SpringRabbitmq-MessageListenerContainer

MessageListenerContainer

定义2个方法:

void setupMessageListener(Object messageListener); 设置messageListener
MessageConverter getMessageConverter(); 得到MessageConverter(用于转换接收到的Message的)

AbstractMessageListenerContainer

    
    static final int DEFAULT_FAILED_DECLARATION_RETRY_INTERVAL = 5000;
        
    public static final boolean DEFAULT_DEBATCHING_ENABLED = true;

    public static final int DEFAULT_PREFETCH_COUNT = 250;

    /**
     * The default recovery interval: 5000 ms = 5 seconds.
     */
    public static final long DEFAULT_RECOVERY_INTERVAL = 5000;

    public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000;

    private final ContainerDelegate delegate = this::actualInvokeListener;

    protected final Object consumersMonitor = new Object(); //NOSONAR

    private final Map<String, Object> consumerArgs = new HashMap<String, Object>();

    private ContainerDelegate proxy = this.delegate;

    private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;

    private ApplicationEventPublisher applicationEventPublisher;

    private PlatformTransactionManager transactionManager;

    private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute();

    private String beanName;

    private Executor taskExecutor = new SimpleAsyncTaskExecutor();

    private boolean taskExecutorSet;

    private BackOff recoveryBackOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);

    private MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();

    private RabbitAdmin rabbitAdmin;

    private boolean missingQueuesFatal = true;

    private boolean missingQueuesFatalSet;

    private boolean possibleAuthenticationFailureFatal = true;

    private boolean possibleAuthenticationFailureFatalSet;

    private boolean autoDeclare = true;

    private boolean mismatchedQueuesFatal = false;

    private long failedDeclarationRetryInterval = DEFAULT_FAILED_DECLARATION_RETRY_INTERVAL;

    private boolean autoStartup = true;

    private int phase = Integer.MAX_VALUE;

    private volatile boolean active = false;

    private volatile boolean running = false;

    private final Object lifecycleMonitor = new Object();

    private volatile List<String> queueNames = new CopyOnWriteArrayList<String>();

    private ErrorHandler errorHandler = new ConditionalRejectingErrorHandler();

    private MessageConverter messageConverter;
        是否暴露channel的listener给已经注册的ChannelAwareMessageListener?
    private boolean exposeListenerChannel = true;

    private volatile Object messageListener;

    private volatile AcknowledgeMode acknowledgeMode = AcknowledgeMode.AUTO;

    private volatile boolean deBatchingEnabled = DEFAULT_DEBATCHING_ENABLED;

    private volatile boolean initialized;

    private Collection<MessagePostProcessor> afterReceivePostProcessors;

    private volatile ApplicationContext applicationContext;

    private String listenerId;

    private Advice[] adviceChain = new Advice[0];

    private ConsumerTagStrategy consumerTagStrategy;

    private volatile boolean exclusive;

    private volatile boolean noLocal;

    private volatile boolean defaultRequeueRejected = true;

    private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT;

    private long idleEventInterval;

    private volatile long lastReceive = System.currentTimeMillis();

    private boolean statefulRetryFatalWithNullMessageId = true;

    private ConditionalExceptionLogger exclusiveConsumerExceptionLogger = new DefaultExclusiveConsumerLogger();

    private boolean alwaysRequeueWithTxManagerRollback;

    private String lookupKeyQualifier = "";

    private boolean forceCloseChannel = true

方法

