CommitFailedException异常处理

所谓 CommitFailedException,顾名思义就是 Consumer 客户端在提交位移时出现了错误或异常,而且还是那种不可恢复的严重异常。如果异常是可恢复的瞬时错误,提交位移的 API 自己就能规避它们了,因为很多提交位移的 API 方法是支持自动错误重试的,比如之前提到的commitSync 方法。

在官网对该异常的处理有的一段描述:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

这段话前半部分的意思是,本次提交位移失败了,原因是消费者组已经开启了 Rebalance 过程,并且将要提交位移的分区分配给了另一个消费者实例。出现这个情况的原因是,你的消费者实例连续两次调用 poll 方法的时间间隔超过了期望的 max.poll.interval.ms 参数值。这通常表明,你的消费者实例花费了太长的时间进行消息处理,耽误了调用 poll 方法。

在后半部分,社区给出了两个相应的解决办法:

    1. 增加期望的时间间隔 max.poll.interval.ms 参数值。
    1. 减少 poll 方法一次性返回的消息数量,即减少 max.poll.records 参数值。

从源代码方面来说,CommitFailedException 异常通常发生在手动提交位移时,即用户显式调用 KafkaConsumer.commitSync() 方法时。从使用场景来说,有两种典型的场景可能遭遇该异常。

场景一

当处理消息的总时间超过了预设的max.poll.interval.ms参数时,Kafka Consumer 端会抛出 CommitFailedException 异常。你只需要写一个 Consumer 程序,使用 KafkaConsumer.subscribe 方法随意订阅一个主题,之后设置 Consumer 端参数 max.poll.interval.ms=5 秒,最后在循环调用KafkaConsumer.poll方法之间,插入Thread.sleep(6000)手动提交位移,就可以成功复现这个异常了。在这里,我展示一下主要的代码逻辑。

…
Properties props = new Properties();
…
props.put("max.poll.interval.ms", 5000);  // 设置处理时间
consumer.subscribe(Arrays.asList("test-topic"));
 
while (true) {
    ConsumerRecords<String, String> records = 
        consumer.poll(Duration.ofSeconds(1));
    // 使用 Thread.sleep 模拟真实的消息处理逻辑,超过了设置的时间
    Thread.sleep(6000L);
    consumer.commitSync();
}

如果要防止这种场景下抛出异常,你需要简化你的消息处理逻辑。具体来说有 4 种方法。

  1. 缩短单条消息的处理时间。比如,之前下游系统消费一条消息的时间是 100 毫秒,优化之后成功地下降到 50 毫秒,那么此时 Consumer 端的 TPS 就提升了一倍。
  2. 增加 Consumer 端允许下游系统消费一批消息的最大时长。这取决于 Consumer 端参数max.poll.interval.ms的值。该参数的默认值是 5 分钟。如果你的消费逻辑不能简化,那么提高该参数值是一个不错的办法。
  3. 减少下游系统一次性消费的消息总数。这取决于 Consumer 端参数 max.poll.records 的值。当前该参数的默认值是 500 条,表明调用一次 KafkaConsumer.poll 方法,最多返回 500 条消息。可以说,该参数规定了单次 poll 方法能够返回的消息总数的上限。
  4. 下游系统使用多线程来加速消费。这应该算是“最高级”同时也是最难实现的解决办法了。具体的思路就是,让下游系统手动创建多个消费线程处理 poll 方法返回的一批消息。之前你使用 Kafka Consumer 消费数据更多是单线程的,所以当消费速度无法匹及 Kafka Consumer 消息返回的速度时,它就会抛出 CommitFailedException 异常。如果是多线程,你就可以灵活地控制线程数量,随时调整消费承载能力,再配以目前多核的硬件条件,该方法可谓是防止 CommitFailedException 最高档的解决之道。不过,凡事有利就有弊,这个方法实现起来并不容易,特别是在多个线程间如何处理位移提交这个问题上,更是极容易出错。

