kafka常用命令脚本

python3 基于kafka-python 2.0.2实现的常用命令工具

转载请注明出处

import argparse
import json
import logging
import re

from kafka import KafkaAdminClient, KafkaConsumer
from kafka.admin.new_topic import NewTopic
from kafka.errors import UnknownTopicOrPartitionError
from kafka.structs import TopicPartition

# pip install kafka-python
conf = {
    'bootstrap_servers': 'kafka:9092',
    'security_protocol': 'SASL_PLAINTEXT',
    'sasl_mechanism': 'SCRAM-SHA-256',
    'sasl_plain_username': 'admin',
    'sasl_plain_password': 'adminsecret'
}

# logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# 创建 logger
logger = logging.getLogger(__name__)
# 设置日志级别为 INFO
logger.setLevel(logging.DEBUG)
# 创建控制台日志
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
# 创建文件日志
file_handler = logging.FileHandler('kafka-console.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
# 将处理器添加到 logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)


# 定义过滤函数
def filter_dict_by_regex(dictionary, regex):
    # 如果没有提供有效的正则表达式,则返回原始的键列表
    if regex is None:
        return list(dictionary)

    pattern = re.compile(regex)
    matched_keys = [key for key in dictionary if pattern.match(key)]
    return matched_keys


def list_topics():
    print('>>>list topic')
    # 创建 KafkaAdminClient 实例
    admin_client = KafkaAdminClient(**conf)
    topics = admin_client.list_topics()
    for key in topics:
        if '__consumer_offsets' != key:
            print(key)
    admin_client.close()


def delete_topic(topic):
    print(f">>>deleting topic {topic}")
    # 创建 KafkaAdminClient 实例
    admin_client = KafkaAdminClient(**conf)
    # 构建要删除的主题列表
    topics = admin_client.list_topics()
    filter_topic_list = filter_dict_by_regex(topics, topic)
    deletion_result = admin_client.delete_topics(filter_topic_list)
    # 等待主题删除完成
    for topic, error_code in deletion_result.topic_error_codes:
        if error_code == 0:
            print(f"删除主题 '{topic}' 成功")
        else:
            print(f"删除主题 '{topic}' 失败, error_code='{error_code}'")
    admin_client.close()


def create_topic(topic, partitions, replication):
    print(f">>> create topic {topic} --partitions {partitions} --replication-factor {replication}")
    # 创建 KafkaAdminClient 实例
    admin_client = KafkaAdminClient(**conf)
    topics = [NewTopic(topic, num_partitions=partitions, replication_factor=replication)]
    admin_client.create_topics(topics)
    print(f"创建主题 '{topic}' 成功")
    admin_client.close()


def show_offset(topic, group_id):
    print('>>> show offset', topic, group_id)
    conf['group_id'] = group_id
    consumer = KafkaConsumer(topic, **conf)
    try:
        partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
        # total
        toff = consumer.end_offsets(partitions)
        toff = [(key.partition, toff[key]) for key in toff.keys()]
        toff.sort()
        print("total offset: {}".format(str(toff)))
        # current
        coff = [(x.partition, consumer.committed(x)) for x in partitions]
        coff.sort()
        print("current offset: {}".format(str(coff)))
        # cal sum and left
        toff_sum = sum([x[1] for x in toff])
        cur_sum = sum([x[1] for x in coff if x[1] is not None])
        left_sum = toff_sum - cur_sum
        print("kafka left: {}".format(left_sum))
    except Exception as e:
        print("topic no found! error:", e)
    finally:
        consumer.close()


def consumer_topic(topic, offset):
    print('>>> consumer topic', topic)
    # 判断topic是否存在
    admin_client = KafkaAdminClient(**conf)
    try:
        topics = admin_client.list_topics()
        if topic not in topics:
            print(f"Topic '{topic}' does not exist.")
            return None
    except UnknownTopicOrPartitionError:
        print(f"Topic '{topic}' does not exist.")
    finally:
        admin_client.close()
    # 消费
    conf['group_id'] = 'kafka-console-test-consumer'
    conf['auto_offset_reset'] = offset
    consumer = KafkaConsumer(topic, **conf)
    try:
        for message in consumer:
            logger.info(f"receive offset:({message.partition},{message.offset})")
            logger.debug(f"value: {json.loads(message.value)}")
    except KeyboardInterrupt:
        pass
    finally:
        consumer.close()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Topic Management Script')
    subparsers = parser.add_subparsers(dest='command')

    # 创建子命令 delete
    del_parser = subparsers.add_parser('delete', help='Delete something')
    del_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
    del_parser.set_defaults(func=delete_topic)

    # 创建子命令 list
    list_parser = subparsers.add_parser('list', help='List all topics')
    list_parser.set_defaults(func=list_topics)

    # 创建子命令 create
    create_parser = subparsers.add_parser('create', help='Create Topic')
    create_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
    create_parser.add_argument('-p', dest='partitions', type=int, help='Number of partitions', required=True)
    create_parser.add_argument('-r', dest='replication', type=int, help='Replication factor', required=True)
    create_parser.set_defaults(func=create_topic)

    # 查询offset
    show_offset_parser = subparsers.add_parser('offset', help='Show Offset')
    show_offset_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
    show_offset_parser.add_argument('-g', dest='group_id', type=str, help='groupId', required=True)
    show_offset_parser.set_defaults(func=show_offset)

    # 实时消费
    consumer_parser = subparsers.add_parser('consumer', help='Consumer Topic')
    consumer_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
    # consumer_parser.add_argument('-o', dest='offset', type=str, help='offset (earliest or latest)', required=True)
    consumer_parser.set_defaults(func=consumer_topic)

    args = parser.parse_args()
    if hasattr(args, 'func'):
        if args.command == 'delete':
            args.func(args.topic)
        elif args.command == 'create':
            args.func(args.topic, args.partitions, args.replication)
        elif args.command == 'offset':
            args.func(args.topic, args.group_id)
        elif args.command == 'consumer':
            args.func(args.topic, 'latest')
        else:
            args.func()
    else:
        parser.print_help()

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容