1. 消息/协议设计

一、什么是协议

协议的概念,来自百度百科的解释:

协议,网络协议的简称,网络协议是通信计算机双方必须共同遵从的一组约定。如怎么样建立连接、怎么样互相识别等。只有遵守这个约定,计算机之间才能相互通信交流。它的三要素是:语法、语义、时序。为了使数据在网络上从源到达目的,网络通信的参与方必须遵循相同的规则,这套规则称为协议(protocol),它最终体现为在网络上传输的数据包的格式。协议往往分成几个层次进行定义,分层定义是为了使某一层协议的改变不影响其他层次的协议。

注意,这儿有几个关键词:

1. 语法,就是我们常说的,协议的数据结构/格式,例如TCP协议的格式。

2. 语义,要做的事情的含义。例如三次握手中发出的SYNC消息,代表要客户端想要和服务端建立连接。

3. 时序,代表事件发生的顺序,例如TCP包的序号。

4. 分层,每一层都有自己的协议,不同层之间互不干扰。


下图是HTTP协议的数据,拆解到各层后,不同的层通过协议对数据进行包装。

二、协议的类型

因为数据在计算机的世界是以二进制流的方式传输的,全是0/1的形式,人类完全无法正常读取。我们需要确定的是一次有意义的传输内容在读到何时结束。而且数据在网络上的传输,存在粘包和半包的情况,能够应对这个问题的办法就是协议能够准确的识别,当粘包发生时不会多读,当半包发生时会继续读取。

协议一般的形式有三种:

1. 定长协议

定长的协议是指协议内容的长度是固定的,比如协议byte长度是50,当从网络上读取50个byte后,就进行decode解码操作。定长协议在读取或者写入时,效率比较高,因为数据缓存的大小基本都确定了,就好比数组一样,缺陷就是适应性不足,以RPC场景为例,很难估计出定长的长度是多少。

可以参考Netty的FixedLengthFrameDecoder

2. 特殊结束符

相比定长协议,如果能够定义一个特殊字符作为每个协议单元结束的标示,就能够以变长的方式进行通信,从而在数据传输和高效之间取得平衡,比如用特殊字符\n。例如,HTTP/1的header部分就是通过\r\n来表示结束的。

特殊结束符方式的问题是过于简单的思考了协议传输的过程,对于一个协议单元必须要全部读入才能够进行处理,除此之外必须要防止用户传输的数据不能同结束符相同,否则就会出现紊乱。

可以参考Netty的DelimiterBasedFrameDecoder

3. 协议头+PAYLOAD模式

一般是自定义协议,会以定长加不定长的部分组成,其中定长的部分需要描述不定长的内容长度。例如:dubbo/2、TCP协议、Kafka消息。

可以参考Netty的LengthFieldBasedFrameDecoder

三、各中间件常见协议

3.1 TCP/IP协议

TCP/IP模型是互联网的基础,它包含了:TCP(传输控制协议)和IP(网际协议)两种协议。

TCP/IP协议的关系如下图所示:

IP是TCP/IP中最为核心的协议,所有的TCP、UDP、ICMP等协议均以IP数据报的格式传输。IP协议提供不可靠、无连接的服务,它不保证数据报一定可以送达目的,也不保证数据报的先后次序。

IPv4首部格式为(IPv6的格式不太一样,可见这篇文章:IP协议详解及IPv4与IPv6协议的区别):

协议内容解析:

1. 版本(4位):版本占了4位,最大值为15,是协议版本号的定义,IPv4的话版本号就是4,IPv6的话版本号就是6。

2. 首部长度(4位):首部长度占了4位,最大值是15,不过这个数字代表的含义是在整个头部中含有的32位字长的数量,首部长度的最小值为5,最大值为15。由此根据计算可得整个头部的最小值为5 * 32位 = 5 * 4字节 = 20字节,最大值为 15 * 32位 = 15 * 4字节 = 60字节。

3. 区分服务(TOS)(4位):该字段定义上层协议对处理当前数据报所期望的服务质量,并对数据报按照重要性级别进行分配。前3位成为优先位,后面4位成为服务类型,最后1位没有定义。这些8位字段用于分配优先级、延迟、吞吐量以及可靠性。(想具体了解TOS的具体使用规则及位控制请移步文末扩展部分。)

4. 总长度(16位):该字段定义整个IP数据报的字节长度,包括协议头部和数据。其最大值为65535字节。以太网协议对能够封装在一个帧中的数据有最小值和最大值的限制(46~1500个字节)。过大的数据需要进行切片传输。链路层的帧格式中的数据字段的最大长度称为最大传送单元MTU(Maximum Transfer Unit)。当数据包封装成链路层的帧的时候,该数据报的头部+数据的长度不可以超过MTU的最大长度,超过的需要进行切片传输。

