rocketmq rpc 通信(一) 编解码

  消息中间件的性能吞吐量如何,关键是看底层存储如何做到极限,另一个重要因素就是网络io通信。接下来将从消息编解码,rpc通信介绍rocketMq的网络io~

rocketmq 通信模块代码结构

  如图所示,rocketMq网络通信是基于netty实现的,大家需要对java的nio和各种buffer和多线程的知识有一定的了解。


image.png

RemotingCommand中重要的几个变量

name 类型 说明
code int 请求操作码,服务端拿到相应的操作码做不同的处理
language byte 语言
version int 版本号
opaque int 客户端请求id,因为netty是异步编程,服务端响应中做拓传到客户端中,找到相应的请求
flag int rpc标示,是同步、异步、oneway
remark String 额外存储信息的
extFields Map 存放header头信息中的成员变量
serializeTypeCurrentRPC SerializeType 表示使用json还是rocketMq协议序列化和反序列化头信息

rocketMq编解码说明

  先说明一下rocketMq的消息体组成如图所示,rocketMq提供了两种 消息头 序列化方式,一种是json,一种是rocketMq自定义协议序列化方式,之后会说明一下自定义协议序列化方式。

消息体组成

  rocketMq反序列化是如何知道是那种序列化方式呢,答案就在headerLength中。
headerLength数据类型是int类型,第一个字节存储了消息头的序列化方式,后3个字节存储了消息头的长度。

消息头序列化:
public static byte[] markProtocolType(int source, SerializeType type) {
        byte[] result = new byte[4];
        // SerializeType 是一个byte类型,0 表示json协议,1表示rocketMq协议
        result[0] = type.getCode();
        result[1] = (byte) ((source >> 16) & 0xFF);
        result[2] = (byte) ((source >> 8) & 0xFF);
        result[3] = (byte) (source & 0xFF);
        return result;
    }

消息头反序列化获取header类型:

public static SerializeType getProtocolType(int source) {
        // 因为第4个字节是header的类型,所以要右移24个比特位
        return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
    }

再来看下有header长度和header类型如何获取header体的长度

public static int getHeaderLength(int length) {
      // 0xFFFFFF转换成byte数组是 [0,-1,-1,-1] ,int的后三个字节比特位全是1
      // byte 最高位1表示负数,而 11111111 8个比特位全是1 是 (byte)-1 的补码
        return length & 0xFFFFFF;
    }

rocketMq作为一个消息中间件,消息头有很多种,所以单独抽象出一个CommandCustomHeader接口,消息头的子类实现这个接口即可,接下来看一下接口定义

public interface CommandCustomHeader {
   // 子类的成员变量必须是java的基本类型,这个方法就是校验子类成员变量是否为空的
    void checkFields() throws RemotingCommandException;
}

接下来具体看下rocketMq是如何encode消息的

public ByteBuffer encode() {
        // 1> header length size
        int length = 4;
        // 2> header data length
        byte[] headerData = this.headerEncode();
        length += headerData.length;
        // 3> body data length
        if (this.body != null) {
            length += body.length;
        }
        ByteBuffer result = ByteBuffer.allocate(4 + length);
        // length
        result.putInt(length);
        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
        // header data
        result.put(headerData);
        // body data;
        if (this.body != null) {
            result.put(this.body);
        }
        result.flip();
        return result;
    }

①:length 代表消息的总长度,初始值为4表示了消息头的长度
②:将消息头转换为byte数组,分为json和rocketMq协议,会在下面具体分析
③:ByteBuffer.allocate(4 + length) 分配buffer堆内内存,前面length初始值为4,这里又加了4,因为有两个长度信息。
④:之后再把长度信息和header 、body push到内存中
⑤:之后在flip,转换为读模式,需要对ByteBuffer有一定的了解
说明:
java nio 中的ByteBuffer中有几个重要变量,limit、position、capacity。一开始new 一个buffer时,limit是等于capacity是处于写模式下,向buffer里put数据 position是逐渐递增的 不能超过limit的值。写了几个数据后我想从头开始读我写了哪些东西,这时做的一个操作是flip,其实就是将limit设置成了position的值,position 设置成0,get时position是递增的,直到小于limit。

// ByteBuffer的基本操作
ByteBuffer buffer = ByteBuffer.allocate(12);
        buffer.putInt(1);
        buffer.putInt(2);
        buffer.flip();
        System.out.println(buffer.getInt());
        System.out.println(buffer.getInt());
// flip方法也可以转换为如下方式
ByteBuffer buffer = ByteBuffer.allocate(12);
        buffer.putInt(1);
        buffer.putInt(2);
