1 接收消息
接收消息需要提供MessageListenerContainer和MessageListener,有两种方式:
一种是显示生成,一种是由@KafkaListener注解自动生成
显示生成方式需要提供一个method生成
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
public Map<String, Object> consumerConfigs(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.21.225:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//disable auto commit offsets
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
@Bean
public KafkaMessageListenerContainer<Integer, String> getContainer() {
ContainerProperties containerProperties = new ContainerProperties("testtopic");
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
//使用自定义的MessageListener
containerProperties.setMessageListener(new StringMsgListerner());
containerProperties.setGroupId("stringGroup");
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerConfigs());
KafkaMessageListenerContainer<Integer, String> kafkaMessageListenerContainer =
new KafkaMessageListenerContainer<>(cf, containerProperties);
return kafkaMessageListenerContainer;
}
}
Spring 提供了两种MessageListenerContainer:
- KafkaMessageListenerContainer
- ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. The ConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.
KafkaMessageListenerContainer的构造函数如下:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties,
TopicPartitionInitialOffset... topicPartitions)
ConsumerFactory负责创建kafka client的KafkaConsumer实例,ContainerProperties负责设置listener的参数。
注意:KafkaConsumer的参数由consumerConfigs方法提供。
下面看下KafkaMessageListenerContainer的具体结构

其主要成员有:
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
private final AbstractMessageListenerContainer<K, V> container;
private final TopicPartitionInitialOffset[] topicPartitions;
private volatile ListenerConsumer listenerConsumer;
private volatile ListenableFuture<?> listenerConsumerFuture;
private GenericMessageListener<?> listener;
private String clientIdSuffix;
}
从AbstractMessageListenerContainer继承的成员有:
public abstract class AbstractMessageListenerContainer<K, V>
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware {
/**
* The default {@link org.springframework.context.SmartLifecycle} phase for listener
* containers {@value #DEFAULT_PHASE}.
*/
public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100; // late phase
protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR
protected final ConsumerFactory<K, V> consumerFactory; // NOSONAR (final)
private final ContainerProperties containerProperties;
private final Object lifecycleMonitor = new Object();
private String beanName;
private ApplicationEventPublisher applicationEventPublisher;
private GenericErrorHandler<?> errorHandler;
private boolean autoStartup = true;
private int phase = DEFAULT_PHASE;
private AfterRollbackProcessor<K, V> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>();
private volatile boolean running = false;
private volatile boolean paused;
}
现在看看私有class ListenerConsumer,这个类是对kafka client中 Consumer 的a封装,继承关系如下:

