reactor
Understanding Reactor Pattern with Java NIO
reactor-demo
以下内容原版可参看英文文章。
什么是Reactor模式
Reactor 模式是一种事件驱动架构的实现技术
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.
我们知道Reactor模式首先是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的 Request Handler。
Handle
Handle代表操作系统管理的资源,包括:网络链接,打开的文件,计时器,同步对象等等。在我们的web系统中,Handle代表与客户端连接的套接字,Synchronous Event Demultiplexer在这些套接字上等待事件的发生。
Synchronous Event Demultiplexer
在一个Handle集合上等待事件的发生。这里常用系统调用select[1],UNIX和WIN32平台都支持这个系统调用。select的返回结果说明handle上发生情况,需要被处理。
Initiation Dispatcher
提供接口:注册,删除和派发Event Handler。上面的Synchronous Event Demultiplexer等待事件的发生,当检测到新的事件,就把事件交给Initiation Dispatcher,它去回调Event Handler。事件种类一般有:接受到连接,数据输入,数据输出,超时。
Event Handler
定义一个抽象接口,包含一个钩子方法,实现特定服务的派发操作。这个方法实现了与特定应用相关的服务。
Concrete Event Handler
继承上面的类,实现钩子方法。应用把Concrete Event Handler注册到Initiation Dispatcher,等待被处理的事件。当事件发生,这些方法被回调。
Reactor pattern in Java NIO
我们将会基于java nio I/O 复用模型 实现reactor模式的demo
Java NIO 是为了弥补传统 I/O 工作模式的不足而研发的,NIO 的工具包提出了基于 Selector(选择器)、Buffer(缓冲区)、Channel(通道)的新模式;Selector、Channel和 SelectionKey(选择键)配合起来使用,可以实现并发的非阻塞型 I/O 能力。
让我们将java reactor的实现与标准意义对应起来:
Selector (demultiplexer)
Selector is the Java building block, which is analogous to the demultiplexer in the Reactor pattern. Selector is where you register your interest in various I/O events and the objects tell you when those events occur.
Reactor/initiation dispatcher
We should use the Java NIO Selector in the Dispatcher/Reactor. For this, we can introduce our own Dispatcher/Reactor implementation called ‘Reactor’. The reactor comprises java.nio.channels.Selector and a map of registered handlers. As per the definition of the Dispatcher/Reactor, ‘Reactor’ will call the Selector.select() while waiting for the IO event to occur.
Handle
对应 SelectionKey.
Event
不同的IO events 如 SlectionKey.OP_READ , SelectionKey.OP_ACCEPT,SelectionKey.OP_WRITE,SelectionKey.OP_CONNECT
Handler
事件处理器,A handler is often implemented as runnable or callable in Java.
code time
https://github.com/kasun04/rnd/tree/master/nio-reactor
在我们的代码里通过一个map来管理我们关心的事件和对应的事件handler。
当我们通过Selector.select
来轮询到来的事件。并遍历Set<SelectionKey>
.
while (true) { // Loop indefinitely
demultiplexer.select();// 是阻塞的
Set<SelectionKey> readyHandles =
demultiplexer.selectedKeys();
Iterator<SelectionKey> handleIterator =
readyHandles.iterator();
while (handleIterator.hasNext()) {
SelectionKey handle = handleIterator.next();
if (handle.isAcceptable()) {
EventHandler handler =
registeredHandlers.get(SelectionKey.OP_ACCEPT);
handler.handleEvent(handle);
// 需要将此次事件key移除,避免重复处理
handleIterator.remove();
}
if (handle.isReadable()) {
EventHandler handler =
registeredHandlers.get(SelectionKey.OP_READ);
handler.handleEvent(handle);
handleIterator.remove();
}
if (handle.isWritable()) {
EventHandler handler =
registeredHandlers.get(SelectionKey.OP_WRITE);
handler.handleEvent(handle);
handleIterator.remove();
}
}
}
我们在handleEvent里处理事件,并且注册新的关心的事件,比如在AcceptEventHandler处理完后,注册SelectionKey.OP_READ,将事件抛给下一个handler。并可以在
public void handleEvent(SelectionKey handle) throws Exception {
System.out.println("===== Accept Event Handler =====");
ServerSocketChannel serverSocketChannel =
(ServerSocketChannel) handle.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
socketChannel.configureBlocking(false);
socketChannel.register(
demultiplexer, SelectionKey.OP_READ);
}
}
我们看到上面轮询的代码其实是单线程的每一个handler的handleEvent都是在Reactor这个一个里的,为什么不做成多线程呢?
NIO由原来的BIO的阻塞读写(占用线程)变成了单线程轮询事件,找到可以进行读写的网络描述符进行读写。除了事件的轮询是阻塞的(没有可干的事情必须要阻塞),剩余的I/O操作都是纯CPU操作,没有必要开启多线程。
我们看read事件的handler,为了提高效率,业务代码开线程,将io处理与业务代码处理分离。
public void handleEvent(SelectionKey handle) throws Exception {
System.out.println("===== Read Event Handler =====");
SocketChannel socketChannel =
(SocketChannel) handle.channel();
// 从socket读取数据
byte[] buffer=read(socketChannel);
// 此处执行业务操作,最好单线程或者多线程,将io处理与业务代码处理分离
doBusiness(buffer);
// Rewind the buffer to start reading from the beginning
// Register the interest for writable readiness event for
// this channel in order to echo back the message
socketChannel.register(
demultiplexer, SelectionKey.OP_WRITE, inputBuffer);
}
BIO与NIO的对比
美团技术博客--Java NIO浅析
关键差别:
BIO accept,read,write 都是阻塞的。
NIO select 阻塞,read,write是非阻塞的。
bio 经典模型
{
ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(8088);
while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来
Socket socket = serverSocket.accept();//阻塞
//为新的连接创建新的线程
executor.submit(new ConnectIOnHandler(socket));
}
class ConnectIOnHandler extends Thread{
private Socket socket;
public ConnectIOnHandler(Socket socket){
this.socket = socket;
}
public void run(){
while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){死循环处理读写事件
String someThing = socket.read()....//读取数据
if(someThing!=null){
......//处理数据
socket.write()....//写数据
}
}
}
}
这是一个经典的每连接每线程的模型,之所以使用多线程,主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里(比如阻塞在read,那么就无法accept到其他请求了);但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。
NIO reactor模型
如上设计和分析:
由于read,write的非阻塞,所以可以不用多线程,并且由于线程的节约,连接数大的时候因为线程切换带来的问题也随之解决,进而为处理海量连接提供了可能。
单线程处理I/O的效率确实非常高,没有线程切换,只是拼命的读、写、选择事件。但现在的服务器,一般都是多核处理器,如果能够利用多核心进行I/O,无疑对效率会有更大的提高。
限制:
Java的Selector对于Linux系统来说,有一个致命限制:同一个channel的select不能被并发的调用。因此,如果有多个I/O线程,必须保证:一个socket只能属于一个IoThread,而一个IoThread可以管理多个socket。