著名的KafkaManager工具应该都听说过,可是在实际的使用过程中发现一些问题,最重要的是当topic过多以后数据的刷新很慢,而且无法查看到当前topic的消费者组。
于是决定自己做一个,做一个工具最好的选中无非是python了,速度快嘛,同时需要一个节目,这里选择了PyQt5,当然也可以选择TK和wxpython,
OK,先上最终效果图
首先需要安装模块,python的强大也是各种强大的模块
这里我们需要用到的模块是pykafka、PyQt5、pyqt5-tools、pyinstaller
安装方式用pip
pip install pykafka
pip install PyQt5
pip install pyqt5-tools
pip install pyinstaller
1、先画好界面
2、实现一个KafkaManager获取数据,这个类我们用一个定时器,参考timer的实现
class KafkaManger(Thread):
def __init__(self,host,timeInterval):
Thread.__init__(self)
self._host = host
self._interval = timeInterval
self.finished = Event()
def cancel(self):
self.finished.set()
def run(self):
while not self.finished.is_set():
self._update()
time.sleep(self._interval)
接着,我们需要用到的是pykafka里面的Cluster
from pykafka import Cluster,handlers
在init里面添加链接kafka集群
self._handler = handlers.ThreadingHandler()
self._cluster = Cluster(hosts=self._host,handler=self._handler)
实现update函数来更新Cluster的数据
def _update(self):
self._cluster.update()
生产者相关信息的实现
获取所有的topic
def gettopic(self):
return self._cluster.topics.keys()
获取topic的offset情况
def getoffsets_topic(self,topic):
with self._lock_topic:
if topic in self._cluster.topics:
descrip = self._cluster.topics[topic]
earlistoffset = descrip.earliest_available_offsets()
latestoffset = descrip.latest_available_offsets()
topicdescrip = {}
for id, partition in descrip.partitions.items():
leader = partition.leader.id
Replicas = []
isr = []
for i in partition.replicas:
Replicas.append(i.id)
for i in partition.isr:
isr.append(i.id)
topicdescrip[id] = TopicDescrip(id, leader, Replicas, isr,
earlistoffset[id].offset[0],
latestoffset[id].offset[0])
return topicdescrip
else:
topicdescrip = {}
for i in range(8):
topicdescrip[i] = TopicDescrip(i,
0,
0,
0,
0,
0,
)
return topicdescrip
这样获取topic相关的信息就完成了,可以获取到所有的topic和某个topic的详细信息
获取consumer的信息
获取所有的consumer
def getconsumers(self):
return self._cluster.get_managed_group_descriptions().keys()
获取consumer的消费信息
def getoffsets(self,group_id):
with self._lock_consumer:
group_descrips = self._cluster.get_managed_group_descriptions()
if group_id in group_descrips:
if group_id == b'KafkaManagerOffsetCache':
return None
descrips = group_descrips[group_id]
if descrips[5] != {}:
descrip4topic = {}
for member_id, groupMember in descrips[5].items():
partitions = groupMember[4].partition_assignment[0][1]
topics = groupMember[3].topic_names
for topic in topics:
reqs = [PartitionOffsetFetchRequest(topic, i) for i in
self._cluster.topics[topic].partitions]
offset = self._cluster.get_group_coordinator(group_id).fetch_consumer_group_offsets(
group_id, reqs)
descrip = {}
for partition in partitions:
descrip[partition] = ConsumerDescrip(partition,
offset.topics[topic][partition].offset,
group_id,
member_id,
groupMember[1],
groupMember[2],
groupMember[3].topic_names,
)
descrip4topic[topic] = descrip
return descrip4topic
else:
descrip = {}
descrip4topic = {}
for i in range(8):
descrip[i] = ConsumerDescrip(i,
0,
b'-',
b'-',
b'-',
b'-',
[b'-'],
)
descrip4topic[b''] = descrip
return descrip4topic
return None
接下来我们还要更新topic与consumer的对应关系
def updatetopic2group(self):
group_descrips = self._cluster.get_managed_group_descriptions()
for group_id,descrips in group_descrips.items():
if group_id == b'KafkaManagerOffsetCache' or descrips[5] == {}:
continue
for member_id,groupMember in descrips[5].items():
topics = groupMember[3].topic_names
for topic in topics:
self._topic2group[topic].add(group_id)
获取对应关系接口
def gettopic2group(self, topic):
if topic in self._topic2group:
return self._topic2group[topic]
return {}
删除topic的接口,之前记得可以得到当前topic在哪个broker的,后来没找到,干脆直接遍历所有的broker来删除
def deletetopic(self,topic):
for id,broker in self._cluster.brokers.items():
try:
broker.delete_topics([topic,])
except Exception as e:
pass
接下来是QT的部分
之前画好的界面用pyuic转化为python代码
然后定义KafkaTool
class KafkaTool(QtWidgets.QWidget,Ui_KafkaTool):
def __init__(self,parent = None):
super(KafkaTool,self).__init__(parent=parent)
self.setupUi(self)
self._started = False
self._initUi()
self._createconnections()
初始化界面,建立对应的信号槽
def _initUi(self):
self.unconnect.setEnabled(False)
self.topic.setSortingEnabled(True)
self.consumer.setSortingEnabled(True)
self.stackedWidget.setEnabled(False)
self.tabWidget.clear()
self.topic_menu = QtWidgets.QMenu(self.topic)
self.topic_menu.addAction(self.action_delete)
self.topic_menu.addAction(self.action_fresh)
self.topic.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)
def _createconnections(self):
self.connect.pressed.connect(self._connect)
self.unconnect.pressed.connect(self._unconnect)
self.fresh.pressed.connect(self._fresh_topic)
self.topic.doubleClicked.connect(self.showtopic)
self.freshconsumer.pressed.connect(self._fresh_consumer)
self.consumer.doubleClicked.connect(self.showconsumer)
self.lineEdit_search_topic.textChanged.connect(self.search_topic)
self.lineEdit_search_consume.textChanged.connect(self.search_consume)
self.topic.customContextMenuRequested.connect(self.show_topic_menu)
self.action_delete.triggered.connect(self.delete_topic)
self.action_fresh.triggered.connect(self._fresh_topic)
链接与断开集群
def _connect(self):
self._host = self.host.text()
try:
self._cluster = ClusterManager(host=self._host,timeInterval=5)
self._cluster.start()
self._started = True
self.stackedWidget.setEnabled(True)
self.connect.setEnabled(False)
self.unconnect.setEnabled(True)
except Exception as e:
print(e)
def _unconnect(self):
self._cluster.cancel()
self._cluster.join(10)
self._started = False
self.stackedWidget.setEnabled(False)
self.unconnect.setEnabled(False)
self.connect.setEnabled(True)
展示topic与consumer
def _fresh_topic(self):
try:
self.topic.clear()
topics = self._cluster.gettopic()
for topic in topics:
self.topic.addItem(topic.decode())
except Exception as e:
print(e)
def _fresh_consumer(self):
consumers = self._cluster.getconsumers()
self.updategrouplist(consumers)
def updategrouplist(self,group_ids):
self.consumer.clear()
for group_id in group_ids:
self.consumer.addItem(group_id.decode('utf-8'))
展示topic的详细信息
def showtopic(self,index):
try:
topic = index.data()
group_ids = self._cluster.gettopic2group(topic.encode('utf-8'))
self.updategrouplist(group_ids)
offsets = self._cluster.getoffsets_topic(topic.encode('utf-8'))
total_offset = 0
self.topic_decrips.clear()
self.topic_decrips.setHorizontalHeaderLabels(['分区','Leader','Replicas','ISR','最早偏移量','最晚偏移量','总偏移量'])
for id,descrip in offsets.items():
self.topic_decrips.setItem(id, 0, QTableWidgetItem(str(id)))
self.topic_decrips.setItem(id, 1, QTableWidgetItem(str(descrip.Leader)))
self.topic_decrips.setItem(id, 2, QTableWidgetItem(str(descrip.Replicas)))
self.topic_decrips.setItem(id, 3, QTableWidgetItem(str(descrip.ISR)))
self.topic_decrips.setItem(id, 4, QTableWidgetItem(str(descrip.earlist)))
self.topic_decrips.setItem(id, 5, QTableWidgetItem(str(descrip.latist)))
total_offset += descrip.latist
#self.topic_decrips.setItem(id, 6, QTableWidgetItem(str(id)))
self.topic_decrips.setItem(0,6,QTableWidgetItem(str(total_offset)))
except Exception as e:
print(e)
展示consumer的详细信息
def showconsumer(self,index):
try:
consumer = index.data()
offsets4topic = self._cluster.getoffsets(consumer.encode('utf-8'))
self.tabWidget.clear()
if offsets4topic is not None:
for topic,offsets in offsets4topic.items():
topicoffsets = self._cluster.getoffsets_topic(topic)
self.tabWidget.addTab(Table_Consumer(topicoffsets,offsets,self.tabWidget),topic.decode('utf-8','replace'))
except Exception as e:
print(e)
class Table_Consumer(Ui_Descrip,QtWidgets.QTableWidget):
def __init__(self,topicoffsets,descrips:dict,parent = None):
super(Table_Consumer,self).__init__(parent=parent)
self.setupUi(self)
self.setdata(topicoffsets,descrips)
def setdata(self,topicoffsets,descrips:dict):
for id, descrip in descrips.items():
self.consumer_descrips.setItem(id, 0, QTableWidgetItem(str(id)))
self.consumer_descrips.setItem(id, 1, QTableWidgetItem(str(topicoffsets[id].latist)))
self.consumer_descrips.setItem(id, 2, QTableWidgetItem(str(descrip.LogSize)))
self.consumer_descrips.setItem(id, 3, QTableWidgetItem(str(topicoffsets[id].latist - descrip.LogSize)))
self.consumer_descrips.setItem(id, 4, QTableWidgetItem(str(descrip.group_id.decode('utf-8'))))
self.consumer_descrips.setItem(id, 5, QTableWidgetItem(str(descrip.member_id.decode('utf-8'))))
self.consumer_descrips.setItem(id, 6, QTableWidgetItem(str(descrip.client_id.decode('utf-8'))))
self.consumer_descrips.setItem(id, 7, QTableWidgetItem(str(descrip.client_host.decode('utf-8'))))
实现topic与consumer的搜索框
def search_topic(self,a0):
num = self.topic.count()
for i in range(num):
self.topic.item(i).setHidden(True)
items = self.topic.findItems(a0,QtCore.Qt.MatchContains)
for item in items:
item.setHidden(False)
def search_consume(self,a0):
num = self.consumer.count()
for i in range(num):
self.consumer.item(i).setHidden(True)
items = self.consumer.findItems(a0,QtCore.Qt.MatchContains)
for item in items:
item.setHidden(False)
ok,大功告成,代码简陋,轻看
计划后续实现读取指定offset的数据,最后再搞个网页版的,待更新...