Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
在Python环境中使用Kafka
主要分为几个步骤
- 安装
JDK
(使用JDK 8) - 安装
Kafka
(编译版) - 启动
Kafka
自带的zookeeper
服务 - 启动
Kafka
服务并建立Topic
- 安装并使用
kafka-python
调度Kafka
producer.py
负责生成消息并将其传递进对应的Topic
中
from kafka impor KafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
msg = "Message".encode('utf-8')
producer.send('KafkaTest', msg)
consumer.py
则负责接收消息
from kafka impor KafkaConsumer
consumer = KafkaConsumer('KafkaTest', bootstrap_servers=['127.0.0.1:9092'])
for msg in consumer:
recv = f"Topic: {msg.topic}, Partition: {msg.partition}, Key: {msg.key}, Value: {msg.value}"
print(recv)
启动consumer.py
后,其中的consumer就会进入等待消息的状态,只要producer.py
中发送了消息,consumer就会将其打印出来
Topic: KafkaTest, Partition: 0, Key: None, Value: b'Message'