一文带你玩转kafka消费者

1 前言

《一文带你快速入门kafka》《结合源码了解Kafka生产者》 写过的知识点与前提这里就不重复写了。这里会和前面有一点联系,但是不多。

2 消费者与消费组

在 Kafka 中,会有消费组这么一个概念,每一个消费者都有一个对应的消费组。当一个消息发布到 topic 后,只会被订阅这个 topic 的每个消费组中的一个消费者消费。下面我们通过一些示例来进一步了解消费者与消费组之间的关系。
1.某主题有4个分区,两个消费组订阅,具体如下图(下图来自《深入理解Kafka:核心设计与实践原理》)

image.png

2.在实际工作中,我们消费者往往是多个节点,并且在发布时基本都是一个节点发好了再发;在遇到突发流量导致的消息堆积时,有时我们可能会选择增加消费者节点来处理。下面的示例图其实就是按照这些工作会遇到的场景来的:目前有一个主题,它有7个分区
2.1.订阅这个主题的消费者第一个节点启动了
image.png

2.2.接着第二个节点也启动好了(下图来自《深入理解Kafka:核心设计与实践原理》)
image.png

2.3.如此类推,一直发布到C4,共5个节点,正常情况下消息消费情况都没有堆积,但是遇到活动时会有激增,这时我们会给这个消费组增加消费者提升整体消费能力,如下图增加到了C6
image.png

是不是越多越好呢?再加多一个消费者C7,如下
image.png

事实上,默认的分配策略下每一个分区只能被同一个消费组内的一个消费者消费

3 分区分配策略

在前面说了消费者与消费组的关系的同时,也用示例展示了增加消费者在默认情况下分区是怎么分配的。不过这个分区分配策略是可以通过配置 partition.assignment.strategy 修改的。接下来,我们看下 Kafka 都提供了哪些分配策略。
下面会反复涉及到 consumerId 或者叫 memberId,这是 org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor.MemberInfo 中的一个属性,具体如下图

memberId.png

3.1 RangeAssignor

这个是 Kafka 的默认分配策略。假设当前有一个主题被一个消费组订阅了,这个策略会先将这个消费组内的消费者按照 consumerId 的字典序排序,然后为每个消费者划分固定的分区范围。如果不能平均分配,那么字典序靠前的消费者将会被多分配一个分区。
举个例子:假设现在这个主题的分区数为 x,消费者数为 y,n = x/y,m=x%y,那么前面 m 个消费者会被分配到 n+1 个分区,后面的只有 n 个分区。
从公式不难看出,当 m 总是 0,并且消费者的性能都相差无几,那么不管有多少个主题,这个分配策略都很是合理,每个消费者的性能都得到了充分的利用,并且负载很均匀。
但现实不可能总是这么理想的,打个比方,现在有两个主题,它们分区均为 3,而订阅它们的消费组的消费者只有 2 个,那么此时会得到以下结果(这个例子来自于kafka-client的源码注释):
消费者 C0:t0p0,t0p1,t1p0,t1p1
消费者 C1:t0p2,t1p2
这时,C0 的压力明显高于 C1。

3.2 RoundRobinAssignor

这个策略是先将消费者的 consumerId 按字典序排序,然后通过轮询的方式来分配分区。
用 3.1 的第二个例子,用这个策略得到的结果如下:
消费者 C0:t0p0,t0p2,t1p1
消费者 C1:t0p1,t1p0,t1p2
但是实际工作中,我们每个主题的分区数有可能是不同的,另外组内的消费者订阅的主题也可能是不同的,这样就有可能导致分配不均匀了。
我们来看下官方说明的一个极端例子


RoundRobinAssignor.png

这里是说,消费组内有 3 个消费组,消费组订阅了 3 个主题,这些主题的分区数分别是 1,2,3。其中 C0 订阅了 t0,C1 订阅了 t0 和 t1,C2 订阅了 t0,t1,t2,分配结果如上图。

3.3 StickyAssignor

这个分配策略有两个特性:

  1. 分区的分配要尽可能均匀
  2. 分区的分配尽可能与上次分配的结果保持一致
    当这两个特性冲突时,第一个优先于第二个。

