一、消费方式
只要提到消息队列,必然会涉及到其推模式(push)和拉模式(pull),在kafka中,同样会涉及到。
consumer采用的时pull模式,从broker中去拉取消息。
为什么采用pull模式?
如果采用push模式,很容易造成消息的堆积,因为broker控制消息的推送速率,消息数量大的话,很难使每个消费者很难适应消息推送速率。
采用pull模式则可以使每个消费者以自身的消费能力去消费。
pull模式有什么不足?kafka如何解决这一问题?
如果kafka没有消息,则每次消费者拉去的都是空的数据,会使得其陷入循环,一直返回空数据。针对这一情况,每次消费数据的时候,消费者会带有一个时长参数timeout,当返回空数据时,消费者会等待timeout的时间,再去消费。
二、分区分配策略
一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定那个 partition 由哪个 consumer 来消费。
Kafka 有两种分配策略,一是 Range,一是 RoundRobin。
有以下情况会导致消费者分区的重新分配:
1) 当consumer group的成员数量增加或者减少。
2) 当消费者订阅的主题的partition数量变更。
2.1 Range
Range策略是对每个主题而言的。它会将每个topic的分区从0往后一次排列。
其分配算法是用topic分区的总个数除以消费者个数(这里指的消费者是同一个消费者组内的,因为不同组会得到topic的全量消息),除尽的话则消费者均匀分配,除不尽的话,在前面的消费者会多消费一个分区。
我们列举三种情况来说明它:
1)有一个topic,有6个partition,有两个消费者,则经过首次分区分配后,会形成如下的形式:
如上所示,将6个分区均匀的分给了两个消费者,6除以三除尽了,所以前三个属于consumer1,后三个属于consumer2。
2)有一个topic,有7个partition,有两个消费者,则经过首次分区分配后,会形成如下的形式:
如上所示,consumer1比consumer2多消费了一个分区的数据。
3)有两个topic,每个有5个partition,有两个消费者,则经过首次分区分配后,会形成如下的形式:
如上所示,发现consumer1总共消费6分区,而consumer2只消费4个。
综上所述,能够看出Range一个较为明显的弊端。
2.2 RoundRobin策略
将所有topic的partition组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序。
使用RoundRobin策略必须满足以下条件:
1.同一个Consumer Group里面的所有consumer的num.streams必须相等(关于这个num.streams我还不知道是什么东西,知道的可以帮忙解答下)。
2.每个consumer订阅的topic必须相同。
分以下三种情况展示:
1)假设有个topic,有六个分区,假设6个分区经过hashcode的排序后顺序是5,0,4,3,2,1, 两个消费者:consumer1,consumer2,分配结果如下所示:
2)假设有个topic,有7个分区,假设6个分区经过hashcode的排序后顺序是6,5,0,4,3,2,1, 两个消费者:consumer1,consumer2,分配结果如下所示:
如上所示,consumer1会比consumer2多消费一个分区,但是也仅会多一个。没有range策略的问题。
2)假设有两个topic,每个有5个分区,假设6个分区经过hashcode的排序后顺序是topic1-0,topic1-4,topic1-3,topic1-2,topic1-1,topic2-0,topic2-4,topic2-3,topic2-2,topic2-1 ,两个消费者:consumer1,consumer2,分配结果如下所示:
综上所述,RoundRobin与Range最大的区别:
Range是为某个topic完成了分区分派以后,再进行下一个topic的分区分派;
RoundRobin是首先将这个group中的所有consumer订阅的所有的topic-partition按顺序展开,依次对于每一个topic-partition,在consumer进行round robin,为这个topic-partition选择一个consumer。
三、offset
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。
当有消费者第一次消费kafka数据时就会自动创建,它的副本数不受集群配置的topic副本数限制,分区数默认50(可以配置),默认压缩策略为compact。
使用以下的命令可以查看__consumer_offsets:
[root@localhost kafka_2.12-2.7.0]# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.184.134:9092 --describe --group test
查看结果如下所示:
[root@localhost kafka_2.12-2.7.0]# bin/kafka-consumer-groups.sh --bootstrap-server 192.168.184.134:9092 --describe --group test
Consumer group 'test' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test test-kafka1 9 0 0 0 - - -
test test-kafka2 9 0 0 0 - - -
test test-kafka2 2 0 0 0 - - -
test test-kafka1 0 0 0 0 - - -
test test-kafka2 4 0 0 0 - - -
test test-kafka1 2 0 0 0 - - -
test test-kafka2 6 0 0 0 - - -
test test-kafka1 4 0 0 0 - - -
test test-kafka 3 202 203 1 - - -
test test-kafka2 8 0 0 0 - - -
test test-kafka1 6 0 0 0 - - -
test test-kafka 1 74 75 1 - - -
test test-kafka 7 73 74 1 - - -
test test-kafka 5 74 75 1 - - -
test test-kafka2 0 0 0 0 - - -
test test-kafka 9 75 76 1 - - -
test test-kafka1 8 0 0 0 - - -
test test-kafka1 1 0 0 0 - - -
test test-kafka 0 75 76 1 - - -
test test-kafka2 1 1 1 0 - - -
test test-kafka1 3 0 0 0 - - -
test test-kafka2 3 0 0 0 - - -
test test-kafka1 5 0 0 0 - - -
test test-kafka 4 75 76 1 - - -
test test-kafka2 5 0 0 0 - - -
test test-kafka1 7 0 0 0 - - -
test test-kafka 2 75 76 1 - - -
test test-kafka2 7 0 0 0 - - -
test test-kafka 8 74 75 1 - - -
test test-kafka 6 73 74 1 - - -
GROUP:消费者组的id
TOPIC:主题
PARTITION:分区
CURRENT-OFFSET:当前消费的offset
LOG-END-OFFSET:最后一个offset
当CURRENT-OFFSET与LOG-END-OFFSET相等时,表示当前分区的所有消息都被消费了。
四、消费者组
kafka区别于其他的消息队列,有着自己的特性,由于消费者组的引入,使得一条消息不但能够一对一的被消费者组内的唯一消费者消费,也可以被不通消费者组的消费者同时消费。
总结起来这个消费者组和消费者的关系:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者。
通过下图举例:在springboot中使用以下方式配置:
spring:
kafka:
consumer:
group-id: test