Spring Kafka 学习笔记

1 接收消息

接收消息需要提供MessageListenerContainer和MessageListener,有两种方式:
一种是显示生成,一种是由@KafkaListener注解自动生成

显示生成方式需要提供一个method生成

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public Map<String, Object> consumerConfigs(){
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.21.225:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        //disable auto commit offsets
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }
    
    @Bean
    public KafkaMessageListenerContainer<Integer, String> getContainer() {
        ContainerProperties containerProperties = new ContainerProperties("testtopic");
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL);
        
        //使用自定义的MessageListener
        containerProperties.setMessageListener(new StringMsgListerner());
        containerProperties.setGroupId("stringGroup");

        DefaultKafkaConsumerFactory<Integer, String> cf =
                new DefaultKafkaConsumerFactory<>(consumerConfigs());

        KafkaMessageListenerContainer<Integer, String> kafkaMessageListenerContainer =
                new KafkaMessageListenerContainer<>(cf, containerProperties);

        return kafkaMessageListenerContainer;
    }
}

Spring 提供了两种MessageListenerContainer:

  1. KafkaMessageListenerContainer
  2. ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all topics/partitions on a single thread. The ConcurrentMessageListenerContainer delegates to 1 or more KafkaMessageListenerContainer s to provide multi-threaded consumption.

KafkaMessageListenerContainer的构造函数如下:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties,
                    TopicPartitionInitialOffset... topicPartitions)

ConsumerFactory负责创建kafka client的KafkaConsumer实例,ContainerProperties负责设置listener的参数。
注意:KafkaConsumer的参数由consumerConfigs方法提供。

下面看下KafkaMessageListenerContainer的具体结构


image

其主要成员有:

public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {

    private final AbstractMessageListenerContainer<K, V> container;

    private final TopicPartitionInitialOffset[] topicPartitions;

    private volatile ListenerConsumer listenerConsumer;

    private volatile ListenableFuture<?> listenerConsumerFuture;

    private GenericMessageListener<?> listener;

    private String clientIdSuffix;
}

从AbstractMessageListenerContainer继承的成员有:

public abstract class AbstractMessageListenerContainer<K, V>
        implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware {

    /**
     * The default {@link org.springframework.context.SmartLifecycle} phase for listener
     * containers {@value #DEFAULT_PHASE}.
     */
    public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100; // late phase

    protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR

    protected final ConsumerFactory<K, V> consumerFactory; // NOSONAR (final)

    private final ContainerProperties containerProperties;

    private final Object lifecycleMonitor = new Object();

    private String beanName;

    private ApplicationEventPublisher applicationEventPublisher;

    private GenericErrorHandler<?> errorHandler;

    private boolean autoStartup = true;

    private int phase = DEFAULT_PHASE;

    private AfterRollbackProcessor<K, V> afterRollbackProcessor = new DefaultAfterRollbackProcessor<>();

    private volatile boolean running = false;

    private volatile boolean paused;
}

现在看看私有class ListenerConsumer,这个类是对kafka client中 Consumer 的a封装,继承关系如下:

image

主要feilds如下:

private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
    private final ContainerProperties containerProperties = getContainerProperties();
    
    private final Consumer<K, V> consumer;
    
    private final GenericMessageListener<?> genericListener;

    private final MessageListener<K, V> listener;
    
    private volatile Thread consumerThread;
    
    private boolean consumerPaused;

在 KafkaMessageListenerContainer 的doStart函数中创建ListenerConsumer