下面举一些例子来了解它:

  1. 一个消费组内有 3 个消费者,消费组订阅了 4 个主题,每个主题均有 2 个分区,最终分配结果如下
    C0:t0p0, t1p1, t3p0
    C1:t0p1, t2p0, t3p1
    C2:t1p0, t2p1
    现在 C1 因为各种原因不在消费组中了,这时会重新分配分区,结果如下
    C0:t0p0, t1p1, t3p0, t2p0
    C2:t1p0, t2p1, t0p1, t3p1
  2. 一个消费组内有 3 个消费者(C0,C1,C2),消费组订阅了 3 个主题(t0,t1,t2),它们各自的分区数分别是 1,2,3,C0 订阅了 t0,C1 订阅了 t0,t1,C2 订阅了 t0,t1,t2,分配结果如下
    C0:t0p0
    C1:t1p0, t1p1
    C2:t2p0, t2p1, t2p2
    假设现在 C0 离开了消费组,那么从新分配的结果如下
    C1:t1p0, t1p1, t0p0
    C2:t2p0, t2p1, t2p2

3.4 自定义分区分配策略

前面三个都是 kafka-client 提供的三种分配策略,如果都不满足业务场景的需求,那么我们可以自定义一个分配策略。自定义分配策略需要实现接口 org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
application.yml配置如下

spring:
  kafka:
    bootstrap-servers: "192.168.226.140:9092"
    consumer:
      group-id: ${spring.application.name}-${spring.profiles.active}
      properties:
        partition:
          assignment:
            strategy: org.apache.kafka.clients.consumer.RangeAssignor # 指定分区策略

RangeAssignor(这里用 Kafka 的默认策略说明怎么自定义策略)
关系类图如下


RangeAssignor.png

RangeAssignor 源码如下

public class RangeAssignor extends AbstractPartitionAssignor {
    public static final String RANGE_ASSIGNOR_NAME = "range";

    /**
     * 因为会有多个分区策略,而这个方法会返回与每个分区策略一一对应的唯一名,所以可以通过这个方法匹配到唯一的分区策略
     */
    @Override
    public String name() {
        return RANGE_ASSIGNOR_NAME;
    }

    /**
     * key-topic value-订阅了该 topic 的消费者集合
     * @param consumerMetadata key-consumerId value-Subscription(它是ConsumerPartitionAssignor的内部类,它拥有消费者的订阅信息)
     * @return key-topic value-订阅了该 topic 的消费者集合
     */
    private Map<String, List<MemberInfo>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
        Map<String, List<MemberInfo>> topicToConsumers = new HashMap<>();
        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            MemberInfo memberInfo = new MemberInfo(consumerId, subscriptionEntry.getValue().groupInstanceId());
            for (String topic : subscriptionEntry.getValue().topics()) {
                put(topicToConsumers, topic, memberInfo);
            }
        }
        return topicToConsumers;
    }

    /**
     * 重写这个方法可以自定义自己的分区策略
     */
    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
        // key-topic value-订阅了该 topic 的消费者集合
        Map<String, List<MemberInfo>> consumersPerTopic = consumersPerTopic(subscriptions);

        // key-consumerId value-这个 consumerId 被分配的分区
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        for (String memberId : subscriptions.keySet())
            assignment.put(memberId, new ArrayList<>());

        // 按照主题的维度遍历
        for (Map.Entry<String, List<MemberInfo>> topicEntry : consumersPerTopic.entrySet()) {
            String topic = topicEntry.getKey();
            // 订阅了该 topic 的消费者
            List<MemberInfo> consumersForTopic = topicEntry.getValue();

            // topic 的分区数
            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null)
                continue;

            // 对消费者进行排序
            Collections.sort(consumersForTopic);

            // 每个消费者至少被分配到的分区数
            int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
            // 前 consumersWithExtraPartition 个消费者被分配到的分区数 = numPartitionsPerConsumer + 1
            int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
            for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
                assignment.get(consumersForTopic.get(i).memberId).addAll(partitions.subList(start, start + length));
            }
        }
        return assignment;
    }
}

