kafka-python操作

kafka-python文档

一、consumer

1. 常用api

#建立连接
consumer = KafkaConsumer(bootstrap_servers=['ip1:port','ip2:port'],
                         api_version=(0,10),group_id='my_group')

# topic所有的partition
consumer.partitions_for_topic(topic)

# 构造topicPartition对象
tps = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]

# 为consumer分配分区
consumer.assign(tps)

# kafka每个分区的最新offset
consumer.end_offsets(tps)

# 当前groupid 每个分区消费到的位置
for i in range(len(tps)):
    consumer.position(tps[i])

# 消费数据
for message in consumer:
    partition = message.partition
    offset = message.offset
    value = message.value

# 重置offset
for i in range(len(tps)):
    consumer.seek(tps[i], partition_offset[i])    #partition_offset保存每
# partition_offset保存每个分区的起始消费位置
# 形如{0:123, 1:345 },表示0分区从123开始再次消费

二、producer


三、其他

3.1 json处理

额外的包:  
pip install msgpack
import msgpack
producer:
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
consumer:
KafkaConsumer(value_deserializer=msgpack.unpackb)
此时得到的value是dict类型
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,747评论 13 425
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,491评论 0 34
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,884评论 8 167
  • 七月过去,八月已开始一天,小记一下。 七月初签订的合约之二,(之一没有签订)计划学习规范,计划整理台帐,计划重整检...
    虎妞034阅读 178评论 0 0
  • 原来社会就是这样,不管哪个行业,哪个岗位,或多或少都有些那么不尽人意。虽说每天9小时不算多,但是每周要上一天全天...
    陈同学ccc阅读 202评论 0 0