5. 标识(16位):该字段的值由发送到分配,如果该数据报需要进行切片分段的话,每一段的标识位的值都是一样的,方便接收端收到分段的数据后进行组装。

6. 标志(3位):该字段占3位,但是只有前两位才有意义:

最低位为MF (More Fragment),当MF=0的时候,表明该数据报已经是分段传输的最后一段报文,当MF=1的时候说明后面还有分段报文需要拼接。中间位为DF(Don’t Fragment),当DF = 0的时候说明该报文不允许切片,=1的时候可以进行切片分段传输

7. 片位移(13位):标记了该段报文在原始报文中的位置,方便接收端进行拼接操作。片位移以8个字节为偏移单位,所以对数据包进行分段操作的时候一定是以8个字节为单位进行分段的。如果最后一个分段不足8个字节的话会进行填充。

8. 生存时间TTL(8位):该字段定义为路由器跳数,指的是在传输的过程中最多可以经过多少个路由器进行转发,最大值为255(2的8次方 - 1),每经过一次路由器跳转这个值就会被-1,当该值为0的时候路由器会将该数据报丢弃。该值存在的意义是防止无法交付的资源在路由里无限跳转占用网络资源。

9. 协议(8位):表示在传输层使用哪种协议进行封装的(比如17代表UDP,6代表TCP,1代表ICMP),这样接收方的网络层可以知道使用何种协议来对数据进行解析。

10. 首部校验和(16位):该字段帮助确保IP协议头的完整性。由于某些协议头字段的改变,这就需要对每个点重新计算和检验。计算过程是先将校验和字段置为0,然后将整个头部每16位划分为一部分,将各部分相加,再将计算结果取反码,插入到校验和字段中。当接收方接收到数据报后,也对其首部进行校验计算,如果结果与“首部校验和”的值不同,就丢弃收到的数据报。

11. 源地址(32位):源主机IP地址,在整个传输过程中保持不变,32位,IPv4。将IP地址看作是32位数值则需要将网络字节顺序转化为主机字节顺序。转化的方法是:将每4个字节首尾互换,将2、3字节互换。

12. 目的地址(32位):目的主机IP地址,在整个传输过程中保持不变,32位,IPv4。

13. 可选字段:长度可变,如果选项存在的话,它在IPv4分组中紧跟在基本IPv4头部之后。实际应用中用到的很少(想了解的看文末扩展吧)。

14. 填充:IP头是以32bit字长为单位,有时需要填充来实现。

15. 数据部分:从传输层封装过来的数据。



TCP数据是封装在IP数据报中,TCP协议如下图:

协议内容解析:

1. 16位源端口号:16位的源端口中包含初始化通信的端口。源端口和源IP地址的作用是标识报文的返回地址。

2. 16位目的端口号:16位的目的端口域定义传输的目的。这个端口指明报文接收计算机上的应用程序地址接口。

3. 32位序号:32位的序列号由接收端计算机使用,重新分段的报文成最初形式。当SYN出现,序列码实际上是初始序列码(Initial Sequence Number,ISN),而第一个数据字节是ISN+1。这个序列号(序列码)可用来补偿传输中的不一致。

4. 32位确认序号:32位的序列号由接收端计算机使用,重组分段的报文成最初形式。如果设置了ACK控制位,这个值表示一个准备接收的包的序列码。

5. 4位首部长度:4位包括TCP头大小,指示何处数据开始。

6. 保留(6位):6位值域,这些位必须是0。为了将来定义新的用途而保留。

7. 标志:6位标志域。表示为:紧急标志、有意义的应答标志、推、重置连接标志、同步序列号标志、完成发送数据标志。按照顺序排列是:URG、ACK、PSH、RST、SYN、FIN。

  URG:紧急标志。紧急标志为"1"表明该位有效。

  ACK:确认标志。表明确认编号栏有效。大多数情况下该标志位是置位的。TCP报头内的确认编号栏内包含的确认编号(w+1)为下一个预期的序列编号,同时提示远端系统已经成功接收所有数据。

  PSH:推标志。该标志置位时,接收端不将该数据进行队列处理,而是尽可能快地将数据转由应用处理。在处理Telnet或rlogin等交互模式的连接时,该标志总是置位的。

  RST:复位标志。用于复位相应的TCP连接。

  SYN:同步标志。表明同步序列编号栏有效。该标志仅在三次握手建立TCP连接时有效。它提示TCP连接的服务端检查序列编号,该序列编号为TCP连接初始端(一般是客户端)的初始序列编号。在这里,可以把TCP序列编号看作是一个范围从0到4,294,967,295的32位计数器。通过TCP连接交换的数据中每一个字节都经过序列编号。在TCP报头中的序列编号栏包括了TCP分段中第一个字节的序列编号。

  FIN:结束标志。

