最近负责的短信项目业务量增长较多,使用kafka异步发送短信的过程遇到了一些问题,在业务和线上环境上具体的表现就是:
- 外部请求一直得不到处理,kafka的消息出现大量积压。
- 部分请求被反复处理,其对应的kafka消息被反复消费。
对于发送短信这种实时性要求较高的项目,五分钟以前的请求未得到处理的话,基本就已经晚了。不管短信内容是数分钟有效的验证码,还是运维的监控告警,五分钟的延迟都会带来很多问题。
因此,如果可以容忍消息丢失,这时候可以用以下方法止血:
- 更新consumer的消费组,这样旧的积压的消息就不会出现在新的消费组上,新的消费组在创建时就以kafka的LOG-END-OFFSET作为当前偏移量。
- 更新对应的topic,让生产者将新的消息发送到新的topic上,消费者也消费新的topic。
但是,消费者的消费积压一般都是因为消费者自身有问题,可以参考如下文章,本次线上问题和下面文章描述的情况也基本一致:(链接已删除)
下面记录一下项目的问题排查和复现、解决过程。
问题排查和复现
1、消费积压和重复消费出现
kafka消费积压情况:(这时候消费者已经不行了,一直在消费消息,但是位移无法提交,因此在重启生产环境的服务)
后来拿到消费者的服务日志,发现对很多消息存在反复消费的情况:
2、问题排查
2.1、消费者消费缓慢的原因
通过检查消费者日志,发现在消费一条消息时,消费者需要2次更新MongoDB数据库,而每次更新都会花费一秒钟以上。于是检查对应的数据库,发现单个集合数据量已经达到了170万条,且没有添加任何索引,这不慢才怪了。
于是将集合做了归档(rename),随即新建了带有索引的新集合,更新MongoDB的时间缩短到了0.01s,线上问题初步解决。
2.2、kafka offset无法更新的原因
问万能的chatGpt和阅读上面的文章,基本确认是消费者单次拉取消息过多,在默认时间内未完成处理,触发集群rebanlance导致的,如下文:
消费超时会发生什么?
Kafka Handle Error, Client Will Seek Soon: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
报错信息非常良心,简单解释下:
集群以为消费线程挂了,触发了rebanlance(这一批已经给别的消费者线程消费了)。当前消费者线程业务逻辑执行完了再去同步游标报错了,没有提交成功,这就导致了两个消费者线程把同一批消息消息了两遍。
kafka消费者超时解决方案_kafka消费超时_7im0thyZhang的博客-CSDN博客
如果超过了 [max.poll.interval.ms] 所设置的时间,就会被消费组所在的 coordinator 剔除掉,从而导致重平衡,Kafka 重平衡过程中是不能消费的,会导致消费组处于类似 stop the world 的状态下,重平衡过程中也不能提交位移,这会导致消息重复消费从而使得消费组的消费速度下降,导致消息堆积。
kafka消息堆积原因解析kafka消费堆积鸭梨山大哎的博客-CSDN博客
2.3、问题复现
1、在消费者程序中加入线程休眠模拟处理时长(5s);
2、配置max-poll-records参数为500(默认值),使消费者可以单次拉取较多的消息;
3、通过本地工程短时间循环请求接口600次;
4、检查消费者日志和消费积压情况;
消费积压情况:消费者的kafka offset在更新几个之后,逐渐卡在3035,不再更新,如下图
随后,获取消费者服务日志,检查也发现了重复消费的情况。
解决重复消费问题的解决办法验证:
1、配置max-poll-records参数为10,使消费者可以单次最多仅可拉取10条消息
2、在不改变消费者消费能力(速度)的情况下,检查消费情况
3、发现消费积压逐渐减少,且减少的步长就是配置的max-poll-records的值(10)(如下图),说明只要 单次拉取的消息数 * 处理时间
< kafka的超时阈值[max.poll.interval.ms]
(默认五分钟) 即可保证消费结果可成功提交;
3、问题解决
首先,要提升消费者消费能力,涉及到数据库的就添加索引和缓存,涉及到网络连接的考虑添加连接池等;
其次,kafka的相关配置一定要根据自己的需要及时修改,避免一次拉取过多消息无法处理。反复触发rebanlance的话,位移就一点都不会更新了。也就是卡死了。
最后,可以根据需求为kafka配置多线程消费,要注意为topic配置多个分区,注意上游消息生产者不要使用完全一样的key(否则消息无法发送到多个分区),然后再为消费者配置多线程的
KafkaListenerContainerFactory
,注意代码做好线程安全修改即可。