1. topic相关
1.1 查看kafka集群的所有topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181
1.2 创建名称为topic_test的topic,partitions为1个,副本为1个
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_test
1.3 为topic_test添加partition为10
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10
1.4 删除名称为topic_test的topic(只删除zookeeper内的元数据,消息文件须手动删除)
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic_test
1.5 查看topic为topic_test的详细信息
./bin/kafka-topics.sh -zookeeper localhost:2181 -describe -topic topic_test
2. consumer相关
2.1 查看所有consumer的group
/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
2.2 查看group为group_test消费的机器以及partition
./bin/kafka-consumer-groups.sh --describe --group group_test --bootstrap-server localhost:9092
2.3 查询__consumer_offsets topic所有内容
需要注意的是,__consumer_offset是内部使用的topic,外部应用程序是没法读取的,需要在config/consumer.properties关闭exclude.internal.topics这个参数。
exclude.internal.topics=false
# 0.11.0.0之前版本
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
# 0.11.0.0之后版本(含)
./bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
__consumer_offsets topic的每一日志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
2.4 计算指定consumer group在__consumer_offsets topic中分区信息,Kafka会使用下面公式计算该group位移保存在__consumer_offsets的哪个分区上:
Math.abs(groupID.hashCode()) % numPartitions
默认情况下__consumer_offsets有50个分区,如果你的系统中consumer group也很多的话,那么这个命令的输出结果会很多。
2.5 查询__consumer_offsets topic所有某个partition的所有内容
# 0.11.0.0之前版本
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --partition 20 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
# 0.11.0.0之后版本(含)
./bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --partition 20 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties
3. 控制台相关
3.1 控制台向topic_test发送数据
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_test
3.2 控制台接收topic_test数据
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_test --from-beginning
4. 服务相关
4.1 启动zookeeper服务
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
4.2 启动kafka服务
./bin/zookeeper-server-start.sh -daemon ../config/zookeeper.properties
4.3 查看服务启动
ps -ef | grep kafka
5. 遇到的问题
5.1 kafka生产消息在broker上分配不均匀
__consumer_offser大量提交写入导致broker负载不均匀,问题参考
Kafka 如何读取offset topic内容 (__consumer_offsets)
参考
https://kafka.apache.org
__consumer_offser大量提交写入导致broker负载不均匀
Kafka 如何读取offset topic内容 (__consumer_offsets)