注意:在第 2 章节末尾写了默认策略下一个分区只能被同个消费组内的一个消费者消费,但是这不是绝对的,我们可以通过自定义策略使得一个分区可被同一个消费组内的多个消费者消费

那如果想将消息广播给消费组内所有消费者怎么办呢?请看以下的示例代码

package com.example.czl.kafka.kafka.consumer.partition.strategy;

import com.google.common.collect.Maps;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * @author CaiZhuliang
 * @date 2023/7/1
 */
public class BroadcastAssignor extends AbstractPartitionAssignor {
    public static final String BROADCAST_ASSIGNOR_NAME = "broadcast";

    /**
     * 将消息广播给订阅的消费组内的所有消费者
     * @param partitionsPerTopic key-topic value-topic的分区数
     * @param subscriptions key-consumerId value-Subscription(它是ConsumerPartitionAssignor的内部类,它拥有消费者的订阅信息)
     * @return key-consumerId(具体消费者的ID) value-这个 consumerId 被分配的分区
     */
    @Override
    public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
        // key-topic value-订阅了该 topic 的消费者的consumerId集合
        Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
        // key-consumerId(具体消费者的ID) value-这个 consumerId 被分配的分区
        Map<String, List<TopicPartition>> assignment = Maps.newHashMapWithExpectedSize(subscriptions.size());
        subscriptions.keySet().forEach(consumerId -> assignment.put(consumerId, new ArrayList<>()));
        // 针对每个主题,为每个订阅的消费者分配所有的分区
        consumersPerTopic.forEach((topic, consumerIdList) -> {
            Integer partitionNumForTopic = partitionsPerTopic.get(topic);
            if (null != partitionNumForTopic && CollectionUtils.isNotEmpty(consumerIdList)) {
                List<TopicPartition> partitionList = AbstractPartitionAssignor.partitions(topic, partitionNumForTopic);
                if (CollectionUtils.isNotEmpty(partitionList)) {
                    consumerIdList.forEach(consumerId -> assignment.get(consumerId).addAll(partitionList));
                }
            }
        });
        return assignment;
    }

    /**
     * key-topic value-订阅了该 topic 的消费者集合
     * @param subscriptions key-consumerId value-Subscription(它是ConsumerPartitionAssignor的内部类,它拥有消费者的订阅信息)
     * @return key-topic value-订阅了该 topic 的消费者的consumerId集合
     */
    private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> subscriptions) {
        Map<String, List<String>> topicToConsumers = Maps.newHashMap();
        for (Map.Entry<String, Subscription> subscriptionEntry : subscriptions.entrySet()) {
            // 一般为 consumer-{groupId}-{consumerIndex}-{UUID}
            String consumerId = subscriptionEntry.getKey();
            for (String topic : subscriptionEntry.getValue().topics()) {
                put(topicToConsumers, topic, consumerId);
            }
        }
        return topicToConsumers;
    }

    @Override
    public String name() {
        return BROADCAST_ASSIGNOR_NAME;
    }
}

这段代码只是展示怎么实现广播而已,但是如果直接使用在生产是有问题的。比如:consumer1 提交了 offset 为 10,这时 consumer2 紧接着提交了 offset 为 12,此时 consumer1 重启了,那么 consumer1 就丢失了部分消息。
为什么会这样呢?如果看过 《一文带你快速入门kafka》 的话,就会对下图有印象

kafka与zookeeper.png

消费者提交位移存储在 zk 上,具体路径是 /consumers/所属的消费组ID/offsets/主题/节点ID-分区号 下的,所以上面的 BroadcastAssignor 广播会导致消费者之间的位移互相覆盖,节点没重启本地就还有缓存记录自己的 offset,但是一旦重启就要到 zk 拿数据了,这样就会有问题。
除了这个问题,还有别的问题,因此,想实现广播功能,需要自己去完善代码才行,这里我就不完善了。

4 反序列化

《结合源码了解Kafka生产者》 提到 Producer 需要将消息对象序列化成字节数组才能发送,那么很容易想到我们 consumer 接收到的必然是字节数组,需要反序列化处理。
如果生产者是自定义的序列化器,那么消费者的反序列化往往也需要自定义了,那么要怎么做呢?只要实现接口 org.apache.kafka.common.serialization.Deserializer 即可。下面是源码

