前言——服务端处理网络请求
首先看看服务端处理网络请求的典型过程:
由上图可以看到,主要处理步骤包括:
- 获取请求数据,客户端与服务器建立连接发出请求,服务器接受请求(1-3)。
- 构建响应,当服务器接收完请求,并在用户空间处理客户端的请求,直到构建响应完成(4)。
- 返回数据,服务器将已构建好的响应再通过内核空间的网络 I/O 发还给客户端(5-7)。
设计服务端并发模型时,主要有如下两个关键点:
- 1、服务器如何管理连接,获取输入数据。
- 2、服务器如何处理请求。
以上两个关键点最终都与操作系统的 I/O 模型以及线程(进程)模型相关,下面详细介绍这两个模型。
前言——I/O多路复用
I/O多路复用是指使用一个线程来检查多个文件描述符(Socket)的就绪状态,比如调用select和poll函数,传入多个文件描述符,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动线程执行(比如使用线程池)。
一般情况下,I/O 复用机制需要事件分发器。 事件分发器的作用,将那些读写事件源分发给各读写事件的处理者。
涉及到事件分发器的两种模式称为:Reactor和Proactor。 Reactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的。本文主要介绍的就是 Reactor模式相关的知识。
一、传统阻塞IO服务模型——BIO模式
模型特点
- 1、采用阻塞IO模式获取输入的数据。
- 2、每个连接都需要独立的线程完成数据的输入,业务处理,数据返回。
针对传统阻塞 I/O 服务模型的 2 个缺点,解决方案:
1、基于 I/O 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
2、基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。
Reactor 对应的叫法:1、反应器模式;2、分发者模式(Dispatcher);3、通知者模式(notifier)
二、Reactor模式简介
Netty是典型的Reactor模型结构,关于Reactor的详尽阐释,本文站在巨人的肩膀上,借助 Doug Lea(就是那位让人无限景仰的大爷)的“Scalable IO in Java”中讲述的Reactor模式。
“Scalable IO in Java”的地址是:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
Reactor:是反应堆的意思,Reactor 模型是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。
服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式,即 I/O 多路复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术之一,大多数IO相关组件如Netty、Redis在使用的IO模式。
Reactor 模型中有 2 个关键组成:
Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人。
Handlers:处理程序执行 I/O 事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。
取决于 Reactor 的数量和 Hanndler 线程数量的不同,Reactor 模型有 3 个变种:
- 单 Reactor 单线程。
- 单 Reactor 多线程。
- 主从 Reactor 多线程。
三、多线程IO的致命缺陷
最最原始的网络编程思路就是服务器用一个while循环,不断监听端口是否有新的套接字连接,如果有,那么就调用一个处理函数处理,类似:
while(true){
socket = accept();
handle(socket)
}
这种方法的最大问题是无法并发,效率太低,如果当前的请求没有处理完,那么后面的请求只能被阻塞,服务器的吞吐量太低。
之后,想到了使用多线程,也就是很经典的connection per thread,每一个连接用一个线程处理,类似:
class BasicModel implements Runnable {
public void run() {
try {
ServerSocket serverSocket = new ServerSocket(8888);
while (!Thread.interrupted()) {
//创建新线程来handle
// or, single-threaded, or a thread pool
new Thread(new Handler(serverSocket.accept())).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class Handler implements Runnable {
private Socket socket;
public Handler(Socket socket) {
this.socket = socket;
}
public void run() {
try {
byte[] input = new byte[1024];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException e) {
e.printStackTrace();
}
}
private byte[] process(byte[] input) {
byte[] output=null;
//业务逻辑处理
return output;
}
}
}
对于每一个请求都分发给一个线程,每个线程中都独自处理上面的流程。
tomcat服务器的早期版本确实是这样实现的。
多线程并发模式,一个连接一个线程的优点是:
- 一定程度上极大地提高了服务器的吞吐量,因为之前的请求在read阻塞以后,不会影响到后续的请求,因为他们在不同的线程中。这也是为什么通常会讲“一个线程只能对应一个socket”的原因。另外有个问题,如果一个线程中对应多个socket连接不行吗?语法上确实可以,但是实际上没有用,每一个socket都是阻塞的,所以在一个线程里只能处理一个socket,就算accept了多个也没用,前一个socket被阻塞了,后面的是无法被执行到的。
多线程并发模式,一个连接一个线程的缺点是:
- 缺点在于资源要求太高,系统中创建线程是需要比较高的系统资源的,如果连接数太高,系统无法承受,而且,线程的反复创建-销毁也需要代价。
改进方法是:
- 采用基于事件驱动的设计,当有事件触发时,才会调用处理器进行数据处理。使用Reactor模式,对线程的数量进行控制,一个线程处理大量的事件。
四、单 Reactor 单线程
方案说明:
- 1、Select 是前面 I/O 复用模型介绍的标准网络编程 API,可以实现应用程序通过一个阻塞对象监听多路连接请求。
- 2、Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发。
- 3、如果是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理。
- 4、如果不是建立连接事件,则 Reactor 会分发调用连接对应的 Handler 来响应。
- 5、Handler 会完成 Read→业务处理→Send 的完整业务流程。
服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑,下面的NIO就属于这种模型。
Reactor模型的朴素原型
Java的NIO模式的Selector网络通讯,其实就是一个简单的Reactor模型。可以说是Reactor模型的朴素原型。
public class NIOServer {
public static void main() throws IOException {
// 1、获取Selector选择器
Selector selector = Selector.open();
// 2、获取通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 3.设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 4、绑定连接
serverSocketChannel.bind(new InetSocketAddress(8888));
// 5、将通道注册到选择器上,并注册的操作为:“接收”操作
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6、采用轮询的方式,查询获取“准备就绪”的注册过的操作
while (true) {
if(selector.select() == 0){
continue;
}
// 7、获取当前选择器中所有注册的选择键(“已经准备就绪的操作”)
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
// 8、获取“准备就绪”的时间
SelectionKey selectedKey = selectedKeys.next();
// 9、判断key是具体的什么事件
if (selectedKey.isAcceptable()) {
// 10、若接受的事件是“接收就绪” 操作,就获取客户端连接
SocketChannel socketChannel = serverSocketChannel.accept();
// 11、切换为非阻塞模式
socketChannel.configureBlocking(false);
// 12、将该通道注册到selector选择器上
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (selectedKey.isReadable()) {
// 13、获取该选择器上的“读就绪”状态的通道
SocketChannel socketChannel = (SocketChannel) selectedKey.channel();
// 14、读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
while ((length = socketChannel.read(byteBuffer)) != -1)
{
byteBuffer.flip();
System.out.println(new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
socketChannel.close();
}
// 15、移除选择键
selectedKeys.remove();
}
}
}
}
实际上的Reactor模式,是基于Java NIO的,在他的基础上,抽象出来两个组件——Reactor和Handler两个组件:
1)Reactor:负责响应IO事件,当检测到一个新的事件,将其发送给相应的Handler去处理;新的事件包含连接建立就绪、读就绪、写就绪等。
2)Handler:将自身(handler)与事件绑定,负责事件的处理,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。
4.1、什么是单线程Reactor呢?
如下图所示:
这是最简单的单Reactor单线程模型。Reactor线程是个多面手,负责多路分离套接字,Accept新连接,并分派请求到Handler处理器中。
下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多。Reactor和Hander 处于一条线程执行。
顺便说一下,可以将上图的accepter,看做是一种特殊的handler。
4.2、单线程Reactor的参考代码
“Scalable IO in Java”,实现了一个单线程Reactor的参考代码,Reactor的代码如下:
public class Handler implements Runnable {
private SocketChannel channel;
private SelectionKey selectionKey;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
public Handler(Selector selector, SocketChannel c) throws IOException {
this.channel = c;
c.configureBlocking(false);
// Optionally try first read now
this.selectionKey = channel.register(selector, 0);
//将Handler作为callback对象
this.selectionKey.attach(this);
//第二步,注册Read就绪事件
this.selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
private boolean inputIsComplete() {
/* ... */
return false;
}
private boolean outputIsComplete() {
/* ... */
return false;
}
private void process() {
/* ... */
return;
}
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void read() throws IOException {
channel.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
//第三步,接收write就绪事件
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
}
private void send() throws IOException {
channel.write(output);
//write完就结束了, 关闭select key
if (outputIsComplete()) {
selectionKey.cancel();
}
}
}
这两段代码,是建立在JAVA NIO的基础上的,这两段代码建议一定要看懂。可以在IDE中去看源码,这样直观感觉更佳。
4.3、单线程模式的缺点:
1、 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。
2、因此,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。
方案优缺点分析:
优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成。
缺点:性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。
缺点:可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。
使用场景:客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复杂度 O(1) 的情况。
五、多线程的Reactor
方案说明:
- 1、Reactor 对象通过select 监控客户端请求事件,收到事件后,通过dispatch进行分发。
- 2、如果建立连接请求,则右Acceptor 通过accept 处理连接请求,然后创建一个Handler对象处理完成连接后的各种事件。
- 3、如果不是连接请求,则由reactor分发调用连接对应的handler 来处理。
- 4、handler 只负责响应事件,不做具体的业务处理, 通过read 读取数据后,会分发给后面的worker线程池的某个线程处理业务。
- 5、worker 线程池会分配独立线程完成真正的业务,并将结果返回给handler。
- 6、handler收到响应后,通过send 将结果返回给client。
方案优缺点分析:
- 优点:可以充分的利用多核cpu 的处理能力。
- 缺点:多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈。
5.1、基于线程池的改进
在线程Reactor模式基础上,做如下改进:
- 1、将Handler处理器的执行放入线程池,多线程进行业务处理。
- 2、而对于Reactor而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor拆分为两个线程。
一个简单的图如下:
5.2、改进后的完整示意图
下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多,只是更加详细。Reactor是一条独立的线程,Hander 处于线程池中执行。
5.3、多线程Reactor的参考代码
“Scalable IO in Java”,的多线程Reactor的参考代码,是基于单线程做一个线程池的改进,改进的Handler的代码如下:
public class MThreadHandler implements Runnable {
private SocketChannel channel;
private SelectionKey selectionKey;
ByteBuffer input = ByteBuffer.allocate(1024);
ByteBuffer output = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
ExecutorService pool = Executors.newFixedThreadPool(2);
static final int PROCESSING = 3;
public MThreadHandler(Selector selector, SocketChannel c) throws IOException {
this.channel = c;
c.configureBlocking(false);
// Optionally try first read now
this.selectionKey = this.channel.register(selector, 0);
//将Handler作为callback对象
this.selectionKey.attach(this);
//第二步,注册Read就绪事件
this.selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
private boolean inputIsComplete() {
/* ... */
return false;
}
private boolean outputIsComplete() {
/* ... */
return false;
}
private void process() {
/* ... */
return;
}
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private synchronized void read() throws IOException {
// ...
channel.read(input);
if (inputIsComplete()) {
state = PROCESSING;
//使用线程pool异步执行
pool.execute(new Processer());
}
}
private void send() throws IOException {
channel.write(output);
//write完就结束了, 关闭select key
if (outputIsComplete()) {
selectionKey.cancel();
}
}
private synchronized void processAndHandOff() {
process();
state = SENDING;
// or rebind attachment
//process完,开始等待write就绪
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
private class Processer implements Runnable {
public void run() {
processAndHandOff();
}
}
}
Reactor 类没有大的变化,参考前面的代码。
六、主从 Reactor 多线程
针对单 Reactor 多线程模型中,Reactor 在单线程中运行,高并发场景下容易成为性能瓶颈,可以让 Reactor 在多线程中运行。
方案说明:
- 1、Reactor主线程 MainReactor 对象通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件。
- 2、当 Acceptor 处理连接事件后,MainReactor 将连接分配给SubReactor。
- 3、subreactor 将连接加入到连接队列进行监听,并创建handler进行各种事件处理。
- 4、当有新事件发生时, subreactor 就会调用对应的handler处理。
- 5、handler 通过read 读取数据,分发给后面的worker 线程处理。
- 6、worker 线程池分配独立的worker 线程进行业务处理,并返回结果。
- 7、handler 收到响应的结果后,再通过send 将结果返回给client。
- 8、Reactor 主线程可以对应多个Reactor 子线程, 即MainRecator 可以关联多个SubReactor。
Scalable IO in Java 对 Multiple Reactors 的原理图解
对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。代码如下:
public class MThreadReactor implements Runnable {
//subReactors集合, 一个selector代表一个subReactor
Selector[] selectors=new Selector[2];
int next = 0;
final ServerSocketChannel serverSocket;
private MThreadReactor(int port) throws IOException {
//Reactor初始化
selectors[0]=Selector.open();
selectors[1]= Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port));
//非阻塞
serverSocket.configureBlocking(false);
//分步处理,第一步,接收accept事件
SelectionKey selectionKey = serverSocket.register( selectors[0], SelectionKey.OP_ACCEPT);
//attach callback object, Acceptor
selectionKey.attach(new Acceptor());
}
public void run() {
try {
while (!Thread.interrupted()) {
for (int i = 0; i <2 ; i++) {
selectors[i].select();
Set<SelectionKey> selectionKeys = selectors[i].selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
//Reactor负责dispatch收到的事件
dispatch((SelectionKey) (iterator.next()));
}
selectionKeys.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
//调用之前注册的callback对象
if (r != null) {
r.run();
}
}
class Acceptor { // ...
public synchronized void run() throws IOException {
//主selector负责accept
SocketChannel connection = serverSocket.accept();
if (connection != null) {
//选个subReactor去负责接收到的connection
new Handler(selectors[next], connection);
}
if (++next == selectors.length) {
next = 0;
}
}
}
}
方案优缺点说明:
- 优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
- 优点:父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。
- 缺点:编程复杂度较高。
这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持。
七、Reactor编程的优点和缺点
优点:
1、响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
2、编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
3、可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;
4、可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;
缺点:
1、相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
2、Reactor模式需要底层的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持,如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效。
3、Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用改进版的Reactor模式如Proactor模式。
参考:
https://www.cnblogs.com/crazymakercircle/p/9833847.html
https://www.cnblogs.com/549294286/p/11241357.html
https://www.ancii.com/aax4jlaz/
http://www.softtest.com/index.php?m=content&c=index&a=show&catid=94&id=13513