Kafka Controller Redesign方案

Kafka Controller 是 Kafka 的核心组件,在前面的文章中,已经详细讲述过 Controller 部分的内容。在过去的几年根据大家在生产环境中应用的反馈,Controller 也积累了一些比较大的问题,而针对这些问题的修复,代码的改动量都是非常大的,无疑是一次重构,因此,社区准备在新版的系统里对 Controller 做一些相应的优化(0.11.0及以后的版本),相应的设计方案见:Kafka Controller Redesign,本文的内容就是结合这篇文章做一个简单的总结。

Controller 功能

在一个 Kafka 中,Controller 要处理的事情总结如下表所示:

Controller 目前存在的问题

之所以要重新设计 Controller,是因为现在的 Controller 积累了一些比较难解决的问题,这些问题解决起来,代码改动量都是巨大的,甚至需要改变 controller 部门的设计,基本就跟重构差不多了,下面我们先来了看一下 controller 之前(主要是 0.11.0 之前的版本)存在的一些问题。

目前遇到的比较大的问题有以下几个:

Partition 级别同步 zk 写;

sequential per-partition controller-to-broker requests;

Controller 复杂的并发语义;

代码组织混乱;

控制类请求与数据类请求未分离;

Controller 给 broker 的请求中没有 broker 的 generation信息;

ZkClient 阻碍 Client 的状态管理。

Partition 级别同步 zk 写

zookeeper 的同步写意味着在下次写之前需要等待前面整个过程的结束,而且由于它们都是 partition 粒度的(一个 Partition 一个 Partition 的去执行写操作),对于 Partition 非常多的集群来说,需要等待的时间会更长,Controller 通常会在下面这两个地方做 Partition 级别 zookeeper 同步写操作:

PartitionStateMachine 在进行触发 leader 选举(partition 目的状态是 OnlinePartition),将会触发上面的操作;

ReplicaStateMachine 更新 LeaderAndIsr 信息到 zk(replica 状态转变为 OfflineReplica),这种情况也触发这种情况,它既阻碍了 Controller 进程,也有可能会 zk 造成压力。

sequential per-partition controller-to-broker requests

Controller 在向 Broker 发送请求,有些情况下也是 Partition 粒度去发送的,效率非常低,比如在 Controller 处理 broker shutdown 请求时,这里是按 Partition 级别处理,每处理一个 Partition 都会执行 Partition、Replica 状态变化以及 Metadata 更新,并且调用sendRequestsToBrokers()向 broker 发送请求,这样的话,效率将变得非常低。

Controller 复杂的并发语义

Controller 需要在多个线程之间共享状态信息,这些线程有:

IO threads handling controlled shutdown requests

The ZkClient org.I0Itec.zkclient.ZkEventThread processing zookeeper callbacks sequentially;

The TopicDeletionManager kafka.controller.DeleteTopicsThread;

Per-broker RequestSendThread within ControllerChannelManager.

所有这些线程都需要访问或修改状态信息(ControllerContext),现在它们是通过 ControllerContext 的 controllerLock(排它锁)实现的,Controller 的并发变得虚弱无力。

代码组织混乱

KafkaController 部分的代码组织(KafkaController、PartitionStateMachine 和 ReplicaStateMachine)不是很清晰,比如,下面的问题就很难回答:

where and when does zookeeper get updated?

where and when does a controller-to-broker request get formed?

what impact does a failing zookeeper update or controller-to-broker request have on the cluster state?

这也导致了这部分很多开发者不敢轻易去改动。

控制类请求与数据类请求未分离

现在 broker 收到的请求,有来自 client、broker 和 controller 的请求,这些请求都会被放到同一个 requestQueue 中,它们有着同样的优先级,所以来自 client 的请求很可能会影响来自 controller 请求的处理(如果是 leader 变动的请求,ack 设置的不是 all,这种情况有可能会导致数据丢失)。

Controller 给 broker 的请求中没有 broker 的 generation信息

这里的 Broker generation 代表着一个标识,每当它重新加入集群时,这个标识都会变化。如果 Controller 的请求没有这个信息的话,可能会导致一个重启的 Broker 收到之前的请求,让 Broker 进入到一个错误的状态。

比如,Broker 收到之前的 StopReplica 请求,可能会导致副本同步线程退出。

ZkClient 阻碍 Client 的状态管理

这里的状态管理指的是当 Client 发生重连或会话过期时,Client 可以监控这种状态变化,并做出一些处理,因为开源版的 ZKClient 在处理 notification 时,是线性处理的,一些 notification 会被先放到 ZkEventThread’s queue 中,这样会导致一些最新的 notification 不能及时被处理,特别是与 zk 连接断开重连的情况。

Controller 改进方案

关于上述问题,Kafka 提出了一些改进方案,有些已经在最新版的系统中实现,有的还在规划中。

使用异步的 zk API

Zookeeper 的 client 提供三种执行请求的方式:

同步调用,意味着下次请求需要等待当前当前请求的完成;

异步调用,意味着不需要等待当前请求的完成就可以开始下次请求的执行,并且我们可以通过回调机制去处理请求返回的结果;

单请求的 batch 调用,意味着 batch 内的所有请求都会在一次事务处理中完成,这里需要关注的是 zookeeper 的 server 对单请求的大小是有限制的(jute.maxbuffer)。

文章中给出了三种请求的测试结果,Kafka 最后选取的是异步处理机制,因为对于单请求处理,异步处理更加简洁,并且相比于同步处理还可以保持一个更好的写性能。

improve controller-to-broker request batching

