简单构建netty服务端与客户端

一、netty流程(抽象)
  • 定义核心处理类
  • 定义Initializer
  • 定义服务启动类
二、netty server流程
  1. 构造netty服务端处理器 -> NettyServerHandler
  @Slf4j
  public class NettyServerHandler extends ChannelInboundHandlerAdapter {
      /**
       *@Author wuxubiao
       *@Description 客户端连接触发
       *@Date 2021/7/23 11:17
       *@Param [ctx]
       *@return void
       **/
      @Override
      public void channelActive(ChannelHandlerContext ctx) throws Exception {
          log.info("channel connect active...");
      }
  
      /**
       *@Author wuxubiao
       *@Description 客户端发消息触发
       *@Date 2021/7/23 11:17
       *@Param [ctx, msg]
       *@return void
       **/
      @Override
      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
          log.info("server receive message is :{}", msg.toString());
          //回应消息
          ctx.write("hello client!");
          ctx.flush();
      }
  
      /**
       *@Author wuxubiao
       *@Description 发生异常触发
       *            当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
       *            在大部分情况下,捕获的异常应该被记录下来并且把关联的 channel 给关闭掉。然而这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
       *@Date 2021/7/23 11:17
       *@Param [ctx, cause]
       *@return void
       **/
      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
          cause.printStackTrace();
          ctx.close();
      }
  }
NettyServerHandler继承自 ChannelInboundHandlerAdapter,这个类实现了 ChannelInboundHandler接口,ChannelInboundHandler 提供了许多事件处理的接口方法,然后你可以覆盖这些方法。
常用:
    channelActive() 事件处理方法:每当新的客户端连接时该方法被调用
    chanelRead() 事件处理方法:每当从客户端收到新的数据时,这个方法会在收到消息时被调用
    exceptionCaught() 事件处理方法:当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的 channel 给关闭掉。然而这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
ChannelHandlerContext:ChannelHandlerContext 对象提供了许多操作,使你能够触发各种各样的 I/O 事件和操作。这里我们调用了 write(Object) 方法来逐字地把接受到的消息写入。
    ctx.write(Object) 方法不会使消息写入到通道上,他被缓冲在了内部,你需要调用 ctx.flush() 方法来把缓冲区中数据强行输出。或者你可以用更简洁的 cxt.writeAndFlush(msg) 以达到同样的目的。

此处有一个特殊说明的地方:

    1.处理器的职责是释放所有传递到处理器的引用计数对象
    2.msg即接收到的消息-接收到的消息的类型为ByteBuf,ByteBuf 是一个引用计数对象
    3.则这个对象必须显示地调用 release() 方法来释放
    4.我们调用write的时候并没有释放接受到的消息,这是因为当写入的时候 Netty 已经帮我们释放了
    5.write()方法内部也会调用release(),参考源码:AbstractChannelHandlerContext类下
  1. 构造netty服务初始化器 -> ServerChannelInitializer
/**
 * @Description: netty服务初始化器
 * @Author wuxubiao
 * @Date 2021/7/23 11:30
 * @Version V-1.0
 */
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //添加编解码
        socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        //添加 netty 服务端处理器逻辑
        socketChannel.pipeline().addLast(new NettyServerHandler());
    }
}
ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel
也许你想通过增加一些处理类比如NettyServerHandler 来配置一个新的 Channel 或者其对应的ChannelPipeline 来实现你的网络程序。当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上。
  1. 构造netty服务启动监听器 -> NettyServer
/**
 * @Description: 服务启动监听器
 * @Author wuxubiao
 * @Date 2021/7/23 11:35
 * @Version V-1.0
 */
@Slf4j
public class NettyServer {
    /**
    *@Author wuxubiao
    *@Description 启动
    *@Date 2021/7/23 11:37
    *@Param [socketAddress-socket地址包装类]
    *@return void
    **/
    public void start(InetSocketAddress socketAddress){
        // boss线程组(初始化一个线程)
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 工作线程组(初始化200线程)
        EventLoopGroup workGroup = new NioEventLoopGroup(200);
        //构造引导
        ServerBootstrap bootstrap = new ServerBootstrap()
                .group(bossGroup,workGroup)
                //根据场景不同选择合适得通信channel,继承自 Channel
                .channel(NioServerSocketChannel.class)
                .childHandler(new ServerChannelInitializer())
                .localAddress(socketAddress)
                // option() 是提供给NioServerSocketChannel 用来接收进来的连接
                //设置队列大小
                // ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,
                //所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
                .option(ChannelOption.SO_BACKLOG,1024)
                // childOption() 是提供给由父管道 ServerChannel 接收到的连接,在这个例子中也是 NioServerSocketChannel
                // 两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
                .childOption(ChannelOption.SO_KEEPALIVE, true);
        try {
            // 绑定端口,开始接收进来的连接
            ChannelFuture future = bootstrap.bind(socketAddress).sync();
            log.info("server start begin to listen in port: {}",socketAddress.getPort());
            // 同步等待服务器  socket 关闭
            // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 关闭主线程组
            bossGroup.shutdownGracefully();
            // 关闭工作线程组
            workGroup.shutdownGracefully();
        }
    }
}
NioEventLoopGroup:用来处理I/O操作的多线程事件循环器,Netty 提供了许多不同的 EventLoopGroup 的实现用来处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup 会被使用。第一个经常被叫做‘boss’,用来接收进来的连接。第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。如何知道多少个线程已经被使用,如何映射到已经创建的 Channel上都需要依赖于 EventLoopGroup 的实现,并且可以通过构造函数来配置他们的关系。
ServerBootstrap:是一个启动 NIO 服务的辅助启动类。你可以在这个服务中直接使用 Channel,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。
option 和 childOption配置:你可以设置这里指定的 Channel 实现的配置参数。我们正在写一个TCP/IP 的服务端,因此我们被允许设置 socket 的参数选项比如tcpNoDelay 和 keepAlive。请参考 ChannelOption 和详细的 ChannelConfig 实现的接口文档以此可以对ChannelOption 的有一个大概的认识
    option() 是提供给NioServerSocketChannel 用来接收进来的连接
    childOption() 是提供给由父管道 ServerChannel 接收到的连接,在这个例子中也是 NioServerSocketChannel
