Netty框架分析
Netty简介
Netty 是一个广受欢迎的异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
本文基于 Netty 4.1 展开介绍相关理论模型,使用场景,基本组件、整体架构,知其然且知其所以然,希望给大家在实际开发实践、学习开源项目方面提供参考。
本文作者的另两篇《高性能网络编程(五):一文读懂高性能网络编程中的I/O模型》、《高性能网络编程(六):一文读懂高性能网络编程中的线程模型》也写的很好,有兴趣的读者可以一并看看。
相关资料
Netty源码在线阅读:
Netty-4.1.x地址是:http://docs.52im.net/extend/docs/src/netty4_1/
Netty-4.0.x地址是:http://docs.52im.net/extend/docs/src/netty4/
Netty-3.x地址是:http://docs.52im.net/extend/docs/src/netty3/
Netty在线API文档:
Netty-4.1.x API文档(在线版):http://docs.52im.net/extend/docs/api/netty4_1/
Netty-4.0.x API文档(在线版):http://docs.52im.net/extend/docs/api/netty4/
Netty-3.x API文档(在线版):http://docs.52im.net/extend/docs/api/netty3/
有关Netty的其它精华文章:
《有关“为何选择Netty”的11个疑问及解答》
《Netty 4.x学习(一):ByteBuf详解》
《Netty 4.x学习(二):Channel和Pipeline详解》
《Netty 4.x学习(三):线程模型详解》
《详解Netty的安全性:原理介绍、代码演示(上篇)》
《详解Netty的安全性:原理介绍、代码演示(下篇)》
《详解Netty的优雅退出机制和原理》
《NIO框架详解:Netty的高性能之道》
《Twitter:如何使用Netty 4来减少JVM的GC开销(译文)》
《绝对干货:基于Netty实现海量接入的推送服务技术要点》
《Netty干货分享:京东京麦的生产级TCP网关技术实践总结》
JDK 原生 NIO 程序的问题
JDK 原生也有一套网络应用程序 API,但是存在一系列问题,主要如下:
1) NIO 的类库和 API 繁杂,使用麻烦:你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
2) 需要具备其他的额外技能做铺垫:例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。
3) 可靠性能力补齐,开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。
JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。官方声称在 JDK 1.6 版本的 update 18 修复了该问题,但是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生概率降低了一些而已,它并没有被根本解决
Netty的特点
Netty的对JDK自带的NIO的API进行封装,解决上述问题,主要特点有:
- 设计优雅适用于各种传输类型的统一API - 阻塞和非阻塞Socket基于灵活且可扩展的事件模型,可以清晰地分离关注点高度可定制的线程模型 - 单线程,一个或多个线程池真正的无连接数据报套接字支持(自3.1起)
- 使用方便详细记录的Javadoc,用户指南和示例没有其他依赖项,JDK5(Netty 3.x)或6(Netty 4.x)就足够了
- 高性能吞吐量更高,延迟更低减少资源消耗最小化不必要的内存复制
- 安全完整的SSL / TLS和StartTLS支持社区活跃,不断更新
- 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会被加入
Netty 常见使用场景
Netty 常见的使用场景如下:
1)互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
2)游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。
3)大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
有兴趣的读者可以了解一下目前有哪些开源项目使用了 Netty的Related Projects。
Netty 高性能设计
Netty 作为异步事件驱动的网络,高性能之处主要来自于其 I/O 模型</font>和<font color=#FF0000>线程处理模型,前者决定如何收发数据,后者决定如何处理数据。
I/O模型
用什么样的通道将数据发送给对方,BIO、NIO 或者 AIO,I/O 模型在很大程度上决定了框架的性能。
阻塞I/O(传统IO-BIO)
总结:
当进程调用recvfrom时,该函数直到①数据报到达且被复制到应用进程缓冲区;②或者发生错误(比如被信号中断)才返回。
所以,阻塞式IO的特点就是在I/O执行的两个阶段都被阻塞了——阻塞等待数据,阻塞拷贝数据。
特点如下:
- 每个请求都需要独立的线程完成数据 Read,业务处理,数据 Write 的完整操作问题。
- 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
- 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费。
非阻塞式I/O
说明:
当对一个非阻塞 socket 执行读操作时,如果内核中的数据还没有准备好,那么它并不会阻塞用户进程,而是立刻返回一个EWOULDBLOCK错误;如果内核中有数据准备好了,它会立即将数据拷贝到用户内存,并成功返回。
由于非阻塞I/O在没有数据时会立即返回,故用户进程通常需要循环调用recvfrom,不断地主动询问内核数据是否ready。
所以,非阻塞式IO的特点是在I/O执行的第一个阶段不会阻塞线程,但在第二阶段会阻塞。
I/O复用模型
IO复用(IO multiplexing),也称事件驱动IO(event-driven IO),就是在单个线程里同时监控多个套接字,通过 select 或 poll 轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。
总结:
可以看出,进程阻塞在select调用上,等待有套接字变为可读;当有套接字可读以后,调用recvfrom把数据报从内核复制到用户进程缓冲区,此时进程阻塞在IO执行的第二个阶段。
如上图整个用户进程其实是一直被阻塞的,但IO复用的优势在于可以等待多个描述符就绪。
所以,IO复用的特点是进行了两次系统调用,进程先是阻塞在 select/poll 上,再是阻塞在读操作的第二个阶段上。
信号驱动IO模型
信号驱动式IO(signal-driven IO),就是让内核在描述符就绪时发送SIGIO信号通知用户进程。
总结:
首先需要开启 socket 的信号驱动式IO功能,然后通过sigaction系统调用注册SIGIO信号处理函数 —— 该系统调用会立即返回。当数据准备好时,内核会为该进程产生一个SIGIO信号,这时就可以在信号处理函数中调用 recvfrom 读取数据了。
所以,信号驱动式IO的特点就是在等待数据ready期间进程不被阻塞,当收到信号通知时再阻塞并拷贝数据。
异步IO模型
异步IO(asynchronous IO)其实用得很少,在Linux 2.5 版本的内核中首次出现,在 2.6 版本的内核中才成为标准特性。
总结:
用户进程在发起aio_read操作后,该系统调用立即返回 —— 然后内核会自己等待数据ready,并自动将数据拷贝到用户内存。整个过程完成以后,内核会给用户进程发送一个信号,通知IO操作已完成。
异步IO与信号驱动式IO的主要区别是:信号驱动式IO是由内核通知我们何时启动一个IO操作,而异步IO是由内核通知我们IO操作何时完成。
所以,异步IO的特点是IO执行的两个阶段都由内核去完成,用户进程无需干预,也不会被阻塞。
五种IO模型的比较
总结:
如图可以看出,前4种模型的主要区别在于第一阶段,因为它们的第二阶段是一样的:都是阻塞于recvfrom调用,将数据从内核拷贝到用户进程缓冲区。
Netty的I/O模型
netty的I/O模型是基于多路复用I/O模型
多个Channel(Socket-read/write)以事件的方式可以注册到同一个Selector,从而达到用一个线程处理多个请求 成为可能
总结:
- Netty 的 IO 线程 NioEventLoop 由于聚合了多路复用器 Selector,可以同时并发处理成百上千个客户端连接。
- 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
- 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。
- 由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
- 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。
基于Buffer
- 传统的 I/O 是面向字节流或字符流的,以流式的方式顺序地从一个 Stream 中读取一个或多个字节, 因此也就不能随意改变读取指针的位置。
- 在 NIO 中,抛弃了传统的 I/O 流,而是引入了 Channel 和 Buffer 的概念。在 NIO 中,只能从 Channel 中读取数据到 Buffer 中或将数据从 Buffer 中写入到 Channel。
- 基于 Buffer 操作不像传统 IO 的顺序操作,NIO 中可以随意地读取任意位置的数据。
线程模型
数据报如何读取?读取之后的编解码在哪个线程进行,编解码后的消息如何派发,线程模型的不同,对性能的影响也非常大。
事件驱动模型
通常,我们设计一个事件处理模型的程序有两种思路:
- 轮询方式:线程不断轮询访问相关事件发生源有没有发生事件,有发生事件就调用事件处理逻辑;
- 事件驱动方式:发生事件,主线程把事件放入事件队列,在另外线程不断循环消费事件列表中的事件,调用事件对应的处理逻辑处理事件。事件驱动方式也被称为消息通知方式,其实是设计模式中观察者模式的思路。
以 GUI 的逻辑处理为例,说明两种逻辑的不同:
- 轮询方式:线程不断轮询是否发生按钮点击事件,如果发生,调用处理逻辑。
- 事件驱动方式:发生点击事件把事件放入事件队列,在另外线程消费的事件列表中的事件,根据事件类型调用相关事件处理逻辑。
image.png
主要包括 4 个基本组件:
1)事件队列(event queue):接收事件的入口,存储待处理事件;
2)分发器(event mediator):将不同的事件分发到不同的业务逻辑单元;
3)事件通道(event channel):分发器与处理器之间的联系渠道;
4)事件处理器(event processor):实现业务逻辑,处理完成后会发出事件,触发下一步操作。
可以看出,相对传统轮询模式,事件驱动有如下优点:
1)可扩展性好:分布式的异步架构,事件处理器之间高度解耦,可以方便扩展事件处理逻辑;
2)高性能:基于队列暂存事件,能方便并行异步处理事件。
Reactor线程模型
Reactor 是反应堆的意思,Reactor 模型是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。
服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor(反应堆) 模式也叫 Dispatcher(调度者) 模式,即 I/O 多了复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术之一。
Reactor 模型中有 2 个关键组成:
1)Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人;
2)Handlers(处理者):处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。
单线程单Reactor模型
Reactor 线程是个多面手,负责多路分离套接字,Accept 新连接,并分派请求到处理器链中。该模型 适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所 以实际使用的不多。
多线程(线程池)单Reactor模型
主从Reactor多线程模型
Netty主要基于主从Reactors多线程模型(如下图)做了一定的修改,其中主从Reactor多线程模型有多个Reactor:MainReactor和SubReactor:
- MainReactor负责客户端的连接请求,并将请求转交给SubReactor
- SubReactor负责相应通道的IO读写请求
- 非IO请求(具体逻辑处理)的任务则会直接写入队列,等待worker threads进行处理
特别说明的是: 虽然Netty的线程模型基于主从Reactor多线程,借用了MainReactor和SubReactor的结构,但是实际实现上,SubReactor和Worker线程在同一个线程池中:
//使用NioEventLoopGroup,表示使用NIO
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
//使用NioServerSocketChannel,表示使用NIO
.channel(NioServerSocketChannel.class)
上面代码中的bossGroup 和workerGroup是Bootstrap构造方法中传入的两个对象,这两个group均是线程池
- bossGroup线程池则只是在bind某个端口后,获得其中一个线程作为MainReactor,专门处理端口的accept事件,每个端口对应一个boss线程
- workerGroup线程池会被各个SubReactor和worker线程充分利用
异步处理
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
调用者并不能立刻获得结果,而是通过 Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
常见有如下操作:
1)通过 isDone 方法来判断当前操作是否完成;
2)通过 isSuccess 方法来判断已完成的当前操作是否成功;
3)通过 getCause 方法来获取已完成的当前操作失败的原因;
4)通过 isCancelled 方法来判断已完成的当前操作是否被取消;
5)通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则理解通知指定的监听器。
例如下面的代码中绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑:
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()) {
System.out.println(newDate() + ": 端口["+ port + "]绑定成功!");
} else{
System.err.println("端口["+ port + "]绑定失败!");
}
});
相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住, 直到操作完成;异步处理的好处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。
Netty框架的架构设计
前面介绍完 Netty 相关一些理论,下面从功能特性、模块组件、运作过程来介绍 Netty 的架构设计。
功能特性
- 传输服务支持BIO和NIO
- 容器集成支持OSGI、JBossMC、Spring、Guice容器
- 协议支持HTTP、Protobuf、二进制、文本、WebSocket等一系列常见协议都支持。还支持通过实行编码解码逻辑来实现自定义协议
- Core核心可扩展事件模型、通用通信API、支持零拷贝的ByteBuf缓冲对象
模块组件
Bootstrap、ServerBootstrap
Bootstrap意思是引导,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。
Future、ChannelFuture
正如前面介绍,在Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理,但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。
Channel
Netty网络通信的组件,能够用于执行网络I/O操作。 Channel为用户提供:
- 当前网络连接的通道的状态(例如是否打开?是否已连接?)
- 网络连接的配置参数 (例如接收缓冲区大小)
- 提供异步的网络I/O操作(如建立连接,读写,绑定端口),异步调用意味着任何I / O调用都将立即返回,并且不保证在调用结束时所请求的I / O操作已完成。调用立即返回一个ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I / O操作成功、失败或取消时回调通知调用方。
- 支持关联I/O操作与对应的处理程序
不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,下面是一些常用的Channel类型:
- NioSocketChannel,异步的客户端TCP Socket连接
- NioServerSocketChannel,异步的服务器端TCP Socket连接
- NioDatagramChannel,异步的UDP连接
- NioSctpChannel,异步的客户端Sctp连接
- NioSctpServerChannel,异步的Sctp服务器端连接这些通道涵盖了UDP和TCP网络IO以及文件IO.
Selector
Netty基于Selector对象实现I/O多路复用,通过 Selector, 一个线程可以监听多个连接的Channel事件, 当向一个Selector中注册Channel 后,Selector 内部的机制就可以自动不断地查询(select) 这些注册的Channel是否有已就绪的I/O事件(例如可读, 可写, 网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel。
NioEventLoop
NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:
- I/O任务 即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。
- 非IO任务 添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。
两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。
ioRatio
什么事ioRatio呢?它表示的是此线程分配给IO操作所占的事件比(即运行processSelectedKeys)耗时在整个循环中所占用的时间)例如 ioRation默认是50,则表示IO操作和执行task的所占用的线程时间比1:1,当知道了IO操作时间和它所占用时间比,那么执行task的时间就可以很方便的计算出来了:
设IO操作时间为ioTime,ioTime占的时间比例为ioRatio,则:
ioTime/ioRatio=taskTime/taskRatio
taskRatio=100-ioRatio=>taskTime= ioTime * (100-ioRatio)/ioRatio
当ioRatio=100时,不考虑IO耗时占比,而分别调用processSelectedKeys()、runAllTasks();而ioRatio!=100时,则按照公式计算分别调用processSelectedKeys()、runAllTasks()对应的时间。
NioEventLoopGroup
NioEventLoopGroup,主要管理eventLoop的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个Channel上的事件,而一个Channel只对应于一个线程
ChannelHandler
ChannelHandler是一个接口,处理I/O事件或拦截I/O操作,并将其转发到其ChannelPipeline(业务处理链)中的下一个处理程序。
ChannelHandler本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:
- ChannelInboundHandler用于处理入站I/O事件
- ChannelOutboundHandler用于处理出站I/O操作
或者使用以下适配器类:
- ChannelInboundHandlerAdapter用于处理入站I/O事件
- ChannelOutboundHandlerAdapter用于处理出站I/O操作
- ChannelDuplexHandler用于处理入站和出站事件
ChannelHandlerContext
保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象
ChannelPipline
保存ChannelHandler的List,用于处理或拦截Channel的入站事件和出站操作。 ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler如何相互交互。
下图引用Netty的Javadoc4.1中ChannelPipline的说明,描述了ChannelPipeline中ChannelHandler通常如何处理I/O事件。 I/O事件由ChannelInboundHandler或ChannelOutboundHandler处理,并通过调用ChannelHandlerContext中定义的事件传播方法(例如ChannelHandlerContext.fireChannelRead(Object)和ChannelOutboundInvoker.write(Object))转发到其最近的处理程序。
I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
入站事件由自下而上方向的入站处理程序处理,如图左侧所示。 入站Handler处理程序通常处理由图底部的I / O线程生成的入站数据。 通常通过实际输入操作(例如SocketChannel.read(ByteBuffer))从远程读取入站数据。
出站事件由上下方向处理,如图右侧所示。 出站Handler处理程序通常会生成或转换出站传输,例如write请求。 I/O线程通常执行实际的输出操作,例如SocketChannel.write(ByteBuffer)。
在 Netty中每个Channel都有且仅有一个ChannelPipeline与之对应, 它们的组成关系如下:
一个 Channel 包含了一个 ChannelPipeline, 而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表, 并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的handler,出站事件会从链表tail往前传递到最前一个出站的handler,两种类型的handler互不干扰。
工作原理架构
初始化并启动Netty服务端过程如下:
public static void main(String[] args) {
// 创建mainReactor
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
// 创建工作线程组
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
// 组装NioEventLoopGroup
.group(boosGroup, workerGroup)
// 设置channel类型为NIO类型
.channel(NioServerSocketChannel.class)
// 设置连接配置参数
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
// 配置入站、出站事件handler
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
// 配置入站、出站事件channel
ch.pipeline().addLast(...);
ch.pipeline().addLast(...);
}
});
// 绑定端口
int port = 8080;
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
}
});
}
基本过程如下:
- 初始化创建2个NioEventLoopGroup,其中boosGroup用于Accetpt连接建立事件并分发请求, workerGroup用于处理I/O读写事件和业务逻辑
- 基于ServerBootstrap(服务端启动引导类),配置EventLoopGroup、Channel类型,连接参数、配置入站、出站事件handler
- 绑定端口,开始工作
结合上面的介绍的Netty Reactor模型,介绍服务端Netty的工作架构图:
server端包含1个Boss NioEventLoopGroup和1个Worker NioEventLoopGroup,NioEventLoopGroup相当于1个事件循环组,这个组里包含多个事件循环NioEventLoop,每个NioEventLoop包含1个selector和1个事件循环线程。
每个Boss NioEventLoop循环执行的任务包含3步:
- 轮询accept事件
- 处理accept I/O事件,与Client建立连接,生成NioSocketChannel,并将NioSocketChannel注册到某个Worker NioEventLoop的Selector上
- 处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用eventloop.execute或schedule执行的任务,或者其它线程提交到该eventloop的任务。
每个Worker NioEventLoop循环执行的任务包含3步:
- 轮询read、write事件;
- 处I/O事件,即read、write事件,在NioSocketChannel可读、可写事件发生时进行处理
- 处理任务队列中的任务,runAllTasks。
其中任务队列中的task有3种典型使用场景
- 用户程序自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
//...
}
});
- 非当前reactor线程调用channel的各种方法 例如在推送系统的业务线程里面,根据用户的标识,找到对应的channel引用,然后调用write类方法向该用户推送消息,就会进入到这种场景。最终的write会提交到任务队列中后被异步消费。
- 用户自定义定时任务
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
}
}, 60, TimeUnit.SECONDS);