环境准备
前置:KAFKA_HOME及ZK_IP_PORT_LIST的取值视具体的环境信息来定
跳转kafka的安装目录,然后启动,命令如下:
cd $KAFKA_HOME && ./bin/kafka-server-start.sh config/server.properties
创建topic
单zk节点:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
zk集群:
kafka-topics.sh --create --zookeeper $ZK_IP_PORT_LIST--replication-factor 1 --partitions 1 --topic txt
查看topic信息
单zk节点:
./bin/kafka-topics.sh --zookeeper localhost:2181 --list
zk集群:
kafka-topics.sh --list --zookeeper $ZK_IP_PORT_LIST
查看指定的topic信息
单zk节点:
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic
显示如下信息:
Topic:my-topic PartitionCount:1 ReplicationFactor:1 Configs:Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
生产消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
在python中使用producer.send("txt", line.encode('utf-8')) 注意输入的消息需要转bytes
消费消息
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
python实践
前置:运行命令pip3 install kafka-python安装工具包
简单的消息生产及消费代码示例,参见链接中python实践部分的代码