kafka 协议分析 (一) 基础篇
kafka 协议分析 (二) Produce API
kafka 协议分析 (三) Fetch API
上一篇文章主要介绍了kafka协议的基础概念,本篇将介绍一下如何将消息写入Kafka
一个最简单的消息生产流程如下图:
- 先检查服务器支持的API版本,并从中选择一个合适的使用。ApiVersions API
- 获取目标Topic的MetaData,主要包括Topic的partition数量和leader地址。Metadata API
- 向leader写入消息。Produce API
ApiVersions API (Key: 18)
ApiVersions API用来获取服务器支持的版本,version 0只需要包含统一的请求头Size - Api Key - Api Version - Correlation ID - Client ID即可,不需要其它信息。
一个网络包示例如下:
域 | 值 | 描述 |
---|---|---|
Size | 12 | 消息长度 |
Api Key | 18 | Api |
Api Version | 0 | Api版本 |
Correlation ID | 1 | 请求ID,服务器会将这个ID原样返回 |
Client ID | wd | 客户端ID |
Metadata API (Key: 3)
Metadata API 获取目标Topic的MetaData,主要包括Topic的partition数量和leader地址。除了请求头Size - Api Key - Api Version - Correlation ID - Client ID外,还需要目标Topic信息。
一个网络包示例如下:
域 | 值 | 描述 |
---|---|---|
Size | 24 | 消息长度 |
Api Key | 3 | Api |
Api Version | 0 | Api版本 |
Correlation ID | 1 | 请求ID,服务器会将这个ID原样返回 |
Client ID | wd | 客户端ID |
Topic Len | 1 | topic数组大小 |
Topic | w1 | topic名字 |
Produce API (Key: 0)
Produce API写入消息API,负责将消息写入kafka。由 Common Header - Ack - Timeout - Topic - Partition - Records组成
域 | 值 | 描述 |
---|---|---|
Size | 24 | 消息长度 |
Api Key | 3 | Api |
Api Version | 0 | Api版本 |
Correlation ID | 1 | 请求ID,服务器会将这个ID原样返回 |
Client ID | wd | 客户端ID |
Ack | 1 | 0 无,1 leader,2 ISR |
Timeout | wd | 超时 |
Topic Len | 1 | topic数组大小 |
Topic | w1 | topic名字 |
Partition Len | 1 | partition数组大小 |
Partition | 0 | partition |
Records set | 消息数组 |