注:本文依赖于kafka-0.10.0.1-src
kafka消息格式是经过多个版本的演变的,本文只说0.10.0.1版本的消息格式。
消息格式如图1所示:
CRC:用于校验消息内容。占4个字节
MAGIC:用于标识kafka版本,默认是1。占1个字节
ATTRIBUTES:用于存储消息压缩使用的编码以及Timestamp类型。这个版本仅支持 gzip、snappy、lz4三种压缩格式。后三位如果是000则表示没有使用压缩,如果是001则表示是gzip压缩,如果是010则是snappy压缩,如果是011则是snappy压缩。第4位(从右数)如果为0,代表使用create time,如果为1代表append time。其余位保留。占1个字节
TIMESTAMP:时间戳。占8个字节
KEY_SIZE:用于标识KEY内容的长度K。占用4个字节
KEY:存储的是KEY的具体内容。占用K个字节。
VALUE_SIZE:主要标识VALUE的内容的长度V。占用4个字节。
VALUE:消息的真实内容。占用V个字节
Record实际上是用Java NIO ByteBuffer存储的,那我们如何将一个消息写入到Record的ByteBuffer里面呢?
说简单点,其实就是把上面说的字段给Record传过来然后依次放入ByteBuffer中就可以了,但这中间利用了Compressor压缩器的压缩功能,我们后续会说一下Compressor压缩器,这里就先不介绍了。
还有一点,Record的写入是支持数据分包的,也就是一个完整的VALUE值可以通过valueOffset和valueSize来指定偏移和这次写入的数据大小来进行分包操作,这样就可以将一个完整的消息,分成多个Record。如果valueSize是负数,就表示从valueOffset开始到末尾的数据都写入。