概述
这次我们来谈谈网络IO模型,网络IO是系统底层网络操作的一环,不管做系统的哪一层,基本都会涉及到网络IO模型,从架构设计、中间件设计、分布式系统到代码排错,很可能都会涉及到网络IO模型。我打算先聊聊朴素的网络IO 模型里的各个模型,再探讨一下并发模型,然后我们来讨论一下高性能网络IO模型,高性能网络IO里我们仔细讨论一下随处可见的 reactor 模型。
网络IO模型
网络IO操作是操作系统的基本操作,基本体现在网络套接字、网络属性的操作API上,还有很多与文件操作等等的API也会经常用到。
网络IO可以笼统地概述为一个串行的过程:
- 初始化服务端地址、套接字等参数
- 绑定套接字与对应的服务端地址
- 建立服务端监听连接
- 若有客户端连接,接收客户端连接
- 通过客户端套接字与客户端进行读或写交互
- 关闭客户端连接
这个简单模型是同步的,即每个操作都是依次执行,实际上目前常见的开源软件如netty、nginx、swoole等的网络处理里,网络IO基本模型都是同步的,只是在整个服务端处理连接的流程中是异步的。如nginx会将accept事件和读写事件放到队列里,异步去处理读写操作。
下面我们分别来走进每个网络IO模型。
阻塞IO(blocking I/O)
阻塞的定义可能每个人的说法都不一样,一般来说阻塞是一个进程状态,若一个任务在执行过程中被操作系统剥夺了CPU控制权,就可以认为该任务被阻塞了。
上述笼统的网络IO流程就是一个阻塞的IO模型,我们可以通过一个简单的实例来描述一下:
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
void handleError(char *str) {
printf("%s, strerror: %s\n", str, strerror(errno));
exit(errno);
}
int main(int argc, char *argv[]) {
// 初始化参数
int serverFd;
int clientFd;
int len;
struct sockaddr_in serverAddr;
struct sockaddr_in clientAddr;
socklen_t addrSize;
char buf[BUFSIZ];
char writeBuf[1024];
// 设置服务端地址等参数
serverAddr.sin_family = AF_INET; // IP通信
serverAddr.sin_addr.s_addr = inet_addr("127.0.0.1"); // 服务器IP
serverAddr.sin_port = htons(8000); // 端口
// 初始化服务器端套接字
if ((serverFd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
handleError("socket error");
}
// 设置 socket 属性为非阻塞方式
/*if (fcntl(serverFd, F_SETFL, O_NONBLOCK) == -1) {
handleError("fcntl error");
}*/
// 绑定套接字
if (bind(serverFd, (struct sockaddr *) &serverAddr, sizeof(struct sockaddr)) < 0) {
handleError("bind error");
}
// 监听连接请求-队列长5
if (listen(serverFd, 5) < 0) {
handleError("listen error");
}
do {
// 接收客户端连接
addrSize = sizeof(clientAddr);
clientFd = accept(serverFd, (struct sockaddr *) &clientAddr, &addrSize);
if (clientFd == -1) {
handleError("accept error");
/*sleep(1);
printf("处理其他事情…\n"); // 处理其他事情
continue;*/
}
printf("接收客户端 %s\n", inet_ntoa(clientAddr.sin_addr));
len = send(clientFd, "Welcome!\n", 9, 0);
// 设置读为阻塞
if (fcntl(clientFd, F_SETFL, O_RDWR) == -1) {
handleError("fcntl error");
}
// 读客户端,并写服务端响应
while ((len = recv(clientFd, buf, BUFSIZ, 0)) > 0) {
buf[len] = '\0';
printf("客户端请求:%s", buf);
memset(writeBuf, 0, strlen(writeBuf));
strcat(writeBuf, "服务端响应:");
strcat(writeBuf, buf);
strcat(writeBuf, "\0");
if (send(clientFd, writeBuf, strlen(writeBuf), 0) < 0) {
handleError("write error");
}
}
} while (1);
return 0;
}
可以看到对客户端的 accept 和 recv 都是阻塞的,当没有客户端连接或者当前客户端没有写数据的时候,进程会陷入系统调用,CPU 控制权是被剥夺的。
我们可以简单演示一下,服务端运行起来后:
可以看到服务端每一次操作后都会停住,也就是阻塞掉,第一个客户端的每次请求必须要等上一次处理完了才会处理当前请求,并且可以看到第二个客户端发起连接后服务端是无响应的,连接请求是暂时无法被服务端 accept 处理的。
关于阻塞IO,Java 有一个处理库叫 BIO,有时间我们可以了解一下,基本操作对程序员非常友好。
非阻塞IO(non-blocking I/O)
非阻塞就是无客户端请求的时候可以去处理的事情,轮训监听客户端连接。
将上述例子注释的代码稍作修改即为一个简单的同步非阻塞IO实例:
if (fcntl(serverFd, F_SETFL, O_NONBLOCK) == -1) {
handleError("fcntl error");
}
// handleError("accept error");
sleep(1);
printf("处理其他事情…\n"); // 处理其他事情
continue;
效果可参考:
信号驱动IO(signal-driven I/O)
在信号驱动式 IO 模型中,程序使用套接口进行信号驱动 I/O,并安装信号处理函数,不阻塞进程运行。信号驱动IO的使用似乎不太常见,我们不作讨论了。
异步IO
异步IO目前 Linux 下的支持并不太成熟,Window 下是支持的,但是似乎业界没有非常广泛的应用,异步IO我们就也不作过多讨论了。后面我们会介绍一下基于异步IO的Proactor模型。
IO多路复用
多路复用IO模型也叫事件通知模型,一般理解是操作系统提供的单线程或单进程同时监测若干个文件描述符是否可以执行IO操作的能力。内核一旦发现某个或某些IO条件准备读写,就会通过该能力通知用户进程。
比如我们现在有 n 个需要读或写的 fd ,我们通过某些系统 API 将这些 fd 通知给内核,内核一旦发现这些 fd 有读或写就会通过某些系统 API 通知进程,另外常用的操作是管理这个 fd 集合,如往 fd 集合里添加 fd 、删除 fd 等操作。
上述是对多路复用IO模型比较笼统的理解,我们知道常见的多路复用IO模型有 select、poll、epoll、queue 等等,每个IO模型都有其适用的场景,一般我们熟知的高性能IO模型里最常见的是 epoll ,优秀的开源高性能网“组件”如 nginx、swoole、netty、goroutine 调度模型都是 epoll 封装的。这里我们就不打算单纯得学习 select、epoll 系统调用API的使用了,一是因为 select 模型在实际使用中场景太少,单纯地了解这些系统调用 API 的使用意义不大,另外一个原因是我们用单进程简单实现的 select 或者 epoll 模型无法真实应用,实际场景漏考虑,也非高性能。对 select、epoll 系统 API 的使用我们翻查文档即可。
后面的 reactor 会用 Java 介绍 epoll 的一般常规使用,基于Java NIO 写的轮子稳定可用,完善后有实际使用意义。后面我们会站在 nginx 的肩膀上,研究一下 nginx 对 epoll 的使用。
并发模型
单纯地依靠操作系统网络IO模型的能力并不能写出高性能的网络IO模型,在写出高性能IO操作的路上,我们少不了的就是并发模型,下面我们来简单了解一下并发模型。
多进程/多线程
多任务处理我们应该都不会陌生,多任务处理里最常见的就是多进程或多线程了,PHP 使用的可能是多进程居多,Java 一般使用多线程。
多任务的处理逻辑里,比较麻烦的应该相互通信了,进程间由于是隔离的,所以一般的通信方式就是共享内存,当然还有管道(pipe)、FIFO(命名管道)、消息队列(详见sys/msg.h)、信号量(sys/sem.h)、进程通知信号,或者是网络通信等等。但是一般语言支持的进程通信组件不太成熟,所以逻辑写起来一般比较累人,如果场景需要频繁地通信,一般可能会用线程或者协程。
相比较进程,线程的通信会成熟很多,一个全局变量即可通信,Java 还有一些阻塞的队列操作,使得任务之间通信会比较简单。
函数式
函数式这里我们就不讨论了,网络IO领域里函数式比较少见,Haskell 对网络是支持好像也不太好学,后面我们在研究吧。
Actor 模型
暂不讨论。
CSP 模型
其实很多语言对 CSP 并发模型的实现就已经使用了 IO 模型了,如:
- go 的 goroutine 调度对 CSP 的底层实现就是使用 epoll,后面我们可以研究一下 GMP 模型的源码实现。
- swoole 的 Coroutine 调度也是对 CSP 模型的实现,底层也是使用 epoll,本身也是一个高性能的网络IO模型,实现应该也是 reactor。
这些巨人的肩膀,我们有机会都可以爬上去一览众山。
高性能网络模型
我们如果粗暴地将并发模型和网络IO模型进行一个简单的组合,就可以发现我们目前业界常用的高性能网络IO的雏形,但是也有一些例外,比如 CSP 模型就是使用 epoll 实现的并发模型,所以其本身就是一个高性能网络IO模型。
同步非阻塞IO+多进程
也叫 PPC (Process Per Connection),传统的或者是有名的有 Apache 的实现,也叫 Apache 模型(据了解 Apache 好像已经舍弃该实现)。
基本原理也简单明了,每个连接都会分到一个进程处理,在并发不高的情况下,也能实现较高的性能。但是显而易见的有两个问题:
- 一是受操作系统可用进程数的限制,在 LInux 就是可用 fd 的配置,如果系统支持可打开的文件句柄为6W,那么并发能力可以简单地认为是6W;
- 二是一旦进程数量多了,进程切换的性能损耗极大,进而影响系统的响应时间、吞吐量。
同步非阻塞IO+多线程
也叫 TPC (Thread Per Connection),与上述的 PPC 模型类似,其中有区别的是换为线程后,上下文切换开销会小很多。
- 我们可以自己使用java BIO 和线程池编写一个实例作实践,开发起来也是非常方便的,这里我们就不作展示了。
同步非阻塞IO+CSP模型(多协程)
上文说过,CSP 模型本身就已经是一个高性能IO模型。
在实际应用中,以 go 语言为例,由于 go 已经对 CSP 模型有实现了,所以一般用 go 编写的优秀的开源中间件里对网络IO模型的实现极为简单,不需要自己操作处理 epoll 的fd,只需要将处理客户端连接的协程启动即可,GMP 调度器会使用 epoll 调度该协程。
比如开源数据库中间件 kingshard 里对于网络IO的实现:
func (s *Server) Run() error {
s.running = true
// flush counter
go s.flushCounter()
for s.running {
conn, err := s.listener.Accept()
if err != nil {
golog.Error("server", "Run", err.Error(), 0)
continue
}
// 将处理连接协程交由 GMP 调度
go s.onConn(conn)
}
return nil
}
怎么样,很轻松吧。
多路复用IO+多线程/多进程
多路复用IO模型是我们日常见得最多的IO模型,一般会根据场景使用单进程、多进程或者使用线程实现功能,多路复用IO模型由于用的人太多,慢慢有人总结出来一个大家都认可的模型,这个模型就是 reactor 模型,我们详细地来探讨一下这个模型。我们每天都在使用的 nginx、redis、swoole、netty 都是使用这个模型。
Reactor模型
reactor 模型其实由三部分组成,reactor、acceptor、handler。
- reactor 负责监听和分发连接和读写事件,连接事件交由 acceptor 处理,读写事件会交给 handler 处理;
- acceptor 负责初始化服务端连接,接收新客户端连接;
- handler 负责处理客户端连接读和写;
当这三部分和多进程/多线程结合起来,我们自然就会遇到每个组件要起多少进程的问题,其实每个组件都可以起多个进程。
如遇到很多个服务端连接(如多个端口)的情况下,起多个 acceptor 是可以的,只是像诸如 nginx、redis 的服务端场景下,服务端连接一般不会很多,所以单独起多个 acceptor 对提升性能其实帮助不大,反而会增加上下文切换开销;
对于处理事件通知的 reactor ,多个 reactor 可以帮助 nginx 最大地提高服务端吞吐量,但是在像 redis 这些性能瓶颈是内存而非 CPU 的场景下,不用多个 reactor 也是没问题的;
因为 handler 是处理实际业务逻辑读写的,所以多进程或单进程需要看业务场景;
因此我们发现不能一概而论地敲定某个 reactor 模型的实现就是高性能的,最合适的其实还是要依据具体场景。但是因为我们主要关注网络IO的多路复用,其实就是 epoll 的使用,我们可以分别以单 reactor 和多 reactor 来讨论一下业界的实现。
单 reactor 模型
单进程的单 reactor 方案在实践中应用场景不多,只适用于业务处理非常快速的场景,但是其实一般并发不大的情况下,单进程单 reactor 还是没问题的。
我们常用的 redis 就是单进程单 reactor 的实现,可能因为redis的瓶颈不在 CPU,而在内存和其它指标。可以参考极客时间的专栏里的这个图:
多进程加单 reactor 的设计可以是多个 acceptor 进程或者多个处理业务读写的 handler 进程,如果我们遇到的场景是并发不高,但是业务读写比较耗时的场景,还是可以考虑单个 reactor +多个 handler 的设计的,至于 acceptor 根据情况判断,不单独起 acceptor 进程也是很常见的。可以参考下图:
至于单 reactor 其它设计就要看我们遇到的场景再作讨论了。如果我们使用 Java ,用 NIO 去实现还是很友好很方便的,我们可以自己实践不同的设计。
多 reactor 模型
接下来我们重点来研究一下多 reactor 下的实现,单进程多 reactor 的实现没什么意义(也就是一个进程处理多个 epoll ),我们就不讨论了。
关于多进程下的多 reactor,几乎每个开源组件对于 reactor 模型的实现几乎都不一样,我们不妨思考一个自己认为高效的模型,对于这种不同场景下有不同实现的模型,实践更能出真知。
我们这个实现对 Netty 和 nginx 的模型做了简单的参考,基本结构可以查看下图:
acceptor 独立出来的原因是更好维护,acceptor 的逻辑是接客户端新连接,并添加连接 fd 到响应的 reactor 里。这个 acceptor 可以再拓展为类似 netty 里的 mainReactor ,用于处理鉴权、分发请求至 subReactor 等逻辑;
对于 reactor 则根据配置启动若干个子线程,一般可以配置为 CPU 核数一样,具体逻辑是处理读写连接,“分发”至 handler 处理;
handler 笔者这里决定和 reactor 做到一个线程内处理读写逻辑,只是模块是分开的,没有做成一个 handler 线程池是因为这里会增加很多线程切换的性能损耗。其实即使这样,读写事件和创建新连接事件也是分开进程处理的,不需要担心连接事件处理会被阻塞在读写事件后。
代码量简单且不多,以供我们学习使用:
// Acceptor 代码:
package PlainReactorLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Random;
/**
* 连接事件就绪,处理连接事件
*/
class Acceptor implements Runnable {
private Logger logger = LoggerFactory.getLogger(Acceptor.class);
private ServerSocketChannel serverSocketChannel;
Acceptor() {
// 初始化服务端连接
initServerSocket();
}
public static void start() {
new Thread(new Acceptor()).start();
}
public void run() {
while (!Thread.interrupted()) {
try {
SocketChannel socketChannel = serverSocketChannel.accept();
logger.info("接收到客户端连接……");
if (socketChannel != null) {
register(socketChannel);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 初始化服务端连接
*/
private Acceptor initServerSocket() {
try {
// 打开监听信道
serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
// 与本地端口绑定
serverSocketChannel.socket().bind(address);
// 设置为非阻塞模式
//serverSocketChannel.configureBlocking(false);
logger.info("初始化服务端连接完成……");
} catch (IOException e) {
e.printStackTrace();
}
return this;
}
/**
* 通知Reactor注册选择器, 在注册过程中指出该信道可以进行Read操作
*/
private void register(SocketChannel socketChannel) {
try {
socketChannel.configureBlocking(false);
// 唤起selector以防锁未释放
int selectorIndex = new Random().nextInt(Server.processNum);
Server.selector[selectorIndex].wakeup();
socketChannel.register(Server.selector[selectorIndex], SelectionKey.OP_READ);
logger.info("Acceptor注册Socket到Reactor Selector" + selectorIndex + "……");
} catch (IOException e) {
e.printStackTrace();
}
}
}
// Reactor 代码:
package PlainReactorLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
/**
* 等待事件到来,分发事件处理
*/
class Reactor implements Runnable {
public static final Logger logger = LoggerFactory.getLogger(Reactor.class);
private Integer selectorIndex;
Reactor(Integer selectorIndex) {
this.selectorIndex = selectorIndex;
try {
// 初始化 selector
Server.selector[selectorIndex] = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void start(Integer processNum) {
for (Integer selectorIndex = 0; selectorIndex < processNum; selectorIndex++) {
new Thread(new Reactor(selectorIndex)).start();
}
}
public void run() {
try {
logger.info("Reactor" + this.selectorIndex + " 运行中……");
while (!Thread.interrupted()) {
// 创建选择器
Server.selector[selectorIndex].select();
logger.info("Reactor" + this.selectorIndex + " 处理IO事件……");
Set<SelectionKey> selected = Server.selector[selectorIndex].selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
dispatch(it.next());
it.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 分发事件处理
*/
void dispatch(SelectionKey k) {
if (k.isReadable()) {
// 若是IO读写事件,调handler处理
logger.info("Reactor" + this.selectorIndex + " IO读写事件分发……");
SocketChannel socketChannel = (SocketChannel) k.channel();
new Handler(socketChannel).run();
}
}
}
// Handler 代码:
package PlainReactorLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
/**
* 处理读写业务逻辑
*/
class Handler implements Runnable {
private SocketChannel socketChannel;
private static final Logger logger = LoggerFactory.getLogger(Handler.class);
public Handler(SocketChannel socket) {
socketChannel = socket;
}
public void run() {
try {
logger.info("处理业务逻辑……");
String question = read();
String resultMsg = process(question);
write(resultMsg);
} catch (Exception e) {
e.printStackTrace();
}
}
private String read() throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = socketChannel.read(buffer);
if (bytesRead <= 0) {
throw new IOException("客户端数据为空");
}
buffer.flip();
// 将字节转化为为UTF-16的字符串
String receivedString = Charset.forName("utf-8").newDecoder().decode(buffer).toString();
// 控制台打印
logger.info("接收到来自" + socketChannel.socket().getRemoteSocketAddress() + "的信息:" + receivedString);
return receivedString;
}
private void write(String resultMsg) throws IOException {
socketChannel.write(ByteBuffer.wrap(resultMsg.getBytes()));
logger.info("回复" + socketChannel.socket().getRemoteSocketAddress() + "信息:" + resultMsg);
}
/**
* task 业务处理
*/
public String process(String question) {
String res = question.replace("?", ".");
return res;
}
}
实现效果一览:
更详细的代码可以参看:
https://github.com/varXinYuan/reactorLoop
关于 Netty 的参考,拜读了一下大神对于 NIO 和 netty 的介绍:
关于 Nginx 进程模型的参考,我们会在下一次具体探讨 nginx 源码里 reactor 模型的实现,敬请期待。
异步IO+多进程
Proactor 模型
Proactor 模型要依赖操作系统异步IO的支持,因此并未广泛应用,目前 Windows 下通过 IOCP 实现了真正的异步 I/O,而在 Linux 系统下的 AIO 并不完善,因此在 Linux 下实现高并发网络编程时都是以 Reactor 模式为主。
所以即使 Boost.Asio 号称实现了 Proactor 模型,其实它在 Windows 下采用 IOCP,而在 Linux 下是用 Reactor 模式(采用 epoll)模拟出来的异步模型。
Netty 在 Linux 下的 AIO 实现也是使用 epoll 模拟的。
结语
到这里我们把网络IO模型的各个分类、细节和实现都游览了一遍。下次我们会站在 nginx 源码的肩膀上再览群山,不见不散。