kafka-python 获取topic lag值

说真,这个问题看上去很简单,但“得益”与kafka-python神奇的文档,真的不算简单,反正我是搜了半天还看了半天源码。直接上代码吧

from kafka import SimpleClient, KafkaConsumer
from kafka.common import OffsetRequestPayload, TopicPartition

def get_topic_offset(brokers, topic):
    """
    获取一个topic的offset值的和
    """
    client = SimpleClient(brokers)
    partitions = client.topic_partitions[topic]
    offset_requests = [OffsetRequestPayload(topic, p, -1, 1) for p in partitions.keys()]
    offsets_responses = client.send_offset_request(offset_requests)
    return sum([r.offsets[0] for r in offsets_responses])


def get_group_offset(brokers, group_id, topic):
    """
    获取一个topic特定group已经消费的offset值的和
    """
    consumer = KafkaConsumer(bootstrap_servers=brokers,
                             group_id=group_id,
                             )
    pts = [TopicPartition(topic=topic, partition=i) for i in
           consumer.partitions_for_topic(topic)]
    result = consumer._coordinator.fetch_committed_offsets(pts)
    return sum([r.offset for r in result.values()])


if __name__ == '__main__':
    topic_offset = get_topic_offset("brokers", "topic")
    group_offset = get_group_offset("brokers", "group_id", "topic")
    lag = topic_offset - group_offset
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 转自:https://weibo.com/ttarticle/p/show?id=2309404129469920...
    xpf2000阅读 10,187评论 0 48
  • Web UI测试自动化 splinter - web UI测试工具,基于selnium封装。 selenium -...
    Thea0216阅读 11,391评论 2 48
  • 知识经常被认为是储存在昏暗图书馆里堆满尘埃的书架上的死物。遗憾的是,图书馆里沉寂的气氛会使人想起教堂的葬礼或...
    邓洁儿阅读 1,118评论 0 1
  • 听了20多期吐槽课程,对幽默也有了一定了解,感觉自己在段子手的道路上似乎又前进了一步。 许多内容,听过了之后也不过...
    乌卓阅读 4,862评论 0 4
  • 沙子的自由 风无法囚禁住他 愚蠢的海浪也不能 和风一样愚蠢 自信的神 不了解他们自己 除非他们首先失去自己 然后沙...
    瓦尔登野人阅读 2,548评论 0 0