Apache Kafka 是分布式的流处理平台, 能够发布消息和订阅消息, 并且能够以容错的持久的方式来存储记录数据流, 作为大数据生态的重要组成部分, Apache Kafka主要应用在构建实时的流数据管道,在系统和应用间得到可靠的数据, 并且能够构建转换或响应实时数据流的应用。这里通过用一个小demo展示如何使用 Apache Kafka producer和consumer 来实时发布和订阅数据。
数据的来源是https://covid19api.com/。网站提供完全免费的rest api 新冠数据。如通过以下的Api call 可以获得如下的json.
(https://api.covid19api.com/country/germany/status/confirmed/live?from=2020-03-01T00:00:00Z&to=2020-04-01T00:00:00Z)
{
"Country": "Germany",
"CountryCode": "DE",
"Province": "",
"City": "",
"CityCode": "",
"Lat": "51.17",
"Lon": "10.45",
"Cases": 130,
"Status": "confirmed",
"Date": "2020-03-01T00:00:00Z"
},
{
"Country": "Germany",
"CountryCode": "DE",
"Province": "",
"City": "",
"CityCode": "",
"Lat": "51.17",
"Lon": "10.45",
"Cases": 159,
"Status": "confirmed",
"Date": "2020-03-02T00:00:00Z"
},
{
"Country": "Germany",
"CountryCode": "DE",
"Province": "",
"City": "",
"CityCode": "",
"Lat": "51.17",
"Lon": "10.45",
"Cases": 196,
"Status": "confirmed",
"Date": "2020-03-03T00:00:00Z"
}
在开始数据的发布和订阅之前,首先要开始Kafka 服务。代码如下
(base) cloud_user@yin2c:~$ sudo systemctl start confluent-zookeeper
(base) cloud_user@yin2c:~$ sudo systemctl enable confluent-zookeeper
(base) cloud_user@yin2c:~$ sudo systemctl start confluent-kafka
(base) cloud_user@yin2c:~$ sudo systemctl enable confluent-kafka
之后查看kafka broker是否在运行。
这样Kafka就设置好了,下一步要创建一个话题topic
kafka-topics --bootstrap-server localhost:9092 --create --topic py --partitions 1 --replication-factor 1
接下来用python 来创建消息发布者和订阅者。消息的来源是新冠数据, 通过api call来获取数据, 是德国从4月20号以来每天的现存病例数量, 先创建一个发布者实例, 设置好服务器,然后通过loop 把得到的json数据字典中的每天的病例数量发布到topic 里面。当启动发布者之后, 订阅者就会逐行打印得到的信息。
from kafka import KafkaProducer
from json import loads
import json
import requests
from time import sleep
#list of all data from first date
#URL = "https://api.covid19api.com/total/dayone/country/germany/status/confirmed"
URL ="https://api.covid19api.com/live/country/germany/status/confirmed/date/2020-04-20T13:13:30Z"
req = requests.get(url = URL)
data = req.json()
producer = KafkaProducer(bootstrap_servers = ['localhost:9092'], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for i in range (len(data)):
file = data[i]
sleep(1)
producer.send('py', value=str(file["Date"].split("T")[0])+':'+str(file["Active"]))
消息的订阅者很简单就是一个监听topic 的订阅者。首先开始订阅者, 由于还没有消息发布, 所以没有信息。当发布者启动之后, 就可以看到信息被逐行打印出来。
代码可以通过我的github 分叉:https://github.com/dtdetianyin/ApacheKafka/tree/master/Corona19%20Data%20processed%20with%20ApacheKafka%20and%20Python
_