python3 基于kafka-python 2.0.2实现的常用命令工具
转载请注明出处
import argparse
import json
import logging
import re
from kafka import KafkaAdminClient, KafkaConsumer
from kafka.admin.new_topic import NewTopic
from kafka.errors import UnknownTopicOrPartitionError
from kafka.structs import TopicPartition
# pip install kafka-python
conf = {
'bootstrap_servers': 'kafka:9092',
'security_protocol': 'SASL_PLAINTEXT',
'sasl_mechanism': 'SCRAM-SHA-256',
'sasl_plain_username': 'admin',
'sasl_plain_password': 'adminsecret'
}
# logging.basicConfig(level=logging.ERROR, format='%(asctime)s %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# 创建 logger
logger = logging.getLogger(__name__)
# 设置日志级别为 INFO
logger.setLevel(logging.DEBUG)
# 创建控制台日志
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
# 创建文件日志
file_handler = logging.FileHandler('kafka-console.log')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
# 将处理器添加到 logger
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# 定义过滤函数
def filter_dict_by_regex(dictionary, regex):
# 如果没有提供有效的正则表达式,则返回原始的键列表
if regex is None:
return list(dictionary)
pattern = re.compile(regex)
matched_keys = [key for key in dictionary if pattern.match(key)]
return matched_keys
def list_topics():
print('>>>list topic')
# 创建 KafkaAdminClient 实例
admin_client = KafkaAdminClient(**conf)
topics = admin_client.list_topics()
for key in topics:
if '__consumer_offsets' != key:
print(key)
admin_client.close()
def delete_topic(topic):
print(f">>>deleting topic {topic}")
# 创建 KafkaAdminClient 实例
admin_client = KafkaAdminClient(**conf)
# 构建要删除的主题列表
topics = admin_client.list_topics()
filter_topic_list = filter_dict_by_regex(topics, topic)
deletion_result = admin_client.delete_topics(filter_topic_list)
# 等待主题删除完成
for topic, error_code in deletion_result.topic_error_codes:
if error_code == 0:
print(f"删除主题 '{topic}' 成功")
else:
print(f"删除主题 '{topic}' 失败, error_code='{error_code}'")
admin_client.close()
def create_topic(topic, partitions, replication):
print(f">>> create topic {topic} --partitions {partitions} --replication-factor {replication}")
# 创建 KafkaAdminClient 实例
admin_client = KafkaAdminClient(**conf)
topics = [NewTopic(topic, num_partitions=partitions, replication_factor=replication)]
admin_client.create_topics(topics)
print(f"创建主题 '{topic}' 成功")
admin_client.close()
def show_offset(topic, group_id):
print('>>> show offset', topic, group_id)
conf['group_id'] = group_id
consumer = KafkaConsumer(topic, **conf)
try:
partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
# total
toff = consumer.end_offsets(partitions)
toff = [(key.partition, toff[key]) for key in toff.keys()]
toff.sort()
print("total offset: {}".format(str(toff)))
# current
coff = [(x.partition, consumer.committed(x)) for x in partitions]
coff.sort()
print("current offset: {}".format(str(coff)))
# cal sum and left
toff_sum = sum([x[1] for x in toff])
cur_sum = sum([x[1] for x in coff if x[1] is not None])
left_sum = toff_sum - cur_sum
print("kafka left: {}".format(left_sum))
except Exception as e:
print("topic no found! error:", e)
finally:
consumer.close()
def consumer_topic(topic, offset):
print('>>> consumer topic', topic)
# 判断topic是否存在
admin_client = KafkaAdminClient(**conf)
try:
topics = admin_client.list_topics()
if topic not in topics:
print(f"Topic '{topic}' does not exist.")
return None
except UnknownTopicOrPartitionError:
print(f"Topic '{topic}' does not exist.")
finally:
admin_client.close()
# 消费
conf['group_id'] = 'kafka-console-test-consumer'
conf['auto_offset_reset'] = offset
consumer = KafkaConsumer(topic, **conf)
try:
for message in consumer:
logger.info(f"receive offset:({message.partition},{message.offset})")
logger.debug(f"value: {json.loads(message.value)}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Topic Management Script')
subparsers = parser.add_subparsers(dest='command')
# 创建子命令 delete
del_parser = subparsers.add_parser('delete', help='Delete something')
del_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
del_parser.set_defaults(func=delete_topic)
# 创建子命令 list
list_parser = subparsers.add_parser('list', help='List all topics')
list_parser.set_defaults(func=list_topics)
# 创建子命令 create
create_parser = subparsers.add_parser('create', help='Create Topic')
create_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
create_parser.add_argument('-p', dest='partitions', type=int, help='Number of partitions', required=True)
create_parser.add_argument('-r', dest='replication', type=int, help='Replication factor', required=True)
create_parser.set_defaults(func=create_topic)
# 查询offset
show_offset_parser = subparsers.add_parser('offset', help='Show Offset')
show_offset_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
show_offset_parser.add_argument('-g', dest='group_id', type=str, help='groupId', required=True)
show_offset_parser.set_defaults(func=show_offset)
# 实时消费
consumer_parser = subparsers.add_parser('consumer', help='Consumer Topic')
consumer_parser.add_argument('-t', dest='topic', type=str, help='Topic name', required=True)
# consumer_parser.add_argument('-o', dest='offset', type=str, help='offset (earliest or latest)', required=True)
consumer_parser.set_defaults(func=consumer_topic)
args = parser.parse_args()
if hasattr(args, 'func'):
if args.command == 'delete':
args.func(args.topic)
elif args.command == 'create':
args.func(args.topic, args.partitions, args.replication)
elif args.command == 'offset':
args.func(args.topic, args.group_id)
elif args.command == 'consumer':
args.func(args.topic, 'latest')
else:
args.func()
else:
parser.print_help()