1.消费者和消费者组的关系
Kafka并不删除已消费的消息,为了实现传统Message Queue消息只被消费一次的语义,Kafka保证每条消息在同一个Consumer Group里只会被某一个Consumer消费。与传统Message Queue不同的是,Kafka还允许不同Consumer Group同时消费同一条消息,这一特性可以为消息的多元化处理提供支持。
消费者组
测试:创建一个Topic (名为topic1),再创建一个属于group1的Consumer实例,并创建三个属于group2的Consumer实例,然后通过Producer向topic1发送Key分别为1,2,3的消息。结果发现属于group1的Consumer收到了所有的这三条消息,同时group2中的3个Consumer分别收到了Key为1,2,3的消息,如下图所示。
消费者组测试
2.kafka消费者Consumer Rebalance
(1)Kafka的Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:
渐进式rebalance
第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。
第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。
(2)以下的场景会触发Consumer Rebalance操作:
①新的消费者加入Consumer Group;②有消费者主动退出Consumer Group;③Consumer Group订阅的任何一个Topic出现分区数量的变化。
(3)默认情况下,Kafka提供了两种分配策略:Range和RoundRobin。
Range策略:①对一个topic中的partition进行排序;②对消费者按字典进行排序;③遍历排序后的partition的方式分配给消费者。
例子:有两个消费者C0和C1,两个topic(t0,t1),每个topic有三个分区p(0-2),那么采用Range策略,分配出的结果为:
C0: [t0-p0, t0-p1, t1-p0, t1-p1]
C1: [t0-p2, t1-p2]
RoundRobin策略:RoundRobin策略和Range策略类型,唯一的区别就是Range策略分配partition时,是按照topic逐次划分的。RoundRobin策略则是将所有topic的所有分区一起排序,然后遍历partition分配给消费者。
例子:有两个消费者C0和C1,两个topic(t0,t1),每个topic有三个分区p(0-2),那么采用RoundRobin策略,分配出的结果为:
C0: [t0-p0, t0-p2, t1-p1]
C1: [t0-p1, t1-p0, t1-p2]
(4)Kafka对消息的分配是以Partition为单位分配的,而非以每一条消息作为分配单元。这样设计的劣势:无法保证同一个Consumer Group里的Consumer均匀消费数据。优势:①每个Consumer不用都跟大量的Broker通信,减少通信开销;②降低了分配难度,实现也更简单;③因为同一个Partition里的数据是有序的,这种设计可以保证每个Partition里的数据可以被有序消费。
如果某Consumer Group中Consumer实例数量少于Partition数量,则至少有一个Consumer会消费多个Partition的数据,如果Consumer的数量与Partition数量相同,则正好一个Consumer消费一个Partition的数据。而如果Consumer的数量多于Partition的数量时,会有部分Consumer无法消费该Topic下任何一条消息。
3.非线程安全的Kafka Consumer实现多线程消费
(1)线程封闭,即为每个线程实例化一个Consumer对象。说明:一个线程对应一个Consumer 实例,称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。
封闭线程消费
(2)消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的Consumer实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。
线程池消费
4.指定offset或者timestamp消费数据
指定offset:Kafka是通过seek() 方法来指定消费的,在执行seek() 方法之前要去执行一次poll()方法,等到分配到分区之后会去对应partition的指定offset开始消费。
指定timestamp:Kafka的offsetsForTimes() 方法,通过timestamp 来查询与此对应的分区位置。offsetsForTimes() 方法的参数 timestampsToSearch 是一个Map类型,key为待查询的partition,而 value为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳。
5.消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。
当前消费者需要提交的消费位移是offset+1。
6.造成重复消费的场景
(1)Rebalance:一个consumer正在消费一个partition的消息A,还没有消费完,发生了rebalance(加入了一个consumer),从而导致消息A没有消费成功,rebalance后,另一个consumer又把这条消息消费一遍。
(2)消费者端手动提交:如果先消费消息,再更新offset位置,导致消息重复消费。
(3)消费者端自动提交:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
(4)生产者端:生产者因为业务问题导致的宕机,在重启之后可能数据会重发。
7.造成消息漏消费的场景
(1)自动提交:设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
(2)生产者端:发送消息设置的是fire-and-forget(发后即忘),它只管往 Kafka 中发送消息而并不关心消息是否正确到达。不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。
(3)消费者端:先提交位移,但是消息还没消费完就宕机了,造成了消息没有被消费。自动位移提交同理acks没有设置为all。如果在broker还没把消息同步到其他broker的时候宕机了,那么消息将会丢失。