kafka-topic
创建主题
2.3版本创建命令
kafka-topics --bootstrap-server localhost:9092 --create --topic xxxtopic --partitions 1 --replication-factor 1
0.10.2.1版本创建命令
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic xxxtopic --partitions 1 --replication-factor 3
0.10.2.1版本创建命令,并指明分区及副本所在的broker上
(下面示例代表,有两个分区,每个分区有四个副本,第一个分区副本在4,5,6,7broker上,第二个分区在 1,2,3,4broker上)
kafka-topics.sh --zookeeper xxx.xxx.xxx.xxx:2181 --create --topic xxxtopic --replica-assignment 4:5:6:7 1:2:3:4
查看所有主题lead副本分布情况,AR 集合,ISR 集合
2.3版本查看命令
kafka-topics --bootstrap-server localhost:9092 --describe --topic topicName1,topicName2
0.10.2.1版本查看命令
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic xxxtopic
查看所有包含覆盖配置的主题
kafka-topics --bootstrap-server localhost:9092 --describe --topics-with-overrides 如果需要查看某一个主题或者多个主题加上 --topic topicName1,topicName2
找出所有包含失效副本的主题以及分区,即分区的 ISR 集合元素数量小于 AR 集合元素数量
kafka-topics --bootstrap-server localhost:9092 --describe --under-replicated-partitions 如果需要查看某一个主题或者多个主题加上 --topic topicName1,topicName2
找出所有leader副本不可用的主题以及分区
kafka-topics --bootstrap-server localhost:9092 --describe --unavailable-partitions 如果需要查看某一个主题或者多个主题加上 --topic topicName1,topicName2
kafka-console-consumer
查看kafka 位移主题消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
kafka-consumer-groups
查看所有的消费者组
kafka-consumer-groups --bootstrap-server localhost:9092 --list
查看消费者组信息,包括LAG等
kafka-consumer-groups.sh --bootstrap-server xxx.xxx.xxx.xxx:9092 --describe --group xxxtopic
查看消费组状态信息
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testConsumerGroup1 --state
查看消费组内的消费者成员信息
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testConsumerGroup1 --members
查看消费组内的消费者成员信息,以及每个消费者成员的分配情况
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group testConsumerGroup1 --members --verbose
kafka-run-class
查看日志信息
kafka-run-class kafka.tools.DumpLogSegments --files 00000000000000000000.log 如果需要输出消息内容加上 --print-data-log 选项
查看JMX数据指标
kafka-run-class kafka.tools.JmxTool --object-name kafka.utils:type=Throttler,name=cleaner-io --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi --date-format "YYYY-MM-dd HH:mm:ss" --attributes Count --reporting-interval 1000
--attributes 指定要查询的JMX属性名称
--date-format 显示的日期格式
--jmx-url 指定要链接的JMX接口
--object-name 指定要查询的JMX MBean 名称
--reporting-interval 指定实时查询的时间间隔,时间单位 毫秒,默认每2秒查询一次
kafka-dump-log(kafka 2.0.0 版本开始才有此shell脚本)
查看日志信息
kafka-dump-log --files 00000000000000000000.log 如果需要输出消息内容加上 --print-data-log 选项
修改topic分区的副本系数
0.10.2.1版本命令
kafka-reassign-partitions.sh --zookeeper xxxxx:2181 -reassignment-json-file partitions-to-move.json -execute
partitions-to-move.json 文件内容如下
{
"version": 1,
"partitions": [
{
"topic": "xxxtopic",// topic
"partition": 0, // 分区编号
"replicas": [ // 新副本需要分布到哪些 broker,1、2、3 表示 broker id
1,
2,
3
]
}
]
}
分区重分配获取分区分配方案
kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file kafkaReassign.json --broker-list 1,2,3 --generate
kafkaReassign.json 文件内容格式如下
{
"topics": [
{
"topic": "xxxtopic" // topic 的名字
}
],
"version": 1
}
执行命令后结果如下
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF8
Current partition replica assignment(当前分区分布情况)
{"version":1,"partitions":[{"topic":"xxxtopic","partition":3(分区id),"replicas":[2(brokerId)]},{"topic":"xxxtopic","partition":4,"replicas":[3]},{"topic":"xxxtopic","partition":2,"replicas":[1]},{"topic":"xxxtopic","partition":1,"replicas":[3]},{"topic":"xxxtopic","partition":0,"replicas":[2]},{"topic":"xxxtopic","partition":5,"replicas":[1]}]}
Proposed partition reassignment configuration(生成的分区分配方案)
{"version":1,"partitions":[{"topic":"xxxtopic","partition":4(分区id),"replicas":[3(brokerId)]},{"topic":"xxxtopic","partition":3,"replicas":[2]},{"topic":"xxxtopic","partition":2,"replicas":[3]},{"topic":"xxxtopic","partition":1,"replicas":[2]},{"topic":"xxxtopic","partition":0,"replicas":[3]},{"topic":"xxxtopic","partition":5,"replicas":[2]}]}