//        buffer.flip();
        buffer.limit(8);
        buffer.position(0);
        System.out.println(buffer.getInt());
        System.out.println(buffer.getInt());
rocketMq协议序列化头

前面说了rocketMq是有头信息的,头信息里面是java的基本变量,其实就是根据反射将变量信息先放在map中,然后在将map里面的k v 转换成byte 放到buffer中

// 存放header成员变量k v 值的map
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
// 这个方法就是通过反射先把customHeader中的成员变量放到map中
public void makeCustomHeaderToNet() {
        if (this.customHeader != null) {
            Field[] fields = getClazzFields(customHeader.getClass());
            if (null == this.extFields) {
                this.extFields = new HashMap<String, String>();
            }
            for (Field field : fields) {
                if (!Modifier.isStatic(field.getModifiers())) {
                    String name = field.getName();
                    if (!name.startsWith("this")) {
                        Object value = null;
                        try {
                            field.setAccessible(true);
                            value = field.get(this.customHeader);
                        } catch (Exception e) {
                            log.error("Failed to access field [{}]", name, e);
                        }
                        if (value != null) {
                            this.extFields.put(name, value.toString());
                        }
                    }
                }
            }
        }
    }

下面说明一下rocketMq形式的encode如何将header转换成byte~,直接上代码

public static byte[] rocketMQProtocolEncode(RemotingCommand cmd) {
        // String remark
        byte[] remarkBytes = null;
        int remarkLen = 0;
        if (cmd.getRemark() != null && cmd.getRemark().length() > 0) {
            remarkBytes = cmd.getRemark().getBytes(CHARSET_UTF8);
            remarkLen = remarkBytes.length;
        }
        // HashMap<String, String> extFields
        byte[] extFieldsBytes = null;
        int extLen = 0;
        if (cmd.getExtFields() != null && !cmd.getExtFields().isEmpty()) {
            extFieldsBytes = mapSerialize(cmd.getExtFields());
            extLen = extFieldsBytes.length;
        }
        int totalLen = calTotalLen(remarkLen, extLen);
        ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
        // int code(~32767)
        headerBuffer.putShort((short) cmd.getCode());
        // LanguageCode language
        headerBuffer.put(cmd.getLanguage().getCode());
        // int version(~32767)
        headerBuffer.putShort((short) cmd.getVersion());
        // int opaque
        headerBuffer.putInt(cmd.getOpaque());
        // int flag
        headerBuffer.putInt(cmd.getFlag());
        // String remark
        if (remarkBytes != null) {
            headerBuffer.putInt(remarkBytes.length);
            headerBuffer.put(remarkBytes);
        } else {
            headerBuffer.putInt(0);
        }
        // HashMap<String, String> extFields;
        if (extFieldsBytes != null) {
            headerBuffer.putInt(extFieldsBytes.length);
            headerBuffer.put(extFieldsBytes);
        } else {
            headerBuffer.putInt(0);
        }
        return headerBuffer.array();
    }

①:将remark转换成字节,remark就是RemotingCommand的一个成员变量
②:下面重点来了,将customHeader通过序列化放到extFields转换成byte数组,接下来会说明
③:接下来计算header的长度,公式为,里面的变量意思详见上面表格

private static int calTotalLen(int remark, int ext) {
        // int code(~32767)
        int length = 2
            // LanguageCode language
            + 1
            // int version(~32767)
            + 2
            // int opaque
            + 4
            // int flag
            + 4
            // String remark
            + 4 + remark
            // HashMap<String, String> extFields
            + 4 + ext;
        return length;
    }

④:初始化一个buffer,然后一个一个将值put进去。
接下来看下rocketMq如何将HashMap序列化成byte数组

public static byte[] mapSerialize(HashMap<String, String> map) {
        // keySize+key+valSize+val
        if (null == map || map.isEmpty())
            return null;
        int totalLength = 0;
        int kvLength;
        Iterator<Map.Entry<String, String>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, String> entry = it.next();
            if (entry.getKey() != null && entry.getValue() != null) {
                kvLength =
                    // keySize + Key
                    2 + entry.getKey().getBytes(CHARSET_UTF8).length
                        // valSize + val
                        + 4 + entry.getValue().getBytes(CHARSET_UTF8).length;
                totalLength += kvLength;
            }
        }
        ByteBuffer content = ByteBuffer.allocate(totalLength);
        byte[] key;
        byte[] val;
        it = map.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, String> entry = it.next();
            if (entry.getKey() != null && entry.getValue() != null) {
                key = entry.getKey().getBytes(CHARSET_UTF8);
                val = entry.getValue().getBytes(CHARSET_UTF8);
                content.putShort((short) key.length);
                content.put(key);
                content.putInt(val.length);
                content.put(val);
            }
        }
        return content.array();
    }