主要feilds如下:
private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
private final ContainerProperties containerProperties = getContainerProperties();
private final Consumer<K, V> consumer;
private final GenericMessageListener<?> genericListener;
private final MessageListener<K, V> listener;
private volatile Thread consumerThread;
private boolean consumerPaused;
在 KafkaMessageListenerContainer 的doStart函数中创建ListenerConsumer
@Override
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
if (!this.consumerFactory.isAutoCommit()) {
AckMode ackMode = containerProperties.getAckMode();
if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
}
if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
&& containerProperties.getAckTime() == 0) {
containerProperties.setAckTime(5000);
}
}
Object messageListener = containerProperties.getMessageListener();
Assert.state(messageListener != null, "A MessageListener is required");
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
this.listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);
if (this.listener instanceof DelegatingMessageListener) {
Object delegating = this.listener;
while (delegating instanceof DelegatingMessageListener) {
delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();
}
listenerType = ListenerUtils.determineListenerType(delegating);
}
this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);
setRunning(true);
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
}
可以看出,ListenerConsumer的构造函数有两个参数,一个是GenericMessageListener,由我们在创建ContainerProperties的传入,另外一个参数是listenerType,这个值由listener的类型决定:
如果containerProperties没有指定创建executor,listenerConsumer将被 提交给SimpleAsyncTaskExecutor去消费数据;如果制定executor,将由制定的executor执行消费数据。
public final class ListenerUtils {
private ListenerUtils() {
super();
}
public static ListenerType determineListenerType(Object listener) {
Assert.notNull(listener, "Listener cannot be null");
ListenerType listenerType;
if (listener instanceof AcknowledgingConsumerAwareMessageListener
|| listener instanceof BatchAcknowledgingConsumerAwareMessageListener) {
listenerType = ListenerType.ACKNOWLEDGING_CONSUMER_AWARE;
}
else if (listener instanceof ConsumerAwareMessageListener
|| listener instanceof BatchConsumerAwareMessageListener) {
listenerType = ListenerType.CONSUMER_AWARE;
}
else if (listener instanceof AcknowledgingMessageListener
|| listener instanceof BatchAcknowledgingMessageListener) {
listenerType = ListenerType.ACKNOWLEDGING;
}
else if (listener instanceof GenericMessageListener) {
listenerType = ListenerType.SIMPLE;
}
else {
throw new IllegalArgumentException("Unsupported listener type: " + listener.getClass().getName());
}
return listenerType;
}
}
我们再看下listenerConsumer的构造函数都做了哪些事情,函数比较长,看下关键代码:
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
Assert.state(!this.isAnyManualAck || !this.autoCommit,
"Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
//创建kafka client的Consumern对象
final Consumer<K, V> consumer =
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix);
//订阅主题
if (KafkaMessageListenerContainer.this.topicPartitions == null) {
if (this.containerProperties.getTopicPattern() != null) {
consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
}
else {
consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
}
}
else {
List<TopicPartitionInitialOffset> topicPartitions =
Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
this.definedPartitions = new HashMap<>(topicPartitions.size());
for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
this.definedPartitions.put(topicPartition.topicPartition(),
new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent(),
topicPartition.getPosition()));
}
consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
}
创建完成ListenerConsumer,KafkaMessageListenerContainer将运行ListenerConsumer的run函数获取并消费数据,关键代码如下:
@Override
public void run() {
//获取containerProperties创建的线程
this.consumerThread = Thread.currentThread();
while (isRunning()) {
try {
if (!this.autoCommit && !this.isRecordAck) {
processCommits();
}
processSeeks();
if (!this.consumerPaused && isPaused()) {
this.consumer.pause(this.consumer.assignment());
this.consumerPaused = true;
if (this.logger.isDebugEnabled()) {
this.logger.debug("Paused consumption from: " + this.consumer.paused());
}
publishConsumerPausedEvent(this.consumer.assignment());
}
//读取数据
ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
this.lastPoll = System.currentTimeMillis();
if (this.consumerPaused && !isPaused()) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Resuming consumption from: " + this.consumer.paused());
}
Set<TopicPartition> paused = this.consumer.paused();
this.consumer.resume(paused);
this.consumerPaused = false;
publishConsumerResumedEvent(paused);
}
if (records != null && this.logger.isDebugEnabled()) {
this.logger.debug("Received: " + records.count() + " records");
if (records.count() > 0 && this.logger.isTraceEnabled()) {
this.logger.trace(records.partitions().stream()
.flatMap(p -> records.records(p).stream())
// map to same format as send metadata toString()
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
.collect(Collectors.toList()));
}
}
if (records != null && records.count() > 0) {
if (this.containerProperties.getIdleEventInterval() != null) {
lastReceive = System.currentTimeMillis();
}
//调用Listener处理数据
invokeListener(records);
}
else {
if (this.containerProperties.getIdleEventInterval() != null) {
long now = System.currentTimeMillis();
if (now > lastReceive + this.containerProperties.getIdleEventInterval()
&& now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener
? this.consumer : null, this.consumerPaused);
lastAlertAt = now;
if (this.genericListener instanceof ConsumerSeekAware) {
seekPartitions(getAssignedPartitions(), true);
}
}
}
}
}
catch (WakeupException e) {
// Ignore, we're stopping
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
break;
}
catch (Exception e) {
handleConsumerException(e);
}
}
最终会调用doInvokeRecordListener,在这个函数会调用listener的onMessage方法处理数据
private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
@SuppressWarnings("rawtypes") Producer producer,
Iterator<ConsumerRecord<K, V>> iterator) throws Error {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null, this.consumer);
break;
case CONSUMER_AWARE:
this.listener.onMessage(record, this.consumer);
break;
case ACKNOWLEDGING:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null);
break;
case SIMPLE:
this.listener.onMessage(record);
break;

最后再看下spring是怎么调用doStart函数:
- doStart:232, KafkaMessageListenerContainer (org.springframework.kafka.listener)
- start:257, AbstractMessageListenerContainer (org.springframework.kafka.listener)
- doStart:182, DefaultLifecycleProcessor (org.springframework.context.support)
- access$200:53, DefaultLifecycleProcessor (org.springframework.context.support)
- start:360, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support)
- startBeans:158, DefaultLifecycleProcessor (org.springframework.context.support)
- onRefresh:122, DefaultLifecycleProcessor (org.springframework.context.support)
- finishRefresh:879, AbstractApplicationContext (org.springframework.context.support)
- refresh:549, AbstractApplicationContext (org.springframework.context.support)
- refresh:775, SpringApplication (org.springframework.boot)
- refreshContext:397, SpringApplication (org.springframework.boot)
- run:316, SpringApplication (org.springframework.boot)
- run:1260, SpringApplication (org.springframework.boot)
- run:1248, SpringApplication (org.springframework.boot)
- main:14, KafkaconsumerApplication (com.example.kafkaconsumer)