kafka-python文档
一、consumer
1. 常用api
#建立连接
consumer = KafkaConsumer(bootstrap_servers=['ip1:port','ip2:port'],
api_version=(0,10),group_id='my_group')
# topic所有的partition
consumer.partitions_for_topic(topic)
# 构造topicPartition对象
tps = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
# 为consumer分配分区
consumer.assign(tps)
# kafka每个分区的最新offset
consumer.end_offsets(tps)
# 当前groupid 每个分区消费到的位置
for i in range(len(tps)):
consumer.position(tps[i])
# 消费数据
for message in consumer:
partition = message.partition
offset = message.offset
value = message.value
# 重置offset
for i in range(len(tps)):
consumer.seek(tps[i], partition_offset[i]) #partition_offset保存每
# partition_offset保存每个分区的起始消费位置
# 形如{0:123, 1:345 },表示0分区从123开始再次消费
二、producer
三、其他
3.1 json处理
额外的包:
pip install msgpack
import msgpack
producer:
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
consumer:
KafkaConsumer(value_deserializer=msgpack.unpackb)
此时得到的value是dict类型
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。