8. 16位窗口大小:用来表示想收到的每个TCP数据段的大小。TCP的流量控制由连接的每一端通过声明的窗口大小来提供。窗口大小为字节数,起始于确认序号字段指明的值,这个值是接收端正期望接收的字节。窗口大小是一个16字节字段,因而窗口大小最大为65535字节。

9. 16位校验和:16位TCP头。源机器基于数据内容计算一个数值,收信息机要与源机器数值 结果完全一样,从而证明数据的有效性。检验和覆盖了整个的TCP报文段:这是一个强制性的字段,一定是由发送端计算和存储,并由接收端进行验证的。

10. 16位紧急指针:指向后面是优先数据的字节,在URG标志设置了时才有效。如果URG标志没有被设置,紧急域作为填充。加快处理标示为紧急的数据段。

11. 选项:长度不定,但长度必须为1个字节。如果没有选项就表示这个1字节的域等于0。

  最常见的可选字段是最长报文大小,又称为MSS(Maximum Segment Size)。每个连接方通常都在通信的第一个报文段(为建立连接而设置SYN标志的那个段)中指明这个选项。它指明本端所能接收的最大长度的报文段。

12. 数据:该TCP协议包负载的数据。

3.2 HTTP协议

HTTP是超文本形式定义的,通过TCP协议进行传输,本身分为HEADER和BODY两个部分,Header部分通过\r\n实现换行,通过 \r\n\r\n 标识Header部分结束,而Body部分通过Content-Length规定Body部分长度,如果Header里不包含Content-Length,而包含Transfer-Encoding: chunked,说明响应数据的长度不固定,结束符以\r\n0\r\n这5个字节为结束符。

3.3 DUBBO/2协议

3.3.1 协议

Dubbo 框架定义了私有的RPC协议,其中请求和响应协议的具体内容我们使用表格来展示。

- Magic - Magic High & Magic Low (16 bits)

- 标识协议版本号,Dubbo 协议:0xdabb

- Req/Res (1 bit)

标识是请求或响应。请求: 1; 响应: 0。

- 2 Way (1 bit)

仅在 Req/Res 为1(请求)时才有用,标记是否期望从服务器返回值。如果需要来自服务器的返回值,则设置为1。

- Event (1 bit)

标识是否是事件消息,例如,心跳事件。如果这是一个事件,则设置为1。

- Serialization ID (5 bit)

标识序列化类型:比如 fastjson 的值为6。

- Status (8 bits)

仅在 Req/Res 为0(响应)时有用,用于标识响应的状态。

    20 - OK

    30 - CLIENT_TIMEOUT

    31 - SERVER_TIMEOUT

    40 - BAD_REQUEST

    50 - BAD_RESPONSE

    60 - SERVICE_NOT_FOUND

    70 - SERVICE_ERROR

    80 - SERVER_ERROR

    90 - CLIENT_ERROR

    100 - SERVER_THREADPOOL_EXHAUSTED_ERROR

- Request ID (64 bits)

标识唯一请求。类型为long。

- Data Length (32 bits)

序列化后的内容长度(可变部分),按字节计数。int类型。

- Variable Part

被特定的序列化类型(由序列化 ID 标识)序列化后,每个部分都是一个 byte [] 或者 byte

    如果是请求包 ( Req/Res = 1),则每个部分依次为:

    Dubbo version

    Service name

    Service version

    Method name

    Method parameter types

    Method arguments

    Attachments

    如果是响应包(Req/Res = 0),则每个部分依次为:

    返回值类型(byte),标识从服务器端返回的值类型:

    返回空值:RESPONSE_NULL_VALUE 2

    正常响应值: RESPONSE_VALUE 1

    异常:RESPONSE_WITH_EXCEPTION 0

    返回值:从服务端返回的响应bytes

注意:对于(Variable Part)变长部分,当前版本的Dubbo 框架使用json序列化时,在每部分内容间额外增加了换行符作为分隔,请在Variable Part的每个part后额外增加换行符, 如:

Dubbo version bytes (换行符)

Service name bytes  (换行符)

...

3.3.2 Dubbo/2缺点

类似于 http 请求,通过 header 就可以确定要访问的资源,而 Dubbo 需要涉及到用特定序列化协议才可以将服务名、方法、方法签名解析出来,并且这些资源定位符是 string 类型或者 string 数组,很容易转成 bytes,因此可以组装到 header 中。类似于 http2 的 header 压缩,对于 rpc 调用的资源也可以协商出来一个int来标识,从而提升性能,如果在header上组装资源定位符的话,该功能则更易实现。