这个在设计文档还是 TODO 状态,具体的方案还没确定,不过基本可以猜测一下,因为目的是提高 batch 发送能力,那么只能是在调用对每个 broker 的 RequestSenderThread 线程发送请求之前,做一下检测,而不是来一个请求立马就发送,这是一个性能与时间的权衡,如果不是立马发送请求,那么可能会带来 broker 短时 metadata 信息的不一致,这个不一致时间不同的应用场景要求是不一样的。

单线程的事件处理模型

采用单线程的时间处理模型将极大简化 Controller 的并发实现,只允许这个线程访问和修改 Controller 的本地状态信息,因此在 Controller 部分也就不需要到处加锁来保证线程安全了。

目前 1.1.0 的实现中,Controller 使用了一个 ControllerEventThread 线程来处理所有的 event,目前可以支持13种不同类型事件:

Idle:代表当前 ControllerEventThread 处理空闲状态;

ControllerChange:Controller 切换处理;

BrokerChange:Broker 变动处理,broker 可能有上线或掉线;

TopicChange:Topic 新增处理;

TopicDeletion:Topic 删除处理;

PartitionReassignment:Partition 副本迁移处理;

AutoLeaderBalance:自动 rebalance 处理;

ManualLeaderBalance:最优 leader 选举处理,这里叫做手动 rebalance,手动去切流量;

ControlledShutdown:优雅关闭 broker;

IsrChange:Isr 变动处理;

LeaderAndIsrResponseReceived;

LogDirChange:Broker 某个目录失败后的处理(比如磁盘坏掉等);

ControllerShutdown:ControllerEventThread 处理这个事件时,会关闭当前线程。

重构集群状态管理

这部分的改动,目前社区也没有一个很好的解决思路,重构这部分的目的是希望 Partition、Replica 的状态管理变得更清晰一些,让我们从代码中可以清楚地明白状态是在什么时间、什么地方、什么条件下被触发的。这个优化其实是跟上面那个有很大关联,采用单线程的事件处理模型,可以让状态管理也变得更清晰。

prioritize controller requests

我们想要把控制类请求与数据类请求分开,提高 controller 请求的优先级,这样的话即使 Broker 中请求有堆积,Broker 也会优先处理控制类的请求。

这部分的优化可以在网络层的 RequestChannel 中做,RequestChannel 可以根据请求的 id 信息把请求分为正常的和优先的,如果请求是 UpdateMetadataRequest、LeaderAndIsrRequest 或者 StopReplicaRequest,那么这个请求的优先级应该提高。实现方案有以下两种:

在请求队列中增加一个优先级队列,优先级高的请求放到 the prioritized request queue 中,优先级低的放到普通请求队列中,但是无论使用一个定时拉取(poll)还是2个定时拉取,都会带来其他的问题,要么是增大普通请求的处理延迟,要么是增大了优先级高请求的延迟;

直接使用优先级队列代替现在的普通队列,设计上更倾向与这一种。

目前这部分在1.1.0中还未实现。

Controller 发送请求中添加 broker 的 generation 信息

generation 信息是用来标识当前 broker 加入集群 epoch 信息,每当 broker 重新加入集群中,该 broker.id 对应的 generation 都应该变化(要求递增),目前有两种实现方案:

为 broker 分配的一个全局唯一的 id,由 controller 广播给其他 broker;

直接使用 zookeeper 的 zxid 信息(broker.id 注册时的 zxid)。

直接使用原生的 Zookeeper client

Client 端的状态管理意味着当 Client 端发生状态变化(像连接中断或回话超时)时,我们有能力做一些操作。其中,zookeeper client 有效的状态(目前的 client 比下面又多了几种状态,这里先不深入)是:

NOT_CONNECTED: the initial state of the client;

CONNECTING: the client is establishing a connection to zookeeper;

CONNECTED: the client has established a connection and session to zookeeper;

CLOSED: the session has closed or expired。

有效的状态转移是:

NOT_CONNECTED > CONNECTING

CONNECTING > CONNECTED

CONNECTING > CLOSED

CONNECTED > CONNECTING

CONNECTED > CLOSED

最开始的设想是直接使用原生 Client 的异步调用方式,这样的话依然可以通过回调方法监控到状态的变化(像连接中断或回话超时),同样,在每次事件处理时,可以通过检查状态信息来监控到 Client 状态的变化,及时做一些处理。

当一个 Client 接收到连接中断的 notification(Client 状态变成了 CONNECTING 状态),它意味着 Client 不能再从 zookeeper 接收到任何 notification 了。如果断开连接,对于 Controller 而言,无论它现在正在做什么它都应该先暂停,因为可能集群的 Controller 已经切换到其他机器上了,只是它还没接收到通知,它如果还在工作,可能会导致集群状态不一致。当连接断开后,Client 可以重新建立连接(re-establish,状态变为 CONNECTED)或者会话过期(状态变为 CLOSED,会话过期是由 zookeeper Server 来决定的)。如果变成了 CONNECTED 状态,Controller 应该重新开始这些暂停的操作,而如果状态变成了 CLOSED 状态,旧的 Controller 就会知道它不再是 controller,应该丢弃掉这些任务。

欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 854393687

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容

  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,701评论 13 425
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,594评论 18 139
  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,293评论 1 15
  • Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O(1)的方...
    Alukar阅读 3,074评论 0 43
  • 原创 2016-11-22 风茕子 1, 有些男人有迷之自恋,女人长得漂亮打扮得性感,他们就觉得是为了勾引他们。 ...
    pengbaoer阅读 476评论 0 0