三、netty client流程
  1. 构造netty客户端处理器 -> NettyClientHandler
  2. 构造netty客户初始化器 -> NettyClientInitializer
  3. 构造netty客户启动监听器 -> NettyClient

在 Netty 中,编写服务端和客户端最大的并且唯一不同的使用了不同的BootStrap 和 Channel的实现

/**
 * @Description: 客户端处理器
 * @Author wuxubiao
 * @Date 2021/7/23 14:01
 * @Version V-1.0
 */
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客户端Active .....");
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("客户端收到消息: {}", msg.toString());
        // 在TCP/IP中,Netty 会把读到的数据放到 ByteBuf 的数据结构中
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
        super.channelRead(ctx, msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
        super.exceptionCaught(ctx, cause);
    }
}
在TCP/IP中,Netty 会把读到的数据放到 ByteBuf 的数据结构中
@Slf4j
public class NettyClient {

    public void start(){
        // 如果你只指定了一个 EventLoopGroup,那他就会即作为一个 boss group ,也会作为一个 workder group,尽管客户端不需要使用到 boss worker
        EventLoopGroup group = new NioEventLoopGroup();
        // BootStrap 和 ServerBootstrap 类似,不过他是对非服务端的 channel 而言,比如客户端或者无连接传输模式的 channel
        Bootstrap bootstrap = new Bootstrap()
                .group(group)
                // 代替NioServerSocketChannel的是NioSocketChannel,这个类在客户端channel 被创建时使用
                .channel(NioSocketChannel.class)
                // 该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输(不进行数据包组装为更大的帧然后进行发送)
                // 不像在使用 ServerBootstrap 时需要用 childOption() 方法,因为客户端的 SocketChannel 没有父亲
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new NettyClientInitializer());

        try {
            ChannelFuture future = bootstrap.connect("127.0.0.1", 8090).sync();
            log.info("client connect successful...");
            //发送消息
            future.channel().writeAndFlush("send message!");
            // 同步等待连接被关闭
            // 主线程执行到这里就 wait 子线程结束,子线程才是真正监听和接受请求的,closeFuture()是开启了一个channel的监听器,负责监听channel是否关闭的状态,如果监听到channel关闭了,子线程才会释放,syncUninterruptibly()让主线程同步等待子线程结果
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }
}
区别于服务端程序
    1.BootStrap 和 ServerBootstrap 类似,不过他是对非服务端的 channel 而言,比如客户端或者无连接传输模式的 channel
    2.如果你只指定了一个 EventLoopGroup,那他就会即作为一个 boss group ,也会作为一个 workder group,尽管客户端不需要使用到 boss worker
    3.代替NioServerSocketChannel的是NioSocketChannel,这个类在客户端channel 被创建时使用
    4.不像在使用 ServerBootstrap 时需要用 childOption() 方法,因为客户端的 SocketChannel 没有父亲
    5.我们用 connect() 方法代替了 bind() 方法
四、ChannelOption参数详解

1、ChannelOption.SO_BACKLOG
ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小

2、ChannelOption.SO_REUSEADDR
ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。

3、ChannelOption.SO_KEEPALIVE
Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文

4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。

5、ChannelOption.SO_LINGER
ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送

6、ChannelOption.TCP_NODELAY
ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关,Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

7、IP_TOS
IP参数,设置IP头部的Type-of-Service字段,用于描述IP包的优先级和QoS选项。

8、ALLOW_HALF_CLOSURE
Netty参数,一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;为True时,触发ChannelInboundHandler的userEventTriggered()方法,事件为ChannelInputShutdownEvent。

谢谢大家关注,点个赞呗~
如需转载请标明出处,谢谢~~

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容