消息中间件的性能吞吐量如何,关键是看底层存储如何做到极限,另一个重要因素就是网络io通信。接下来将从消息编解码,rpc通信介绍rocketMq的网络io~
rocketmq 通信模块代码结构
如图所示,rocketMq网络通信是基于netty实现的,大家需要对java的nio和各种buffer和多线程的知识有一定的了解。
RemotingCommand中重要的几个变量
name | 类型 | 说明 |
---|---|---|
code | int | 请求操作码,服务端拿到相应的操作码做不同的处理 |
language | byte | 语言 |
version | int | 版本号 |
opaque | int | 客户端请求id,因为netty是异步编程,服务端响应中做拓传到客户端中,找到相应的请求 |
flag | int | rpc标示,是同步、异步、oneway |
remark | String | 额外存储信息的 |
extFields | Map | 存放header头信息中的成员变量 |
serializeTypeCurrentRPC | SerializeType | 表示使用json还是rocketMq协议序列化和反序列化头信息 |
rocketMq编解码说明
先说明一下rocketMq的消息体组成如图所示,rocketMq提供了两种 消息头 序列化方式,一种是json,一种是rocketMq自定义协议序列化方式,之后会说明一下自定义协议序列化方式。
rocketMq反序列化是如何知道是那种序列化方式呢,答案就在headerLength中。
headerLength数据类型是int类型,第一个字节存储了消息头的序列化方式,后3个字节存储了消息头的长度。
消息头序列化:
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
// SerializeType 是一个byte类型,0 表示json协议,1表示rocketMq协议
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
消息头反序列化获取header类型:
public static SerializeType getProtocolType(int source) {
// 因为第4个字节是header的类型,所以要右移24个比特位
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}
再来看下有header长度和header类型如何获取header体的长度
public static int getHeaderLength(int length) {
// 0xFFFFFF转换成byte数组是 [0,-1,-1,-1] ,int的后三个字节比特位全是1
// byte 最高位1表示负数,而 11111111 8个比特位全是1 是 (byte)-1 的补码
return length & 0xFFFFFF;
}
rocketMq作为一个消息中间件,消息头有很多种,所以单独抽象出一个CommandCustomHeader接口,消息头的子类实现这个接口即可,接下来看一下接口定义
public interface CommandCustomHeader {
// 子类的成员变量必须是java的基本类型,这个方法就是校验子类成员变量是否为空的
void checkFields() throws RemotingCommandException;
}
接下来具体看下rocketMq是如何encode消息的
public ByteBuffer encode() {
// 1> header length size
int length = 4;
// 2> header data length
byte[] headerData = this.headerEncode();
length += headerData.length;
// 3> body data length
if (this.body != null) {
length += body.length;
}
ByteBuffer result = ByteBuffer.allocate(4 + length);
// length
result.putInt(length);
// header length
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// header data
result.put(headerData);
// body data;
if (this.body != null) {
result.put(this.body);
}
result.flip();
return result;
}
①:length 代表消息的总长度,初始值为4表示了消息头的长度
②:将消息头转换为byte数组,分为json和rocketMq协议,会在下面具体分析
③:ByteBuffer.allocate(4 + length) 分配buffer堆内内存,前面length初始值为4,这里又加了4,因为有两个长度信息。
④:之后再把长度信息和header 、body push到内存中
⑤:之后在flip,转换为读模式,需要对ByteBuffer有一定的了解
说明:
java nio 中的ByteBuffer中有几个重要变量,limit、position、capacity。一开始new 一个buffer时,limit是等于capacity是处于写模式下,向buffer里put数据 position是逐渐递增的 不能超过limit的值。写了几个数据后我想从头开始读我写了哪些东西,这时做的一个操作是flip,其实就是将limit设置成了position的值,position 设置成0,get时position是递增的,直到小于limit。
// ByteBuffer的基本操作
ByteBuffer buffer = ByteBuffer.allocate(12);
buffer.putInt(1);
buffer.putInt(2);
buffer.flip();
System.out.println(buffer.getInt());
System.out.println(buffer.getInt());
// flip方法也可以转换为如下方式
ByteBuffer buffer = ByteBuffer.allocate(12);
buffer.putInt(1);
buffer.putInt(2);
// buffer.flip();
buffer.limit(8);
buffer.position(0);
System.out.println(buffer.getInt());
System.out.println(buffer.getInt());
rocketMq协议序列化头
前面说了rocketMq是有头信息的,头信息里面是java的基本变量,其实就是根据反射将变量信息先放在map中,然后在将map里面的k v 转换成byte 放到buffer中
// 存放header成员变量k v 值的map
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
// 这个方法就是通过反射先把customHeader中的成员变量放到map中
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap<String, String>();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
下面说明一下rocketMq形式的encode如何将header转换成byte~,直接上代码
public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
// String remark
byte[] remarkBytes = null;
int remarkLen = 0;
if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
remarkLen = remarkBytes.length;
}
// HashMap<String, String> extFields
byte[] extFieldsBytes = null;
int extLen = 0;
if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
extFieldsBytes = mapSerialize(cmd.getExtFields());
extLen = extFieldsBytes.length;
}
int totalLen = calTotalLen(remarkLen, extLen);
ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
// int code(~32767)
headerBuffer.putShort((short) cmd.getCode());
// LanguageCode language
headerBuffer.put(cmd.getLanguage().getCode());
// int version(~32767)
headerBuffer.putShort((short) cmd.getVersion());
// int opaque
headerBuffer.putInt(cmd.getOpaque());
// int flag
headerBuffer.putInt(cmd.getFlag());
// String remark
if (remarkBytes != null) {
headerBuffer.putInt(remarkBytes.length);
headerBuffer.put(remarkBytes);
} else {
headerBuffer.putInt(0);
}
// HashMap<String, String> extFields;
if (extFieldsBytes != null) {
headerBuffer.putInt(extFieldsBytes.length);
headerBuffer.put(extFieldsBytes);
} else {
headerBuffer.putInt(0);
}
return headerBuffer.array();
}
①:将remark转换成字节,remark就是RemotingCommand的一个成员变量
②:下面重点来了,将customHeader通过序列化放到extFields转换成byte数组,接下来会说明
③:接下来计算header的长度,公式为,里面的变量意思详见上面表格
private static int calTotalLen(int remark, int ext) {
// int code(~32767)
int length = 2
// LanguageCode language
+ 1
// int version(~32767)
+ 2
// int opaque
+ 4
// int flag
+ 4
// String remark
+ 4 + remark
// HashMap<String, String> extFields
+ 4 + ext;
return length;
}
④:初始化一个buffer,然后一个一个将值put进去。
接下来看下rocketMq如何将HashMap序列化成byte数组
public static byte[] mapSerialize(HashMap<String, String> map) {
// keySize+key+valSize+val
if (null == map || map.isEmpty())
return null;
int totalLength = 0;
int kvLength;
Iterator<Map.Entry<String, String>> it = map.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, String> entry = it.next();
if (entry.getKey() != null && entry.getValue() != null) {
kvLength =
// keySize + Key
2 + entry.getKey().getBytes(CHARSET_UTF8).length
// valSize + val
+ 4 + entry.getValue().getBytes(CHARSET_UTF8).length;
totalLength += kvLength;
}
}
ByteBuffer content = ByteBuffer.allocate(totalLength);
byte[] key;
byte[] val;
it = map.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, String> entry = it.next();
if (entry.getKey() != null && entry.getValue() != null) {
key = entry.getKey().getBytes(CHARSET_UTF8);
val = entry.getValue().getBytes(CHARSET_UTF8);
content.putShort((short) key.length);
content.put(key);
content.putInt(val.length);
content.put(val);
}
}
return content.array();
}
说明:其实就是拿到map的迭代器,把key和value按顺序放入到byte数组中,key的长度为short类型,value的长度为int类型,先计算出总长度,然后一个一个的放到buffer里面。
json协议序列化header信息
json协议序列化就十分简单了,直接把序列化后的json字符串获取byte返回。
// body 和 customHeader 被 transient修饰,json序列化和反序列话不会包括这俩字段
private transient byte[] body;
private transient CommandCustomHeader customHeader;
RemotingSerializable.encode(RemotingCommand.this);
// encode 调用这个方法
public static byte[] encode(final Object obj) {
final String json = toJson(obj, false);
if (json != null) {
return json.getBytes(CHARSET_UTF8);
}
return null;
}
rocketMq反序列化
接下来看一下反序列化如何实现的
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
// 整个消息体的长度,netty的LengthFieldBasedFrameDecoder处理粘包拆包时,已经把消息的总长度4个字节跳过
int length = byteBuffer.limit();
// 带上协议的header长度
int oriHeaderLen = byteBuffer.getInt();
// 去掉协议的header长度
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
// 下面是headerDecode 的相关代码,只是说明rocketMq header协议的代码
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
RemotingCommand cmd = new RemotingCommand();
ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
// int code(~32767)
cmd.setCode(headerBuffer.getShort());
// LanguageCode language
cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
// int version(~32767)
cmd.setVersion(headerBuffer.getShort());
// int opaque
cmd.setOpaque(headerBuffer.getInt());
// int flag
cmd.setFlag(headerBuffer.getInt());
// String remark
int remarkLength = headerBuffer.getInt();
if (remarkLength > 0) {
byte[] remarkContent = new byte[remarkLength];
headerBuffer.get(remarkContent);
cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
}
// HashMap<String, String> extFields
int extFieldsLength = headerBuffer.getInt();
if (extFieldsLength > 0) {
byte[] extFieldsBytes = new byte[extFieldsLength];
headerBuffer.get(extFieldsBytes);
// 重点看下如何将byte反序列化成map
cmd.setExtFields(mapDeserialize(extFieldsBytes));
}
return cmd;
}
public static HashMap<String, String> mapDeserialize(byte[] bytes) {
if (bytes == null || bytes.length <= 0)
return null;
HashMap<String, String> map = new HashMap<String, String>();
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
short keySize;
byte[] keyContent;
int valSize;
byte[] valContent;
while (byteBuffer.hasRemaining()) {
keySize = byteBuffer.getShort();
keyContent = new byte[keySize];
byteBuffer.get(keyContent);
valSize = byteBuffer.getInt();
valContent = new byte[valSize];
byteBuffer.get(valContent);
map.put(new String(keyContent, CHARSET_UTF8), new String(valContent, CHARSET_UTF8));
}
return map;
}
说明:rocketmq使用netty自带的LengthFieldBasedFrameDecoder来解码,其底层帮我们处理好了粘包和拆包的问题,而在NettyDecoder的构造方法中把 消息总长度的length去掉了 如代码所示
public NettyDecoder() {
// 第一个0 表示 lengthField 的offset,第一个4表示lengthField的length,最后面的4 表示跳过4个字节
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
}
将extFields转换成map其实就是encode的一个逆过程,当buffer里面有数据时,先获取k v的长度,然后在声明一个长度的byte数组把对应的value取出来,之后在put到map中。
总结
rocketMq的rpc通信使用netty,需要了解下netty底层的线程模型java nio 和 netty ByteBuf的相关知识,编解码其实就是定义好一个通信协议,根据协议将对象转换成byte,byte转换成对象的过程,在接下来的一篇文章里,介绍一下如何使用netty做 sync 、async、oneway形式的rpc通信的。以上就是通过看rocketMq rpc模块编解码相关代码总结出来的,如有错误,欢迎指出讨论~