上一节讲到了如何通过构建ProducerRecord对象,选择对应的序列化方式发送数据到Kafka,这一节我们来讲如何创建一个消费者,然后消费指定topic的对象。
一、消费者、消费者组和分区再均衡
1.消费者和消费者组
首先要理清消费者和消费者组这两个概念。Kafka官网上对于这两个概念有一段言简意赅的表述:
Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.
这里我总结了大概有几个意思:
1.消费者组是一群具有相同标记的消费者实例的集合
2.一个消费者组只消费一次来自同一个topic的同一条记录。假设topicA有记录A,意味着一个消费者组内如果有三个消费者,那么他们只有一个消费者能消费到这条记录A。
3.每个消费者实例可以部署在同一台机器的不同进程上,也可以部署在不同机器上。
上面我们理清了消费者和消费者组的关系,那么消费者组和主题之间,或者说消费者和主题分区之间又存在什么联系呢?
这里我们用《Kafka权威指南》给我们列举的几种情况。
主题T1有4个分区,消费者群组1订阅了主题T1,因为群组1内消费者实例只有一个,那么这个消费者1实例将消费T1内分区0-3的消息。
如果此时消费者群组1内的消费者实例增加到2个,则由再均衡监听器给两个消费者实例分配要消费的分区,这里可以看到2 个消费者收到4 个分区的消息。
如果群组G I 有4 个消费者,那么每个消费者可以分配到一个分区。
如果我们往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。
在上面的例子里,如果新增一个只包含一个消费者的群组G2,那么这个消费者将从主题T1 上接收所有的消息,与群组G1 之间互不影响。群组G2 可以增加更多的消费者,每个消费者可以消费若干个分区,就像群组G1 那样。
2.分区再均衡
再均衡:分区的所有权从一个消费者转移到另一个消费者,这种行为成为再均衡。
消费者通过发送心跳给群组协调器(broker)维持他和分区之间的联系。一旦消费者停机,或者有新的消费者实例加入,协调器会触发一次再均衡,然后由群主(一般是第一个加入消费者组的消费者实例)给每一个消费者分配分区。
二、创建和使用消费者
1.创建(配置)
创建的过程其实和创建生产者的步骤很相似,就是new一个KafkaConsumer对象,在此之前先在Properties类设置好配置参数,一般必填的有bootstrap.servers,group.id,key.deserializer和value.deserializer。
除了上面这些主要配置,还有一些比较重要的配置项。
enable.auto.commit:该属性指定了消费者是否自动提交偏移量,默认值是true 。为了尽量避免出现重复数据和数据丢失,可以把它设为false ,由自己控制何时提交偏移量。如果把它设为true ,还可以通过配置auto.commit.interval.ms属性来控制提交的频率。
partition.assignment.strategy:PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka有两个默认的分配策略:Range和RoundRobin。Range会把主题的若干个连续的分区分配给消费者。RoundRobin把主题的所有分区逐个分配给消费者。
2.订阅主题
创建好消费者后订阅主题,用subscribe方法添加主题列表。
consumer.subscribe(Collections.singletonList("customerCountries"));
3.轮询
消息轮询是消费者API的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据。
三、提交和偏移量
每次调用poll()方法,它总是返回由生产者写入Kafka 但还没有被消费者读取过的记录,我们因此可以追踪到哪些记录是被群组里的哪个消费者读取的。消费者可以使用Kafka 来追踪消息在分区里的位置(偏移量)。消费者可以手动或者自动提交分区最近消费的位置,这个消息记录会保存在一个叫作_consumer_offset 的特殊主题上,记录带上了消费者实例在每个分区的偏移量。
这种提交方式会在再均衡之后导致两种情况的产生,如下所示:
如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
KafkaConsumer API 提供了很多种方式来提交偏移量。
1.自动提交:消费者自动提交偏移量。如果enable.autocommit被设为true ,那么每过5s消费者会自动把从poll() 方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms 控制,默认值是5s。
2.提交当前偏移量:消费者API 提供了另一种提交偏移量的方式, 开发者可以在必要的时候提交当前偏移盘,而不是基于时间间隔。这里只需要设置auto.commit.offset=false。这里使用commitSync() 方法,它会一直尝试直至提交成功。
3.异步提交:弥补手动提交导致应用程序阻塞的情况。但是无法避免重复提交偏移量导致重复消费的问题(因为没有重试机制)
4.同步和异步组合提交:可保证在关闭消费者或者再均衡前的最后一次提交能成功(有同步提交作为重试)
5.提交特定偏移量:消费者API 允许在调用commitSync()和commitAsync()方法时传进去希望提交的分区和偏移量的map。
下面是具体的实现例子:
四、再均衡监听器
新建一个ConsumerRebalanceListener实例类,实现两个方法供消费者调用:
(1) public void onPartitionsRevoked(Collection<TopicPartition> partitions)方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
(2) public void onPartitionsAssigned(Collection<TopicPartition> partitions)方法会在重新分配分区之后和消费者开始读取消息之前被调用。
五、从特定偏移量处开始处理记录
如果你想从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息, 可以使用seekToBeginning(Collection<TopicPartition> tp) 和seekToEnd(Collection<TopicPartition> tp)这两个方法。
六、如何退出
如果确定要退出循环,需要通过另一个线程调用consumer.wakeup()方法。如果循环运行在主线程里,可以在ShutdownHook 里调用该方法。要记住, consumer.wakeup() 是消费者唯一一个可以从其他线程里安全调用的方法。
七、反序列化器
反序列化器可以使用Kafka默认的基本类型反序列化器如StringDeserializer,IntDeserializer,可以使用ArvoDeserializer,也可以自定义反序列化器类。