(五)kafka生产 & 消费问题
A. 重平衡rebalance
1. 基础定义
- 触发机制
- 消费组consumer group内成员数发生变化
- 分区partition数目发生变化
- coordinator节点故障(消费者会在所有broker中选一个作为consumer group的coordinator,用于保存consumer提交的offset)
- 重平衡rebalance实现消费组内成员的故障转移
- 消费组内有a,b,c三个消费者,分别消费partition 0,1,2;
- 若消费者a故障则partition 0就不会被消费,这不可被接受,此时需要使用rebalance来解决该问题;
- rebalance会将partition 0 分给剩下正常的消费者(b c中的一个);
- 重新分配后就完成了消费者a的故障转移。
- 重平衡rebalance实现动态分区分配
- 某个topic的partition数目新增(理论上partition不会减少),原本3台brokers,该topic共计6个partitons,现在新增到12个partitions;
- 消费组内的成员和partition的对应关系需相应进行变化,不然新增的6个partitions无人消费;
- 通过rebalance,将新增的6个partition分配给消费者,也可通过新增组内消费者(新拉起线程)。
2. 重平衡具体流程
- 重平衡rebalance的宏观过程
- empty状态:该状态即消费组新建,内部尚无消费者,出现的第一个消费者后会发送FIND_COORDINATOR以开启首次rebalance;
- preparing rebalance状态:消费者客户端确认自己的coordinator后,向其发出JOIN_GROUP的请求,coordinator收到后将整体状态从empty变为preparing rebalance;
- completing rebalance状态:当组内所有的消费者都发送完JOIN_GROUP的请求后(或者rebalance超时),状态转化成completing rebalance;
- stable状态:kafka broker返回所有JOIN_GROUP的响应后,leader客户端收到后发送SYNC_GROUP。broker收到该请求后,状态就设置成stable,完成reblance。
- 重平衡rebalance的微观过程
- FIND_COORDINATOR时,会带上自身的group.id,计算其coordinator在哪里(coordinatorId=groupId.hash % 50),获得的是具体的分区号;
- broker响应消费者客户端的JOIN_GROUP请求时,会选一个作为消费者客户端的leader并告知它就是leader,给它发送元数据,让leader进行分配分区;
- 在发送SYNC_GROUP请求时,leader会把分区分配方案告知broker,其余消费者客户端发送SYNC_GROUP时,直接返回他们分配结果,告知对应消费内容;
- 为何分四个状态:首次请求需要3次(find,join和sync),但是正常运行的consumer仅需2次(join和sync,他们知道coorinator在哪里),除非是coordinator牺牲导致的reblance,那么老的consumer也要3次请求。
3. 线上问题排查 & 修复
- 线上排查重平衡rebalance
- 概述:触发只有三个可能,需线上排查的场景一般为消费者减少导致的rebalance,其他两个较容易发现(新增partiton是人为的,coordinator down的话会有告警)。
- 消费者减少 ———— 心跳超时
- 认为消费者牺牲了,一般是因为消费者机器负载过高,其cpu无暇发送心跳;
- 心跳超时有onExpireHeartbeat的日志打印
- 消费者减少 ———— poll间隔超过配置
- poll即为拉取数据频率,间隔超出配置是因为poll出数据后业务处理过慢,要么优化业务逻辑,要么多线程消费、异步处理;
- 较依赖排除法判断,若消费者减少且没看到是心跳超时日志,基本就是poll间隔超过配置。
- 消费者如何感知重平衡rebalance
- 概述:消费组有关的metadata都在coordinator中,消费者的心跳(heart beat)和消费位置(offset commit)都会和coordinator交互。
- 心跳:每次提交心跳都会带上group的纪元值(类似epoch),每次kafka broker认定完成一次rebalance纪元值都会+1,若果“心跳计数器”对不上就会开启重平衡。
- 提交消费: 过程和心跳是一样的。主要是依靠状态和纪元来进行感知,状态生效于rebalance过程中,纪元则在join完成阶段后生效。
- 如何减小重平衡rebalance的副作用
- 概述:rebalance类似于java中的stw,消费者发送join group时是停止消费的。
- 降低影响面的手段
- 一个consumer group中避免有过多的consumers,做好隔离分类工作;
- 避免消费者consumer变动过于频繁;
- 对consumer做好cpu的监控,避免高负载导致心跳丢失;
- 设置合适的poll间隔,建议使用异步处理poll出来的信息;
- 配置静态消费者(static member),降低rebalance频次(具体参考博文)。
B. 生产者producer客户端框架
1. 基础定义
- 生产者结构和模式
- producer仅有两个类别的线程组成,业务线程(主线程) & sender线程(网络线程)。
- producer把消息推送至broker,其默认发送的方式是同步发送。
- producer初始化过程
- 代入公共参数,例如
maxRequestSize, totalMemorySize, compressionType
等; - 设置主线程的三个组件——拦截器、序列化器 & 分区器;
- 启动网络线程sender;
- 注意:初始化阶段producer不会同broker建立连接,在首次调用send方法时才会检查对应的partition的metadata,从metadata中获取目标partition的leader副本的broker地址,开始建立通信。
- 代入公共参数,例如
2. 生产者Producer具体工作流程
- 主线程
- 概述:三个组件拦截器、分区器 & 序列化器承担其主要功能,负责处理要发送的业务数据,同时承担batch批处理和RecordAccumulator缓冲区。
- 拦截器:处理业务数据过程中,通过实现producerInterceptor方法定制需要的拦截效果。
- 序列化器:把需要发送的数据按照指定的序列化器进行序列化。
- 分区器:将发送的数据进行分区分配(消费者的分区分配策略主要应用于rebalance场景下,与该分区分配场景不一致),一般情况下会根据默认分区器的算法决定该消息数据发往哪个partition,主要分为是否指定key的场景。
- 指定key:根据hash计算key后确定发往哪个partition,潜在破坏kafka负载均衡的可能性,致使topic的分区不平均,broker承载量差异偏大。
- 未指定key:根据当前存活的分区进行轮询,属默认情况下的方案。
- batch:顺序完成上方三个步骤后消息会按照不同的分区打包,以batch形式发送以提升网络利用率。
-
linger.ms
:等待组包的最大时长,默认值为0,即不等待直接发送,若对时延不敏感可设置为10ms。 -
batch.size
:组包的大小限制,默认是16kb。注意,必须调节linger.ms
的默认值以后再调整batch.size
才有意义。
-
- RecordAccumulator:本质就是一个内存队列,是线程安全的map,将上一步获得的batch暂存一段时间等待后续sender线程来调用,默认参数
buffer.memory
为32M。
- 网络线程sender
- 工作流程
- 拉取数据:从RecordAccumulator缓冲区拉取batch,根据partition把batch封装成request;
- 查询目标broker:查询metadata确认去哪个broker,通过socketChannel将request发送至目标broker,broker由网络模型中的accpetor接收request,通过processor线程一路往下(细节参考第四章节的kafka服务端网络通信模型);
- 得到request的结果(success, fail, timeout)后,将结果回调告知producer;
- 关键参数
-
request.timeout.ms
:sender每次获取RecordAccumulator中的首个batch,其余保持等待,若超过request.timeout.ms就被清除。 -
max.in.flight
:设置异步发送时生效,request发送给socketChannel前先送至inFlightRequsts队列中,同时把request给到selector去发送,inFlightRequsts用于记录给哪些broker发送了多少request。该队列本质是hashmap的结构,且sender是单线程,不用在意线程安全问题。参数max.in.flight.requests.per.connection
会限制producer发出的请求,默认为5。当5个消息发出后,producer没有收到broker的响应,那么就会抛出异常。 -
selector
:nio中的多路复用器,kafka改进后进行了封装,其功能是将request通过send方法最终发送出去。 - 回调:数据发送后,等待响应。max.in.flight中超时的请求,也会被处理成response。
-
- 工作流程
3. 增加Producer吞吐量手段
- 单对象:kafka-java-producer的客户端(即主线程部分)是线程安全的,所以使用时只创建一个producer。
- 异步发送:若没有严格的消息丢失、消息顺序要求,可以选择异步发送(同步的话max.in.flight这个参数是不起效的),并且ack选择1降低等待成本(仅需确认leader收到即可)。
- 调整缓冲区大小:recordAccumulator的
buffer.memory
可以从默认的32m调整到64或者128m,让更多的消息组包成batch后可以被塞入这个内存队列。 - 优化batch:首先修改
linger.ms
,默认是0;其次可以修改batch.size
确保个别较大的消息可以塞入,该操作可以增加吞吐量但会增加时延,业务对时延敏感的不建议调整。 - 回调处理优化:sender发送完毕后会执行业务的回调以获取发送成功与否的反馈,这部分回调函数的逻辑是开发优化重点。
- jvm参数优化:使用jstat查看jvm gc情况,根据实际调整,防止频繁fullGC。
- 排查顺序:排查吞吐量偏低的优先级为回调函数 > 上方罗列的生产者参数 > jvm参数 > 网络问题 > kafka broker配置。
C. 消费者consumer客户端框架
1. 基础定义
- 消费者结构概述
- 作为broker的下游,其重要功能是保障及时消费、rebalance分区分配 & 保证coordinator交互的leader完成这一动作等。
- 核心链路涉及四个步骤:初始化、消息消费、位移提交 & 心跳。
- 消费者consumer初始化
- 基础参数代入设置,例如clientID & groupID等。
- 拦截器 & 序列化模块的初始化。
- 网络化模块初始化。
- 协调器(coordinator)初始化。
- 消息拉取组件(fetcher)初始化。
2. 消费者Consumer具体工作流程
- 消息消费步骤拆分
- 订阅:初始化consumer完成后,在消息消费前需要订阅topic list,具体有三种方式。
- 手动指定一个topic列表给消费者,使用
consumer.subscribe(Collections.singletonList(this.topic))
。 - 强制指定目标消费分区,丧失消费组特性,使用
consumer.assign(partitions)
,此举会影响rebalance故障转移的效果,不建议使用。 - 使用正则匹配topic并可设置rebalance的回调方法,一般用于灵活匹配topic列表时,具体方法为
consumer.subscribe(pattern, callback)
。
- 手动指定一个topic列表给消费者,使用
- 消费 —— poll整体流程
- 整体逻辑就是基于while(true)这个循环进行消费,poll(timeout)来控制具体细节,即在poll过程中若所有对应的分区没有未消费的消息,则broker最多等待时长为timeout的数值;
- poll若等待达到了timeout时长,仍未有新消息写入则返回一个空值并进去下轮poll;若正常读取到新数据,则立刻拿到数据返回并准备下轮拉取;
- 一般poll的timeout设置在100ms - 1000ms之间,不建议太短,可能导致broker高频处理消费者请求(实际都是空值,因为producer没写入),建议设置为500ms。
- 消费 —— 核心函数pollOnce(remaining)
- 激活coordinator:利用Find_Coordinator方法确认自身对应的coordinator,随后激活消费组(即启动心跳与coordinator交互,发送join_group),最后判断是否执行自动提交。
- 从缓存队列completeFetchs拉取数据并返回。
- 缓存中若无数据则发送fetch请求,后续会将数据存在缓存队列中。
- 订阅:初始化consumer完成后,在消息消费前需要订阅topic list,具体有三种方式。
- 位移提交:消费完需要让broker知道,通过commit offset(具体入口也在pollOnce),具体有三种方式。
- 自动提交:保证高吞吐但可能出现重复消息,开启
enable.auto.commit=true
,每5s提交一次。 - 异步提交:兼顾可靠性和吞吐量,是kafka consumer默认的提交位移方式,每消费完就发送一条commit_offset请求,但不等待broker的响应。
- 同步提交:保障消息不丢失但吞吐量降低,每消费完就发送一条commit_offset请求,需等待broker回复后才可继续消费。
- 自动提交:保证高吞吐但可能出现重复消息,开启
- 心跳:心跳不启动于初始化,启动于poolOnce阶段的首次poll方法。该线程是守护线程故priority较低,cpu负载高时可能会抢占不到资源导致心跳中断。心跳线程具体工作内容如下:
- 检测心跳是否超时,若超时改coordinator为unknown,下一轮poll开启rebalance。
- 检测两次poll的间隔是否大于max.poll.interval.ms,默认是5分钟,超过了也会发出leaveGroup请求,于下轮poll开启rebalance。
- 检测是否到了心跳间隔时间,若没到就设置重试间隔为100ms。
- 时间到后,即发送心跳请求,并设置对应listener。