一、consumer特点
- consumer不使用zk保存消费位移(offset),而是使用内部的消息队列。因为当数据量很大且读写频繁时,ZK的读写性能会成为瓶颈。同时Kafka的备份机制可实现位移的高可用管理。
- consumer放弃多线程消费不同分区的设计,采用类似Linux epoll的轮询机制,使得consumer使用单线程就可以管理连向不同broker的多个Socket,既减少了线程间开销,也简化了系统设计。
- 消费者组的集中式管理,所有的组成员管理都交由集中式协调者(coordinator)负责。
kafka检查点机制(checkpointing)会定期对offset进行持久化,从而简化应答机制。
二、位移(offset)
这里的位移指的是consumer端的offset,不同于分区日志中的位移。
consumer实例会为它消费的分区维护属于自己的位置信息,来记录当前消费了多少条消息,即分区中当前最新消费消息的位置。这即是位移(offset)。
把消费端的位移保存在服务器端,会有以下三个问题:
- broker变为有状态的,增加同步成本,影响伸缩性。
- 需要引入应答机制来确认消费成功。
- 需保存很多offset,导致数据结构复杂化。
位移提交
consumer定期向Kafka集群汇报消费数据的进度,这被称为位移提交。
旧版本consumer会定期把位移信息提交到zk下的固定节点上。
新版本consumer把位移信息提交到Kafka的内部topic(__consumer_offsets)上。有50个分区。
默认情况下,consumer是自动提交位移。
手动提交位移分为同步和异步两种方式。
位移管理
offset是实现消息交付语义保证的基石。
常见3种消息交付语义保证:
- 最多一次处理语义:消息可能丢失,但不会重复。
- 最少一次处理语义:消息不会丢失,但可能重复。(kafka默认)
- 精确一次处理语义:消息有且仅被处理一次。
从0.11.0.0版本开始支持事务以及精确一次处理语义。
- 上次提交位移:consumer最近一次提交的offset值。
- 当前位置:consumer已读取但尚未提交时的位置。
- 水位(HW):也被称为高水位,属于分区日志的概念,consumer只可以读取水位之下的消息。
- 日志终端位移(Log End Offset,LEO):也被称为日志最新位移,属于分区日志的概念。表示某个分区副本当前保存消息对应的最大的位移值。正常情况下比水位值大。只有分区所有副本都保存了某条消息,该分区的leader副本才会向上移动水位值。
coordinator
consumer在broker中选择某个broker作为消费者组的coordinator,用于组成员管理、消费分配方案制定以及提交位移等。选择coordinator的依据是内部topic(__consumer_offsets)。
consumer提交位移是通过向coordinator发送位移提交请求。
三、消息轮询
consumer是用来读取消息的,而且要能够同时读取多个topic的多个分区的消息。若要实现并行的消息读取,
一种方法是使用多线程,为每个要读取的分区都创建一个专有的线程去消费(旧版consumer模式)。
另一种方法是采用类似于epoll或select等,用一个线程同时管理多个socket连接,同时与多个broker通信,实现消息的并行读取(新版consumer模式)。
一旦consumer订阅了topic。所有的消费逻辑包括rebalance、消息获取、coordinator管理、异步任务结果的处理和位移提交等都会在poll方法中被执行。这样可以在一个线程中管理所有的consumer I/O操作。
当poll方法被首次调用时,新的消费者组会被创建并根据对应的位移重设策略(auto.offest.reset)来设定消费者组的位移。rebalance完成后都会将位置设置为上次已提交的位移。
一定要在finally中手动关闭consumer,这样不仅可以清除consumer创建的socket资源,还会主动通知coordinator离组,从而更快开启新一轮rebalance。
新版本的consumer不是线程安全的。但可以安全的在另一个线程中调用consumer.wakeup()。
poll方法使用建议:
- consumer需要定期执行其它子任务:poll(较小超时时间)+ 运行标识布尔变量volatie
- consumer不需要定期执行子任务: poll(MAX_VALUE)+ 捕获WakeupException
四、消费者组
消费者用消费者组(group.id)标记自己,topic的消息会被发送到消费者组的一个消费者实例上。
consumer group是实现伸缩性、容错性的机制。某个consumer出现故障,consumer group会立即将已崩溃consumer负责的分区转交给其他consumer,从而保证整个group继续工作,不丢失数据。
kafka目前只能提供单个分区内的消息顺序,而不能实现topic级别的消息顺序,如果要实现topic级别的消息读取顺序,只能通过使consumer group中只有一个consumer实例的方式来间接实现。
消费者组重平衡(consumer group rebalance)
consumer group的rebalance本质上是一组协议,规定了消费者组达成分配分区一致结果的过程。
和旧版本consumer依托zk进行rebalance不同,新版本consumer使用kafka的组协调协议。对于每个组而言,kafka的某个broker会被选举为组协调者。coordinator负责对组的状态进行管理,它的主要职责是当新成员到达时促成组内所有成员达成新的分区分配方案,即coordinator负责对组执行rebalance操作。
发生下面变更,会触发rebalance:
- 组成员
- 主题数
- 分区数
consumer的分区分配策略
range(默认)、round-robin和sticky。
rebalance generation
隔离每次rebalance的数据,rebalance generation用于标识某次rebalance。有利于防止无效的offset提交。
rebalance流程
consumer group在执行rebalance前需要确定coordinator所在的broker,并创建与该broker相互通信的socket连接。确定coordinator的算法与确定offset被提交到__consumer_offsets目标分区的算法是相同的。算法如下:
- 第一步、 计算Math.abs(groupID.hashCode) % offsets.topic.num.partitions参数值(默认是50),假设是10。
- 第二步、寻找__consumer_offsets分区10的leader副本所在的broker,该broker即为这个消费者组的coordinator。
rebalance主要分为两步:加入组和同步更新分配方案。
group的leader和coordinator不是同一个概念。leader是某个consumer,coordinator是某个broker。leader负责为整个group的所有成员制定分配方案,而非coordinator。
rebalance监听器
rebalance监听器用于手动提交位移到第三方存储或者在rebalance前后执行审计操作。
consumer group状态机
新版本consumer依赖于broker端的组协调者(coordinator)来管理组内的所有consumer实例并负责把分配方案下发到每个consumer上。分配方案是由组内的leader consumer根据指定的分区分配策略制定的。
group管理协议
coordinator的组管理协议分两个阶段,即组成员加入阶段和状态同步阶段。第一个阶段用于为group指定active成员并从中选出leader consumer。第二个阶段则让leader consumer制定分配方案并同步到其他组成员中。
可以理解为,第一阶段收集所有consumer的topic订阅信息,第二个阶段利用上面信息给consumer分配要消费的分区。
五、解序列化
六、多线程消费实例
- 每个线程维护一个KafkaConsumer
- 单个KafkaConsumer实例+多worker线程
参考
《Apache Kafka实战 -- 胡夕》