Netty责任链模式的数据流处理策略

引言

前段时间完成了一个项目,需要开发一个服务器,其中一个功能要求用TCP/IP协议与下位机(使用STM32作为主控芯片)以特定的帧格式进行通信,这里在实验室某位技术实力过硬的师兄带领下,使用Spring Boot+Netty相结合的方式完成一个稳定、高效率的服务器,其中还包括多线程、缓存、高并发等技术。笔者对其中的数据流处理方法进行了学习,现自己对其做一个总结。

服务器总体框架

服务器总体框架如下图所示:


服务器总体框架.png

这里的下位机硬件设备其实并不是直接使用以太网与服务器相连,它实际使用的是CAN总线,再使用CAN转以太网模块才与服务器相连,因为下位机的通信与服务器无关,因此框架中的这个部分做了简化,可以直接认为通过以太网进行TCP通信。

数据帧格式

这里的数据帧是下位机设备整理各种信息之后,按照自定义的帧格式组合成数据帧发送给服务器,这里的帧格式如下:


数据主帧.png

这里将这样一条数据帧格式叫做一条数据主帧,它由帧头和主帧数据体组成。帧头中的内容更具项目需求可以分为帧识别位、业务1、主帧类型、业务2、备用位、主帧数据体长度6个部分,其中每个部分所占字节长度为固定值。这里的数据流处理策略就是要从Netty服务器的接收缓冲区中提取这些有效信息,组合成这个格式的帧的类实例给业务层进行另一步处理。主帧数据体由子帧构成,子帧的格式如下:


子帧格式.png

一条子帧由子帧头和数据体组成,子帧头有功能位(占1字节)和子帧长度(占2字节,代表数据体长度)。根据不同的业务功能可以定义功能位。主帧类型和子帧功能位共同决定了此主帧代表的业务功能。

责任链模式

关于责任链模式的解释和用法网上资源很多并且也很详细,这里就不再做介绍了,不过笔者还是最喜欢《大话设计模式》中用加薪这个情景来进行总结,通俗易懂,感兴趣的朋友可以去找这本书研读下。

Netty中的责任链模式

在《Netty实战》第六章中讲到,ChannelHandlerContext使得ChannelHandler能够通知其所属ChannelPipeline的下一个ChannelHandler,这就将它们连成了一条链,事件沿这条链传递,这就是责任链模式最好的体现,借用《Netty实战》中的图6-3:

图6-3 ChannelPipeline 和它的ChannelHandler.png

其中可以使用ChannelInitializer,它提供一个特殊ChannelInboundHandlerAdapter 子类,它定义的protected abstract void initChannel(C ch) throws Exception;方法是一种将多个ChannelHandler添加到一个ChannelPipeline中的简便方法。

数据流处理策略

这里将数据流解析模块分为三级:


数据流处理流程.png

第一级数据流解析成帧的handler,其代码如下:

/**
 * 帧识别 - 入站处理器 分割出并向后继处理器传递一个完整的主帧数据体及主帧信息 bean
 */
@Service
@ChannelHandler.Sharable
public class FrameRecognitionInBoundHandler extends SimpleChannelInboundHandler<ByteBuf> {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
        while (true) {
            if (byteBuf.readableBytes() < FrameSetting.FRAME_HEAD_LENGTH) {
                return;
            }
            if (byteBuf.readByte() != FrameSetting.MAJOR_FRAME_HEAD_1
                    || byteBuf.readByte() != FrameSetting.MAJOR_FRAME_HEAD_2) {
                logger.warn("数据接收异常「帧头不匹配」");
                return;
            }
            int groupId = byteBuf.readByte() & 0xFF;//提取业务1,占1字节
            int msgId = byteBuf.readByte() & 0xFF;//提取主帧类型,占1字节
            int deviceId = byteBuf.readByte() & 0xFF;//提取业务2,占1字节
            int backupMsg = byteBuf.readByte() & 0xFF;//提取备用位,占1字节
            int dataLength = byteBuf.readShort() & 0xFFFF;//提取数据体长度,占2字节
            FrameMajorHeader headMsg = new FrameMajorHeader(msgId, groupId, deviceId, dataLength, backupMsg);
            ByteBuf subBuf = ctx.alloc().buffer(dataLength);
            byteBuf.readBytes(subBuf, dataLength);
            ctx.fireChannelRead(new FrameMajor(headMsg, subBuf));
        }
    }
}