通过 req/res 是否是请求后,可以精细定制协议,去掉一些不需要的标识和添加一些特定的标识。比如status,twoWay标识可以严格定制,去掉冗余标识。还有超时时间是作为 Dubbo 的 attachment 进行传输的,理论上应该放到请求协议的header中,因为超时是网络请求中必不可少的。提到 attachment ,通过实现可以看到 attachment 中有一些是跟协议 content中已有的字段是重复的,比如 path和version等字段,这些会增大协议尺寸。

Dubbo 会将服务名com.alibaba.middleware.hsf.guide.api.param.ModifyOrderPriceParam,转换为Lcom/alibaba/middleware/hsf/guide/api/param/ModifyOrderPriceParam;,理论上是不必要的,最后追加一个;即可。

Dubbo 协议没有预留扩展字段,没法新增标识,扩展性不太好,比如新增响应上下文的功能,只有改协议版本号的方式,但是这样要求客户端和服务端的版本都进行升级,对于分布式场景很不友好。

3.4 HTTP/2协议

HTTP/2 不再像 HTTP/1.1 里的纯文本形式的报文,而是全面采用了二进制格式。

HTTP/2 把报文划分成了两类帧(Frame),图中的 HEADERS(首部)和 DATA(消息负载) 是帧的类型,也就是说一条 HTTP 响应,划分成了两类帧来传输,并且采用二进制来编码。

那么帧(frame)的结构是怎样的呢?

帧头(Frame Header)很小,总计只有 9 个字节(72bits),可能包含帧数据长度、帧类型、标志位、和流标识符。

帧数据长度

帧开头的前 3 个字节表示帧数据(Frame Playload)的长度。

帧类型

HTTP/2 总共定义了 10 种类型的帧,一般分为数据帧和控制帧两类,如下表格:

标志位

可以保存 8 个标志位,用于携带简单的控制信息,比如:

END_HEADERS 表示头数据结束标志,相当于 HTTP/1 里头后的空行(“\r\n”);

END_Stream 表示单方向数据发送结束,后续不会再有数据帧。

PRIORITY 表示流的优先级;

流标识符

流标识符(Stream ID),最高位被保留不用,只有 31 位可以使用,因此流标识符的最大值是 2^31,大约是 21 亿,它的作用是用来标识该 Frame 属于哪个 Stream,接收方可以根据这个信息从乱序的帧里找到相同 Stream ID 的帧,从而有序组装信息。

帧数据

帧数据存放的是通过 HPACK 算法压缩过的 HTTP 头部和包体。因为表示帧数据长度的位置只有24个bit,因此一个帧的数据部分最多2^24-1个bits,换算后长度为2MB。因此一个帧数据部分最多2MB,超过就需要拆分多个帧。

不同类型的帧的数据结构不一样,是由上面的几个部分组成的。具体的帧类型对应的数据结构看这篇文章:https://halfrost.com/http2-http-frames-definitions/#toc-0

3.5 gRPC协议

3.5.1 概念

gRPC 是 Google 基于 HTTP/2 以及 protobuf 的,要了解 gRPC 协议,只需要知道 gRPC 是如何在 HTTP/2 上面传输就可以了。

gRPC 通常有四种模式,unary,client streaming,server streaming 以及 bidirectional streaming,对于底层 HTTP/2 来说,它们都是 stream,并且仍然是一套 request + response 模型。

3.5.2 请求与响应例子

下面是gRpc基于HTTP/2的请求和响应的协议,原文见此

一个请求由请求头、消息、EOS构成,如下:

Request → Request-Headers *Length-Prefixed-Message EOS

下面是一组Request/Response的例子:

Request

HEADERS (flags = END_HEADERS)

:method = POST

:scheme = http

:path = /google.pubsub.v2.PublisherService/CreateTopic

:authority = pubsub.googleapis.com

grpc-timeout = 1S

content-type = application/grpc+proto

grpc-encoding = gzip

authorization = Bearer y235.wef315yfh138vh31hv93hv8h3v

DATA (flags = END_STREAM)

<Length-Prefixed Message>

Response

HEADERS (flags = END_HEADERS)

:status = 200

grpc-encoding = gzip

content-type = application/grpc+proto

DATA

<Length-Prefixed Message>

HEADERS (flags = END_STREAM, END_HEADERS)

grpc-status = 0 # OK

trace-proto-bin = jher831yy13JHy3hc

3.5.3 Request-Header、Response-Header和Length-Prefixed Message

Request-Header

Request-Header部分,由Call-Definition和Metadata组成。

Request-Headers → Call-Definition *Custom-Metadata

其中Call-Definition定义如下:

Call-Definition → Method Scheme Path TE [Authority] [Timeout] Content-Type [Message-Type] [Message-Encoding] [Message-Accept-Encoding] [User-Agent]

Method → ":method POST"