checkMessageListener 检查messageListener的类型必须是MessageListener或ChannelAwareMessageListener
@Override
    public final void afterPropertiesSet() {
        super.afterPropertiesSet();父类检查ConnectionFactory存在
        Assert.state(
                this.exposeListenerChannel || !getAcknowledgeMode().isManual(),
                "You cannot acknowledge messages manually if the channel is not exposed to the listener "
                        + "(please check your configuration and set exposeListenerChannel=true or acknowledgeMode!=MANUAL)");
检查
        Assert.state(
                !(getAcknowledgeMode().isAutoAck() && isChannelTransacted()),
                "The acknowledgeMode is NONE (autoack in Rabbit terms) which is not consistent with having a "
                        + "transactional channel. Either use a different AcknowledgeMode or make sure channelTransacted=false");
        validateConfiguration();
        initialize();
    }
    // -------------------------------------------------------------------------
    // Lifecycle methods for starting and stopping the container
    // -------------------------------------------------------------------------

    /**
     * Initialize this container.
     * <p>
     * Creates a Rabbit Connection and calls {@link #doInitialize()}.
     */
    public void initialize() {
        try {
            //获取锁并唤醒锁上等待的所有线程
            synchronized (this.lifecycleMonitor) {
                this.lifecycleMonitor.notifyAll();
            }
            //将delegate的和内含的adviceChain 生成代理
            initializeProxy(this.delegate);
            //?????????
            checkMissingQueuesFatalFromProperty();
            //??????
            checkPossibleAuthenticationFailureFatalFromProperty();
            //留给子类使用
            doInitialize();
            //??????
            if (!this.isExposeListenerChannel() && this.transactionManager != null) {
                logger.warn("exposeListenerChannel=false is ignored when using a TransactionManager");
            }
            //默认生成一个SimpleAsyncTaskExecutor,并将标志位设置为true
            if (!this.taskExecutorSet && StringUtils.hasText(this.getBeanName())) {
                this.taskExecutor = new SimpleAsyncTaskExecutor(this.getBeanName() + "-");
                this.taskExecutorSet = true;
            }
            //默认事务标志位
            if (this.transactionManager != null) {
                if (!isChannelTransacted()) {
                    logger.debug("The 'channelTransacted' is coerced to 'true', when 'transactionManager' is provided");
                    setChannelTransacted(true);
                }

            }
        }
        catch (Exception ex) {
            throw convertRabbitAccessException(ex);
        }
    }
shutdown方法,设置标志位。doShutdown方法留给子类
    @Override
    public void start() {
      //根据条件判断是否调用afterPropertiesSet
        if (isRunning()) {
            return;
        }
        if (!this.initialized) {
            synchronized (this.lifecycleMonitor) {
                if (!this.initialized) {
                    afterPropertiesSet();
                    this.initialized = true;
                }
            }
        }
        try {
            if (logger.isDebugEnabled()) {
                logger.debug("Starting Rabbit listener container.");
            }
      //
            configureAdminIfNeeded();
            checkMismatchedQueues();
            doStart();
        }
        catch (Exception ex) {
            throw convertRabbitAccessException(ex);
        }
    }

SimpleMessageListenerContainer

    /**
     * Re-initializes this container's Rabbit message consumers, if not initialized already.           
     * Then submits each consumer  to this container's task executor. 再次初始化容器的        
     * message consumers。如果已经初始化了,提交每一个consumer到容器的task executor
     * @throws Exception Any Exception.
     */
    @Override
    protected void doStart() throws Exception {
              //第一步里面主要是将MessageListener希望监听的Queue和Container包含的Queue比较,如果container缺少了任一一个希望的QueueName,抛出异常。
        if (getMessageListener() instanceof ListenerContainerAware) {
            Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
            if (expectedQueueNames != null) {
                String[] queueNames = getQueueNames();
                Assert.state(expectedQueueNames.size() == queueNames.length,
                        "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                                + Arrays.asList(queueNames));
                boolean found = false;
                for (String queueName : queueNames) {
                    if (expectedQueueNames.contains(queueName)) {
                        found = true;
                    }
                    else {
                        found = false;
                        break;
                    }
                }
                Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                        + Arrays.asList(queueNames));
            }
        }
    //第二步,调用父类的super.doStart();
        super.doStart();
    //第三步,获取consumersMonitor锁,调用initializeConsumers初始化consumers(BlockingQueueConsumer)。
    //再根据consumer初始化AsyncMessageProcessingConsumer。
    //然后提交给Executor。然后遍历proccessor,调用processor.getStartupException。
        synchronized (this.consumersMonitor) {
            if (this.consumers != null) {
                throw new IllegalStateException("A stopped container should not have consumers");
            }
            int newConsumers = initializeConsumers();
            if (this.consumers == null) {
                logger.info("Consumers were initialized and then cleared " +
                        "(presumably the container was stopped concurrently)");
                return;
            }
            if (newConsumers <= 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Consumers are already running");
                }
                return;
            }
            Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
            for (BlockingQueueConsumer consumer : this.consumers) {
                AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                processors.add(processor);
                getTaskExecutor().execute(processor);
                if (getApplicationEventPublisher() != null) {
                    getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                }
            }
            for (AsyncMessageProcessingConsumer processor : processors) {
                FatalListenerStartupException startupException = processor.getStartupException();
                if (startupException != null) {
                    throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                }
            }
        }
    }

