kafka 本身确保消息不丢失,有前提条件:
对于已提交到 kafka 的 broker 中的数据,实现持久化的操作,确保消息不丢失
对于可能出现的消息丢失 & 解决方法:
1.producer 端:
- 使用一些无回调的 API 进行数据发送,消息发送后不进行校验;java - send() 函数
解决:推荐使用有回调的 API 进行处理,确保消息正常发送到 kafka - 设置重试机制,producer 端设置重试,对于未正常发送的数据,需要设置重试
- 设置 ack 参数,如果设置成all,则表明所有副本Broker都要接收到消息,该消息才算是“已提交”。消息正常被kafka 的所有副本 broker 处理后返回
2.broker 端:
- 设置参数:unclean.leader.election.enable = false,对于一个Broker落后原先的Leader太多,那么它一旦成为新的Leader,必然会造成消息的丢失。故一般都要将该参数设置成false
- 设置replication.factor >= 3。这也是Broker端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
- 设置min.insync.replicas > 1。这依然是Broker端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于1可以提升消息持久性。在实际环境中千万不要使用默认值1。
3.consumer 端:
- 确保消息消费完成再提交。Consumer端有个参数enable.auto.commit,最好把它设置成false,并采用手动提交位移的方式。