# -*- coding: utf-8 -*-
"""
Create by Mr.Hao on 2019/12/6.
"""
#pip install kafka-python
import hashlib
import os
import time
import pymysql
import json
import requests
from kafka import KafkaConsumer
from kafka import TopicPartition
consumer = KafkaConsumer(
bootstrap_servers = "127.0.0.1:9092", # kafka集群地址
group_id = "newConsumerTest1", # 消费组id
client_id = '8eaa8c81edfd41f28a50f9121ad14572',
auto_offset_reset="latest",
max_poll_records=10, # 每次最大消费数量
enable_auto_commit = True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
auto_commit_interval_ms = 5000, # 自动提交的周期(毫秒)
)
partition = TopicPartition('auto_datacenter_spider_snapshot', 4)
res = consumer.poll(10)
start = 20905270
end = 20905280
consumer.assign([partition])
consumer.seek(partition, offset=start)
#consumer.seek_to_end() 默认读取最新数据
for msg in consumer: # 迭代器,等待下一条消息
offset, value = msg.offset, msg.value
if msg.offset > end:
break
jdate = json.loads(value)
print offset,"====>>>>",jdate.get("crawler_time"), jdate.get("taskId")," url_md5:", jdate.get("url_md5")
Python 指定offset 读取消息(Kafka-Python)
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...