Scheme → ":scheme " ("http" / "https")

Path → ":path" "/" Service-Name "/" {method name} # But see note below.

Service-Name → {IDL-specific service name}

Authority → ":authority" {virtual host name of authority}

TE → "te" "trailers" # Used to detect incompatible proxies

Timeout → "grpc-timeout" TimeoutValue TimeoutUnit

TimeoutValue → {positive integer as ASCII string of at most 8 digits}

TimeoutUnit → Hour / Minute / Second / Millisecond / Microsecond / Nanosecond

    Hour → "H"

    Minute → "M"

    Second → "S"

    Millisecond → "m"

    Microsecond → "u"

    Nanosecond → "n"

Content-Type → "content-type" "application/grpc" [("+proto" / "+json" / {custom})]

Content-Coding → "identity" / "gzip" / "deflate" / "snappy" / {custom}

Message-Encoding → "grpc-encoding" Content-Coding

Message-Accept-Encoding → "grpc-accept-encoding" Content-Coding *("," Content-Coding)

User-Agent → "user-agent" {structured user-agent string}

Message-Type → "grpc-message-type" {type name for message schema}

Custom-Metadata → Binary-Header / ASCII-Header

    Binary-Header → {Header-Name "-bin" } {base64 encoded value}

    ASCII-Header → Header-Name ASCII-Value

    Header-Name → 1*( %x30-39 / %x61-7A / "_" / "-" / ".") ; 0-9 a-z _ - .

    ASCII-Value → 1*( %x20-%x7E ) ; space and printable ASCII

Response-Header

Response-Header部分,有两种可能:

Response-Headers *Length-Prefixed-Message Trailers

Response-Headers → HTTP-Status [Message-Encoding] [Message-Accept-Encoding] Content-Type *Custom-Metadata

    HTTP-Status → ":status 200"

    Status → "grpc-status" 1*DIGIT ; 0-9

    Status-Message → "grpc-message" Percent-Encoded

    Percent-Encoded → 1*(Percent-Byte-Unencoded / Percent-Byte-Encoded)

    Percent-Byte-Unencoded → 1*( %x20-%x24 / %x26-%x7E ) ; space and VCHAR, except %

    Percent-Byte-Encoded → "%" 2HEXDIGIT ; 0-9 A-F

Trailers → Status [Status-Message] * Custom-Metadata

Trailers-Only

Trailers-Only → HTTP-Status Content-Type Trailers

Length-Prefixed-Message

Length-Prefixed-Message是实际传输的数据格式,包含:Compressed-Flag(1字节)、Message-Length(4字节)、消息本身。

Compressed-Flag → 0 / 1 , 1 字节无符号整数。

Message-Length → 消息的长度,为 4 字节无符号整数(大端)

消息→ *{二进制八位字节}

3.6 DUBBO/3(Triple)协议

Dubbo/3兼容 gRPC ,以 HTTP2 作为传输层构建新的协议,命名为Triple

容器化应用程序和微服务的兴起促进了针对负载内容优化技术的发展。 客户端中使用的传统通信协议( RESTFUL或其他基于 HTTP 的自定义协议)难以满足应用在性能、可维护性、扩展性、安全性等方便的需求。一个跨语言、模块化的协议会逐渐成为新的应用开发协议标准。自从 2017 年 gRPC 协议成为 CNCF 的项目后,包括 k8s、etcd 等越来越多的基础设施和业务都开始使用 gRPC 的生态,作为云原生的微服务化框架, Dubbo 的新协议也完美兼容了 gRPC。并且,对于 gRPC 协议中一些不完善的部分,Triple 也将进行增强和补充。

性能上: Triple 协议采取了 metadata 和 payload 分离的策略,这样就可以避免中间设备,如网关进行 payload 的解析和反序列化,从而降低响应时间。

路由支持上,由于 metadata 支持用户添加自定义 header ,用户可以根据 header 更方便的划分集群或者进行路由,这样发布的时候切流灰度或容灾都有了更高的灵活性。

安全性上,支持双向TLS认证(mTLS)等加密传输能力。

易用性上,Triple 除了支持原生 gRPC 所推荐的 Protobuf 序列化外,使用通用的方式支持了 Hessian / JSON 等其他序列化,能让用户更方便的升级到 Triple 协议。对原有的 Dubbo 服务而言,修改或增加 Triple 协议 只需要在声明服务的代码块添加一行协议配置即可,改造成本几乎为 0。

协议内容,基于 grpc 协议进行进一步扩展,具体如下:

Service-Version → “tri-service-version” {Dubbo service version}

Service-Group → “tri-service-group” {Dubbo service group}

Tracing-ID → “tri-trace-traceid” {tracing id}

Tracing-RPC-ID → “tri-trace-rpcid” {_span id _}

