1、消费组:分区只能被组中一个消费者消费;消费者>分区个数,有空闲
2、分区策略:1)界限分配(默认) 2)轮询分配 3)粘性分配(均匀、尽可能与上次相同)
3、再均衡:分区所属变,删除/添加消费者 1)弊端:不可读、慢、重复消费 2)避免
4、再平衡全流程:1)状态机 2)流程
一、消费组
消费者只消费配到分区中消息。每个分区只能被一个消费组中一个消费者消费
分区固定,一味增加消费者,并不会提升消费能力,如消费者个数大于分区个数,就有消费者配不到分区。如:一共有8个消费者,7个分区,C7空闲
二、消费端分区策略
partition.assignment.strategy 设置分区分配策略(消费者与主题间)
1、RangeAssignor界限分配(默认)
原理:(消费者+分区)总数整除获得跨度,按跨度平均分配给所有消费者。订阅主题消费者按名称排序,每个消费者固定分区范围。如不够平均分,靠前消费者多分一个分区。
组内消费者 C0 和 C1,都订阅主题 t0 和 t1,每个主题4个分区,结果为:
3个分区,并不均匀
2、RoundRobinAssignor轮询分配
组内消费者及订阅topic分区按字典排序,轮询方式 分区逐个分每个消费者。
1)如订阅相同,分配均匀。2)不同,就不完全轮询,可能不均匀。
例:3个消费者(C0、C1 和 C2),t0、t0、t1、t2主题分别3个分区,消费组订阅 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 6个分区。
C0 订t0,C1 订t0 和 t1,C2 订 t0、t1 和 t2,最终结果:
3、StickyAssignor粘性分配
1、目的:分配均匀、尽可能与上次分配相同
2、例:消费者C0、C1 和 C2,订阅t0、t1、t2、t3,每个主题有2个。消费组订阅 t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1 8个分区:
C1 脱离消费组,“黏性”:尽可能让前后两次相同,减少损耗及异常:
三、再均衡(Rebalance)
分区所属权转到另一消费者,安全删除/添加消费者
1、弊端
1)发生期间,组内消费者无法读消息
2)慢,组里几百个Consumer,Rebalance几小时
3)丢失消费者当前状态:如消费完没提交位移就再均衡,分给组内另一消费者,重复消费
2、Rebalance 发生时机
1)组成员数、2)订阅topic数、3)topic分区数变化 4)gc:Consumer 端频繁Full GC导致长时间停顿,也会引发 Rebalance
组成员数引发Rebalance如何避免(后两类业务调整导致,不可控):
1)设置session.timeout.ms,默认10 秒没有收Consumer 心跳,认为挂
2)设置heartbeat.interval.ms,心跳请求频率,心跳间隔时间
3)设置max.poll.interval.ms,Consumer 端应用程序调用 poll()时间间隔,5 分钟(默认)无法消费完 poll 方法返回消息,Consumer 发起“离开组”请求,Coordinator 开启Rebalance
ps:GroupCoordinator 管理消费组、负责再均衡。
四、消费者组再平衡全流程
过程:靠消费者端心跳线程(Heartbeat Thread),通知其他消费者
“REBALANCE_IN_PROGRESS”装进心跳请求,发给消费者。消费者发现心跳响应中包含“REBALANCE_IN_PROGRESS”,知道重平衡开始
1、消费者组状态机
Broker 的协调者完成重平衡,状态机(State Machine)实现
1)5 种状态:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。
2)流转:加入/退时,Stable 跳PreparingRebalance 状态,现存成员重新申请加入组。
Kafka 定期自动删除/过期位移条件就是组Empty 状态。如消费者组停掉了很长时间(超过 7 天),Kafka 很可能就把该组的位移数据删除了。
2、消费者端重平衡流程
消费者端,重平衡两个步骤:加入组和等Leader消费者分配。JoinGroup和 SyncGroup请求
1.加入组:
1)向协调器发JoinGroup请求,2)上报订阅topic,这样协调器就可收集所有订阅信息
2.选择消费组领导者
1)协调者选领导者,2)收集所有成员订阅信息,3)制定分区消费分配方案
3.选举分区分配策略
消费者投票决定
1)协调器会收集支持,组成候选集candidates。2)消费者从候选集candidates 中找出第一个自身支持策略,投票。3)计算选票数
如消费者不支持报出异常 IllegalArgumentException:Member does not support protocol。
4.发送 SyncGroup 请求
1)协调器把消费者组订阅信息,装进 JoinGroup 请求响应体中,2)发给领导者统一做分配方案,3)领导者发送SyncGroup请求给协调器
5.响应SyncGroup
消费者发送SyncGroup请求,领导者请求内容为空,接收一个SyncGroup响应,接受订阅信息
https://mp.weixin.qq.com/s/C6dfvzFkNDYgiNeZ4eWPBQ