BlockingQueueConsumer

    public BlockingQueueConsumer(ConnectionFactory connectionFactory,
            MessagePropertiesConverter messagePropertiesConverter,
            ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
            boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
            Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) {
        this.connectionFactory = connectionFactory;
        this.messagePropertiesConverter = messagePropertiesConverter;
        this.activeObjectCounter = activeObjectCounter;
        this.acknowledgeMode = acknowledgeMode;
        this.transactional = transactional;
        this.prefetchCount = prefetchCount;
        this.defaultRequeueRejected = defaultRequeueRejected;
        if (consumerArgs != null && consumerArgs.size() > 0) {
            this.consumerArgs.putAll(consumerArgs);
        }
        this.noLocal = noLocal;
        this.exclusive = exclusive;
        this.queues = Arrays.copyOf(queues, queues.length);
        this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
    }
    public void start() throws AmqpException {
        if (logger.isDebugEnabled()) {
            logger.debug("Starting consumer " + this);
        }

        this.thread = Thread.currentThread();

        try {
//1.先得到ResourceHolder,得到channel,如果channel的实质是AutorecoveringChannel,为其添加addRecoveryListener,既对象本身。
            this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,
                    this.transactional);
            this.channel = this.resourceHolder.getChannel();
            addRecoveryListener();
        }
        catch (AmqpAuthenticationException e) {
            throw new FatalListenerStartupException("Authentication failure", e);
        }
  //2.?????
        this.consumer = new InternalConsumer(this.channel);
        this.deliveryTags.clear();
        this.activeObjectCounter.add(this);

        // mirrored queue might be being moved
        int passiveDeclareRetries = this.declarationRetries;
        this.declaring = true;
        do {
            if (cancelled()) {
                break;
            }
            try {
                attemptPassiveDeclarations();
                if (passiveDeclareRetries < this.declarationRetries && logger.isInfoEnabled()) {
                    logger.info("Queue declaration succeeded after retrying");
                }
                passiveDeclareRetries = 0;
            }
            catch (DeclarationException e) {
                if (passiveDeclareRetries > 0 && this.channel.isOpen()) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Queue declaration failed; retries left=" + (passiveDeclareRetries), e);
                        try {
                            Thread.sleep(this.failedDeclarationRetryInterval);
                        }
                        catch (InterruptedException e1) {
                            this.declaring = false;
                            Thread.currentThread().interrupt();
                            this.activeObjectCounter.release(this);
                            throw RabbitExceptionTranslator.convertRabbitAccessException(e1);
                        }
                    }
                }
                else if (e.getFailedQueues().size() < this.queues.length) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Not all queues are available; only listening on those that are - configured: "
                                + Arrays.asList(this.queues) + "; not available: " + e.getFailedQueues());
                    }
                    this.missingQueues.addAll(e.getFailedQueues());
                    this.lastRetryDeclaration = System.currentTimeMillis();
                }
                else {
                    this.declaring = false;
                    this.activeObjectCounter.release(this);
                    throw new QueuesNotAvailableException("Cannot prepare queue for listener. "
                            + "Either the queue doesn't exist or the broker will not allow us to use it.", e);
                }
            }
        }
        while (passiveDeclareRetries-- > 0 && !cancelled());
        this.declaring = false;

        if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {
            // Set basicQos before calling basicConsume (otherwise if we are not acking the broker
            // will send blocks of 100 messages)
            try {
                this.channel.basicQos(this.prefetchCount);
            }
            catch (IOException e) {
                this.activeObjectCounter.release(this);
                throw new AmqpIOException(e);
            }
        }


        try {
            if (!cancelled()) {
                for (String queueName : this.queues) {
                    if (!this.missingQueues.contains(queueName)) {
                        consumeFromQueue(queueName);
                    }
                }
            }
        }
        catch (IOException e) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        }
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容