@Override
    protected void doStart() {
        if (isRunning()) {
            return;
        }
        if (this.clientIdSuffix == null) { // stand-alone container
            checkTopics();
        }
        ContainerProperties containerProperties = getContainerProperties();
        if (!this.consumerFactory.isAutoCommit()) {
            AckMode ackMode = containerProperties.getAckMode();
            if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
                Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
            }
            if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
                    && containerProperties.getAckTime() == 0) {
                containerProperties.setAckTime(5000);
            }
        }

        Object messageListener = containerProperties.getMessageListener();
        Assert.state(messageListener != null, "A MessageListener is required");
        if (containerProperties.getConsumerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
        this.listener = (GenericMessageListener<?>) messageListener;
        ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);
        if (this.listener instanceof DelegatingMessageListener) {
            Object delegating = this.listener;
            while (delegating instanceof DelegatingMessageListener) {
                delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();
            }
            listenerType = ListenerUtils.determineListenerType(delegating);
        }
        this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);
        setRunning(true);
        this.listenerConsumerFuture = containerProperties
                .getConsumerTaskExecutor()
                .submitListenable(this.listenerConsumer);
    }

可以看出,ListenerConsumer的构造函数有两个参数,一个是GenericMessageListener,由我们在创建ContainerProperties的传入,另外一个参数是listenerType,这个值由listener的类型决定:

如果containerProperties没有指定创建executor,listenerConsumer将被 提交给SimpleAsyncTaskExecutor去消费数据;如果制定executor,将由制定的executor执行消费数据。

public final class ListenerUtils {

    private ListenerUtils() {
        super();
    }

    public static ListenerType determineListenerType(Object listener) {
        Assert.notNull(listener, "Listener cannot be null");
        ListenerType listenerType;
        if (listener instanceof AcknowledgingConsumerAwareMessageListener
                || listener instanceof BatchAcknowledgingConsumerAwareMessageListener) {
            listenerType = ListenerType.ACKNOWLEDGING_CONSUMER_AWARE;
        }
        else if (listener instanceof ConsumerAwareMessageListener
                || listener instanceof BatchConsumerAwareMessageListener) {
            listenerType = ListenerType.CONSUMER_AWARE;
        }
        else if (listener instanceof AcknowledgingMessageListener
                || listener instanceof BatchAcknowledgingMessageListener) {
            listenerType = ListenerType.ACKNOWLEDGING;
        }
        else if (listener instanceof GenericMessageListener) {
            listenerType = ListenerType.SIMPLE;
        }
        else {
            throw new IllegalArgumentException("Unsupported listener type: " + listener.getClass().getName());
        }
        return listenerType;
    }

}

我们再看下listenerConsumer的构造函数都做了哪些事情,函数比较长,看下关键代码:

        ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
            Assert.state(!this.isAnyManualAck || !this.autoCommit,
                    "Consumer cannot be configured for auto commit for ackMode " + this.containerProperties.getAckMode());
                    
            //创建kafka client的Consumern对象
            final Consumer<K, V> consumer =
                    KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
                            this.consumerGroupId,
                            this.containerProperties.getClientId(),
                            KafkaMessageListenerContainer.this.clientIdSuffix);
            
            //订阅主题              
            if (KafkaMessageListenerContainer.this.topicPartitions == null) {
                if (this.containerProperties.getTopicPattern() != null) {
                    consumer.subscribe(this.containerProperties.getTopicPattern(), rebalanceListener);
                }
                else {
                    consumer.subscribe(Arrays.asList(this.containerProperties.getTopics()), rebalanceListener);
                }
            }
            else {
                List<TopicPartitionInitialOffset> topicPartitions =
                        Arrays.asList(KafkaMessageListenerContainer.this.topicPartitions);
                this.definedPartitions = new HashMap<>(topicPartitions.size());
                for (TopicPartitionInitialOffset topicPartition : topicPartitions) {
                    this.definedPartitions.put(topicPartition.topicPartition(),
                            new OffsetMetadata(topicPartition.initialOffset(), topicPartition.isRelativeToCurrent(),
                                    topicPartition.getPosition()));
                }
                consumer.assign(new ArrayList<>(this.definedPartitions.keySet()));
            }               

创建完成ListenerConsumer,KafkaMessageListenerContainer将运行ListenerConsumer的run函数获取并消费数据,关键代码如下:

