一、Consumer的两套相关API
- The high-level Consumer API:提供一个从 kafka 消费数据的高层抽象
high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费
且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。
可以多线程使用 high-level consumer API ,但要注意:
-1. 如果消费线程大于 patition 数量,则有些线程将收不到消息
-2. 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
-3. 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的
- The SimpleConsumer API:需要开发人员实现更多的底层细节
使用 SimpleConsumer API 实现对 patition 更多的控制权:
- 多次读取一个消息
- 只消费一个 patition 中的部分消息
- 使用事务来保证一个消息仅被消费一次
但是使用这个API 时,需要自己管理partition、offset、broker、leader 等:
- 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
- 应用程序需要通过程序获知每个 Partition 的 leader 是谁
- 需要处理 leader 的变更
使用 SimpleConsumer API 的一般流程如下:
- 查找到一个“活着”的 broker,并且找出每个 partition 的 leader
- 找出每个 partition 的 follower
- 定义好请求,该请求应该能描述应用程序需要哪些数据
- fetch 数据
- 识别 leader 的变化,并对之作出必要的响应
以下针对 high-level Consumer API 进行说明。
二、Consumer Group
kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。
kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 consumer group。如下图所示:
三、消费方式
consumer 采用 pull 模式从 broker 中读取数据。
push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
四、consumer 重平衡
当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。
Consumer rebalance算法如下:
- 将目标 topic 下的所有 partirtion 排序,存于PT
- 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci
- N=size(PT)/size(CG),向上取整
- 解除 Ci 对原来分配的 partition 的消费权(i从0开始)
- 将第iN到(i+1)N-1个 partition 分配给 Ci
在 0.8.*版本,每个 consumer 都只负责调整自己所消费的 partition,为了保证整个consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance。
0.9.0+版本的重平衡机制见https://zhuanlan.zhihu.com/p/98770059?from_voters_page=true