环境准备
前置:KAFKA_HOME及ZK_IP_PORT_LIST的取值视具体的环境信息来定
kafka的安装目录及启动
cd $KAFKA_HOME && ./bin/kafka-server-start.sh config/server.propertieskafka创建topic
单zk节点:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testzk集群:
kafka-topics.sh --create --zookeeper $ZK_IP_PORT_LIST--replication-factor 1 --partitions 1 --topic txt查看topic信息
单zk节点:
./bin/kafka-topics.sh --zookeeper localhost:2181 --listzk集群:
kafka-topics.sh --list --zookeeper $ZK_IP_PORT_LIST查看指定的topic信息
单zk节点:
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic my-topic显示如下信息:
Topic:my-topic PartitionCount:1 ReplicationFactor:1 Configs:Topic: my-topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
生产消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test在python中使用producer.send("txt", line.encode('utf-8')) 注意输入的消息需要转bytes
消费消息
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginningpython实践
前置:运行命令pip3 install kafka-python安装工具包
简单的消息生产及消费代码示例,参见链接中python实践部分的代码