kafka 重置offset

#!/usr/bin/env python
# coding:utf-8

import sys
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata

if __name__ == '__main__':

    if (len(sys.argv) < 6):
        print("usage <kafkaHost> <kafkaPort> <groupid> <topic> <partition> <offset>")
        sys.exit(0)

    kafkaHost = sys.argv[1]
    kafkaPort = sys.argv[2]
    groupid = sys.argv[3]
    topic = sys.argv[4]
    partition = int(sys.argv[5])
    offset = int(sys.argv[6])

    # init kafka consumer
    consumer = KafkaConsumer(group_id=groupid,
                             bootstrap_servers='{kafka_host}:{kafka_port}'.format(
                                 kafka_host=kafkaHost, kafka_port=kafkaPort))

    # 分配topic and partition
    consumer.assign([TopicPartition(topic, partition)])

    offsets = {}
    meta = consumer.partitions_for_topic(topic)
    offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, meta)
    consumer.seek(TopicPartition(topic, partition), offset)

    consumer.commit(offsets)
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容