利用python做一个kafka的查看工具

著名的KafkaManager工具应该都听说过,可是在实际的使用过程中发现一些问题,最重要的是当topic过多以后数据的刷新很慢,而且无法查看到当前topic的消费者组。
于是决定自己做一个,做一个工具最好的选中无非是python了,速度快嘛,同时需要一个节目,这里选择了PyQt5,当然也可以选择TK和wxpython,

OK,先上最终效果图


image.png

首先需要安装模块,python的强大也是各种强大的模块
这里我们需要用到的模块是pykafka、PyQt5、pyqt5-tools、pyinstaller
安装方式用pip
pip install pykafka
pip install PyQt5
pip install pyqt5-tools
pip install pyinstaller


1、先画好界面


image.png

2、实现一个KafkaManager获取数据,这个类我们用一个定时器,参考timer的实现

class KafkaManger(Thread):
    def __init__(self,host,timeInterval):
        Thread.__init__(self)
        self._host = host
        self._interval = timeInterval
        self.finished = Event()
    def cancel(self):
        self.finished.set()
    def run(self):
        while not self.finished.is_set():
            self._update()
            time.sleep(self._interval)

接着,我们需要用到的是pykafka里面的Cluster
from pykafka import Cluster,handlers
在init里面添加链接kafka集群

self._handler = handlers.ThreadingHandler()
self._cluster = Cluster(hosts=self._host,handler=self._handler)

实现update函数来更新Cluster的数据

    def _update(self):
        self._cluster.update()

生产者相关信息的实现
获取所有的topic

    def gettopic(self):
        return self._cluster.topics.keys()

获取topic的offset情况

    def getoffsets_topic(self,topic):
        with self._lock_topic:
            if topic in self._cluster.topics:
                descrip = self._cluster.topics[topic]
                earlistoffset = descrip.earliest_available_offsets()
                latestoffset = descrip.latest_available_offsets()
                topicdescrip = {}
                for id, partition in descrip.partitions.items():
                    leader = partition.leader.id
                    Replicas = []
                    isr = []
                    for i in partition.replicas:
                        Replicas.append(i.id)
                    for i in partition.isr:
                        isr.append(i.id)
                    topicdescrip[id] = TopicDescrip(id, leader, Replicas, isr,
                                                    earlistoffset[id].offset[0],
                                                    latestoffset[id].offset[0])
                return topicdescrip
            else:
                topicdescrip = {}
                for i in range(8):
                    topicdescrip[i] = TopicDescrip(i,
                                                   0,
                                                   0,
                                                   0,
                                                   0,
                                                   0,
                                                   )
                return topicdescrip

这样获取topic相关的信息就完成了,可以获取到所有的topic和某个topic的详细信息


获取consumer的信息
获取所有的consumer

    def getconsumers(self):
        return self._cluster.get_managed_group_descriptions().keys()

获取consumer的消费信息

    def getoffsets(self,group_id):
        with self._lock_consumer:
            group_descrips = self._cluster.get_managed_group_descriptions()
            if group_id in group_descrips:
                if group_id == b'KafkaManagerOffsetCache':
                    return None
                descrips = group_descrips[group_id]
                if descrips[5] != {}:
                    descrip4topic = {}
                    for member_id, groupMember in descrips[5].items():
                        partitions = groupMember[4].partition_assignment[0][1]
                        topics = groupMember[3].topic_names
                        for topic in topics:
                            reqs = [PartitionOffsetFetchRequest(topic, i) for i in
                                    self._cluster.topics[topic].partitions]
                            offset = self._cluster.get_group_coordinator(group_id).fetch_consumer_group_offsets(
                                group_id, reqs)
                            descrip = {}
                            for partition in partitions:
                                descrip[partition] = ConsumerDescrip(partition,
                                                                      offset.topics[topic][partition].offset,
                                                                      group_id,
                                                                      member_id,
                                                                      groupMember[1],
                                                                      groupMember[2],
                                                                      groupMember[3].topic_names,
                                                                        )
                            descrip4topic[topic] = descrip
                    return descrip4topic
                else:
                    descrip = {}
                    descrip4topic = {}
                    for i in range(8):
                        descrip[i] = ConsumerDescrip(i,
                                                 0,
                                                 b'-',
                                                 b'-',
                                                 b'-',
                                                 b'-',
                                                 [b'-'],
                                                 )
                    descrip4topic[b''] = descrip
                    return descrip4topic
            return None