public interface Deserializer<T> extends Closeable {

    /**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */
    default void configure(Map<String, ?> configs, boolean isKey) {
        // configs 中有3个参数用来配置反序列化的编码类型:key.deserializer.encoding、value.deserializer.encoding 和 deserializer.encoding
    }

    /**
     * Deserialize a record value from a byte array into a value or object.
     * @param topic topic associated with the data
     * @param data serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception.
     * @return deserialized typed data; may be null
     */
    T deserialize(String topic, byte[] data);

    /**
     * Deserialize a record value from a byte array into a value or object.
     * @param topic topic associated with the data
     * @param headers headers associated with the record; may be empty.
     * @param data serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception.
     * @return deserialized typed data; may be null
     */
    default T deserialize(String topic, Headers headers, byte[] data) {
        return deserialize(topic, data);
    }

    /**
     * Close this deserializer.
     * <p>
     * This method must be idempotent as it may be called multiple times.
     */
    @Override
    default void close() {
        // intentionally left blank
    }
}

不过最好不要自定义序列化器,因为在实际工作中,开发水平良莠不齐,人员流动大,这种会增加生产者和消费者的耦合度的做法,最好慎重。如果实在需要自定义,可以考虑一些成熟的序列化工具,如:ProtoBuf。具体怎么使用请看官网:https://protobuf.dev/

5 消息消费

我们使用 springboot 集成的 kafka,使用上很方便,但是也屏蔽了很多细节,容易让很多刚接触 kafka 的人遇到问题时无从下手。本质上 spring 也只是封装了 kafka-client 而已。因此,这里会直接使用 kafka-client 来讲述消息如何消费的,再去看 spring 的源码就很容易理解并且知道要怎么做才能实现自己想要的功能了。
系统学习过 MQ 的应该知道消息消费一般有两种模式:

  • 推模式
    服务端主动推送给消费者
  • 拉模式
    消费者主动去服务端拉取消息
    Kafka 采用的是拉模式。

下面我们来看下如何使用 kafka-client 实现消费者代码

package com.example.czl.kafka.kafka.consumer;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.*;

/**
 * @author CaiZhuliang
 * @date 2023/7/3
 */
@Slf4j
@Component
public class MyKafkaConsumer {
    private static final int MAX_QUEUE_SIZE = 1000;

    @Value("${spring.kafka.bootstrap-servers:localhost:9092}")
    private String brokerServer;

    private final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1,
            new BasicThreadFactory.Builder().namingPattern("MyKafkaConsumer-schedule-pool-%d").daemon(true).build(),
            (r, e) -> {
                BlockingQueue<Runnable> queue = e.getQueue();
                if (null != queue && queue.size() > MAX_QUEUE_SIZE) {
                    log.warn("MyKafkaConsumer - 触发拒绝策略");
                    throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e);
                }
            });

    /**
     * 初始化构建 consumer 所需要的配置参数
     * @return Properties
     */
    public Properties initConfig() {
        // 下面是构建 KafkaConsumer 的必要参数
        Properties properties = new Properties();
        // 平常用 spring-kafka,key 和 value 的反序列化器 spring 会帮我们默认为 StringDeserializer,所以不用我们自己去指定了
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers", brokerServer);
        // 消费组ID(或者叫消费组名称),spring 会帮我们给默认值"",但是这里就需要自己赋值了
        properties.put("group.id", "myGroupId");
        // 消费者客户端ID
        properties.put("client.id", StringUtils.joinWith("-", "consumer", "myGroupId", 0, UUID.randomUUID()));
        return properties;
    }

    @PostConstruct
    public void pollMsg() {
        Properties prop = initConfig();
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop)) {
            // 订阅主题
            consumer.subscribe(Lists.newArrayList("test-topic-2"));
            // 定时拉取消息
            scheduledExecutorService.scheduleAtFixedRate(() -> {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
                for (ConsumerRecord<String, String> record : records) {
                    log.info("MyKafkaConsumer - topic : {}, partition : {}, offset : {}, key : {}, value ; {}",
                            record.topic(), record.partition(), record.offset(), record.key(), record.value());
                }
            }, 0L, 2L, TimeUnit.SECONDS);
        } catch (Exception e) {
            log.error("MyKafkaConsumer - 消费信息发生异常", e);
        }
    }
}

