序
本文主要聊一下spring for kafka的retry
AbstractRetryingMessageListenerAdapter
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/adapter/AbstractRetryingMessageListenerAdapter.java
主要有两个实现类RetryingAcknowledgingMessageListenerAdapter以及RetryingMessageListenerAdapter
RetryingAcknowledgingMessageListenerAdapter
public class RetryingAcknowledgingMessageListenerAdapter<K, V>
extends AbstractRetryingMessageListenerAdapter<K, V, AcknowledgingMessageListener<K, V>>
implements AcknowledgingMessageListener<K, V> {
private final AcknowledgingMessageListener<K, V> delegate;
/**
* Construct an instance with the provided template and delegate. The exception will
* be thrown to the container after retries are exhausted.
* @param messageListener the listener delegate.
* @param retryTemplate the template.
*/
public RetryingAcknowledgingMessageListenerAdapter(AcknowledgingMessageListener<K, V> messageListener,
RetryTemplate retryTemplate) {
this(messageListener, retryTemplate, null);
}
/**
* Construct an instance with the provided template, callback and delegate.
* @param messageListener the listener delegate.
* @param retryTemplate the template.
* @param recoveryCallback the recovery callback; if null, the exception will be
* thrown to the container after retries are exhausted.
*/
public RetryingAcknowledgingMessageListenerAdapter(AcknowledgingMessageListener<K, V> messageListener,
RetryTemplate retryTemplate, RecoveryCallback<? extends Object> recoveryCallback) {
super(messageListener, retryTemplate, recoveryCallback);
Assert.notNull(messageListener, "'messageListener' cannot be null");
this.delegate = messageListener;
}
@SuppressWarnings("unchecked")
@Override
public void onMessage(final ConsumerRecord<K, V> record, final Acknowledgment acknowledgment) {
getRetryTemplate().execute(new RetryCallback<Object, KafkaException>() {
@Override
public Void doWithRetry(RetryContext context) throws KafkaException {
context.setAttribute(CONTEXT_RECORD, record);
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
RetryingAcknowledgingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
return null;
}
}, (RecoveryCallback<Object>) getRecoveryCallback());
}
}
RetryingMessageListenerAdapter
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/adapter/RetryingMessageListenerAdapter.java
public class RetryingMessageListenerAdapter<K, V>
extends AbstractRetryingMessageListenerAdapter<K, V, MessageListener<K, V>>
implements MessageListener<K, V> {
/**
* Construct an instance with the provided template and delegate. The exception will
* be thrown to the container after retries are exhausted.
* @param messageListener the delegate listener.
* @param retryTemplate the template.
*/
public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate) {
this(messageListener, retryTemplate, null);
}
/**
* Construct an instance with the provided template, callback and delegate.
* @param messageListener the delegate listener.
* @param retryTemplate the template.
* @param recoveryCallback the recovery callback; if null, the exception will be
* thrown to the container after retries are exhausted.
*/
public RetryingMessageListenerAdapter(MessageListener<K, V> messageListener, RetryTemplate retryTemplate,
RecoveryCallback<? extends Object> recoveryCallback) {
super(messageListener, retryTemplate, recoveryCallback);
Assert.notNull(messageListener, "'messageListener' cannot be null");
}
@SuppressWarnings("unchecked")
@Override
public void onMessage(final ConsumerRecord<K, V> record) {
getRetryTemplate().execute(new RetryCallback<Object, KafkaException>() {
@Override
public Void doWithRetry(RetryContext context) throws KafkaException {
context.setAttribute(CONTEXT_RECORD, record);
RetryingMessageListenerAdapter.this.delegate.onMessage(record);
return null;
}
}, (RecoveryCallback<Object>) getRecoveryCallback());
}
}
具体是哪种listener呢
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java
private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {
Object messageListener = createMessageListener(container, messageConverter);
Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
if (this.retryTemplate != null) {
if (messageListener instanceof AcknowledgingMessageListener) {
messageListener = new RetryingAcknowledgingMessageListenerAdapter<>(
(AcknowledgingMessageListener<K, V>) messageListener, this.retryTemplate,
this.recoveryCallback);
}
else {
messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
this.retryTemplate, (RecoveryCallback<Object>) this.recoveryCallback);
}
}
if (this.recordFilterStrategy != null) {
if (messageListener instanceof AcknowledgingMessageListener) {
messageListener = new FilteringAcknowledgingMessageListenerAdapter<>(
(AcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,
this.ackDiscarded);
}
else if (messageListener instanceof MessageListener) {
messageListener = new FilteringMessageListenerAdapter<>((MessageListener<K, V>) messageListener,
this.recordFilterStrategy);
}
else if (messageListener instanceof BatchAcknowledgingMessageListener) {
messageListener = new FilteringBatchAcknowledgingMessageListenerAdapter<>(
(BatchAcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,
this.ackDiscarded);
}
else if (messageListener instanceof BatchMessageListener) {
messageListener = new FilteringBatchMessageListenerAdapter<>(
(BatchMessageListener<K, V>) messageListener, this.recordFilterStrategy);
}
}
container.setupMessageListener(messageListener);
}
如果retryTemplate不为null的话,会先判断是不是AcknowledgingMessageListener的子类,如果是则创建RetryingAcknowledgingMessageListenerAdapter,如果不是则创建RetryingMessageListenerAdapter
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java
protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(MessageConverter messageConverter) {
if (isBatchListener()) {
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<K, V>(
this.bean, this.method);
if (messageConverter instanceof BatchMessageConverter) {
messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
}
return messageListener;
}
else {
RecordMessagingMessageListenerAdapter<K, V> messageListener =
new RecordMessagingMessageListenerAdapter<K, V>(this.bean, this.method);
if (messageConverter instanceof RecordMessageConverter) {
messageListener.setMessageConverter((RecordMessageConverter) messageConverter);
}
return messageListener;
}
}
其中RecordMessagingMessageListenerAdapter实现了AcknowledgingMessageListener接口
public class RecordMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
implements MessageListener<K, V>, AcknowledgingMessageListener<K, V> {
//......
}