【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(附通信协议和代码)(上)

前提介绍

今天,我要向大家实现一个基于Netty实现的高性能远程通信框架!这个框架利用了 Netty 的强大功能,提供了快速、可靠的远程通信能力。

无论是构建大规模微服务架构还是实现分布式计算,这个分布式通信框架都是一个不可或缺的利器。

回顾Dubbo

相信大家都指导Dubbo(Dubbo3)这个非常著名的RPC框架对吧,如果你忘记了,那么我给您先垫垫底,可以看到下面就是Dubbo的借本架构图,当然Dubbo3会更加复杂,我们先按照基础的Dubbo架构进行回顾

image.png

无论是在分布式系统、微服务架构还是其他需要跨网络进行通信的场景下,这个框架都能够帮助你实现高效的数据传输和通信。它具备出色的性能和可扩展性,能够满足各种复杂的通信需求。

image.png

但是无论是从层次化和结构化而言,Dubbo/Dubbo3都过于的复杂了,我们起始未必会用到那么复杂以及扩展性那么强的功能,因此我们来实现一个属于我们自己的一个可靠且高性能的远程通信解决方案。

分布式通信框架

分布式通信框架是一种卓越的高性能远程通信解决方案,它基于 Netty 实现了 TCP 通信的底层细节,并对上层进行了封装,以提供简单易用和高度可扩展的能力。

image.png

这个框架能够帮助开发者轻松构建分布式系统,并实现可靠的跨网络通信。通过利用 Netty 的强大功能,该框架能够提供出色的性能和可靠性,同时还具备灵活的扩展性,可以满足各种复杂的通信需求。

组成元素

先介绍一下网络通信的两个最基本的元素和属性,如下所示。

image.png
  • Channel:可以理解为一个通道,即一条连接线路的概念。它承载着数据、信息或者信号的传输功能。

  • ChannelGroup:由多个通道组合而成的一个概念。它将多条通道有机地集合在一起,形成一个整体,以便更高效地进行数据、信息或者信号的传输。

程序执行流程

下图自上而下分别为boss接受连接、channel、dispatcher、event listener和service。


image.png

这五个部分各自承载着独特的任务,又彼此协作,形成了一个系统化、高效化的运行流程。


image.png
  • Boss线程:接受连接流程,主要负责接受外部请求,这些请求可能是来自用户的操作或是其他服务的调用。一旦接收到请求,boss会进行必要的处理,然后将请求分发给下面的线程池worker进行处理。

  • Worker线程:系统中的工作执行者,负责接收boss分发的任务,然后执行具体的业务逻辑。这些任务可能涉及到数据的处理、服务的调用等。线程池worker通过channel与boss进行通信,确保任务能够准确无误地传递。

  • dispatcher机制:在worker执行任务的过程中,需要有一个机制来调度和分配任务。这就是dispatcher的作用。

dispatcher根据一定的策略和规则,将任务分配给合适的worker线程进行处理。这一环节保证了系统的负载均衡和高效运行。

  • EventListener:基于在每个worker线程内部,eventListener发挥着关键作用。它负责监听和处理线程中的事件,比如任务的完成、异常等。通过eventListener,系统能够及时响应各种事件,进行必要的处理和反馈。

  • Service业务逻辑实现:它代表了整个系统的核心业务逻辑。service接收并处理来自worker线程的任务,完成具体的业务操作。这些操作可能涉及到数据的处理、服务的调用等。

消息协议设计

消息协议这里是指对消息编码和解码的规范的一种定义,通信内置的消息协议采用如下结构:其中包含了三个部分:ID、Length 和 Content。

image.png
  1. ID:

    • 长度:1 字节
    • 用途:表示 Content 部分是否被压缩,其中 1 表示 Content 部分被压缩,0 表示未被压缩。
  2. Length:

    • 长度:4 字节
    • 用途:表示 ID 和 Content 的总长度。这通常用于消息分片或分批传输,确保接收方可以正确地重新组装消息。
  3. Content:

    • 长度:不定(由 Length 字段决定)
    • 用途:真实的消息内容。根据 ID 的值,它可能是压缩的或未压缩的。

