(八)kafka消息顺序健壮机制
A. 消息顺序
- 消息顺序写入的定义
- 仅保证每个partition内的messages是顺序的。
- 基于向partition写入数据是以append的模式追加的,故而可以确保其顺序性。
- 不保证topic维度下的全部顺序。
- 保障顺序消费的手段
- producer需要保证相关消息写入同一个partition,以次保障顺序,比如一个订单下的多条消息需要进入同一个partition。
- broker可利用只创建一个partition来强制保证,但此举会严重损失吞吐能力。
- consumer需保证一个线程消费一个partition的对应关系,注意如下问题:
- 单线程时,消费顺序必然是顺序的,但是消费能力不足导致消息积压;
- 多线程时,避免消息直接被扔入pool,把消息根据策略放入内存队列中,队列下游用单线程方式拉取消费。基本策略有两种,record.partition() & record.key(),前者拿到分区id,丢入对应的内存队列;后者拿到上游设置的key,hash后丢入对应的内存队列。
B. 消息不丢失
- 丢失消息的潜在场景
- producer的配置问题
- 设置acks为0,即不等待partition leader副本所在broker应答,存在发送失败的隐患。
- 设置retries为0,那么即使acks = 1或-1,也不会进行重发的尝试,仍然存在丢失风险。
- broker的配置问题
- 配置replics为1,则无备份资料,若单节点down后数据就会丢失。
- 未设置数据强制刷盘,单节点机器重启会丢数据。
- 未设置min.insync,replicas=2,isr某个时刻列表中若只有1个副本,该副本所在broker该时间段内宕机则会丢数据。
-
设置允许非isr内副本作为leader,即设置
unclean.leader.election.enable=true,亦会丢失数据。
- producer的配置问题
- 保障手段
- producer优化
- acks设置为all,确保所有副本所在的broker都收到message。
- retries设置为Integer.MAX_VALUE,保证重试次数。
- broker优化
- replicas设置大于2(默认3),保证多节点存储数据。
- 使用
log.flush.interval.messages=1确保强制刷盘,默认不控制。 -
min.insync,replicas=2保证至少isr中有2个及以上存活副本,关闭unclean.leader.election.enable。
- consumer优化:关闭自动提交(
enable.auto.commit为false),使用异步或者同步提交。 - 注意点:开启上述所有保障手段后,吞吐量会呈现出恐怖的下降趋势。
- producer优化
C. 消息不重复(消息幂等)
- 常见策略
- 基于DB唯一键:消息内容拼成唯一key。创建幂等表,就两列id & key。
- 基于缓存:判断key是否在缓存中,若在缓存就跳过,不在则存入后处理。
- 常见场景
- producer遇到网络抖动,broker已经写入数据,但producer以为自己超时重发了信息。
- consumer若一次拉取500条messages,业务处理了200条,虽然consumer自己down了(未提交消费位移200),那么原来的200会被重复消费。