Kafka Consumer

一、consumer特点
  • consumer不使用zk保存消费位移(offset),而是使用内部的消息队列。因为当数据量很大且读写频繁时,ZK的读写性能会成为瓶颈。同时Kafka的备份机制可实现位移的高可用管理。
  • consumer放弃多线程消费不同分区的设计,采用类似Linux epoll的轮询机制,使得consumer使用单线程就可以管理连向不同broker的多个Socket,既减少了线程间开销,也简化了系统设计。
  • 消费者组的集中式管理,所有的组成员管理都交由集中式协调者(coordinator)负责。

kafka检查点机制(checkpointing)会定期对offset进行持久化,从而简化应答机制。

二、位移(offset)

这里的位移指的是consumer端的offset,不同于分区日志中的位移。
consumer实例会为它消费的分区维护属于自己的位置信息,来记录当前消费了多少条消息,即分区中当前最新消费消息的位置。这即是位移(offset)。
把消费端的位移保存在服务器端,会有以下三个问题:

  • broker变为有状态的,增加同步成本,影响伸缩性。
  • 需要引入应答机制来确认消费成功。
  • 需保存很多offset,导致数据结构复杂化。
位移提交

consumer定期向Kafka集群汇报消费数据的进度,这被称为位移提交。
旧版本consumer会定期把位移信息提交到zk下的固定节点上。
新版本consumer把位移信息提交到Kafka的内部topic(__consumer_offsets)上。有50个分区。
默认情况下,consumer是自动提交位移。
手动提交位移分为同步和异步两种方式。

位移管理

offset是实现消息交付语义保证的基石。
常见3种消息交付语义保证:

  • 最多一次处理语义:消息可能丢失,但不会重复。
  • 最少一次处理语义:消息不会丢失,但可能重复。(kafka默认
  • 精确一次处理语义:消息有且仅被处理一次。

从0.11.0.0版本开始支持事务以及精确一次处理语义。

  • 上次提交位移:consumer最近一次提交的offset值。
  • 当前位置:consumer已读取但尚未提交时的位置。
  • 水位(HW):也被称为高水位,属于分区日志的概念,consumer只可以读取水位之下的消息。
  • 日志终端位移(Log End Offset,LEO):也被称为日志最新位移,属于分区日志的概念。表示某个分区副本当前保存消息对应的最大的位移值。正常情况下比水位值大。只有分区所有副本都保存了某条消息,该分区的leader副本才会向上移动水位值。
coordinator

consumer在broker中选择某个broker作为消费者组的coordinator,用于组成员管理、消费分配方案制定以及提交位移等。选择coordinator的依据是内部topic(__consumer_offsets)。
consumer提交位移是通过向coordinator发送位移提交请求。

三、消息轮询

consumer是用来读取消息的,而且要能够同时读取多个topic的多个分区的消息。若要实现并行的消息读取,
一种方法是使用多线程,为每个要读取的分区都创建一个专有的线程去消费(旧版consumer模式)。
另一种方法是采用类似于epoll或select等,用一个线程同时管理多个socket连接,同时与多个broker通信,实现消息的并行读取(新版consumer模式)。
一旦consumer订阅了topic。所有的消费逻辑包括rebalance、消息获取、coordinator管理、异步任务结果的处理和位移提交等都会在poll方法中被执行。这样可以在一个线程中管理所有的consumer I/O操作。
当poll方法被首次调用时,新的消费者组会被创建并根据对应的位移重设策略(auto.offest.reset)来设定消费者组的位移。rebalance完成后都会将位置设置为上次已提交的位移。
一定要在finally中手动关闭consumer,这样不仅可以清除consumer创建的socket资源,还会主动通知coordinator离组,从而更快开启新一轮rebalance。
新版本的consumer不是线程安全的。但可以安全的在另一个线程中调用consumer.wakeup()。
poll方法使用建议:

  • consumer需要定期执行其它子任务:poll(较小超时时间)+ 运行标识布尔变量volatie
  • consumer不需要定期执行子任务: poll(MAX_VALUE)+ 捕获WakeupException

四、消费者组

消费者用消费者组(group.id)标记自己,topic的消息会被发送到消费者组的一个消费者实例上。
consumer group是实现伸缩性、容错性的机制。某个consumer出现故障,consumer group会立即将已崩溃consumer负责的分区转交给其他consumer,从而保证整个group继续工作,不丢失数据。
kafka目前只能提供单个分区内的消息顺序,而不能实现topic级别的消息顺序,如果要实现topic级别的消息读取顺序,只能通过使consumer group中只有一个consumer实例的方式来间接实现。

消费者组重平衡(consumer group rebalance)

consumer group的rebalance本质上是一组协议,规定了消费者组达成分配分区一致结果的过程。
和旧版本consumer依托zk进行rebalance不同,新版本consumer使用kafka的组协调协议。对于每个组而言,kafka的某个broker会被选举为组协调者。coordinator负责对组的状态进行管理,它的主要职责是当新成员到达时促成组内所有成员达成新的分区分配方案,即coordinator负责对组执行rebalance操作。
发生下面变更,会触发rebalance:

  • 组成员
  • 主题数
  • 分区数
consumer的分区分配策略

range(默认)、round-robin和sticky。

rebalance generation

隔离每次rebalance的数据,rebalance generation用于标识某次rebalance。有利于防止无效的offset提交。

rebalance流程

consumer group在执行rebalance前需要确定coordinator所在的broker,并创建与该broker相互通信的socket连接。确定coordinator的算法与确定offset被提交到__consumer_offsets目标分区的算法是相同的。算法如下:

  • 第一步、 计算Math.abs(groupID.hashCode) % offsets.topic.num.partitions参数值(默认是50),假设是10。
  • 第二步、寻找__consumer_offsets分区10的leader副本所在的broker,该broker即为这个消费者组的coordinator。

rebalance主要分为两步:加入组和同步更新分配方案。

group的leader和coordinator不是同一个概念。leader是某个consumer,coordinator是某个broker。leader负责为整个group的所有成员制定分配方案,而非coordinator。

rebalance监听器

rebalance监听器用于手动提交位移到第三方存储或者在rebalance前后执行审计操作。

consumer group状态机

新版本consumer依赖于broker端的组协调者(coordinator)来管理组内的所有consumer实例并负责把分配方案下发到每个consumer上。分配方案是由组内的leader consumer根据指定的分区分配策略制定的。

group管理协议

coordinator的组管理协议分两个阶段,即组成员加入阶段和状态同步阶段。第一个阶段用于为group指定active成员并从中选出leader consumer。第二个阶段则让leader consumer制定分配方案并同步到其他组成员中。
可以理解为,第一阶段收集所有consumer的topic订阅信息,第二个阶段利用上面信息给consumer分配要消费的分区。

五、解序列化

六、多线程消费实例

  • 每个线程维护一个KafkaConsumer
  • 单个KafkaConsumer实例+多worker线程

参考

《Apache Kafka实战 -- 胡夕》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容