如果 ID 为 1,则 Content 部分可能会被某种算法(如gzip)压缩,以减少存储或传输的空间需求。Length 字段确保了数据的完整性,因为接收方可以根据这个长度字段正确地读取和重组数据。

在实际应用中,这种结构通常用于网络通信、文件存储或数据库存储等场景,其中需要对数据进行有效且紧凑的表示。

实现机制

Netty框架原生提供了一个处理器链,该链用于对事件进行处理。每个处理器都实现了 ChannelHandler 接口。ChannelHandler 接口是一个空接口,其中:ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter

image.png

我们主要关注这两个接口,因为它们被用于处理读取输入和写入输出的消息。

ChannelInboundHandlerAdapter

ChannelInboundHandlerAdapterNetty框架中用于处理从网络到应用程序的事件的组件。它是一种特殊的ChannelHandler,主要负责处理读取操作。

image.png

当网络通道接收到数据时,ChannelInboundHandlerAdapter会被触发,然后开发者可以通过重写其中的方法来执行需要的操作。常见的操作包括数据的解码、解压或反序列化等。

自定义事件处理

ChannelInboundHandlerAdapterNetty中实现业务逻辑的关键组件,它提供了丰富的方法来处理不同的事件,例如通道激活、数据读取和异常处理等,下面是对应的源码:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRegistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelUnregistered()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelActive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelInactive()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelRead(Object)} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelReadComplete()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireUserEventTriggered(Object)} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    /**
     * Calls {@link ChannelHandlerContext#fireChannelWritabilityChanged()} to forward
     * to the next {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }

    /**
     * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)} to forward
     * to the next {@link ChannelHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    @SuppressWarnings("deprecation")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

通过自定义ChannelInboundHandlerAdapter,开发者可以灵活地处理从网络到应用程序的数据传输过程,稍后回进行分析介绍。

ChannelOutboundHandlerAdapter

ChannelOutboundHandlerAdapter也是一种特殊的ChannelHandler,用于处理从应用程序到网络的事件,主要包括写出操作。它是Netty框架中的一个关键组件,负责将应用程序的数据写入网络通道中,以便发送给对应的接收端。

image.png

使用ChannelOutboundHandlerAdapter可以实现对写出事件的定制化处理,例如数据的编码、压缩或序列化等操作,以满足具体业务需求。它可以直接扩展ChannelOutboundHandlerAdapter类,并重写其中的方法来实现特定的功能。

package io.netty.channel;
import io.netty.channel.ChannelHandlerMask.Skip;
import java.net.SocketAddress;
/**
 * Skeleton implementation of a {@link ChannelOutboundHandler}. This implementation just forwards each method call via
 * the {@link ChannelHandlerContext}.
 */
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {

    /**
     * Calls {@link ChannelHandlerContext#bind(SocketAddress, ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#disconnect(ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.disconnect(promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#close(ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.close(promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#deregister(ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.deregister(promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#read()} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.read();
    }

    /**
     * Calls {@link ChannelHandlerContext#write(Object, ChannelPromise)} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    /**
     * Calls {@link ChannelHandlerContext#flush()} to forward
     * to the next {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
     *
     * Sub-classes may override this method to change behavior.
     */
    @Skip
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

编(解)码处理器

编码(解码)处理器、压缩(解压)处理器以及序列化(反序列化)处理器等都是直接或间接用于实现ChannelHandler的组件。

编码过程阶段

编码过程由三个Handler组合完成,分别为序列化,压缩数据以及编码处理。

image.png
ChannelOutboundHandlerAdapter序列化实现

当你需要实现序列化数据的发送时,可以基于ChannelOutboundHandlerAdapter接口进行实现。下面是一个简单的示例代码,展示了如何使用write方法将序列化后的数据发送到网络:

public class SerializationHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // 进行数据序列化操作,这里假设使用Java内置的序列化方式
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        oos.flush();
        byte[] serializedData = bos.toByteArray();
        // 将序列化后的数据写入网络通道
        ByteBuf byteBuf = ctx.alloc().buffer();
        byteBuf.writeBytes(serializedData);
        ctx.write(byteBuf, promise);
    }
}