接下来我们还要更新topic与consumer的对应关系

    def updatetopic2group(self):
        group_descrips = self._cluster.get_managed_group_descriptions()
        for group_id,descrips in group_descrips.items():
            if group_id == b'KafkaManagerOffsetCache' or descrips[5] == {}:
                continue
            for member_id,groupMember in descrips[5].items():
                topics = groupMember[3].topic_names
                for topic in topics:
                    self._topic2group[topic].add(group_id)

获取对应关系接口

    def gettopic2group(self, topic):
        if topic in self._topic2group:
            return self._topic2group[topic]
        return {}

删除topic的接口,之前记得可以得到当前topic在哪个broker的,后来没找到,干脆直接遍历所有的broker来删除

    def deletetopic(self,topic):
        for id,broker in self._cluster.brokers.items():
            try:
                broker.delete_topics([topic,])
            except Exception as e:
                pass

接下来是QT的部分
之前画好的界面用pyuic转化为python代码
然后定义KafkaTool

class KafkaTool(QtWidgets.QWidget,Ui_KafkaTool):
    def __init__(self,parent = None):
        super(KafkaTool,self).__init__(parent=parent)
        self.setupUi(self)
        self._started = False
        self._initUi()
        self._createconnections()

初始化界面,建立对应的信号槽

    def _initUi(self):
        self.unconnect.setEnabled(False)
        self.topic.setSortingEnabled(True)
        self.consumer.setSortingEnabled(True)
        self.stackedWidget.setEnabled(False)
        self.tabWidget.clear()
        self.topic_menu = QtWidgets.QMenu(self.topic)
        self.topic_menu.addAction(self.action_delete)
        self.topic_menu.addAction(self.action_fresh)
        self.topic.setContextMenuPolicy(QtCore.Qt.CustomContextMenu)

    def _createconnections(self):
        self.connect.pressed.connect(self._connect)
        self.unconnect.pressed.connect(self._unconnect)
        self.fresh.pressed.connect(self._fresh_topic)
        self.topic.doubleClicked.connect(self.showtopic)
        self.freshconsumer.pressed.connect(self._fresh_consumer)
        self.consumer.doubleClicked.connect(self.showconsumer)
        self.lineEdit_search_topic.textChanged.connect(self.search_topic)
        self.lineEdit_search_consume.textChanged.connect(self.search_consume)
        self.topic.customContextMenuRequested.connect(self.show_topic_menu)
        self.action_delete.triggered.connect(self.delete_topic)
        self.action_fresh.triggered.connect(self._fresh_topic)

链接与断开集群

    def _connect(self):
        self._host = self.host.text()
        try:
            self._cluster = ClusterManager(host=self._host,timeInterval=5)
            self._cluster.start()
            self._started = True
            self.stackedWidget.setEnabled(True)
            self.connect.setEnabled(False)
            self.unconnect.setEnabled(True)
        except Exception as e:
            print(e)

    def _unconnect(self):
        self._cluster.cancel()
        self._cluster.join(10)
        self._started = False
        self.stackedWidget.setEnabled(False)
        self.unconnect.setEnabled(False)
        self.connect.setEnabled(True)

展示topic与consumer

    def _fresh_topic(self):
        try:
            self.topic.clear()
            topics = self._cluster.gettopic()
            for topic in topics:
                self.topic.addItem(topic.decode())
        except Exception as e:
            print(e)
    def _fresh_consumer(self):
            consumers = self._cluster.getconsumers()
            self.updategrouplist(consumers)
    def updategrouplist(self,group_ids):
        self.consumer.clear()
        for group_id in group_ids:
            self.consumer.addItem(group_id.decode('utf-8'))

