本文命令均在kafka2.11-1.0.1下使用,如果端口冲突,此时需要加JMX_PORT=10111(或者其他数字) 来更改端口号
1. 查看kafka下有多少topic
bin/kafka-topics.sh --list --zookeeper header-1:2181
2.创建topic
bin/kafka-topics.sh --create --topic test-topic --zookeeper header-1:2181
\ --config max.message.bytes=10485760 --partitions 1 --replication-factor 2
--config后可以添加许多参数,比如
参数名称 | 含义 |
---|---|
max.message.bytes | 接收消息的最大字节数 |
partitions | 分区数 |
replication-factor | 副本个数 |
3.往topic写消息
/usr/lib/kafka-current/bin/kafka-console-producer.sh --broker-list worker-ip:9092 --topic test-topic
4.消费topic里的消息
/usr/lib/kafka-current/bin/kafka-console-consumer.sh --bootstrap-server worker-ip:9092 --topic test-topic --from-beginning
增加--from-beginning表示从头消费,否则去掉表示最新消费
5.查看topic信息
bin/kafka-topics.sh --describe --zookeeper header-1:2181 --topic test-topic
6.修改topic分区数
bin/kafka-topics.sh --zookeeper header-1:2181 -alter --partitions 10 --topic test-topic
只增不减
7.为topic增加时间戳
kafka-topics.sh --zookeeper -header-1:2181 --topic test-topic --alter --config message.timestamp.type=LogAppendTime