查看 kafka broker offset

参考:https://zhuanlan.zhihu.com/p/571040157

如果确认没有使用 zk 或者其他外部存储来保存消费者的 offset 信息,那么一般默认在 kafka 的 __consumer_offsets 这个 topic 中保存 offset。

这个系统 topic 默认不能被用户消费,那么就需要解读它的数据文件了(当然可以在配置文件 config/consumer.properties 中添加 exclude.internal.topics=false,默认是 true,让用户可以消费系统 topic):

# 共 50 个分区,每台机器的分区数 = 50*replicas/brokers
$ ll data/ | grep -E "__consumer_offsets-" -c
# 分区的数据文件
$ ll data/__consumer_offsets-*/*.log

log 文件是二进制文件,需要 kafka 提供的方法 kafka.tools.DumpLogSegments 来读取

假设业务的 topic 信息如下:

$ topicName=TEST
$ consumerGroupId=TEST_CG

1、计算 offset 信息保存位置

计算这个 topic 的消费组 offset 信息保存在哪个 __consumer_offsets 分区

  • 计算公式:
Math.abs(groupID.hashCode()) % numPartitions
  • Java 代码:HashCode.java
public class HashCode {
        public static void main(String args[]) {
                String Str = new String("TEST");
                System.out.println(Math.abs(Str.hashCode()%50));
        }
}
  • 执行计算:
$ javac HashCode.java
$ java HashCode
11

结果为 11,就是说 offset 数据保存在 __consumer_offsets-11 分区里面

2、查看 __consumer_offsets 的分区 11 的信息

$ /app/kafka/pkg/kafka_2.12-2.5.1/bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --describe --topic __consumer_offsets | grep "Partition: 11"
        Topic: __consumer_offsets       Partition: 11   Leader: 2       Replicas: 2,1,3 Isr: 2,1,3

可知分区 11 在 broker.id = 2,1,3 的这3台机器上,leader id 是 2,查看 broker.id:

$ grep broker.id conf/broker.properties

broker.id=2

其实定位 __consumer_offsets 分区 11 的方法挺多的:

  • 上面阐述的 --describe --topic __consumer_offsets
  • --describe 业务 topic 的分区信息
  • ls data/ | grep __consumer_offsets-11 或者 ls data/ | grep ${topicName}-

因为 ACL 的原因,无法使用 describe 去获取业务 topic 的分区信息时,--describe --topic __consumer_offsets 获或者 ls data/ 还是可以帮得上忙的。

3、提取 offset 信息

因为 __consumer_offsets-11 的 leader 的 broker.id = 2,那么只需要看这台机器的 log 文件即可,修改时间最新的那个就是我们想要的文件,这里是 00000000000004458198.log

$ ll -tr data/__consumer_offsets-11/*.log
-rw-r--r-- 1 test test   4804 Mar  6 21:20 data/__consumer_offsets-11/00000000000000000000.log
-rw-r--r-- 1 test test 315708 Mar  8 11:49 data/__consumer_offsets-11/00000000000004458198.log

开始读取消费组的 offset 信息

$ logfile=$(ll -tr data/__consumer_offsets-11/*.log | tail -n 1 | awk '{print $9}') && echo $logfile
$ /app/kafka/pkg/kafka_2.12-2.5.1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments \
    --deep-iteration \
    --print-data-log \
   --files ${logfile}  | less

baseOffset: 4460036 lastOffset: 4460036 count: 1 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 335511 LogAppendTime: 1678255976868 size: 161 magic: 2 compresscodec: NONE crc: 4126368907 isvalid: true
| offset: 4460036 LogAppendTime: 1678255976868 keysize: 67 valuesize: 24 sequence: 0 headerKeys: [] key: ^@^A^@!TEST_CG^@^TEST^@^@^@^F payload: ^@^C^@^@^@^@^@   le����^@^@^@^@^A����
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容