1. Kafka简介
Kafka 是一种分布式的,基于发布/订阅的消息系统,主要设计目标如下:
- 以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
- 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输。
- 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
- 同时支持离线数据处理和实时数据处理。
- Scale out:支持在线水平扩展。
1.1 为什么使用消息系统
解耦
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。灵活性 & 峰值处理能力
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
1.2 常用Message Queue对比
RabbitMQ
RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。Redis
Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。ZeroMQ
ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。ActiveMQ
ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。Kafka/Jafka
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。
2. Kafka架构介绍
2.1. 基础概念
-
Producer
负责发布消息到Kafka broker -
Consumer
消息消费者,向Kafka broker读取消息的客户端。 -
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。在 Kafka 中,消息以主题(Topic)来分类,每一个主题都对应一个「消息队列」,这有点儿类似于数据库中的表。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
但是如果我们把所有同类的消息都塞入到一个“中心”队列中,势必缺少可伸缩性,无论是生产者/消费者数目的增加,还是消息数量的增加,都可能耗尽系统的性能或存储。 -
Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition。 -
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker。 -
Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
2.2. Kafka拓扑结构
如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
2.3. Topic & Partition
Topic 在逻辑上可以被认为是一个 Queue,每条消费都必须指定它的 Topic,可以简单理解为必须指明把这条消息放进哪个 Queue 里。我们把一类消息按照主题来分类,有点类似于数据库中的表。
为了使得 Kafka 的吞吐率可以线性提高,物理上把 Topic 分成一个或多个 Partition。对应到系统上就是一个或若干个目录。
如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。
可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。
假设我们现在 Kafka 集群只有一个 Broker,我们创建 2 个 Topic 名称分别为:「Topic1」和「Topic2」,Partition 数量分别为 1、2。
那么我们的根目录下就会创建如下三个文件夹:
| --topic1-0
| --topic2-0
| --topic2-1
在 Kafka 的文件存储中,同一个 Topic 下有多个不同的 Partition,每个 Partition 都为一个目录。
而每一个目录又被平均分配成多个大小相等的 Segment File 中,Segment File 又由 index file 和 data file 组成,他们总是成对出现,后缀 ".index" 和 ".log" 分表表示 Segment 索引文件和数据文件。
现在假设我们设置每个 Segment 大小为 500 MB,并启动生产者向 topic1 中写入大量数据,topic1-0 文件夹中就会产生类似如下的一些文件:
| --topic1-0
| --00000000000000000000.index
| --00000000000000000000.log
| --00000000000000368769.index
| --00000000000000368769.log
| --00000000000000737337.index
| --00000000000000737337.log
| --00000000000001105814.index
| --00000000000001105814.log
| --topic2-0
| --topic2-1
Segment 是 Kafka 文件存储的最小单位。Segment 文件命名规则:Partition 全局的第一个 Segment 从 0 开始,后续每个 Segment 文件名为上一个 Segment 文件最后一条消息的 offset 值。
数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。如 00000000000000368769.index 和 00000000000000368769.log。
以上面的一对 Segment File 为例,说明一下索引文件和数据文件对应关系:
其中以索引文件中元数据
<3, 497>
为例,依次在数据文件中表示第 3
个 Message(在全局 Partition 表示第 368769 + 3 = 368772
个 message)以及该消息的物理偏移地址为 497
。
注意该 Index
文件并不是从0开始,也不是每次递增 1 的,这是因为 Kafka 采取稀疏索引存储的方式,每隔一定字节的数据建立一条索引。
它减少了索引文件大小,使得能够把 Index
映射到内存,降低了查询时的磁盘 IO 开销,同时也并没有给查询带来太多的时间消耗。
因为其文件名为上一个 Segment
最后一条消息的 Offset
,所以当需要查找一个指定 Offset
的 Message
时,通过在所有 Segment
的文件名中进行二分查找就能找到它归属的 Segment
。
再在其 Index 文件中找到其对应到文件上的物理位置,就能拿出该 Message。
由于消息在 Partition
的 Segment
数据文件中是顺序读写的,且消息消费后不会删除(删除策略是针对过期的 Segment
文件),这是顺序磁盘 IO 存储设计师 Kafka 高性能很重要的原因。
Kafka
是如何准确的知道 Message 的偏移的呢?这是因为在 Kafka 定义了标准的数据存储结构,在 Partition 中的每一条 Message 都包含了以下三个属性:
Offset
:表示 Message 在当前 Partition 中的偏移量,是一个逻辑上的值,唯一确定了 Partition 中的一条 Message,可以简单的认为是一个 ID。
MessageSize
:表示 Message 内容 Data 的大小。
Data
:Message 的具体内容。
因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
如何根据offset查找message
例如读取 offset=368776的 message,需要通过下面2个步骤查找:
-
第一步查找 segment file 上述图为例,其中00000000000000000000.index 表示最开始的文件,起始偏移量(offset)为 0。第二个文件00000000000000368769.index 的消息量起始偏移量为368770 = 368769 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据 offset 二分查找文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index | log。
- 第二步通过 segment file 查找 message 通过第一步定位到 segment file,当 offset=368776时,依次定位到00000000000000368769.index 的元数据物理位置和
00000000000000368769.log 的物理偏移地址,然后再通过00000000000000368769.log 顺序查找直到offset=368776 为止。
如何根据timeindex查找message
Kafka 从0.10.0.0版本起,为分片日志文件中新增了一个 .timeindex 的索引文件,可以根据时间戳定位消息。同样我们可以通过脚本 kafka-dump-log.sh 查看时间索引的文件内容。
- 首先定位分片,将 1570793423501 与每个分片的最大时间戳进行对比(最大时间戳取时间索引文件的最后一条记录时间,如果时间为 0 则取该日志分段的最近修改时间),直到找到大于或等于 1570793423501 的日志分段,因此会定位到时间索引文件00000000000003257573.timeindex,其最大时间戳为 1570793423505。
- 重复 Offset 找到 log 文件的步骤。
分区分配策略
Kafka提供了三个分区分配策略:RangeAssignor、RoundRobinAssignor以及StickyAssignor,下面简单介绍下各个算法的实现。
-
RangeAssignor:kafka默认会采用此策略进行分区分配,主要流程如下:
假设一个消费组中存在两个消费者{C0,C1},该消费组订阅了三个主题{T1,T2,T3},每个主题分别存在三个分区,一共就有9个分区{TP1,TP2,...,TP9}。通过以上算法我们可以得到D=4,R=1,那么消费组C0将消费的分区为{TP1,TP2,TP3,TP4,TP5},C1将消费分区{TP6,TP7,TP8,TP9}。这里存在一个问题,如果不能均分,那么前面的几个消费者将会多消费一个分区。
- 将所有订阅主题下的分区进行排序得到集合TP={TP0,Tp1,...,TPN+1}。
- 对消费组中的所有消费者根据名字进行字典排序得到集合CG={C0,C1,...,CM+1}。
- 计算D=N/M,R=N%M。
-
消费者Ci获取消费分区起始位置=D*i+min(i,R)
,Ci获取的分区总数=D+(if (i+1>R)0 else 1)
。
-
RoundRobinAssignor:使用该策略需要满足以下两个条件:1) 消费组中的所有消费者应该订阅主题相同;2) 同一个消费组的所有消费者在实例化时给每个主题指定相同的流数。
- 对所有主题的所有分区根据主题+分区得到的哈希值进行排序。
- 对所有消费者按字典排序。
- 通过轮询的方式将分区分配给消费者。
-
StickyAssignor:该分配方式在0.11版本开始引入,主要是保证以下特性:
- 尽可能的保证分配均衡;
- 当重新分配时,保留尽可能多的现有分配。
其中第一条的优先级要大于第二条。
2.4. Broker 和集群(Cluster)
一个 Kafka 服务器也称为 Broker,它接受生产者发送的消息并存入磁盘;Broker 同时服务消费者拉取分区消息的请求,返回目前已经提交的消息。使用特定的机器硬件,一个 Broker 每秒可以处理成千上万的分区和百万量级的消息。
若干个 Broker 组成一个集群(Cluster),其中集群内某个 Broker 会成为集群控制器(Cluster Controller),它负责管理集群,包括分配分区到 Broker、监控 Broker 故障等。
在集群内,一个分区由一个 Broker 负责,这个 Broker 也称为这个分区的 Leader。
对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据:一是基于时间,二是基于Partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示:
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。
这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
2.5. Producer
Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。
- 指明 Partition 的情况下,直接将给定的Value 作为 Partition 的值
- 没有指明 Partition 但有 Key 的情况下,将Key 的 Hash 值与分区数取余得到Partition值。
- 既没有 Partition 又没有 Key 的情况下,第一次调用时随机生成一个整数(后面每次调用都在这个整数上自增),将这个值与可用的分区数取余,得到 Partition 值,也就是常说的 Round-Robin 轮询算法。
为保证 Producer 发送的数据,能可靠地发送到指定的 Topic,Topic 的每个Partition 收到 Producer 发送的数据后,都需要向 Producer 发送 ACK。如果Producer 收到 ACK,就会进行下一轮的发送,否则重新发送数据。
ack参数设置及意义
生产端往kafka集群发送消息时,可以通过request.required.acks参数来设置数据的可靠性级别
1:默认为1,表示在ISR中的leader副本成功接收到数据并确认后再发送下一条消息,如果主节点宕机则可能出现数据丢失场景,详细分析可参考前面提到的副本章节。
0:表示生产端不需要等待节点的确认就可以继续发送下一批数据,这种情况下数据传输效率最高,但是数据的可靠性最低。
-1:表示生产端需要等待ISR中的所有副本节点都收到数据之后才算消息写入成功,可靠性最高,但是性能最低,如果服务端的min.insync.replicas值设置为1,那么在这种情况下允许ISR集合只有一个副本,因此也会存在数据丢失的情况。
幂等特性
幂等性:同一个操作任意执行多次产生的影响或效果与一次执行影响相同。
幂等的关键在于服务端能否识别出请求是否重复,然后过滤掉这些重复请求,通常情况下需要以下信息来实现幂等特性:
- 唯一标识:判断某个请求是否重复,需要有一个唯一性标识,然后服务端就能根据这个唯一标识来判断是否为重复请求。
-
记录已经处理过的请求:服务端需要记录已经处理过的请求,然后根据唯一标识来判断是否是重复请求,如果已经处理过,则直接拒绝或者不做任何操作返回成功。
kafka中Producer端的幂等性是指当发送同一条消息时,消息在集群中只会被持久化一次,其幂等是在以下条件中才成立: - 只能保证生产端在单个会话内的幂等,如果生产端因为某些原因意外挂掉然后重启,此时是没办法保证幂等的,因为这时没办法获取到之前的状态信息,即无法做到垮会话级别的幂等。
- 幂等性不能垮多个主题分区,只能保证单个分区内的幂等,涉及到多个消息分区时,中间的状态并没有同步。
如果要支持垮会话或者垮多个消息分区的情况,则需要使用kafka的事务性
来实现。
为了实现生成端的幂等语义,引入了Producer ID(PID)
与Sequence Number
的概念:
-
Producer ID(PID)
:每个生产者在初始化时都会分配一个唯一的PID,PID的分配对于用户来说是透明的。 -
Sequence Number(序列号)
:对于给定的PID而言,序列号从0开始单调递增,每个主题分区均会产生一个独立序列号,生产者在发送消息时会给每条消息添加一个序列号。broker端缓存了已经提交消息的序列号,只有比缓存分区中最后提交消息的序列号大1的消息才会被接受,其他会被拒绝。
生产端消息发送流程的幂等处理
下面简单介绍下支持幂等的消息发送端工作流程
生产端通过Kafkaproducer会将数据添加到RecordAccumulator中,数据添加时会判断是否需要新建一个ProducerBatch。
生产端后台启动发送线程,会判断当前的PID是否需要重置,重置的原因是因为某些消息分区的batch重试多次仍然失败最后因为超时而被移除,这个时候序列号无法连续,导致后续消息无法发送,因此会重置PID,并将相关缓存信息清空,这个时候消息会丢失。
发送线程判断是否需要新申请PID,如果需要则会阻塞直到获取到PID信息。
发送线程在调用sendProducerData()方法发送数据时,会进行以下判断:
判断主题分区是否可以继续发送、PID是否有效、如果是重试batch需要判断之前的batch是否发送完成,如果没有发送完成则会跳过当前主题分区的消息发送,直到前面的batch发送完成。
如果对应ProducerBatch没有分配对应的PID与序列号信息,则会在这里进行设置。
服务端消息接受流程的幂等处理
服务端(broker)在收到生产端发送的数据写请求之后,会进行一些判断来决定是否可以写入数据,这里也主要介绍关于幂等相关的操作流程。
- 如果请求设置了幂等特性,则会检查是否对ClusterResource有IdempotentWrite权限,如果没有,则会返回错误CLUSTER_AUTHORIZATION_FAILED。
- 检查是否有PID信息
- 根据batch的序列号检查该batch是否重复,服务端会缓存每个PID对应主题分区的最近5个batch信息,如果有重复,则直接返回写入成功,但是不会执行真正的数据写入操作。
- 如果有PID且非重复batch,则进行以下操作:
- 判断该PID是否已经存在缓存中。
- 如果不存在则判断序列号是否是从0开始,如果是则表示为新的PID,在缓存中记录PID的信息(包括PID、epoch以及序列号信息),然后执行数据写入操作;如果不存在但是序列号不是从0开始,则直接返回错误,表示PID在服务端以及过期或者PID写的数据已经过期。
- 如果PID存在,则会检查PID的epoch版本是否与服务端一致,如果不一致且序列号不是从0开始,则返回错误。如果epoch不一致但是序列号是从0开始,则可以正常写入。
- 如果epoch版本一致,则会查询缓存中最近一次序列号是否连续,不连续则会返回错误,否则正常写入。
2.6. Consumer
假设这么个场景:我们从 Kafka 中读取消息,并且进行检查,最后产生结果数据。
我们可以创建一个消费者实例去做这件事情,但如果生产者写入消息的速度比消费者读取的速度快怎么办呢?
这样随着时间增长,消息堆积越来越严重。对于这种场景,我们需要增加多个消费者来进行水平扩展。
Kafka 消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。
假设有一个 T1 主题,该主题有 4 个分区;同时我们有一个消费组 G1,这个消费组只有一个消费者 C1。那么消费者 C1 将会收到这 4 个分区的消息。如果我们增加新的消费者 C2 到消费组 G1,那么每个消费者将会分别收到两个分区的消息。相当于 T1 Topic 内的 Partition 均分给了 G1 消费的所有消费者,在这里 C1 消费 P0 和 P2,C2 消费P1 和 P3。
如果增加到 4 个消费者,那么每个消费者将会分别收到一个分区的消息。这时候每个消费者都处理其中一个分区,满负载运行。
但如果我们继续增加消费者到这个消费组,剩余的消费者将会空闲,不会收到任何消息。
总而言之,我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。
这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。
另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。
如果我们的 C1 处理消息仍然还有瓶颈,我们如何优化和处理?
把 C1 内部的消息进行二次 Sharding,开启多个Goroutine Worker 进行消费,为了保障 Offset 提交的正确性,需要使用 WaterMark 机制,保障最小的 Offset 保存,才能往 Broker 提交。
2.7. Consumer Group
Kafka 一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这
个消息。
使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。
这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。 下面这个例子更清晰地展示了Kafka Consumer Group的特性。首先创建一个Topic (名为topic1,包含3个Partition),然后创建一个属于group1的Consumer实例,并创建三个属于group2的Consumer实例,最后通过Producer向topic1发送key分别为1,2,3的消息。结果发现属于group1的Consumer收到了所有的这三条消息,同时group2中的3个Consumer分别收到了key为1,2,3的消息。如下图所示。
2.7.1 Rebalance
可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的。另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(Rebalance)。
重平衡是 Kafka 一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。
而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。
消费者通过定期发送心跳(Hearbeat)到一个作为组协调者(Group Coordinator)的 Broker 来保持在消费组内存活。这个 Broker 不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。如果消费者超过一定时间没有发送心跳,那么它的会话(Session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。
可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费。通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。
在 0.10.1 版本,Kafka 对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。
另外更高版本的 Kafka 支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。
但是活锁也很容易导致连锁故障,当消费端下游的组件性能退化,那么消息消费会变的很慢,会很容易出发livelock 的重新均衡机制,反而影响吞吐。
2.8. Push vs. Pull
作为一个消息系统,Kafka遵循了传统的方式,选择由Producer向broker push消息并由Consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。
push模式
很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式
则可以根据Consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
2.9. Kafka消息交付的保证性
有这么几种可能的消息交付的保证性(delivery guarantee):
-
At most once
消息可能会丢,但绝不会重复传输 -
At least one
消息绝不会丢,但可能会重复传输 -
Exactly once
每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的
2.9.1 At most once
-
读完消息先commit再处理消息。这种模式下,如果Consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于
at most once
(消息会丢,但不重复)。
2.9.2 At least one
- 当
Producer
向Broker
发送数据后,会进行 commit,如果commit成功,由于Replica
副本机制的存在,则意味着消息不会丢失。但是Producer
发送数据给Broker
后,遇到网络问题而造成通信中断,那么Producer
就无法准确判断该消息是否已经被提交(commit),这就可能造成at least once
(消息绝不会丢,但可能会重复传输)。 -
读完消息先处理再commit。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于
at least once
(消息不丢,但被多次重复处理)。
2.9.3 Exactly once
-
在 Kafka 0.11.0.0 之前, 如果
Producer
没有收到消息 commit 的响应结果,它只能重新发送消息,确保消息已经被正确的传输到Broker
,重新发送的时候会将消息再次写入日志中;而在 0.11.0.0 版本之后,**Producer
支持幂等传递选项,保证重新发送不会导致消息在日志出现重复**。为了实现这个,Broker
为Producer
分配了一个ID,发往同一 Partition 的消息会附带Sequence Number。并通过每条消息的序列号进行去重。也支持了类似事务语义来保证将消息发送到多个Topic
分区中,保证所有消息要么都写入成功,要么都失败,这个主要用在Topic
之间的exactly once
(每条消息肯定会被传输一次且仅传输一次)。其中启用幂等传递的方法配置:
enable.idempotence = true
。启用事务支持的方法配置:设置属性
transcational.id = "指定值"
。
3. Kafka高可用设计
3.1 Replication
Kafka 在0.8以前的版本中,并不提供 HA 机制,一旦一个或多个 Broker 宕机,则宕机期间其上所有 Partition 都无法继续提供服务。若该 Broker 永远不能再恢复,亦或磁盘故障,则其上数据将丢失。
在没有 Replication 的情况下,一旦某机器宕机或者某个 Broker 停止工作则会造成整个系统的可用性降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,因此对于生产系统而言 Replication 机制的引入非常重要。
为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提高Kafka的容错能力,也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。
Kafka分配Replica的算法如下:
- 将所有Broker(假设共n个Broker)和待分配的Partition排序
- 将第i个Partition分配到第(i mod n)个Broker上
- 将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上
Kafka的Data Replication 需要解决如下问题:
- 怎样传播消息
- 在向Producer发送ACK前需要保证有多少个Replica已经收到该消息
- 怎样处理某个Replica不工作的情况
- 怎样处理Failed Replica恢复回来的情况
3.2 怎样传播消息
- Producer在发布消息到某个Partition时,先通过 Metadata (通过 Broker 获取并且缓存在 Producer 内) 找到该 Partition 的Leader,Producer只将该消息发送到该Partition的Leader。 Leader会将该消息写入其本地Log。
- 每个Follower都从Leader pull数据。Follower存储的数据顺序与Leader保持一致。Follower在收到该消息后,立即向Leader发送ACK, 而后将数据写入其Log。
- 一旦Leader收到了ISR中的所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。
为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。在将来的版本中,Kafka会考虑提供更高的持久性。
Consumer读消息也是从Leader读取,只有被commit过的消息(offset低于HW的消息)才会暴露给Consumer。
Kafka Replication的数据流如下图所示:3.3 向Producer发送ACK前需要保证有多少个Replica已经收到该消息
Kafka处理失败需要明确定义一个Broker是否“活着”。对于Kafka而言,Kafka存活包含两个条件:
- 它必须维护与Zookeeper的session(这个通过Zookeeper的Heartbeat机制来实现)
- 从副本的最后一条消息的 Offset 需要与主副本的最后一条消息 Offset 差值不超过设定阈值(replica.lag.max.messages)或者副本的 LEO 落后于主副本的 LEO 时长不大于设定阈值(replica.lag.time.max.ms),官方推荐使用后者判断,并在新版本 Kafka0.10.0 移除了replica.lag.max.messages 参数。
Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)
。如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。当其再次满足以上条件之后又会被重新加入集合中。
ISR 的引入主要是解决同步副本与异步复制两种方案各自的缺陷:
- 同步副本中如果有个副本宕机或者超时就会拖慢该副本组的整体性能。
- 如果仅仅使用异步副本,当所有的副本消息均远落后于主副本时,一旦主副本宕机重新选举,那么就会存在消息丢失情况。
Follower可以批量的从Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距
一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks
来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。
3.4 主从数据同步流程详解
初始时 Leader 和 Follower 的 HW(High Watermark)
和 LEO(Log End Offset)
都是0。Leader 中的 remote LEO 指的就是Leader 端保存的 follower LEO,也被初始化成 0。
Follower 发送 Fetch 请求在 Leader 处理完 Producer 请求之后。Producer 给该 Topic 分区发送了一条消息。
- 把消息写入写底层 Log(同时也就自动地更新了 Leader 的 LEO)。
- 尝试更新 Leader HW 值。我们已经假设此时 Follower 尚未发送 Fetch 请求,那么 Leader 端保存的 remote LEO 依然是0,因此 Leader 会比较它自己的 LEO 值和 remote LEO 值,发现最小值是 0,与当前 HW 值相同,故不会更新分区 HW 值。
本例中当 Follower 发送 Fetch 请求时,Leader 端的处理依次是:
• 读取底层 Log 数据。
• 更新 remote LEO = 0(为什么是 0? 因为此时 Follower 还没有写入这条消息。Leader 如何
确认 Follower 还未写入呢?这是通过 Follower 发来的 Fetch 请求中的 Fetch Offset 来确定
的)。
• 尝试更新分区 HW —— 此时 Leader LEO = 1,remote LEO = 0,故分区 HW 值= min(leader
LEO, follower remote LEO) = 0。
• 把数据和当前分区 HW 值(依然是0)发送给 Follower 副本。
而 Follower 副本接收到 Fetch Response 后依次执行下列操作:
• 写入本地 Log(同时更新 Follower LEO)。
• 更新 Follower HW —— 比较本地 LEO 和当前 Leader HW 取小者,故 Follower HW = 0。
Follower 发来了第二轮 Fetch 请求,Leader 端接收到后仍然会依次执行下列操作:
• 读取底层 Log 数据。
• 更新 remote LEO = 1(这次为什么是1了? 因为这轮 FETCH RPC 携带的 Fetch Offset 是1,那么为什么这轮携带的就是1了呢,因为上一轮结束后 Follower LEO 被更新为1了)。
• 尝试更新分区 HW —— 此时 Leader LEO = 1,remote LEO = 1,故分区 HW 值= min(leader LEO, follower remote LEO) = 1。
• 把数据(实际上没有数据)和当前分区 HW 值(已更新为1)发送给 Follower 副本。
同样地,Follower 副本接收到 Fetch Response 后依次执行下列操作:
• 写入本地 Log,当然没东西可写,故 Follower LEO 也不会变化,依然是1。
• 更新 Follower HW —— 比较本地 LEO 和当前 Leader HW 取小者。由于此时两者都是1,故更新 Follower HW = 1 。
• Producer 端发送消息后 Broker 端完整的处理流程就讲完了。此时消息已经成功地被复制到Leader 和 Follower 的 Log 中且分区 HW 是1,表明 Consumer 能够消费 offset = 0 的这条消息。下面我们来分析下 Produce 和 Fetch 请求交互的第二种情况。
第二种情况:Fetch 请求保存在 purgatory 中 Produce 请求到来。
这种情况实际上和第一种情况差不多。前面说过了,当 Leader 无法立即满足 Fetch 返回要求的时候(比如没有数据),那么该 Fetch 请求会被暂存到 Leader 端的purgatory 中,待时机成熟时会尝试再次处理它。不过 Kafka 不会无限期地将其缓存着,默认有个超时时间(500ms),一旦超时时间已过,则这个请求会被强制完成。不过我们要讨论的场景是在寄存期间,Producer 发送 Produce 请求从而使之满足了条件从而被唤醒。
此时,Leader 端处理流程如下:
- Leader 写入本地 Log(同时自动更新 Leader LEO)。
- 尝试唤醒在 purgatory 中寄存的 Fetch 请求。
- 尝试更新分区 HW。
数据丢失场景(更新了LEO,但未更新HW时,主从先后故障)
初始情况为主副本 A 已经写入了两条消息,对应 HW=1,LEO=2,LEOB=1,从副本 B 写入了一条消息,对应HW=1,LEO=1。- 此时从副本 B 向主副本 A 发起 fetchOffset=1 请求,主副本收到请求之后更新LEOB=1,表示副本 B 已经收到了消息0,然后尝试更新 HW 值,in(LEO,LEOB)=1,即不需要更新,然后将消息1以及当前分区 HW=1 返回给从副本 B,从副本 B 收到响应之后写入日志并更新LEO=2,然后更新其 HW=1,虽然已经写入了两条消息,但是 HW 值需要在下一轮的请求才会更新为2。
- 此时从副本 B 重启,重启之后会根据 HW 值进行日志截断,即消息1会被删除。
- 从副本 B 向主副本 A 发送 fetchOffset=1 请求,如果此时主副本 A 没有什么异常,则跟第二步一样没有什么问题,假设此时主副本也宕机了,那么从副本 B 会变成主副本。
- 当副本 A 恢复之后会变成从副本并根据 HW 值进行日志截断,即把消息 1 丢失,此时消息 1 就永久丢失了。
数据不一致场景(更新了LEO,但未更新HW时,旧主故障,从成为主并写入了新数据,旧主恢复后成为从,主从HW一致但数据不一致)
- 初始状态为主副本 A 已经写入了两条消息对应HW=1,LEO=2,LEOB=1,从副本 B 也同步了两条消息,对应 HW=1,LEO=2。
- 此时从副本 B 向主副本发送 fetchOffset=2 请求,主副本 A 在收到请求后更新分区 HW=2 并将该值返回给从副本 B,如果此时从副本 B 宕机则会导致HW 值写入失败。
- 我们假设此时主副本 A 也宕机了,从副本 B 先恢复并成为主副本,此时会发生日志截断,只保留消息 0,然后对外提供服务,假设外部写入了一个消息 1(这个消息与之前的消息 1不一样,用不同的颜色标识不同消息)。
- 等副本 A 起来之后会变成从副本,不会发生日志截断,因为 HW=2,但是对应位移 1 的消息其实是不一致的。
Leader Eepoch
为了解决数据丢失及数据不一致的问题,在新版的 Kafka(0.11.0.0)引入了Leader Epoch 概念。
Leader Epoch 表示一个键值对 <epoch, offset>,其中 Eepoch 表示 Leader 主副本的版本号,从 0 开始编码,当 Leader 每变更一次就会+1,Offset 表示该 Epoch 版本的主副本写入第一条消息的位置。
比如 <0,0> 表示第一个主副本从位移 0 开始写入消息,<1,100> 表示第二个主副本版本号为1并从位移 100 开始写入消息,主副本会将该信息保存在缓存中并定期写入到 CheckPoint 文件中,每次发生主副本切换都会去从缓存中查询该信息。
引入了Leader Eepoch后的数据丢失场景:
如图所示,当从副本 B 重启之后向主副本 A 发送offsetsForLeaderEpochRequest,Epoch 主从副本相等,则 A 返回当前的 LEO=2,从副本 B 中没有任何大于2 的位移,因此不需要截断。
- 当从副本 B 向主副本 A 发送 fetchoffset=2 请求时,A宕机,所以从副本 B 成为主副本,并更新 Epoch 值为<epoch=1, offset=2>,HW 值更新为 2。
- 当 A 恢复之后成为从副本,并向 B 发送 fetcheOffset=2请求,B 返回 HW=2,则从副本 A 更新 HW=2。
- 主副本 B 接受外界的写请求,从副本 A 向主副本 A 不断
发起数据同步请求。
从上可以看出引入 Leader Epoch 值之后避免了前面提到的数据丢失情况,但是这里需要注意的是如果在上面的第一步,从副本 B 起来之后向主副本 A 发送offsetsForLeaderEpochRequest 请求失败,即主副本 A同时也宕机了,那么消息 1 就会丢失,具体可见下面数据不一致场景中有提到。
引入了Leader Eepoch后的数据不一致场景:
- 从副本 B 恢复之后向主副本 A 发送offsetsForLeaderEpochRequest 请求,由于主
副本也宕机了,因此副本 B 将变成主副本并将消息1 截断,此时接受到新消息 1 的写入。 - 副本 A 恢复之后变成从副本并向主副本 A 发送offsetsForLeaderEpochRequest 请求,请求的Epoch 值小于主副本 B,因此主副本 B 会返回epoch=1 时的开始位移,即 lastoffset=1,因此从副本 A 会截断消息 1。
- 从副本 A 从主副本 B 拉取消息,并更新 Epoch 值<epoch=1, offset=1>。
可以看出 Epoch 的引入可以避免数据不一致,但是两个副本均宕机,则还是存在数据丢失的场景。
3.5 Leader Election
引入 Replication 之后,同一个 Partition 可能会有多个 Replica,而这时需要在这些Replication 之间选出一个 Leader,Producer 和 Consumer 只与这个 Leader 交互,其它 Replica 作为 Follower 从 Leader 中复制数据。
因为需要保证同一个 Partition 的多个 Replica 之间的数据一致性(其中一个宕机后其它 Replica必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。
如果没有一个 Leader,所有 Replica 都可同时读/写数据,那就需要保证多个 Replica 之间互相(N×N 条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了 Replication 实现的复杂性,同时也增加了出现异常的几率。而引入 Leader 后,只有 Leader 负责数据读写,Follower 只向 Leader 顺序 Fetch 数据(N 条通路),系统更加简单且高效。
由于 Kafka 集群依赖 ZooKeeper 集群,所以最简单最直观的方案是,所有 Follower都在 ZooKeeper 上设置一个 Watch,一旦 Leader 宕机,其对应的 Ephemeral Znode 会自动删除,此时所有 Follower 都尝试创建该节点,而创建成功者(ZooKeeper 保证只有一个能创建成功)即是新的 Leader,其它 Replica 即为Follower。
前面的方案有以下缺点:
- Split-Brain (脑裂): 这是由 ZooKeeper 的特性引起的,虽然 ZooKeeper 能保证所有Watch 按顺序触发,但并不能保证同一时刻所有 Replica “看”到的状态是一样的,这就可能造成不同 Replica 的响应不一致 。
- Herd Effect (羊群效应): 如果宕机的那个 Broker 上的 Partition 比较多,会造成多个Watch 被触发,造成集群内大量的调整。
- ZooKeeper( 负载过重) : 每个 Replica 都要为此在 ZooKeeper 上注册一个 Watch,当集群规模增加到几千个 Partition 时 ZooKeeper 负载会过重。
Controller
Kafka 的 Leader Election 方案解决了上述问题,它在所有 Broker 中选出一个Controller,所有 Partition 的 Leader 选举都由 Controller 决定。Controller 会将Leader 的改变直接通过 RPC 的方式(比 ZooKeeper Queue 的方式更高效)通知需为此作为响应的 Broker。
Kafka 集群 Controller 的选举过程如下 :
- 每个 Broker 都会在 Controller Path (/controller)上注册一个 Watch。
- 当前 Controller 失败时,对应的 Controller Path 会自动消失(因为它是 Ephemeral Node),此时该 Watch 被 fire,所有“活”着的 Broker 都会去竞选成为新的 Controller(创建新的Controller Path),但是只会有一个竞选成功(这点由 ZooKeeper 保证)。
- 竞选成功者即为新的 Leader,竞选失败者则重新在新的 Controller Path 上注册 Watch。因为ZooKeeper 的 Watch 是一次性的,被 fire 一次之后即失效,所以需要重新注册。
Kafka Partition Leader 的选举过程如下 (由 Controller 执行): - 从 ZooKeeper 中读取当前分区的所有 ISR(in-sync replicas)集合。
- 调用配置的分区选择算法选择分区的 Leader。
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前需要等待的Replica数量是一样的,但是ISR需要的总的Replica的个数几乎是Majority Vote的一半。
3.6. 如何处理所有Replica都不工作
上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
- 等待ISR中的任一个Replica“活”过来,并且选它作为Leader(强一致性,不可用时间相对较长)
- 选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader(高可用性)
Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。
3.7 broker故障恢复过程
- Controller在Zookeeper注册Watch,一旦有Broker宕机(这是用宕机代表任何让系统认为其die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的znode会自动被删除,Zookeeper会fire Controller注册的watch,Controller读取最新的幸存的Broker
- Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition
- 对set_p中的每一个Partition执行以下操作:
3.1. 从/brokers/topics/[topic]/partitions/[partition]/state
读取该Partition
当前的ISR
3.2. 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。
3.3. 将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有其version在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1 -
直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。
4.kafka为什么高性能
架构层面:
• Partition 级别并行:Broker、Disk、Consumer 端
• ISR:避免同步个别副本时拖慢整体副本组性能,同时还能避免主从节点间数据落后过多导致的消息丢失
I/O 层面:
• Batch 读写:减少I/O次数,增加吞吐量
• 磁盘顺序 I/O:在某些情况下,顺序磁盘访问比随机内存访问更快
• Page Cache:将Index及消息缓存到Page Cache中,提升处理效率
• Zero Copy:减少内核态与用户态之间的I/O次数
• 压缩:log压缩及消息压缩,节省磁盘空间,节省字节大小
References:
https://kafka.apache.org/documentation/#design
https://www.jianshu.com/p/bde902c57e80
https://mp.weixin.qq.com/s/fX26tCdYSMgwM54_2CpVrw
https://zhuanlan.zhihu.com/p/27587872
https://mp.weixin.qq.com/s/X301soSDWRfOemQhk9AuPw
http://www.jasongj.com/2015/08/09/KafkaColumn1/
http://www.jasongj.com/2015/08/09/KafkaColumn2/
http://www.jasongj.com/2015/08/09/KafkaColumn3/
http://www.jasongj.com/2015/08/09/KafkaColumn4/
http://www.jasongj.com/2015/08/09/KafkaColumn5/
http://www.jasongj.com/2015/08/09/KafkaColumn6/
http://www.jasongj.com/2015/08/09/KafkaColumn7/
https://www.cnblogs.com/wxd0108/p/6519973.html
https://cloud.tencent.com/developer/article/1589157
https://zhuanlan.zhihu.com/p/459610418