背景
写一个链上的竞猜小程序,由于交易的数据都在kafka里,并且使用了sasl进行认证。所以需要连接到kafka获取交易数据。解决两个问题: 1、连接kafka,2、sasl认证。直接上代码
代码示例
from kafka import KafkaConsumer
import time
import json
BOOTSTRAP_SERVERS='localhost:9102'
TOPIC='topic'
consumer = KafkaConsumer(TOPIC,
bootstrap_servers=BOOTSTRAP_SERVERS,
auto_offset_reset='earliest',
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username='username',
sasl_plain_password='password',
api_version=(0,10),
receive_buffer_bytes=1024,
enable_auto_commit='False')
# consumer_timeout_ms=1000)
# Consumption log
for msg in consumer:
print(msg)
填坑记录
- 参数必须要指定api_version,否则在连接时会提示一个check_version的异常,注意中间是逗号
- 默认是从最新的消息开始消费,如果想要获取全部消息,需要设置auto_offset_reset参数,值为earliest
- 参数security_protocol='SASL_PLAINTEXT'指定后, sasl_mechanism参数只能是PLAIN,否则连接后获取不到消息。
- consumer_timeout_ms 参数没设置,程序会一直阻塞。否则会再设置的时间到期后继续执行。