说明:其实就是拿到map的迭代器,把key和value按顺序放入到byte数组中,key的长度为short类型,value的长度为int类型,先计算出总长度,然后一个一个的放到buffer里面。

json协议序列化header信息

  json协议序列化就十分简单了,直接把序列化后的json字符串获取byte返回。

// body 和 customHeader 被 transient修饰,json序列化和反序列话不会包括这俩字段
private transient byte[] body;
private transient CommandCustomHeader customHeader;

RemotingSerializable.encode(RemotingCommand.this);
// encode 调用这个方法
public static byte[] encode(final Object obj) {
        final String json = toJson(obj, false);
        if (json != null) {
            return json.getBytes(CHARSET_UTF8);
        }
        return null;
    }

rocketMq反序列化

接下来看一下反序列化如何实现的

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        // 整个消息体的长度,netty的LengthFieldBasedFrameDecoder处理粘包拆包时,已经把消息的总长度4个字节跳过
        int length = byteBuffer.limit();
        // 带上协议的header长度
        int oriHeaderLen = byteBuffer.getInt();
       // 去掉协议的header长度
        int headerLength = getHeaderLength(oriHeaderLen);
        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);
        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;
        return cmd;
    }

 // 下面是headerDecode 的相关代码,只是说明rocketMq header协议的代码
public static RemotingCommand rocketMQProtocolDecode(final byte[] headerArray) {
        RemotingCommand cmd = new RemotingCommand();
        ByteBuffer headerBuffer = ByteBuffer.wrap(headerArray);
        // int code(~32767)
        cmd.setCode(headerBuffer.getShort());
        // LanguageCode language
        cmd.setLanguage(LanguageCode.valueOf(headerBuffer.get()));
        // int version(~32767)
        cmd.setVersion(headerBuffer.getShort());
        // int opaque
        cmd.setOpaque(headerBuffer.getInt());
        // int flag
        cmd.setFlag(headerBuffer.getInt());
        // String remark
        int remarkLength = headerBuffer.getInt();
        if (remarkLength > 0) {
            byte[] remarkContent = new byte[remarkLength];
            headerBuffer.get(remarkContent);
            cmd.setRemark(new String(remarkContent, CHARSET_UTF8));
        }
        // HashMap<String, String> extFields
        int extFieldsLength = headerBuffer.getInt();
        if (extFieldsLength > 0) {
            byte[] extFieldsBytes = new byte[extFieldsLength];
            headerBuffer.get(extFieldsBytes);
            // 重点看下如何将byte反序列化成map
            cmd.setExtFields(mapDeserialize(extFieldsBytes));
        }
        return cmd;
    }

public static HashMap<String, String> mapDeserialize(byte[] bytes) {
        if (bytes == null || bytes.length <= 0)
            return null;
        HashMap<String, String> map = new HashMap<String, String>();
        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
        short keySize;
        byte[] keyContent;
        int valSize;
        byte[] valContent;
        while (byteBuffer.hasRemaining()) {
            keySize = byteBuffer.getShort();
            keyContent = new byte[keySize];
            byteBuffer.get(keyContent);
            valSize = byteBuffer.getInt();
            valContent = new byte[valSize];
            byteBuffer.get(valContent);
            map.put(new String(keyContent, CHARSET_UTF8), new String(valContent, CHARSET_UTF8));
        }
        return map;
    }

说明:rocketmq使用netty自带的LengthFieldBasedFrameDecoder来解码,其底层帮我们处理好了粘包和拆包的问题,而在NettyDecoder的构造方法中把 消息总长度的length去掉了 如代码所示

public NettyDecoder() {
       // 第一个0 表示 lengthField 的offset,第一个4表示lengthField的length,最后面的4 表示跳过4个字节
        super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
    }

将extFields转换成map其实就是encode的一个逆过程,当buffer里面有数据时,先获取k v的长度,然后在声明一个长度的byte数组把对应的value取出来,之后在put到map中。

总结

  rocketMq的rpc通信使用netty,需要了解下netty底层的线程模型java nio 和 netty ByteBuf的相关知识,编解码其实就是定义好一个通信协议,根据协议将对象转换成byte,byte转换成对象的过程,在接下来的一篇文章里,介绍一下如何使用netty做 sync 、async、oneway形式的rpc通信的。以上就是通过看rocketMq rpc模块编解码相关代码总结出来的,如有错误,欢迎指出讨论~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,451评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,172评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,782评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,709评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,733评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,578评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,320评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,241评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,686评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,878评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,992评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,715评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,336评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,912评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,040评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,173评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,947评论 2 355

推荐阅读更多精彩内容