Cluster-Info → “tri-unit-info” {cluster infomation}

协议解析:

其中 Service-Version 跟 Service-Group 分别标识了 Dubbo 服务的 version 跟 group 信息,因为grpc的 path 申明了 service name 跟 method name,相比于 Dubbo 协议,缺少了version 跟 group 信息;

Tracing-ID、Tracing-RPC-ID 用于全链路追踪能力,分别表示 tracing id 跟 span id 信息;

Cluster-Info 表示集群信息,可以使用其构建一些如集群划分等路由相关的灵活的服务治理能力。

3.7 Kafka消息协议

Kafka的数据在底层存储上是按Topic-Partition/CommitLog-seq的文件目录存储的。每个文件大小约512MB,长度不定的原因是一条消息不会分割到两个文件中存储。

Kafka的消息格式经过了多次迭代,我们现在使用的基本都是最新版的,如果不感兴趣,老版的消息部分可以跳过。

3.7.1 老版的消息

一开始的时候,消息(Message)并不包括它的offset。Kafka的log是由一条一条的记录构成的,Kafka并没有给这种记录起个专门的名字,但是需要记住的是这个“记录”并不等于"Message"。Offset、MessageSize、Message加在一起,构成一条记录

MessageAndOffset => Offset MessageSize Message

  Offset => int64

  MessageSize => int32

  Message =>

而老版的Message的结构如下:

Message => Crc MagicByte Attributes Key Value

  Crc => int32

  MagicByte => int8

  Attributes => int8

  Key => bytes

  Value => bytes

后面小小更新了一版(supported since 0.10.0),添加了时间戳,添加的原因详情可见对应的KIP-32。添加后的格式如下:

Message => Crc MagicByte Attributes Timestamp Key Value

  Crc => int32

  MagicByte => int8

  Attributes => int8

  Timestamp => int64

  Key => bytes

  Value => bytes

这些字段解释如下:

Field中文解释

Offset

这是 kafka 中用作日志序列号的偏移量。当生产者发送非压缩消息时,它可以将偏移量设置为任何值。当生产者发送压缩消息时,为避免服务器端重新压缩,每个压缩消息的偏移量应从 0 开始,并为压缩消息中的每个内部消息增加 1。(请参阅下面有关 Kafka 中压缩消息的更多详细信息)

CrcCRC 是剩余消息字节的 CRC32。这用于检查消息在Broker和消费者上的完整性。

MagicByte这是一个版本 ID,用于允许消息二进制格式的向后兼容演变。当前值为 1。

Attributes此字节包含有关消息的元数据属性。

最低 3 位包含用于消息的压缩编解码器。

第四低位表示时间戳类型。0 代表 CreateTime,1 代表 LogAppendTime。生产者应始终将此位设置为 0。(自 0.10.0 起)

所有其他位应设置为 0。

Timestamp这是消息的时间戳。时间戳类型在属性中指示。单位是自纪元开始以来的毫秒数(1970 年 1 月 1 日午夜 (UTC))。

Keykey是用于分区分配的可选消息,可以为空。(常说的分区键)

Value该值是作为不透明字节数组的实际消息内容。Kafka 支持递归消息,在这种情况下,它本身可能包含一个消息集。消息可以为空。

3.7.2 新版的消息

在 Kafka 0.11 中,“MessageSet”和“Message”的结构发生了显着变化。不仅添加了新字段以支持新功能,例如 exactly once 语义和记录标头,而且消除了以前版本消息格式的递归性质,以支持平面结构。“MessageSet”现在称为“RecordBatch”,它包含一个或多个“记录”(而不是“消息”)。启用压缩后,RecordBatch 标头保持未压缩状态,但 Records 会一起压缩。此外,“记录”中的多个字段采用 varint 编码,这可以为较大的批次节省大量空间。

原本的批量消息改为了RecordBatch,结构如下:

RecordBatch =>

  FirstOffset => int64

  Length => int32

  PartitionLeaderEpoch => int32

  Magic => int8

  CRC => int32

  Attributes => int16

    bit 0~2:

        0: no compression

        1: gzip

        2: snappy

        3: lz4

    bit 3: timestampType

    bit 4: isTransactional (0 means not transactional)

    bit 5: isControlBatch (0 means not a control batch)

    bit 6~15: unused

  LastOffsetDelta => int32

  FirstTimestamp => int64

  MaxTimestamp => int64

  ProducerId => int64

  ProducerEpoch => int16

  FirstSequence => int32

  Records => [Record]

字段的解释如下:

Field中文解释

FirstOffset

表示 RecordBatch 中的第一个偏移量。批次中每个记录的“offsetDelta”将相对于此 FirstOffset 进行计算。特别地,Batch 中每个 Record 的偏移量是它的 'OffsetDelta' + 'FirstOffset'。