不管拉取消息的业务逻辑是怎样的,但是重点是一定都是用 KafkaConsumer 写实现。因此,我们需要关心 KafkaConsumer 都提供了哪些方法。下面挑一部分介绍:

  • poll(Duration timeout)
    该方法用于从 Kafka 服务器拉取一批消息。它会阻塞等待,直到有消息可用或超过指定的超时时间。返回的是一个记录集(ConsumerRecords)对象,包含了拉取到的一批消息记录。
  • subscribe(Collection<String> topics)
    该方法用于订阅一个或多个主题,让消费者开始接收这些主题的消息。可以通过传递一个主题列表来订阅多个主题。
  • subscribe(Pattern pattern)
    与上述方法类似,但使用正则表达式模式来匹配要订阅的主题。
  • assign(Collection<TopicPartition> partitions)
    该方法用于手动分配特定的分区给消费者。通过传递一个分区列表,消费者将仅消费这些分区的消息。
  • seek(TopicPartition partition, long offset)
    该方法用于将消费者的偏移量(offset)移动到指定的分区和偏移量位置。消费者将从指定的偏移量开始消费消息。
  • seekToBeginning(Collection<TopicPartition> partitions)
    该方法将消费者的偏移量移动到指定分区的起始位置(最早的可用偏移量)。
  • seekToEnd(Collection<TopicPartition> partitions)
    该方法将消费者的偏移量移动到指定分区的末尾位置(最新的可用偏移量)。
  • pause(Collection<TopicPartition> partitions)
    暂停从请求的分区获取数据。将来对 poll(Duration) 的调用将不会从这些分区返回任何记录,直到使用resume(Collection) 恢复这些记录。注意,该方法不影响分区订阅。特别是,当使用自动分配时,它不会导致组重新平衡。
  • resume(Collection<TopicPartition> partitions)
    恢复已通过pause(Collection)暂停的指定分区。如果有任何要获取的记录,对 poll(Duration) 的新调用将从这些分区返回记录。如果分区之前没有暂停,则此方法是无操作的。

5.1 顺序消费

实际工作中,我们有时会对消息的消费顺序有要求,这时我们就要做到顺序消费了。下面提供几个思路给大家参考:

  • 单分区单消费者
    前面说过了,Kafka 分区内是有序的,所以单分区单消费者是可以保证顺序消费的
  • 消息键(Key)
    前面有提到过当 Key 不为空时,会根据 Key 来确定 partition。因此,我们可以将需要有序消费的一类消息通过 key 归类,然后每个消费者只订阅一个分区即可

6 位移提交

每一条消息在自己的分区中都有一个唯一的标识——offset。对于消费者而言,消费者也有自己的 offset,大家要注意区分下。而当消费者消费完消息后,需要将已消费的消息offset提交(持久化在 zk 上)。
举个例子:x 表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置上的消息,该位置用 lastConsumedOffset 标识。如下图


消费位移.png

注意:这个例子中,如果消费者提交位移,那么提交的位移是 x+1,而不是 x!
实际工作中,我们消费消息时不对不面对两个问题:

  • 消息重复消费
    假设现在我拉取 [x+1, x+5],在消费到 x+3 时发生异常,我还没提交位移,那么下次再次拉取消息时依然会从 x+1 开始拉取,所以 x+1 和 x+2 就会重复消费。
  • 消息丢失
    假设现在我拉取[x+1, x+5],提交了位移,然后再去实际处理消息,处理到 x+3 时发生了异常,那么就会产生消息丢失。

6.1 自动提交位移与手动提交位移

相信大家已经想到了,这两个问题和我们提交位移的时机是密切相关的,那么我们是什么时候提交位移的呢?Kafka 通过配置 enable.auto.commit 控制是否自动提交位移,默认是 true;再由配置 auto.commit.interval.ms 控制自动提交的时间间隔,默认是 5000 毫秒。
对于需要精细粒度的场景,Kafka 的自动提交就无法满足了。这时我们可以关闭自动提交,手动提交位移,自己控制。手动提交位移的方法看下图