展示topic的详细信息

    def showtopic(self,index):
        try:
            topic = index.data()
            group_ids = self._cluster.gettopic2group(topic.encode('utf-8'))
            self.updategrouplist(group_ids)
            offsets = self._cluster.getoffsets_topic(topic.encode('utf-8'))
            total_offset = 0
            self.topic_decrips.clear()
            self.topic_decrips.setHorizontalHeaderLabels(['分区','Leader','Replicas','ISR','最早偏移量','最晚偏移量','总偏移量'])
            for id,descrip in offsets.items():
                self.topic_decrips.setItem(id, 0, QTableWidgetItem(str(id)))
                self.topic_decrips.setItem(id, 1, QTableWidgetItem(str(descrip.Leader)))
                self.topic_decrips.setItem(id, 2, QTableWidgetItem(str(descrip.Replicas)))
                self.topic_decrips.setItem(id, 3, QTableWidgetItem(str(descrip.ISR)))
                self.topic_decrips.setItem(id, 4, QTableWidgetItem(str(descrip.earlist)))
                self.topic_decrips.setItem(id, 5, QTableWidgetItem(str(descrip.latist)))
                total_offset += descrip.latist
                #self.topic_decrips.setItem(id, 6, QTableWidgetItem(str(id)))
            self.topic_decrips.setItem(0,6,QTableWidgetItem(str(total_offset)))
        except Exception as e:
            print(e)

展示consumer的详细信息

    def showconsumer(self,index):
        try:
            consumer = index.data()
            offsets4topic = self._cluster.getoffsets(consumer.encode('utf-8'))
            self.tabWidget.clear()
            if offsets4topic is not None:
                for topic,offsets in offsets4topic.items():
                    topicoffsets = self._cluster.getoffsets_topic(topic)
                    self.tabWidget.addTab(Table_Consumer(topicoffsets,offsets,self.tabWidget),topic.decode('utf-8','replace'))
        except Exception as e:
            print(e)
class Table_Consumer(Ui_Descrip,QtWidgets.QTableWidget):
    def __init__(self,topicoffsets,descrips:dict,parent = None):
        super(Table_Consumer,self).__init__(parent=parent)
        self.setupUi(self)
        self.setdata(topicoffsets,descrips)

    def setdata(self,topicoffsets,descrips:dict):
        for id, descrip in descrips.items():
            self.consumer_descrips.setItem(id, 0, QTableWidgetItem(str(id)))
            self.consumer_descrips.setItem(id, 1, QTableWidgetItem(str(topicoffsets[id].latist)))
            self.consumer_descrips.setItem(id, 2, QTableWidgetItem(str(descrip.LogSize)))
            self.consumer_descrips.setItem(id, 3, QTableWidgetItem(str(topicoffsets[id].latist - descrip.LogSize)))
            self.consumer_descrips.setItem(id, 4, QTableWidgetItem(str(descrip.group_id.decode('utf-8'))))
            self.consumer_descrips.setItem(id, 5, QTableWidgetItem(str(descrip.member_id.decode('utf-8'))))
            self.consumer_descrips.setItem(id, 6, QTableWidgetItem(str(descrip.client_id.decode('utf-8'))))
            self.consumer_descrips.setItem(id, 7, QTableWidgetItem(str(descrip.client_host.decode('utf-8'))))

实现topic与consumer的搜索框

    def search_topic(self,a0):
        num = self.topic.count()
        for i in range(num):
            self.topic.item(i).setHidden(True)
        items = self.topic.findItems(a0,QtCore.Qt.MatchContains)
        for item in items:
            item.setHidden(False)

    def search_consume(self,a0):
        num = self.consumer.count()
        for i in range(num):
            self.consumer.item(i).setHidden(True)
        items = self.consumer.findItems(a0,QtCore.Qt.MatchContains)
        for item in items:
            item.setHidden(False)

ok,大功告成,代码简陋,轻看
计划后续实现读取指定offset的数据,最后再搞个网页版的,待更新...

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,721评论 13 425
  • 一、为什么需要消息系统 1.解耦: 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 2.冗...
    java成功之路阅读 1,479评论 0 3
  • 分布式系统中,系统由多个子系统组成,数据需要在子系统中高性能、低延迟的流转。Kafka是"发布-订阅"消息系统,是...
    Goooooooooooal阅读 1,563评论 0 0
  • 微博上看到雪庵老师的新作品,一下子很受触动。画面中,冰冷的笼子外,一只小鸟独自伫立,瞪圆了眼睛瞅着笼门,脑子...
    shine8181阅读 446评论 0 0
  • 在图书馆的人都好可爱, 晚上,图书馆闭馆拥挤的电梯里,貌似是两个熟人相遇 -哟,上自习呀~你考研? -不考 -那你...
    HH牛奶想做的很多阅读 197评论 0 0