参考:https://zhuanlan.zhihu.com/p/571040157
如果确认没有使用 zk 或者其他外部存储来保存消费者的 offset 信息,那么一般默认在 kafka 的 __consumer_offsets 这个 topic 中保存 offset。
这个系统 topic 默认不能被用户消费,那么就需要解读它的数据文件了(当然可以在配置文件 config/consumer.properties 中添加 exclude.internal.topics=false,默认是 true,让用户可以消费系统 topic):
# 共 50 个分区,每台机器的分区数 = 50*replicas/brokers
$ ll data/ | grep -E "__consumer_offsets-" -c
# 分区的数据文件
$ ll data/__consumer_offsets-*/*.log
log 文件是二进制文件,需要 kafka 提供的方法 kafka.tools.DumpLogSegments 来读取
假设业务的 topic 信息如下:
$ topicName=TEST
$ consumerGroupId=TEST_CG
1、计算 offset 信息保存位置
计算这个 topic 的消费组 offset 信息保存在哪个 __consumer_offsets 分区
- 计算公式:
Math.abs(groupID.hashCode()) % numPartitions
- Java 代码:HashCode.java
public class HashCode {
public static void main(String args[]) {
String Str = new String("TEST");
System.out.println(Math.abs(Str.hashCode()%50));
}
}
- 执行计算:
$ javac HashCode.java
$ java HashCode
11
结果为 11,就是说 offset 数据保存在 __consumer_offsets-11 分区里面
2、查看 __consumer_offsets 的分区 11 的信息
$ /app/kafka/pkg/kafka_2.12-2.5.1/bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --describe --topic __consumer_offsets | grep "Partition: 11"
Topic: __consumer_offsets Partition: 11 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
可知分区 11 在 broker.id = 2,1,3 的这3台机器上,leader id 是 2,查看 broker.id:
$ grep broker.id conf/broker.properties
broker.id=2
其实定位 __consumer_offsets 分区 11 的方法挺多的:
- 上面阐述的 --describe --topic __consumer_offsets
- --describe 业务 topic 的分区信息
-
ls data/ | grep __consumer_offsets-11
或者ls data/ | grep ${topicName}-
因为 ACL 的原因,无法使用 describe 去获取业务 topic 的分区信息时,
--describe --topic __consumer_offsets
获或者ls data/
还是可以帮得上忙的。
3、提取 offset 信息
因为 __consumer_offsets-11 的 leader 的 broker.id = 2,那么只需要看这台机器的 log 文件即可,修改时间最新的那个就是我们想要的文件,这里是 00000000000004458198.log
$ ll -tr data/__consumer_offsets-11/*.log
-rw-r--r-- 1 test test 4804 Mar 6 21:20 data/__consumer_offsets-11/00000000000000000000.log
-rw-r--r-- 1 test test 315708 Mar 8 11:49 data/__consumer_offsets-11/00000000000004458198.log
开始读取消费组的 offset 信息
$ logfile=$(ll -tr data/__consumer_offsets-11/*.log | tail -n 1 | awk '{print $9}') && echo $logfile
$ /app/kafka/pkg/kafka_2.12-2.5.1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments \
--deep-iteration \
--print-data-log \
--files ${logfile} | less
baseOffset: 4460036 lastOffset: 4460036 count: 1 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 335511 LogAppendTime: 1678255976868 size: 161 magic: 2 compresscodec: NONE crc: 4126368907 isvalid: true
| offset: 4460036 LogAppendTime: 1678255976868 keysize: 67 valuesize: 24 sequence: 0 headerKeys: [] key: ^@^A^@!TEST_CG^@^TEST^@^@^@^F payload: ^@^C^@^@^@^@^@ le����^@^@^@^@^A����