Kafka源码解析-kafka启动过程分析

闲来无事,大概翻翻kafka源码,就我个人看源码的习惯而言,

1、先到官网上看看功能介绍

2、大概扫扫架构图

3、看看系统启动了哪些重要组件,这样比较容易切入

4、最后就是在使用中带着问题去看源码


由于第一次写文章,各位看官也就手下留个情

源码中找到启动脚本kafka-server-start.sh可以看到调用的是kafka包下的Kafka类,准确说这是scala object类型

这个过程简单明了,就是object类型的fromProps方法产生KafkaServerStartable对象,

同时定义了一个RunTime hook钩子回调函数,它实现了KafkaServerStartable对象的shutdown方法

接下来就执行红框中方法

kafkaServerStartable.startup()

kafkaServerStartable.awaitShutdown()

显然是启动和等待关闭两个方法,接下来重点看下kafkaServerStartable.startup()都做了哪些事情

KafkaServerStartable本身是个代理,具体执行逻辑交给了KafkaServer去做,重点就是startup()、shutdown()

1、定义了两个AtomicBoolean类型的变量判断如果正在关闭服务及已经完成启动则返回

2、原子操作isStartingUp.compareAndSet判断如果正在启动,则不执行canStartup下方函数

在canStartup function中,先设置brokerState.newState(Starting),这里用状态机类维护BrokerStates状态,继承BrokerStates特质在scala中用模式匹配方式替代java的枚举效果,这里穿插下BrokerStates状态机有哪些状态,见下图:


好了接下来回到启动函数执行中,

3、initZkClient(time)进入代码看到主要是构建连接zk的client变量KafkaZkClient,再者就是在zk         上创建PersistentPath

4、getOrGenerateClusterId(zkClient)利用zkClient创建一个base64位加密的uuid作为_clusterId

5、generate brokerId

getBrokerIdAndOfflineDirs(),此方法的逻辑见下图:


解释下上图的代码

1、判断配置中如果不存在brokerId并且配置brokerId是自生成的,则依赖zk生成一个sequence序列作为brokerId

2、根据配置config.logDirs下面对应的brokerMetadataCheckpoints找到多个brokerIdSet,抛异常这就说明了多个broker共享一个logDirs,这肯定会出问题

3、根据配置config.logDirs下面对应的brokerMetadataCheckpoints找到对应的brokerId等于1但和配置中的brokerId不相等,也抛异常

4、除了以上3中情况外如果从meta.properties能找到唯一一个brokerId,则赋值给当前服务节点brokerId,另外这个offlineDirs主要是记录当前节点配

      置下的meta.properties无法读取的N个LogDirs,即不正常的目录有哪些,主要给后面的LogManager对象使用,后面再分析继续往下走

如上图所示,这个是启动kafka的动态参数服务,主要利用zk分布式一致性,将动态参数写到zk路径下对所有节点可见


然后根据配置的后台线程数创建一个KafkaScheduler对象并将之启动,KafkaScheduler对象到底起了什么作用呢?这个貌似还挺重要,咱们进去看下,首先定义一个特质Scheduler如下图所示,4个方法,启动、关闭、判断是否启动、schedule,到这儿基本就明白这个特质是用来干什么的了,调度用的,来看看特质实现类KafkaScheduler的内部实现,先上个图看看此类内部结构




分析下KafkaScheduler的内部实现,它有两个重要成员变量,一个是ScheduledThreadPoolExecutor类型的线程池executor,另外一个原子类型的AtomicInteger主要维护KafkaScheduler生命周期状态,

startup()方法用了并发控制synchronized判断KafkaScheduler没有启动的情况下,根据节点配置指定的线程数、线程池执行策略、以及线程池生成ThreadFactory(控制生成线程名称以及是否后台启动)

shutdown()方法控制着线程池的销毁 

scheduleOnce()调度一次执行即完成的task,实际实现由schedule()完成


实现过程:首先确保KafkaScheduler没有关闭;利用CoreUtils构建Runnable线程,线程执行体由入参func:()=>Uinit实现,这里也能看到scala函数一等公民带来的好处,编程更灵活了,下来就是JDK线程池实现类的具体实现了,这个大家都很清楚

resizeThreadPool()对外暴露的重置线程池个数

isStarted()同步方法判断线程池是否为空

ensureRunning()判断是否启动

好了以上应该对KafkaScheduler有了一个整体上的认识,今天先到这里了



接着上次的继续分析源码,这一节主要看kafka如何实现指标数据的收集,UML类图方法参数类型除了java基本类型能使用之外,其他类型还不知道怎么添加,要是有知道的给说下,图没法画了,纯汉字描述吧


指标发布接口定义了四种方法,初始化、指标参数更改时、指标移除时、关闭


JmxReporter类实现指标发布接口,LOCK并发安全控制,Map结构mbeas存储JMX MBean name->KafkaMbean的映射关系,


KafkaMbean实现javax.management的DynamicMBean,objectName表示Mbean的名称,metrics存储KafkaMetric名称和实体KafkaMetric对应关系

KafkaMetric实现Metric接口,


KafkaMetric内部维护指标描述类MetricName、锁lock、时间工具Time、MetricConfig指标配置以及MetricValueProvider指标value提供接口实例



从上面的record方法已经清楚的表达了只要发生了记录值,第一会将当前最新时间赋值给lastRecordTime,同时将值传递给List类型的stats,完成avg、min、max等function函数的值更新


add方法是将MetricName或CompoundStat类型最终转化为KafkaMetric对象,register到Metrics和Mapmetrics;//指标名称->KafkaMetric类映射管理


hasExpired用来做超时判断用的

我们来看此环节最后一个指标流转管理类Metrics


Metrics实例变量如上,再来看看此类构造方法


如果设置enableExpiration=true,将会构造一个个数为1的线程池,同时添加一个kafka-metrics-count的监控指标,那ExpireSensorTask中做了什么工作?


简单来说就是迭代所有传感器sensors,判断如果长时间没有record数据并且超时了,则移除当前sensor,再来看下removeSensor


先定义一个childSensors赋值为空搁在上面,

1、如果sensor.remove(name,sesor)移除返回true,说明原map中有维护,前面也说过,sensor中挂了很多Kafka指标对象,则一一解除

2、如果子节点childrenSensors包含sensor也做移除操作,这里看的不是很明白,传感器按照设计布局,将会是个多叉树结构,自身没必要

     再关联自身

3、遍历当前sersor的所有父节点,做父子关系解除

4、最后判断如果当前sensor有子节点childSensors,递归编译子节点做removeSensor操作,说直白点就是父节点都删除了,下面挂的所有子节点做        删除操作


到此为止,我们已经明白了上述4行代码表达的含义,我们继续回到kafkaServer类中继续分析



最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350