Length批量数据的长度。

PartitionLeaderEpoch与KIP-101一起引入,这是由Broker在收到生产请求时设置的,用于确保在日志截断导致领导者更改时不会丢失数据。客户端开发人员无需担心设置此值。

MagicByte这是一个版本 ID,当前值为 2。

CrcCRC 会覆盖从属性到批处理结束的数据, (即 CRC 后的所有字节数据). CRC 位于 magic 之后,这意味着,在决定如何解释批次的长度和 magic 类型之前,客户端需要解析 magic 类型.CRC 计算不包括分区 learder epoch 字段,是为了避免 broker 收到每个批次的数据时 需要重新分配计算 CRC . CRC-32C (Castagnoli) 多项式用于计算.

Attributes此字节包含有关消息的元数据属性。

最低 3 位包含用于消息的压缩编解码器。

第四低位表示时间戳类型。0 代表 CreateTime,1 代表 LogAppendTime。生产者应始终将此位设置为 0。(自 0.10.0 起)

第五位表示是否事务,0 表示 RecordBatch 不是事务性的,而 1 表示它是。

第六位表示当前消息是否是控制消息,1代表是控制消息,0则反之;

其他位未使用,全部为0。

关于事务(第五、六位):

为支持事务机制,KAFKA 将底层日志文件的格式进行了扩展:

日志中除了普通的消息,还有一种消息专门用来标志事务的状态,它就是控制消息 control batch;

控制消息跟其他正常的消息一样,都被存储在日志中,但控制消息不会被返回给 consumer 客户端;

控制消息共有两种类型:commit 和 abort,分别用来表征事务已经成功提交或已经被成功终止;

RecordBatch 中 attributes 字段的第5位用来标志当前消息是否处于事务中,1代表消息处于事务中,0则反之;(A record batch is a container for records. )

RecordBatch 中 attributes 字段的第6位用来标识当前消息是否是控制消息,1代表是控制消息,0则反之;

由于控制消息总是处于事务中,所以控制消息对应的RecordBatch 的 attributes 字段的第5位和第6位都被置为1;

LastOffsetDeltaRecordBatch 中最后一条消息的偏移量。Broker使用它来确保正确的行为,即使在批次中的记录被压缩时也是如此。

FirstTimestamp批次中第一个记录的时间戳。RecordBatch 中每个 Record 的时间戳是它的 'TimestampDelta' + 'FirstTimestamp'。

MaxTimestamp批次中最后一条记录的时间戳。Broker使用它来确保正确的行为,即使批次中的记录被压缩。

ProducerId在KIP-98的 0.11.0.0 中引入,这是由“InitProducerId”请求接收到的Broker分配的 producerId。希望支持幂等消息传递和事务的客户端必须设置此字段。

ProducerEpoch在 0.11.0.0 中由KIP-98引入,这是由“InitProducerId”请求接收到的Broker分配的 producerEpoch。希望支持幂等消息传递和事务的客户端必须设置此字段。

FirstSequence在0.11.0.0 中由KIP-98引入,这是生产者分配的序列号,broker使用它来删除重复消息。希望支持幂等消息传递和事务的客户端必须设置此字段。RecordBatch 中每个 Record 的序列号是它的 OffsetDelta + FirstSequence。

Records记录数据。

压缩:

不同于旧的消息格式, magic v2 及以上版本在清理日志时保留原始日志中首次及最后一次 offset/sequence .这是为了能够在日志重新加载时恢复生产者的状态.例如,如果我们不保留最后一次序列号,当分区 learder 失败以后,生产者会报 OutOfSequence 的错误.必须保留基础序列号来做重复检查(broker 通过检查生产者该批次请求中第一次及最后一次序列号是否与上一次的序列号相匹配来判断是否重复).因此,当批次中所有的记录被清理但批次数据依然保留是为了保存生产者最后一次的序列号,日志中可能有空的数据.不解的是在压缩中时间戳可能不会被保留,所以如果批次中的第一条记录被压缩,时间戳也会改变。

Message改为Record,结构如下:

Record =>

  Length => varint

  Attributes => int8

  TimestampDelta => varint

  OffsetDelta => varint

  KeyLen => varint

  Key => data

  ValueLen => varint

  Value => data

  Headers => [Header]


Header => HeaderKey HeaderVal

  HeaderKeyLen => varint

  HeaderKey => string

  HeaderValueLen => varint

  HeaderValue => data

字段的解释如下:

Field中文解释

Length本条消息的长度

Attributes记录级属性目前未使用。

TimestampDelta与第一条消息的时间戳偏移量

OffsetDelta与第一条消息的offset偏移量

KeyLenkey的长度

