kafka之网络模型总结

弄清楚kafka的网络模型原理,能很好的帮助理解和优化kafka服务。kafka底层的网络通信,没有使用第三方rpc实现,如netty等,而是使用了java的NIO实现的一套自己的通信框架协议。本文主要描述kafka基础网络通信的相关实现原理,版本为1.1.0。

java NIO具体细节不再描述,主要包含3个部分:

  • Channel:连接,如FileChannel、SocketChannel等,表示连接通道,阻塞和非阻塞(FileChannel不支持)模式
  • Buffer:缓存,可在读写两种模式中切换
  • Selector:选择器,可实现一个线程处理多个连接

kafka实现网络模型主要涉及到3个类:

  • SocketServer:实现监听listeners,并构建AcceptorProcessorConnectionQuotas等类,用于接收、处理、解析request和response。此处重要监控指标:NetworkProcessorAvgIdlePercentRequestQueueSizeResponseQueueSize
  • KafkaApis:负责处理broker支持的各种通信协议,如PRODUCE/FETCH/LIST_OFFSETS/LEADER_AND_ISR/HEARTBEAT
  • KafkaRequestHandlerPool:负责接收消息,处理SocketServer接收的请求,并构建response返回给SocketServer。此处重要监控指标:RequestHandlerAvgIdlePercent

通信概况

kafka网络模型_精简

kafka接收消息、处理并返回,主要有以下步骤:

  1. Acceptor:监听OP_ACCEPT,将连接的channel以round-robin的方式选择processor进行处理

  2. Processor:监听连接的OP_READOP_WRITE,主要负责读写和解析数据

    • 读取客户端消息解析后放入requestQueue,解析头部信息
    • responseQueue中的消息发送回客户端
  3. KafkaRequestHandler:从requestQueue中获取连接,根据头部信息获取对应的KafkaApi协议进行相关处理,并通过回调,将处理后的结果通过RequestChannel写到对应ProcessorresponseQueue中,等待Processor线程处理

通信处理详细步骤

kafka网络模型

如图所示,其中,蓝色线表示请求处理及流向,绿色组件表示消息缓存。

  1. SocketServer中,主要功能有:

    • 通过配置的listeners可以监听多个interface/port,构建对应的Acceptor
    • 每个Acceptor构建num.network.threadsProcessor,用于处理连接请求
    • 构建RequestChannel,保存解析好的连接请求
    # 主要逻辑
    private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
        ...
        for (_ <- 0 until newProcessorsPerListener) {
          val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
          listenerProcessors += processor
          requestChannel.addProcessor(processor)
          nextProcessorId += 1
        }
        listenerProcessors.foreach(p => processors.put(p.id, p))
        acceptor.addProcessors(listenerProcessors)
      }
    
  2. Acceptor为线程,主要功能有:

    • 监听OP_ACCEPT事件,不断循环获取已经accept的连接
    • 判断每个ip的连接数是否超过quota限制
    • 通过round-robin的方式,选择Processor,放入Processor对应的newConnections缓存中
    • 代码逻辑主要看run()方法
  3. Processor为线程,主要功能为缓存新建连接、接收并缓存数据、缓存返回信息、处理断开连接等

    • configureNewConnections(): 处理新建的连接,监听OP_READ事件,等待读取数据
    • poll()
      • 真正读取数据,并放入接收缓存队列stagedReceives,缓存所有channel的请求
      • 拿出每个channel的第一个请求,解析协议头部,放入completedReceives缓存中
      • 如果channel写出ready,则进行write,将response返回给客户端
    • processCompletedReceives():将请求解析为Request并放入requestQueue缓存
    # 主要代码逻辑
    override def run() {
        startupComplete()
        try {
          while (isRunning) {
            try {
              // setup any new connections that have been queued up
              configureNewConnections()
              // register any new responses for writing
              processNewResponses()
              poll()
              processCompletedReceives()
              processCompletedSends()
              processDisconnected()
              ...
    
  4. KafkaRequestHandlerPool初始化num.io.threadsKafkaRequestHandler

    • requestQueue中拿去request,并根据协议头,选择对应的KafkaApi进行处理
    • 使用回调,将处理完成的response通过KafkaChannel放入当前channel对应的ProcessorresponseQueue

由此可见,kafka内部通信使用了NIO+缓存+异步,从而极大提升了kafka的单机并发能力。

至此,整个网络处理大概步骤完成,其中很多细节内容,这里不再赘述,感兴趣的同学可以看源码或一起交流。

通信协议

kafka网络模型_协议规范

kafka接收到的数据内容格式如上图所示:

  • 4B: 整个数据的长度,最大为2^31 - 1,超过socket.request.max.bytes则报错,默认是100M
  • Header
    • 2B: api_key
    • 2B: api_version,api_key+api_version,可以决定当前消息的协议,如Producer、Fetcher、LeaderAndISR等
    • 4B: correlation_id,返回给客户端时携带
    • clientId: string
      • 2B: string长度
      • remain: string具体内容
  • Body(Payload),根据不同的协议使用不同的解析方法,此处以Produce协议为例
    • 2B: acks,写入的ack值,-1、0、1等,分别表示写入的可靠性要求
    • 4B: timeout,表示此Produce请求的超时时间
    • topic_data[]: Array
      • 4B: array长度
      • topic_data
        • topic_name: string
          • 2B: string长度
          • remain: string具体内容
        • partition_data[]: Array
          • 4B: array长度
          • partition_data
            • 4B: partition_id
            • 4B: 消息体长度
            • remain: MemoryRecords

KafkaApi支持各种协议,具体可参见源码,这里只大概描述Produce协议的主要内容。基于此,kafka各版本之间可以保证很好的协议兼容性。

优化

基于以上分析,我们通过分析相关监控指标,进行了相关调优工作,主要调优内容为(仅供参考):

zookeeper.session.timeout.ms=30000
num.network.threads=12
num.io.threads=16
num.replica.fetchers=3
replica.fetch.max.bytes=2097152
replica.fetch.response.max.bytes=20971520

调整后,cpu的负载明显提升,更加有效的利用机器资源,processor和handler的idle比例明显提高,isr expand/shrink出现频次和梳理也明显降低,大大提升的集群的稳定性。

结尾

本文主要介绍kafka内部的通信模型,kafka组件还有很多模块,后续会不断深入学习和理解,欢迎大家一起交流和学习。最近Pulsar也有兴起之势,后续也需要学习,真是学无止境啊~~

参考

matt博客

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352