MessageListenerContainer
定义2个方法:
void setupMessageListener(Object messageListener); 设置messageListener
MessageConverter getMessageConverter(); 得到MessageConverter(用于转换接收到的Message的)
AbstractMessageListenerContainer
static final int DEFAULT_FAILED_DECLARATION_RETRY_INTERVAL = 5000;
public static final boolean DEFAULT_DEBATCHING_ENABLED = true;
public static final int DEFAULT_PREFETCH_COUNT = 250;
/**
* The default recovery interval: 5000 ms = 5 seconds.
*/
public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000;
private final ContainerDelegate delegate = this::actualInvokeListener;
protected final Object consumersMonitor = new Object(); //NOSONAR
private final Map<String, Object> consumerArgs = new HashMap<String, Object>();
private ContainerDelegate proxy = this.delegate;
private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
private ApplicationEventPublisher applicationEventPublisher;
private PlatformTransactionManager transactionManager;
private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute();
private String beanName;
private Executor taskExecutor = new SimpleAsyncTaskExecutor();
private boolean taskExecutorSet;
private BackOff recoveryBackOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
private MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
private RabbitAdmin rabbitAdmin;
private boolean missingQueuesFatal = true;
private boolean missingQueuesFatalSet;
private boolean possibleAuthenticationFailureFatal = true;
private boolean possibleAuthenticationFailureFatalSet;
private boolean autoDeclare = true;
private boolean mismatchedQueuesFatal = false;
private long failedDeclarationRetryInterval = DEFAULT_FAILED_DECLARATION_RETRY_INTERVAL;
private boolean autoStartup = true;
private int phase = Integer.MAX_VALUE;
private volatile boolean active = false;
private volatile boolean running = false;
private final Object lifecycleMonitor = new Object();
private volatile List<String> queueNames = new CopyOnWriteArrayList<String>();
private ErrorHandler errorHandler = new ConditionalRejectingErrorHandler();
private MessageConverter messageConverter;
是否暴露channel的listener给已经注册的ChannelAwareMessageListener?
private boolean exposeListenerChannel = true;
private volatile Object messageListener;
private volatile AcknowledgeMode acknowledgeMode = AcknowledgeMode.AUTO;
private volatile boolean deBatchingEnabled = DEFAULT_DEBATCHING_ENABLED;
private volatile boolean initialized;
private Collection<MessagePostProcessor> afterReceivePostProcessors;
private volatile ApplicationContext applicationContext;
private String listenerId;
private Advice[] adviceChain = new Advice[0];
private ConsumerTagStrategy consumerTagStrategy;
private volatile boolean exclusive;
private volatile boolean noLocal;
private volatile boolean defaultRequeueRejected = true;
private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT;
private long idleEventInterval;
private volatile long lastReceive = System.currentTimeMillis();
private boolean statefulRetryFatalWithNullMessageId = true;
private ConditionalExceptionLogger exclusiveConsumerExceptionLogger = new DefaultExclusiveConsumerLogger();
private boolean alwaysRequeueWithTxManagerRollback;
private String lookupKeyQualifier = "";
private boolean forceCloseChannel = true
方法
checkMessageListener 检查messageListener的类型必须是MessageListener或ChannelAwareMessageListener
@Override
public final void afterPropertiesSet() {
super.afterPropertiesSet();父类检查ConnectionFactory存在
Assert.state(
this.exposeListenerChannel || !getAcknowledgeMode().isManual(),
"You cannot acknowledge messages manually if the channel is not exposed to the listener "
+ "(please check your configuration and set exposeListenerChannel=true or acknowledgeMode!=MANUAL)");
检查
Assert.state(
!(getAcknowledgeMode().isAutoAck() && isChannelTransacted()),
"The acknowledgeMode is NONE (autoack in Rabbit terms) which is not consistent with having a "
+ "transactional channel. Either use a different AcknowledgeMode or make sure channelTransacted=false");
validateConfiguration();
initialize();
}
// -------------------------------------------------------------------------
// Lifecycle methods for starting and stopping the container
// -------------------------------------------------------------------------
/**
* Initialize this container.
* <p>
* Creates a Rabbit Connection and calls {@link #doInitialize()}.
*/
public void initialize() {
try {
//获取锁并唤醒锁上等待的所有线程
synchronized (this.lifecycleMonitor) {
this.lifecycleMonitor.notifyAll();
}
//将delegate的和内含的adviceChain 生成代理
initializeProxy(this.delegate);
//?????????
checkMissingQueuesFatalFromProperty();
//??????
checkPossibleAuthenticationFailureFatalFromProperty();
//留给子类使用
doInitialize();
//??????
if (!this.isExposeListenerChannel() && this.transactionManager != null) {
logger.warn("exposeListenerChannel=false is ignored when using a TransactionManager");
}
//默认生成一个SimpleAsyncTaskExecutor,并将标志位设置为true
if (!this.taskExecutorSet && StringUtils.hasText(this.getBeanName())) {
this.taskExecutor = new SimpleAsyncTaskExecutor(this.getBeanName() + "-");
this.taskExecutorSet = true;
}
//默认事务标志位
if (this.transactionManager != null) {
if (!isChannelTransacted()) {
logger.debug("The 'channelTransacted' is coerced to 'true', when 'transactionManager' is provided");
setChannelTransacted(true);
}
}
}
catch (Exception ex) {
throw convertRabbitAccessException(ex);
}
}
shutdown方法,设置标志位。doShutdown方法留给子类
@Override
public void start() {
//根据条件判断是否调用afterPropertiesSet
if (isRunning()) {
return;
}
if (!this.initialized) {
synchronized (this.lifecycleMonitor) {
if (!this.initialized) {
afterPropertiesSet();
this.initialized = true;
}
}
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Starting Rabbit listener container.");
}
//
configureAdminIfNeeded();
checkMismatchedQueues();
doStart();
}
catch (Exception ex) {
throw convertRabbitAccessException(ex);
}
}
SimpleMessageListenerContainer
/**
* Re-initializes this container's Rabbit message consumers, if not initialized already.
* Then submits each consumer to this container's task executor. 再次初始化容器的
* message consumers。如果已经初始化了,提交每一个consumer到容器的task executor
* @throws Exception Any Exception.
*/
@Override
protected void doStart() throws Exception {
//第一步里面主要是将MessageListener希望监听的Queue和Container包含的Queue比较,如果container缺少了任一一个希望的QueueName,抛出异常。
if (getMessageListener() instanceof ListenerContainerAware) {
Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
if (expectedQueueNames != null) {
String[] queueNames = getQueueNames();
Assert.state(expectedQueueNames.size() == queueNames.length,
"Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
boolean found = false;
for (String queueName : queueNames) {
if (expectedQueueNames.contains(queueName)) {
found = true;
}
else {
found = false;
break;
}
}
Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
+ Arrays.asList(queueNames));
}
}
//第二步,调用父类的super.doStart();
super.doStart();
//第三步,获取consumersMonitor锁,调用initializeConsumers初始化consumers(BlockingQueueConsumer)。
//再根据consumer初始化AsyncMessageProcessingConsumer。
//然后提交给Executor。然后遍历proccessor,调用processor.getStartupException。
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
int newConsumers = initializeConsumers();
if (this.consumers == null) {
logger.info("Consumers were initialized and then cleared " +
"(presumably the container was stopped concurrently)");
return;
}
if (newConsumers <= 0) {
if (logger.isInfoEnabled()) {
logger.info("Consumers are already running");
}
return;
}
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
for (AsyncMessageProcessingConsumer processor : processors) {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
}
BlockingQueueConsumer
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) {
this.connectionFactory = connectionFactory;
this.messagePropertiesConverter = messagePropertiesConverter;
this.activeObjectCounter = activeObjectCounter;
this.acknowledgeMode = acknowledgeMode;
this.transactional = transactional;
this.prefetchCount = prefetchCount;
this.defaultRequeueRejected = defaultRequeueRejected;
if (consumerArgs != null && consumerArgs.size() > 0) {
this.consumerArgs.putAll(consumerArgs);
}
this.noLocal = noLocal;
this.exclusive = exclusive;
this.queues = Arrays.copyOf(queues, queues.length);
this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
}
public void start() throws AmqpException {
if (logger.isDebugEnabled()) {
logger.debug("Starting consumer " + this);
}
this.thread = Thread.currentThread();
try {
//1.先得到ResourceHolder,得到channel,如果channel的实质是AutorecoveringChannel,为其添加addRecoveryListener,既对象本身。
this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,
this.transactional);
this.channel = this.resourceHolder.getChannel();
addRecoveryListener();
}
catch (AmqpAuthenticationException e) {
throw new FatalListenerStartupException("Authentication failure", e);
}
//2.?????
this.consumer = new InternalConsumer(this.channel);
this.deliveryTags.clear();
this.activeObjectCounter.add(this);
// mirrored queue might be being moved
int passiveDeclareRetries = this.declarationRetries;
this.declaring = true;
do {
if (cancelled()) {
break;
}
try {
attemptPassiveDeclarations();
if (passiveDeclareRetries < this.declarationRetries && logger.isInfoEnabled()) {
logger.info("Queue declaration succeeded after retrying");
}
passiveDeclareRetries = 0;
}
catch (DeclarationException e) {
if (passiveDeclareRetries > 0 && this.channel.isOpen()) {
if (logger.isWarnEnabled()) {
logger.warn("Queue declaration failed; retries left=" + (passiveDeclareRetries), e);
try {
Thread.sleep(this.failedDeclarationRetryInterval);
}
catch (InterruptedException e1) {
this.declaring = false;
Thread.currentThread().interrupt();
this.activeObjectCounter.release(this);
throw RabbitExceptionTranslator.convertRabbitAccessException(e1);
}
}
}
else if (e.getFailedQueues().size() < this.queues.length) {
if (logger.isWarnEnabled()) {
logger.warn("Not all queues are available; only listening on those that are - configured: "
+ Arrays.asList(this.queues) + "; not available: " + e.getFailedQueues());
}
this.missingQueues.addAll(e.getFailedQueues());
this.lastRetryDeclaration = System.currentTimeMillis();
}
else {
this.declaring = false;
this.activeObjectCounter.release(this);
throw new QueuesNotAvailableException("Cannot prepare queue for listener. "
+ "Either the queue doesn't exist or the broker will not allow us to use it.", e);
}
}
}
while (passiveDeclareRetries-- > 0 && !cancelled());
this.declaring = false;
if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {
// Set basicQos before calling basicConsume (otherwise if we are not acking the broker
// will send blocks of 100 messages)
try {
this.channel.basicQos(this.prefetchCount);
}
catch (IOException e) {
this.activeObjectCounter.release(this);
throw new AmqpIOException(e);
}
}
try {
if (!cancelled()) {
for (String queueName : this.queues) {
if (!this.missingQueues.contains(queueName)) {
consumeFromQueue(queueName);
}
}
}
}
catch (IOException e) {
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
}