手动提交消费位移.png

基于微服务的思想,我个人认为服务就应该独立自理,自己的问题自己解决。既然 Kafka 简便完善的 API 给我们处理重复消费消息丢失这两个问题,我们不防在自己消费者端给出解决方案。如果实现良好,并且能做的比较通用,那么还可以抽出来做成公用组件。

可能这里会有读者提出了 Kafka 的幂等和事务,这两个后面也会讲,不过它们也是有自己的局限性的。总的来说,还是那句话,都独立服务了,自己的事情就自己处理。生产者需要保证自己百分百投递;服务端要自己做到高可用别且不丢数据;消费者自己做好幂等和百分百消费即可。

7 再均衡

再均衡是指分区的所属权从一个消费者转移到同一个消费组的另一个消费者的行为

再均衡为消费组的高可用性和伸缩性提供了保障,让我们可以安全地往消费组内添加或删除消费者。不过,在再均衡的期间,消费组的消费者是无法消费消息的。另外,再均衡也可能会导致消息重复消费,比如 消费者A 拉取了 分区a 5条消息,只消费了前两条并且还没提交消费位移,此时就发生了再均衡,分区a 交由 消费者B 消费,那么之前被 消费者A 消费过的两条消息会被 消费者B 重新消费。

当然,作为一个成熟的技术,Kafka 有预留口子让我们可以在再均衡之前或者之后做点事情,我们只需要利用 org.apache.kafka.clients.consumer.ConsumerRebalanceListener 即可。下面是示例代码

// 省略代码
Properties prop = initConfig();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop)) {
    consumer.subscribe(Lists.newArrayList("topicName"), new ConsumerRebalanceListener() {
                /**
                 * 这个方法会在再均衡开始之前和消费者停止读取消息之后被调用
                 * @param partitions 再均衡发生前所分配的分区
                 */
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    // do something
                }

                /**
                 * 这个方法会在再均衡结束之后和消费者开始读取消息之前被调用
                 * @param partitions 再均衡发生后所分配到的分区
                 */
                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    // do something
                }
            });
}
// 省略代码

8 消费者拦截器

要实现消费者拦截器,需要以下步骤(一样还是使用 kafka-client 来实现):

  1. 配置消费者拦截器(如果是在 springboot 上集成的,那么在 application.yml 上配置即可)

    // 注册拦截器
    props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
    
  2. 实现 org.apache.kafka.clients.consumer.ConsumerInterceptor

    import org.apache.kafka.clients.consumer.*;
    
    import java.util.*;
    
    /**
     * @author CaiZhuliang
     * @date 2023/7/9
     */
    public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
    
        /**
         * 这是在 KafkaConsumer.poll(java.time.Duration) 返回消息之前调用的该方法允许修改消息,在这种情况下将返回新消息。此方法可返回的消息数量没
         * 有限制。即拦截器可以过滤消息或生成新消息。此方法引发的任何异常都将被调用者捕获并记录,但不会往上传递。由于消费者可能运行多个拦截器,因此将按
         * 照 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG 指定的顺序调用特定拦截器的 onConsume() 回调。列表中的第一个拦截器获取需要消费的消息,
         * 后面的拦截器将传递前一个拦截器返回的消息,依此类推。
         * @param records 客户端要使用的记录或列表中先前拦截器返回的记录
         * @return 由拦截器修改的记录或与传递给此方法的记录相同的记录
         */
        @Override
        public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
            // 在消费消息之前拦截和处理消息
            for (ConsumerRecord<String, String> record : records) {
                // 对消息进行自定义处理
                String modifiedValue = record.value() + " - Modified";
                ConsumerRecord<String, String> modifiedRecord = new ConsumerRecord<>(
                        record.topic(), record.partition(), record.offset(), record.key(), modifiedValue);
                // 替换原始的消息记录
                records = new ConsumerRecords<>(records.records(record.topic()), modifiedRecord);
            }
            return records;
        }
    
        /**
         * 当提交偏移量时调用此方法。 此方法抛出的任何异常都将被调用者忽略
         * @param offsets key-partition of topic value-带有关联元数据的分区偏移量映射
         */
        @Override
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            // 在提交位移之前拦截
        }
    
        @Override
        public void close() {
            // 关闭拦截器时的清理操作
        }
    }
    

