python kafka 学习笔记(一)

Windows 安装及利用Python操作kafka

1. 安装zookeeper

https://zookeeper.apache.org/releases.html

下载解压文件后

将conf文件夹下的 zoo_sample.cfg 文件名改成 zoo.cfg,打开 zoo.cfg文件,修改dataDir的值路径,将dataDir=/tmp/zookeeper改成
dataDir=E:\apache-zookeeper-3.5.9-bin\data
并加入
dataLogDir=E:\apache-zookeeper-3.5.9-bin\log
data和log文件夹可以自己创建

启动zookeeper

打开cmd 路径切换到解压的文件夹下的bin文件夹
运行 zkServer.cmd

2. 安装kafka

下载kafka文件:

http://kafka.apache.org/downloads.html
解压完成后进行properties文件配置:
新建logs文件夹(自己命名)
找到config文件夹下的server.properties文件,将log.dirs的值改成log.dirs=E:\work\kafka_2.12-0.10.2.0\kafka-logs(改为自己的地址)

启动Kafka

打开新的cmd 路径切换到解压的文件夹下,输入如下命令
.\bin\windows\kafka-server-start.bat .\config\server.properties

3. 使用python操作Kafka

python安装kafka

pip install kafka-python
不知道是否是因为执行过pip install kafka

使用
# 生产者
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

for i in range(3):
    msg = 'test %d' % i
    print(msg)
    producer.send('test',msg.encode())
producer.close()

# 消费者
from kafka import KafkaConsumer

consumer = KafkaConsumer('test', bootstrap_servers=['127.0.0.1:9092'])
for msg in consumer:
    print(msg.value, type(msg.value))
  • 错误记录
    使用KafkaProducer的send函数时,报了如下错误:
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))
AssertionError

错误原因:send函数的value_bytes是str类型

解决办法:

对字符串使用encode()
字符串类str里有一个==encode()==方法,它是从字符串向比特流的编码过程。
bytes类型恰好有个==decode()==方法,它是从比特流向字符串解码的过程。

关闭kafka

.\bin\windows\kafka-server-stop.bat

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容