由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。
创建一个topic
kafka-topics.sh --create --topic bigdata --zookeeper zkKafka1:2181 --partitions 2 --replication-factor 2
Created topic "bigdata".
连接上这个topic准备发消息
kafka-console-producer.sh --broker-list zkKafka1:9092 --topic bigdata
> hello
启动一个消费者消费消息
kafka-console-consumer.sh --zookeeper zkKafka1:2181 --topic bigdata
查看zk
zkCli.sh
[zk: localhost:2181(CONNECTED) 0] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, kafka-manager, latest_producer_id_block, zookeeper]
[zk: localhost:2181(CONNECTED) 2] ls /brokers
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 3] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics
[__consumer_offsets, bigdata, first]
[zk: localhost:2181(CONNECTED) 5] ls /consumers
# 消费者组
[console-consumer-90976]
[zk: localhost:2181(CONNECTED) 6] ls /consumers/console-consumer-90976
[ids, offsets, owners]
[zk: localhost:2181(CONNECTED) 7] ls /consumers/console-consumer-90976/offsets
[bigdata]
[zk: localhost:2181(CONNECTED) 8] ls /consumers/console-consumer-90976/offsets/bigdata
[0, 1]
[zk: localhost:2181(CONNECTED) 10] get /consumers/console-consumer-90976/offsets/bigdata/0
1
Kafka 0.9 版本之前,consumer默认将offset保存在zookeeper中,从0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为__consumer_offsets。
1)修改配置文件consumer.properties
exclude.internal.topics=false
2)读取offset
# 重新启动一个消费者
kafka-console-consumer.sh --bootstrap-server zkKafka1:9092 --topic bigdata
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper zkKafka1:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.conf
ig config/consumer.properties --from-beginning
[console-consumer-17599,bigdata,0]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921362785,ExpirationTime 1645007762785]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921362785,ExpirationTime 1645007762785]
[console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921367772,ExpirationTime 1645007767772]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921367772,ExpirationTime 1645007767772]
[console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921372775,ExpirationTime 1645007772775]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921372775,ExpirationTime 1645007772775]
[console-consumer-17599,bigdata,0]::[OffsetMetadata[3,NO_METADATA],CommitTime 1644921377777,ExpirationTime 1645007777777]
[console-consumer-17599,bigdata,1]::[OffsetMetadata[2,NO_METADATA],CommitTime 1644921377777,ExpirationTime 1645007777777]