综合以上这 4 个处理方法,我个人推荐你首先尝试采用方法 1 来预防此异常的发生。优化下游系统的消费逻辑是百利而无一害的法子,不像方法 2、3 那样涉及到 Kafka Consumer 端 TPS 与消费延时(Latency)的权衡。如果方法 1 实现起来有难度,那么你可以按照下面的法则来实践方法 2、3。

首先,你需要弄清楚你的下游系统消费每条消息的平均延时是多少。比如你的消费逻辑是从 Kafka 获取到消息后写入到下游的 MongoDB 中,假设访问 MongoDB 的平均延时不超过 2 秒,那么你可以认为消息处理需要花费 2 秒的时间。如果按照 max.poll.records 等于 500 来计算,一批消息的总消费时长大约是 1000 秒,因此你的 Consumer 端的 max.poll.interval.ms参数值就不能低于 1000 秒。如果你使用默认配置,那默认值 5 分钟显然是不够的,你将有很大概率遭遇CommitFailedException 异常。将 max.poll.interval.ms 增加到 1000 秒以上的做法就属于上面的第 2 种方法。

除了调整 max.poll.interval.ms 之外,你还可以选择调整 max.poll.records 值,减少每次 poll 方法返回的消息数。还拿刚才的例子来说,你可以设置 max.poll.records 值为 150,甚至更少,这样每批消息的总消费时长不会超过 300 秒(150*2=300),即 max.poll.interval.ms 的默认值 5 分钟。这种减少max.poll.records值的做法就属于上面提到的方法 3。

场景二

之前我们花了很多时间学习 Kafka 的消费者,不过大都集中在消费者组上,即所谓的 Consumer Group。其实,Kafka Java Consumer 端还提供了一个名为 Standalone Consumer 的独立消费者。它没有消费者组的概念,每个消费者实例都是独立工作的,彼此之间毫无联系。不过,你需要注意的是,独立消费者的位移提交机制和消费者组是一样的,因此独立消费者的位移提交也必须遵守之前说的那些规定,比如独立消费者也要指定 group.id 参数才能提交位移。你可能会觉得奇怪,既然是独立消费者,为什么还要指定 group.id 呢?没办法,谁让社区就是这么设计的呢?总之,消费者组和独立消费者在使用之前都要指定 group.id。

现在问题来了,如果你的应用中同时出现了设置相同 group.id 值的消费者组程序和独立消费者程序,那么当独立消费者程序手动提交位移时,Kafka 就会立即抛出 CommitFailedException 异常,因为 Kafka 无法识别这个具有相同 group.id 的消费者实例,于是就向它返回一个错误,表明它不是消费者组内合法的成员。

虽然说这个场景很冷门,但也并非完全不会遇到。在一个大型公司中,特别是那些将 Kafka 作为全公司级消息引擎系统的公司中,每个部门或团队都可能有自己的消费者应用,谁能保证各自的 Consumer 程序配置的 group.id 没有重复呢?一旦出现不凑巧的重复,发生了上面提到的这种场景,你使用之前提到的哪种方法都不能规避该异常。令人沮丧的是,无论是刚才哪个版本的异常说明,都完全没有提及这个场景,因此,如果是这个原因引发的 CommitFailedException 异常,前面的 4 种方法全部都是无效的。

更为尴尬的是,无论是社区官网,还是网上的文章,都没有提到过这种使用场景。我个人认为,这应该算是 Kafka 的一个 bug。比起返回 CommitFailedException 异常只是表明提交位移失败,更好的做法应该是,在 Consumer 端应用程序的某个地方,能够以日志或其他方式友善地提示你错误的原因,这样你才能正确处理甚至是预防该异常。

总结

总结一下,今天我们详细讨论了 Kafka Consumer 端经常碰到的 CommitFailedException 异常。我们从它的含义说起,再到它出现的时机和场景,以及每种场景下的应对之道。


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355

推荐阅读更多精彩内容