Keykey数据

ValueLenvalue的长度

Value传输的数据本身

Headers在 0.11.0.0 中由KIP-82引入,Kafka 现在支持应用程序级记录级标头。生产者和消费者 API 已相应更新以写入和读取这些标头。

3.8 RocketMQ消息协议

RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。

RocketMQ的混合型存储结构针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。

当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。

这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

下图是RocketMQ的存储架构设计:

因此,在rocketMQ的底层存储上,会涉及到CommitLog/ConsumerQueue/IndexFile三个文件的设计。下面我们来一一展开。

3.8.1 CommitLog的消息设计

为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。

下图是RocketMq的通信协议格式:

可见传输内容主要可以分为以下4部分:

消息长度:总长度,四个字节存储,占用一个int类型;

序列化类型:一个字节长度

消息头长度:三个字节表示消息头长度;

消息头数据:经过序列化后的消息头数据;

消息主体数据:消息主体的二进制字节数据内容;

消息头可能的内容如下:

td {white-space:pre-wrap;border:1px solid #dee0e3;}Header字段类型Request说明Response说明

codeint请求操作码,应答方根据不同的请求码进行不同的业务处理应答响应码。0表示成功,非0则表示各种错误

languageLanguageCode请求方实现的语言应答方实现的语言

versionint请求方程序的版本应答方程序的版本

opaqueint相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应应答不做修改直接返回

flagint区分是普通RPC还是onewayRPC得标志区分是普通RPC还是onewayRPC得标志

remarkString传输自定义文本信息传输自定义文本信息

extFieldsHashMap请求自定义扩展信息响应自定义扩展信息

3.8.2 ConsumerQueue的设计

ConsumeQueue每条记录采用定长设计,每条记录20个字节,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。

ConsumeQueue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

其消息结构设计如下:

分为三个部分:

CommitLog Offset:8个字节,对应commitLog文件消息的offset

Message Size:4个字节,对应当前这条消息的长度,这样在commitLog文件上读取的时候,可以从CommitLog Offset开始,读取Size长度的数据,代表一条消息。

MessageTag HashCode:8个字节,对应消息Tag的hash值。

消息按Tag过滤的机制:

Consumer端在订阅消息时除了指定Topic还可以指定TAG,如果一个消息有多个TAG,可以用||分隔。

其中,Consumer端会将这个订阅请求构建成一个 SubscriptionData,发送一个Pull消息的请求给Broker端。Broker端从RocketMQ的文件存储层—Store读取数据之前,会用这些数据先构建一个MessageFilter,然后传给Store。

Store从 ConsumeQueue读取到一条记录后,会用它记录的消息tag hash值去做过滤,由于在服务端只是根据hashcode进行判断,无法精确对tag原始字符串进行过滤,故在消息消费端拉取到消息后,还需要对消息的原始tag字符串进行比对,如果不同,则丢弃该消息,不进行消息消费。

3.8.3 IndexFile的设计

IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,其结构类似JDK中HashMap的实现。索引文件的具体结构如下:

图中含义:

共计固定槽位500W个。每个槽位占4个字节,存放的是,单向链表的表头IndexOffset。每个链表有4个Node,因此,共计有2000W个Node。

每个Node是定长设计,20个字节。包含了KeyHash、CommitLogOffset、Timestamp、NextIndexOffset

KeyHash和CommitQueue里面的KeyHash是一个含义。

CommitLog Offset对应消息在CommitLog的位移。

Timestamp记录的是消息storeTimestamp(IndexFile文件命名是按创建时间命名的)之间的差,并不是一个绝对的时间。

NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。

每个Index文件有40个字节的固定Header部分,因此,每个Index文件的大小是固定的,等于40+500W*4+2000W*20= 420000040个字节大小,约等于400.5MB。

IndexFile文件的存储位置是:$HOME\store\index${fileName},文件名fileName是以创建时的时间戳命名的。

如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

按照Message Key查询消息的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

四、总结

通过上面8种协议的学习,我们可以得到以下几点:

1. 协议是通信双方互相沟通的基础,好的协议设计通常都有预留扩展(Dubbo/2没有,所以升级很困难),方便后续升级。

2. 协议最终的落脚点是信息传输/信息存储。因此要尽量简单,高效。

3. 二进制是网络传输的基础,无论什么协议,最终在网络传输中都要转为二进制,因此最好协议设计是基于二进制的,这样效率最高。

4. 协议设计是需要分层的,每层的载荷是有大小限制的,上层的内容在下层传输时通常需要被拆分,因此协议设计中,要考虑到传输过程的时序、异常,防止数据重复和丢失。

飞书原文:https://mk6hf73z4f.feishu.cn/docx/Qt92duGUDorKDPxpX74cxQTqn7g

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容