✎摘要
对于一个成熟的消息中间件而言,消息格式不仅关系到功能维度的扩展,还牵涉到性能维度的优化。随着Kafka的迅猛发展,其消息格式也在不断的升级改进,从0.8.x版本开始到现在的1.1.x版本,Kafka的消息格式也经历了3个版本。本文这里主要来讲述Kafka的三个版本的消息格式的演变,文章偏长,建议先关注后鉴定。
Kafka根据topic(主题)对消息进行分类,发布到Kafka集群的每条消息都需要指定一个topic,每个topic将被分为多个partition(分区)。每个partition在存储层面是追加log(日志)文件,任何发布到此partition的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型的数值,它唯一标记一条消息。
每一条消息被发送到Kafka中,其会根据一定的规则选择被存储到哪一个partition中。如果规则设置的合理,所有的消息可以均匀分布到不同的partition里,这样就实现了水平扩展。如上图,每个partition由其上附着的每一条消息组成,如果消息格式设计的不够精炼,那么其功能和性能都会大打折扣。比如有冗余字段,势必会使得partition不必要的增大,进而不仅使得存储的开销变大、网络传输的开销变大,也会使得Kafka的性能下降;又比如缺少字段,在最初的Kafka消息版本中没有timestamp字段,对内部而言,其影响了日志保存、切分策略,对外部而言,其影响了消息审计、端到端延迟等功能的扩展,虽然可以在消息体内部添加一个时间戳,但是解析变长的消息体会带来额外的开销,而存储在消息体(参考下图中的value字段)前面可以通过指针偏量获取其值而容易解析,进而减少了开销(可以查看v1版本),虽然相比于没有timestamp字段的开销会差一点。如此分析,仅在一个字段的一增一减之间就有这么多门道,那么Kafka具体是怎么做的呢?本文只针对Kafka 0.8.x版本开始做相应说明,对于之前的版本不做陈述。
v0版本
对于Kafka消息格式的第一个版本,我们把它称之为v0,在Kafka 0.10.0版本之前都是采用的这个消息格式。注意如无特殊说明,我们只讨论消息未压缩的情形。
在具体讲述v0版的消息(Record)格式之前,我们先来了解下消息集(Message Set)的概念。消息包含键值对(key/value)和一些元数据(crc32、magic、attributes),而消息集是包含offset、message size以及一系列消息的集合。消息集不仅是存储于磁盘以及在网络上传输的基本形式,而且是kafka中压缩的基本单元。v0版的消息集结构如下图所示:
每条消息都一个offset用来标志它在partition中的偏移量,这个offset是逻辑值,而非实际物理偏移值。在未压缩的情况下,对于v0和v1版本而言,每个消息集中只包含一条消息,消息集中的offset也是消息的offset(压缩的情况有所不同,下面会介绍)。大多数人会把上图的全部内容看成是消息的格式,这里需要注意Record和Message Set的区别。message size表示消息集中的消息大小。offset和message size总体表示为消息集的日志头部(LOG_OVERHEAD),总大小固定为12B。
而消息(Record)格式是从crc32开始算起中各个字段的解释如下:
crc32(4B):crc32校验值。校验范围为magic至value之间。
magic(1B):消息格式版本号,此版本的magic值为0。
attributes(1B):消息的属性。总共占1个字节,低3位表示压缩类型:0表示NONE、1表示GZIP、2表示SNAPPY、3表示LZ4(LZ4自Kafka 0.9.x引入),其余位保留。
key length(4B):表示消息的key的长度。如果为-1,则表示没有设置key,即key=null。
key:可选,如果没有key则无此字段。
value length(4B):实际消息体的长度。如果为-1,则表示消息为空。
value:消息体。可以为空,比如tomnstone消息。
由上图我们也可以得知,v0版本中一个消息的最小长度(RECORD_OVERHEAD_V0)为crc32 + magic + attributes + key length + value length = 4B + 1B + 1B + 4B + 4B =14B,也就是说v0版本中一条消息的最小长度为14B,如果小于这个值,那么这就是一条破损的消息而不被接受。
这里我们来做一个测试,首先创建一个partition数和副本数都为1的topic,名称为“msg_format_v0”:
[root@node1 kafka_2.10-0.8.2.1]# bin/kafka-topics.sh --create
--zookeeper localhost:2181--topic msg_format_v0 --partitions1--replication-factor1
[root@node1 kafka_2.10-0.8.2.1]# bin/kafka-topics.sh --describe
--zookeeper localhost:2181--topic msg_format_v0
Topic:msg_format_v0 PartitionCount:1ReplicationFactor:1Configs:
Topic: msg_format_v0 Partition:0Leader:0Replicas:0Isr:0
然后往msg_format_v0中发送一条key="key",value="value"的消息,之后查看对应的日志:
[root@node1 kafka_2.10-0.8.2.1]# bin/kafka-run-class.shkafka.tools.DumpLogSegments
--files /tmp/kafka-logs/msg_format_v0-0/00000000000000000000.log
Dumping /tmp/kafka-logs-08/msg_format_v0-0/00000000000000000000.log
Starting offset:0
offset:0position:0isvalid:truepayloadsize:5magic:0
compresscodec: NoCompressionCodec crc:592888119keysize:3
查看消息的大小,即00000000000000000000.log文件的大小为34B,其值正好等于LOG_OVERHEAD+RECORD_OVERHEAD_V0 + 3B的key + 5B的value = 12B + 14B + 3B + 5B = 34B。
[root@node1 msg_format_v0-0]#ll*.log
-rw-r--r--1rootroot34Apr26 02:5200000000000000000000.log
我们再发送一条key=null, value="value"的消息,之后查看日志的大小:
[root@node3 msg_format_v0-0]#ll*.log
-rw-r--r--1rootroot65Apr26 02:5600000000000000000000.log
日志大小为65B,减去上一条34B的消息,可以得知本条消息的大小为31B,正好等于LOG_OVERHEAD+RECORD_OVERHEAD_V0 + 5B的value = 12B + 14B+ 5B = 31B。
消息压缩
压缩率是压缩后的大小与压缩前的对比。例如:把100MB的文件压缩后是90MB,压缩率为90 / 100 * 100% =90%,压缩率一般是越小压缩效果越好。常见的压缩算法是数据量越大压缩率越低,一条消息通常不会太大,这就导致压缩率比较高,压缩效果不太好。而kafka实现的压缩方式是将多条消息一起进行压缩,这样可以保证较高的压缩效果。而且在一般情况下,生产者发送的压缩数据在kafka broker中也是保持压缩状态进行存储,消费者从服务端获取也是压缩的消息,消费者在处理消息之前才会解压消息,这样保持了端到端的压缩。
压缩后的消息格式与非压缩的消息格式类似,但是分为内外两层,参考下图:
压缩后的外层消息(wrapper message)中的key为null,所以图左部分没有画出key这一部分,value字段中保存的是多条压缩消息(inner message, 内层消息),其中Record表示的是从crc32到value的消息格式。当生产者创建压缩消息的时候,对内部压缩消息设置的offset是从0开始为每个内部消息分配offset,详细可以参考下图右部:
kafka broker在为外层消息分配offset的时候,会根据外层消息中记录的内层消息的个数为外层消息分配offset,此offset的值是内层消息中最后一条消息原本的offset值。参考上图,对于未压缩的情形,图右内层消息最后一条的offset理应是1030,但是被压缩之后就变成了5,而这个1030被赋予给了外层的offset。这种压缩的情形下,如果要找到内层消息集合的起始移位,首先要解压缩内层,然后遍历每一条消息,然后才能推到出起始位移,代价昂贵。
v1版本
kafka从0.10.0版本开始到0.11.0版本之前所使用的消息格式版本为v1,其比v0版本就多了一个timestamp字段,表示消息的时间戳。v1版本的结构图如下所示:
v1版本的magic字段值为1。v1版本的attributes字段中的低3位和v0版本的一样,还是表示压缩类型,而第4个bit也被利用了起来:0表示timestamp类型为CreateTime,而1表示tImestamp类型为LogAppendTime,其他位保留。v1版本的最小消息(RECORD_OVERHEAD_V1)大小要比v0版本的要大8个字节,即22B。如果像v0版本介绍的一样发送一条key="key",value="value"的消息,那么此条消息在v1版本中会占用42B,具体测试步骤参考v0版的相关介绍。
Varints
kafka从0.11.0版本开始所使用的消息格式版本为v2,这个版本的消息相比于v0和v1的版本而言改动很大,同时还参考了Protocol Buffer而引入了变长整型(Varints)和ZigZag编码。为了更加形象的说明问题,首先我们来了解一下变长整型。
Varints是使用一个或多个字节来序列化整数的一种方法。数值越小,其所占用的字节数就越少。Varints中每个字节都有一个位于最高位的msb位(most significant bit),除了最后一个字节外其余msb位都设置为1,最后一个字节的msb位设置为0。这个msb位表示其后的字节是否和当前字节一起来表示同一个整数。除msb位之外,剩余的7位用于存储数据本身,这种表示类型又称之为Base 128。通常而言,一个字节8位可以表示256个值,所以称之为Base 256,而这里只能用7位表示,2的7次方即为128。Varints中采用小端字节序,即最小的字节放在最前面。
举个例子,比如数字1,它只占一个字节,所以msb位为0:
0000 0001
再举一个复杂点的例子,如数字300:
1010 1100 0000 0010
300的二进制表示原本为:0000 0001 0010 1100 = 256+32+8+4=300,那么为什么300的变长表示为上面的这种形式?
首先我们先去掉每个字节的msb位,表示如下:
1010 1100 0000 0010
-> 010 1100 000 0010
如前所述,varints是小端字节序的布局方式,所以这里两个字节的位置需要翻转一下:
010 1100 000 0010
-> 000 0010 010 1100 (翻转)
-> 000 0010 ++ 010 1100
-> 0000 0001 0010 1100 = 256+32+8+4=300
Varints可以用来表示int32、int64、uint32、uint64、sint32、sint64、bool、enum等类型。在实际使用过程中,如果当前字段可以表示为负数,那么对于int32/int64和sint32/sint64而言,它们在进行编码时存在着较大的区别。比如你使用int64表示一个负数,那么哪怕是-1,其编码后的长度始终为10个字节(可以通过下面的代码来测试长度),就如同对待一个很大的无符号长整型一样。为了使得编码更加的高效,Varints使用了ZigZag的编码方式。
publicintsizeOfLong(intv){
intbytes =1;
while((v &0xffffffffffffff80L) !=0L) {
bytes +=1;
v >>>=7;
}
returnbytes;
}
ZigZag编码以一种锯齿形(zig-zags)的方式来回穿梭于正负整数之间,以使得带符号整数映射为无符号整数,这样可以使得绝对值较小的负数仍然享有较小的Varints编码值,比如-1编码为1,1编码为2,-2编码为3,参考下表:
原值编码后的值
00
-11
12
-23
21474836474294967294
-21474836484294967295
对应的公式为:
(n <<1) ^ (n>> 31)
这是对于sint32而言的,sint64对应的公式为:
(n <<1) ^ (n>> 63)
以-1为例,其二进制表现形式为:1111 1111 1111 1111 1111 1111 1111 1111(补码)。
(n <<1) =11111111111111111111111111111110
(n>> 31) =00000000000000000000000000000001
(n <<1) ^ (n>> 31) =1
最终-1的Varints编码为0000 0001,这样原本用4字节表示的-1现在可以用1个字节来表示了。对于1而言就显得非常简单了,其二进制表现形式为:0000 0000 0000 0000 0000 0000 0000 0001。
(n <<1) =00000000000000000000000000000010
(n>> 31) =00000000000000000000000000000000
(n <<1) ^ (n>> 31) =2
最终1的Varints编码为0000 0010,也只占用一个字节。
前面说过Varints中的一个字节中只有7位是有效数值位,即只能表示128个数值,转变成绝对值之后其实质上只能表示64个数值。比如对于消息体长度而言,其值肯定是个大于等于0的正整数,那么一个字节长度的Varints最大只能表示63(从0开始计)。对于64而言,其二进制表示为:
0100 0000
经过ZigZag处理后为:
1000 0000 ^ 0000 0000 = 1000 0000
每个字节的低7位是有效数值位,所以1000 0000进一步转变为:
000 0001 000 0000
而Varints又是小端字节序,所以需要翻转一下位置:
000 0000 000 0001
设置非最后一个字节的msb位为1,最后一个字节的msb位为0,最终有:
1000 0000 0000 0001
所以最终64表示为:1000 0000 0000 0001,而63却表示为:0111 1110。
回顾一下kafka v0和v1版本的消息格式,如果消息本身没有key,那么key length字段为-1,int类型的需要4个字节来保存,而如果采用Varints来编码则只需要一个字节。根据Varints的规则可以推导出0-63之间的数字占1个字节,64-8191之间的数字占2个字节,8192-1048575之间的数字占3个字节。而kafka broker的配置message.max.bytes的默认大小为1000012(Varints编码占3个字节),如果消息格式中与长度有关的字段采用Varints的编码的话,绝大多数情况下都会节省空间,而v2版本的消息格式也正是这样做的。
不过需要注意的是Varints并非一直会省空间,一个int32最长会占用5个字节(大于默认的4字节),一个int64最长会占用10字节(大于默认的8字节)。下面代码展示如何计算一个int32占用的字节个数:
publicstaticintsizeOfVarint(intvalue){
intv = (value<<1) ^ (value>>31);
intbytes =1;
while((v &0xffffff80) !=0L) {
bytes +=1;
v >>>=7;
}
returnbytes;
}
v2版本
消息集从Message Set的代号转变成为Record Batch(参见下图),其中包含了1至多条消息(Record,参见下图中部和右部)。在消息压缩的情形下,Record Batch Header部分(参见下图左部,从first offset到records count字段)是不被压缩的,而被压缩的是records字段。
先来讲述一下消息格式Record的关键字段,可以看到内部字段大量采用了Varints,这样Kafka可以根据具体的值来确定需要几个字节来保存。v2版本的消息格式去掉了crc字段,另外增加了length(消息总长度)、timestamp delta(时间戳增量)、offset delta(位移增量)和headers信息,并且attributes被弃用了,笔者对此做如下分析(对于key、key length、value、value length字段和v0以及v1版本的一样,这里不再赘述):
length:消息总长度。
attributes:弃用,但是还是在消息格式中占据1B的大小,以备未来的格式扩展。
timestamp delta:时间戳增量。通常一个timestamp需要占用8个字节,如果像这里保存与RecordBatch的其实时间戳的差值的话可以进一步的节省占用的字节数。
offset delta:位移增量。保存与RecordBatch起始位移的差值,可以节省占用的字节数。
headers:这个字段用来支持应用级别的扩展,而不需要像v0和v1版本一样不得不将一些应用级别的属性值嵌入在消息体里面。Header的格式如上图最有,包含key和value,一个Record里面可以包含0至多个Header。具体可以参考以下KIP-82。
如果对于v1版本的消息,如果用户指定的timestamp类型是LogAppendTime而不是CreateTime,那么消息从发送端(Producer)进入broker端之后timestamp字段会被更新,那么此时消息的crc值将会被重新计算,而此值在Producer端已经被计算过一次;再者,broker端在进行消息格式转换时(比如v1版转成v0版的消息格式)也会重新计算crc的值。在这些类似的情况下,消息从发送端到消费端(Consumer)之间流动时,crc的值是变动的,需要计算两次crc的值,所以这个字段的设计在v0和v1版本中显得比较鸡肋。在v2版本中将crc的字段从Record中转移到了RecordBatch中。
v2版本对于消息集(RecordBatch)做了彻底的修改,参考上图左部,除了刚刚提及的crc字段,还多了如下字段:
first offset:表示当前RecordBatch的起始位移。
length:计算partition leader epoch到headers之间的长度。
partition leader epoch:用来确保数据可靠性,详细可以参考KIP-101。
magic:消息格式的版本号,对于v2版本而言,magic等于2。
attributes:消息属性,注意这里占用了两个字节。低3位表示压缩格式,可以参考v0和v1;第4位表示时间戳类型;第5位表示此RecordBatch是否处于事务中,0表示非事务,1表示事务。第6位表示是否是Control消息,0表示非Control消息,而1表示是Control消息,Control消息用来支持事务功能。
last offset delta:RecordBatch中最后一个Record的offset与first offset的差值。主要被broker用来确认RecordBatch中Records的组装正确性。
first timestamp:RecordBatch中第一条Record的时间戳。
max timestamp:RecordBatch中最大的时间戳,一般情况下是指最后一个Record的时间戳,和last offset delta的作用一样,用来确保消息组装的正确性。
producer id:用来支持幂等性,详细可以参考KIP-98。
producer epoch:和producer id一样,用来支持幂等性。
first sequence:和producer id、producer epoch一样,用来支持幂等性。
records count:RecordBatch中Record的个数。
这里我们再来做一个测试,在1.0.0的kafka中创建一个partition数和副本数都为1的topic,名称为“msg_format_v2”。然后同样插入一条key="key",value="value"的消息,查看日志结果如下:
[root@node1 kafka_2.12-1.0.0]# bin/kafka-run-class.shkafka.tools.DumpLogSegments
--files /tmp/kafka-logs/msg_format_v2-0/00000000000000000000.log --print-data-log
Dumping /tmp/kafka-logs/msg_format_v2-0/00000000000000000000.log
Starting offset:0
baseOffset:0lastOffset:0baseSequence:-1lastSequence:-1producerId:-1
producerEpoch:-1partitionLeaderEpoch:0isTransactional:falseposition:0
CreateTime:1524709879130isvalid:truesize:76magic:2compresscodec: NONE
crc:2857248333
可以看到size字段为76,我们根据上图中的v2版本的日志格式来验证一下,Record Batch Header部分共61B。Record部分中attributes占1B;timestamp delta值为0,占1B;offset delta值为0,占1B;key length值为3,占1B,key占3B;value length值为5,占1B,value占5B;headers count值为0,占1B, 无headers。Record部分的总长度=1B+1B+1B+1B+3B+1B+5B+1B=14B,所以Record的length字段值为14,编码为变长整型占1B。最后推到出这条消息的占用字节数=61B+14B+1B=76B,符合测试结果。同样再发一条key=null,value="value"的消息的话,可以计算出这条消息占73B。
这么看上去好像v2版本的消息比之前版本的消息占用空间要大很多,的确对于单条消息而言是这样的,如果我们连续往msg_format_v2中再发送10条value长度为6,key为null的消息,可以得到:
baseOffset: 2 lastOffset: 11 baseSequence: -1 lastSequence: -1 producerId: -1
producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false position: 149
CreateTime: 1524712213771 isvalid: true size: 191 magic: 2 compresscodec: NONE
crc: 820363253
本来应该占用740B大小的空间,实际上只占用了191B,如果在v0版本中这10条消息则需要占用320B的空间(v1版本则需要占用400B的空间),这样看来v2版本又节省了很多的空间,因为其将多个消息(Record)打包存放到单个RecordBatch中,又通过Varints编码极大的节省了空间。
有兴趣的同学可以自行测试一下在大批量消息的情况下,v2版本的消息占用大小和之前版本的对比,比如往msg_format_v0和msg_format_v2中各自发送100W条1KB的消息。具体的测试报告会在后面的文章中发出,但通过上面的陈述可知v2版本的消息不仅提供了更多的功能,比如事务、幂等性等,某些情况下还减少了消息的空间占用,总体提升很大。