举个实战例子,我曾经在 YY 遇到一个这样的场景:我需要订阅主播开播后的实时截图通过 AI 识别判断后作为开播封面。当时流媒体有一个 topic 是可以满足我需求的,但是流媒体这个 topic 的应用场景过于丰富(因为流媒体对接了很多部门,所以场景较多,但是关于主播开播的,有很多都是放在了这个 topic 上,如 ai识别,5秒图等)。而我只需要 type 是 5秒图 的消息,这时就需要过滤消息了(实际情况是,我不仅只要5秒图,而且要过滤游戏主播,影视主播和黑名单主播)。

除此之外,如果我们想实现 消息TTL 的功能,也可以通过拦截器实现。

9 多线程

KafkaConsumer 是非线程安全的,不过 KafkaConsumer 所提供的 public 修饰的方法内部都会调用 acquire 这个方法

  • acquire()

    这个方法的注释:获取轻锁以保护该消费者免受多线程访问。然而,当锁不可用时,我们不会阻塞,而是抛出异常 java.util.ConcurrentModificationException

在 KafkaConsumer 中,acquire 和 release 是成对存在的。和我们平常使用 ReentrantLock 一样,用了 lock,肯定就会需要 unlock 释放锁。

我们工作中,一般都会使用默认配置,并且更多可能是使用 springboot 直接集成 kafka 使用,所以下图是我们最常用的一种消费方式:线程封闭(下图来自《深入理解Kafka:核心设计与实践原理》)


一个线程对应一个KafkaConsumer 实例.png

我们实际工作过程中,难免会遇到生产者发送消息的速度大于消费者消费的速度(上面提及的开播封面的例子就是如此),这时我们就需要多线程来提升消费者的消费能力了。我个人的一个优化方式如下图


Consumer多线程处理消息.png

10 重要的消费者参数

在 3.4 写消费者的示例代码时,已经体现了一些必要的配置,而实际工作中,除了这些必要的配置以外,还有很多配置,但是他们都有默认值,所以我们不太关心。只不过如果想更好地性能调优或者进行排障,就需要了解更多配置了。

  1. fetch.min.bytes

    该参数用来配置 Consumer 在一次拉取中拉取最小的数据量,默认值为 1 B。如果可拉取消息不能超过阈值,就会进行等待

  2. fetch.max.bytes

    该参数配置用来配置 Consumer 在一次拉取中拉取最大的数据量,默认为 52428800 B,即 50 MB

  3. fetch.max.wait.ms

    这个参数配合 fetch.min.bytes 一起使用。当可拉取数据量一直不超过阈值,不可能一直阻塞,所以会有一个最大等待时间,默认为 500 ms

  4. max.partition.fetch.bytes

    这个参数用来配置从每个分区里返回给 Consumer 的最大数据量,默认为 1048576 B,即 1 MB

  5. max.poll.records

    这个参数用来配置 Consumer 在一次拉取中可以拉取的最大消息数,默认为 500 条

  6. connections.max.idle.ms

    这个参数用来指定在多久之后关闭闲置的连接,默认是 540000 ms

  7. receive.buffer.bytes

    这个参数用来设置 Socket 接收消息缓冲区的大小,默认为 65536 B,即 64 KB。如果设置为 -1,则使用操作系统的默认值

  8. request.timeout.ms

    这个参数用来配置 Consumer 等待请求响应的最长时间,默认为 30000 ms

  9. metadata.max.age.ms

    这个参数用来配置元数据的过期时间,默认为 30000 ms。如果元数据在这个指定的时间内没有更新,则会强制更新,哪怕最终结果是没有任何变更

  10. reconnect.backoff.ms

    这个参数用来配置尝试重新连接指定主机之前的等待时间,避免频繁地连接 broker,默认为 50 ms

  11. retry.backoff.ms

    这个参数用来配置尝试重新发送失败的请求到指定的主题分区之前的等待时间,默认为 100 ms

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容