重写了write方法,在该方法中进行了数据的序列化操作。具体来说,我们使用Java内置的序列化方式将msg对象序列化为字节数组serializedData,然后将序列化后的数据写入网络通道。

ChannelOutboundHandlerAdapter压缩实现

要通过数据压缩进行处理,基于ChannelOutboundHandlerAdapter接口实现一个压缩处理器。使用DeflaterOutputStream进行数据压缩并发送到网络:

public class CompressionHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // 创建输出流,使用DeflaterOutputStream进行数据压缩
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DeflaterOutputStream dos = new DeflaterOutputStream(bos);
        
        // 压缩数据
        dos.write((byte[]) msg);
        dos.finish();
        
        // 获取压缩后的数据
        byte[] compressedData = bos.toByteArray();
        
        // 将压缩后的数据写入网络通道
        ByteBuf byteBuf = ctx.alloc().buffer();
        byteBuf.writeBytes(compressedData);
        ctx.write(byteBuf, promise);
    }
}

同样也是重写了write方法,在该方法中进行了数据的压缩操作。我们使用DeflaterOutputStream将原始数据(byte[]) msg进行压缩,然后将压缩后的数据写入网络通道。

LengthBasedEncoder编码器

要实现Netty中的编码器,你可以自定义一个类并实现MessageToByteEncoder接口。展示了如何编写一个基于字符串的编码器:

public class LengthBasedEncoder extends MessageToByteEncoder<String> {

    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        byte[] data = msg.getBytes(StandardCharsets.UTF_8);
        int length = data.length;
        out.writeInt(length);
        out.writeBytes(data);
    }
}

encode方法中,我们首先将字符串转换为字节数组,使用UTF-8字符集进行编码。然后,我们获取字节数组的长度,并将其写入输出ByteBuf。最后,我们将字节数组写入输出缓冲区。

注意,上述示例中使用的是字符串编码器,你可以根据实际需求替换成其他类型的编码器。同时,也请确保在创建ByteBuf对象时使用适当的Allocator,以获取更高效的内存分配和释放。

通过将自定义的编码器StringEncoder添加到NettyChannelPipeline中,作为ChannelOutboundHandler使用,你就可以在数据发送前将字符串编码为字节并写入网络通道中了。

解码过程阶段

解码的代码和编码的代码就是一个镜像操作和处理,在这里就进行赘余了,相信小伙伴都可以实现,如果真的有不会实现的,可以评论区留言告诉我,我把完整代码给你们。

image.png

对于 TCP 通信而言,粘包是很正常的现象,因此 decoder 必须处理粘包问题。LengthFrameDecoder 是一个支持粘包处理的decoder 类抽象,可基于基于长度的解码器的实现方式进行控制。

处理器链的建立

通过处理器链,Netty框架可以非常灵活地处理不同类型的事件,在Netty中,我们可以通过ChannelPipeline来建立处理器链。ChannelPipeline是一个用于管理和执行处理器的容器,它负责处理入站和出站的事件,并将这些事件传递给适当的处理器。

创建ChannelPipeline对象

ChannelPipeline pipeline = channel.pipeline();

ChannelPipeline中添加处理器

pipeline.addLast("handler1", new Handler1());
pipeline.addLast("handler2", new Handler2());

这里的 "handler1""handler2" 是处理器的名称,可以根据需要进行命名。

添加的顺序形成处理器链

数据将按照顺序在处理器之间传递。最后一个添加的处理器将是数据的出站处理器,第一个添加的处理器将是数据的入站处理器。

ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new MyHandler());

通过建立处理器链,可以根据需要按照一定的顺序和逻辑处理数据。

未完待续

由于篇幅过长,本文就到这里为止。下一篇文章将继续介绍《【分布式技术专题】「探索高性能远程通信」基于Netty的分布式通信框架实现(Dispatcher和EventListener)(下)》,并详细说明剩下的内容。敬请期待!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容