其中FrameMajorHeader是主帧的帧头类,FrameMajor是完整的主帧类。ctx.fireChannelRead(new FrameMajor(headMsg, subBuf));将此主帧类的实例传递给第二级。
第二级处理handler,代码:

/**
* 从第一级中获得了一个主帧实例,第二级获取子帧,处理并交给第三级
*/
@Service
@ChannelHandler.Sharable
public class ParsedMessageInBoundHandler extends SimpleChannelInboundHandler<FrameMajor> {
    /**  第三级处理数据流模块    */
    private final TcpPresenter server;
    @Autowired
    public ParsedMessageInBoundHandler(TcpPresenter server) {
        this.server = server;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FrameMajor msg) throws Exception {
        FrameMajorHeader head = msg.getHead();
        ByteBuf byteBuf = msg.getData();
        while (byteBuf.readableBytes() >= FrameSetting.SUB_FRAME_HEAD_LENGTH) {
            int subMsgId = byteBuf.readByte() & 0xFF;
            byte[] data = new byte[byteBuf.readShort()];
            byteBuf.readBytes(data);
            server.decodeAndHuntMessage(head, subMsgId, data, ctx.channel());
        }
    }
}

第三级处理代码:

 /**
     * 「内部使用」将 TCP 帧解码并转换为消息对象,而后传递至 Server 该方法可保证解析出的消息对象的设备号正确
     *
     * @param head     TCP 帧头
     * @param subMsgId TCP 子帧功能位
     * @param data     TCP 子帧数据体
     * @param channel  该消息帧的容器 Channel
     */
    public void decodeAndHuntMessage(FrameMajorHeader head, int subMsgId, byte[] data, Channel channel) {
        BaseMsg msg = msgProcessor.decode(head, subMsgId, data);
        if (msg == null) {
            logger.warn("帧解析出错");
            return;
        }
        if (msg.getGroupId() > DeviceSetting.MAX_GROUP_ID && msg.getDeviceId() > DeviceSetting.MAX_DEVICE_ID) {
            logger.warn("设备号出错「GroupId:" + msg.getGroupId() + "; DeviceId:" + msg.getDeviceId() + "」");
            return;
        }
        switch (msg.getJointMsgFlag()) {
            case JointMsgType.replyWorkStatus:
            case JointMsgType.replyChargeStatus:
                server.huntDeviceStatusMsg((MsgReplyDeviceStatus) msg);
                break;
            case JointMsgType.replyHeartBeat:
                // 收到心跳包,设备组已激活
                int groupId = msg.getGroupId();
                //设定设备时间校对计划任务
                ScheduledFuture future = channel.eventLoop().scheduleAtFixedRate(
                        () -> sendMessageToTcp(MsgCodecTimestamp.create(groupId)), 5, DeviceSetting.TIMESYNC_INTERVAL, TimeUnit.SECONDS);
                tcpRepository.accessChannelSuccessful(msg, channel, future);
                break;
            default:
                if (msg instanceof MsgReplyNormal) {
                    tcpRepository.touchNormalReplyMsg((MsgReplyNormal) msg);
                    server.touchNormalReplyMsg((MsgReplyNormal) msg);
                } else {
                    server.huntMessage(msg);
                }
        }
    }

到这里数据流的处理基本上就完成了,再往上的功能牵涉到缓存、多线程等技术,这些暂时还不会,以后会了再写吧。

后记

其实这是笔者第一次写这种文章,写完之后发现其实这并没有什么技术含量,感觉像是在写论文,而且自己在写代码的时候知道该这样写,但是写文章想表达出来的时候却发现不知道用什么词汇和方式来表达自己的想法,所以很多地方干脆就不写文字性的东西了。不过本人深刻知道学习过程中写文章总结的必要性,今天算是走出了第一步吧,以后一定努力,多花时间多钻研。Thanks!!!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,063评论 6 510
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,805评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,403评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,110评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,130评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,877评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,533评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,429评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,947评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,078评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,204评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,894评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,546评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,086评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,195评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,519评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,198评论 2 357

推荐阅读更多精彩内容