@Override
public void run() {
    //获取containerProperties创建的线程
    this.consumerThread = Thread.currentThread();
    while (isRunning()) {
                try {
                    if (!this.autoCommit && !this.isRecordAck) {
                        processCommits();
                    }
                    processSeeks();
                    if (!this.consumerPaused && isPaused()) {
                        this.consumer.pause(this.consumer.assignment());
                        this.consumerPaused = true;
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Paused consumption from: " + this.consumer.paused());
                        }
                        publishConsumerPausedEvent(this.consumer.assignment());
                    }
                    
                    //读取数据
                    ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
                    this.lastPoll = System.currentTimeMillis();
                    if (this.consumerPaused && !isPaused()) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Resuming consumption from: " + this.consumer.paused());
                        }
                        Set<TopicPartition> paused = this.consumer.paused();
                        this.consumer.resume(paused);
                        this.consumerPaused = false;
                        publishConsumerResumedEvent(paused);
                    }
                    if (records != null && this.logger.isDebugEnabled()) {
                        this.logger.debug("Received: " + records.count() + " records");
                        if (records.count() > 0 && this.logger.isTraceEnabled()) {
                            this.logger.trace(records.partitions().stream()
                                    .flatMap(p -> records.records(p).stream())
                                    // map to same format as send metadata toString()
                                    .map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
                                    .collect(Collectors.toList()));
                        }
                    }
                    if (records != null && records.count() > 0) {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            lastReceive = System.currentTimeMillis();
                        }
                        
                        //调用Listener处理数据
                        invokeListener(records);
                    }
                    else {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            long now = System.currentTimeMillis();
                            if (now > lastReceive + this.containerProperties.getIdleEventInterval()
                                    && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
                                publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener
                                        ? this.consumer : null, this.consumerPaused);
                                lastAlertAt = now;
                                if (this.genericListener instanceof ConsumerSeekAware) {
                                    seekPartitions(getAssignedPartitions(), true);
                                }
                            }
                        }
                    }
                }
                catch (WakeupException e) {
                    // Ignore, we're stopping
                }
                catch (NoOffsetForPartitionException nofpe) {
                    this.fatalError = true;
                    ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
                    break;
                }
                catch (Exception e) {
                    handleConsumerException(e);
                }
            }

最终会调用doInvokeRecordListener,在这个函数会调用listener的onMessage方法处理数据

private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
                @SuppressWarnings("rawtypes") Producer producer,
                Iterator<ConsumerRecord<K, V>> iterator) throws Error {
                
                switch (this.listenerType) {
                    case ACKNOWLEDGING_CONSUMER_AWARE:
                        this.listener.onMessage(record,
                                this.isAnyManualAck
                                        ? new ConsumerAcknowledgment(record)
                                        : null, this.consumer);
                        break;
                    case CONSUMER_AWARE:
                        this.listener.onMessage(record, this.consumer);
                        break;
                    case ACKNOWLEDGING:
                        this.listener.onMessage(record,
                                this.isAnyManualAck
                                        ? new ConsumerAcknowledgment(record)
                                        : null);
                        break;
                    case SIMPLE:
                        this.listener.onMessage(record);
                        break;
image

最后再看下spring是怎么调用doStart函数:

  • doStart:232, KafkaMessageListenerContainer (org.springframework.kafka.listener)
  • start:257, AbstractMessageListenerContainer (org.springframework.kafka.listener)
  • doStart:182, DefaultLifecycleProcessor (org.springframework.context.support)
  • access$200:53, DefaultLifecycleProcessor (org.springframework.context.support)
  • start:360, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support)
  • startBeans:158, DefaultLifecycleProcessor (org.springframework.context.support)
  • onRefresh:122, DefaultLifecycleProcessor (org.springframework.context.support)
  • finishRefresh:879, AbstractApplicationContext (org.springframework.context.support)
  • refresh:549, AbstractApplicationContext (org.springframework.context.support)
  • refresh:775, SpringApplication (org.springframework.boot)
  • refreshContext:397, SpringApplication (org.springframework.boot)
  • run:316, SpringApplication (org.springframework.boot)
  • run:1260, SpringApplication (org.springframework.boot)
  • run:1248, SpringApplication (org.springframework.boot)
  • main:14, KafkaconsumerApplication (com.example.kafkaconsumer)
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容