一、内核接收数据流程
1.
网卡发现 MAC 地址符合,就将包收进来;发现 IP 地址符合,根据 IP 头中协议项,知道上一层是 TCP 协议;2.
DMA把TCP数据包copy到内核缓冲区;3.
触发CPU中断,中断程序摘除TCP头通过socket五要素(源IP/PORT、目的IP/PORT、协议)找到对应的socket文件,并把原始二进制数据报copy到socket接收缓冲区;4.
中断程序唤醒被阻塞的内核线程;5.
内核线程切换到用户线程把数据从socket接口缓冲区copy到应用内存;
二、中断处理流程
I/O发出的信号的异常代码,拿到异常代码之后,CPU就会触发异常处理的流程。计算机在内存里会保存中断向量表,用来存放不同的异常代码对应的异常处理程序所在的地址。
CPU在拿到了异常码之后,会先把当前的程序执行的现场,保存到程序栈里面,然后根据异常码查询,找到对应的异常处理程序,最后把后续指令执行的指挥权,交给这个异常处理程序。
异常处理程序结束之后返回到原来指令执行的位置继续执行;
三、阻塞不占用 cpu
网卡何时接收到数据是依赖发送方和传输路径的,这个延迟通常都很高,是毫秒(ms)级别的。而应用程序处理数据是纳秒(ns)级别的。也就是说整个过程中,内核态等待数据,处理协议栈是个相对很慢的过程。这么长的时间里,用户态的进程是无事可做的,因此用到了“阻塞(挂起)”。
阻塞是进程调度的关键一环,指的是进程在等待某事件发生之前的等待状态。请看下表,在 Linux 中,进程状态大致有 7 种(在 include/linux/sched.h 中有更多状态):
从说明中可以发现,“可运行状态”会占用 CPU 资源,另外创建和销毁进程也需要占用 CPU 资源(内核)。重点是,当进程被"阻塞/挂起"时,是不会占用 CPU 资源的。
为了支持多任务,Linux 实现了进程调度的功能(CPU 时间片的调度)。而这个时间片的切换,只会在“可运行状态”的进程间进行。因此“阻塞/挂起”的进程是不占用 CPU 资源的。
四、工作队列和等待队列
工作队列:
为了方便时间片的调度,所有“可运行状态”状态的进程组成的队列;fd文件列表:
内核打开的文件句柄,Linux一切皆文件,用户线程执行创建Socket时内核就会创建一个由文件系统管理的sock对象;sock:
socket内核中的数据结构,主要包含发送缓冲区、接收缓冲区、等待队列;
struct sock {
__u32 daddr; /* 外部IP地址 */
__u32 rcv_saddr; /* 绑定的本地IP地址 */
__u16 dport; /* 目标端口 */
__u16 sport; /* 源端口 */
unsigned short family; /* 地址簇 */
int rcvbuf; /* 接受缓冲区长度(单位:字节) */
struct sk_buff_head receive_queue; /* 接收包队列 */
int sndbuf; /* 发送缓冲区长度(单位:字节) */
struct sk_buff_head write_queue; /* 包发送队列 */
wait_queue_head_t *sleep; /* 等待队列,通常指向socket的wait域 */
......
}
等待队列:
等待当前socket的线程;
工作队列中线程执行到阻塞操作等待socket时,会从工作队列中移除,移动到该socket的等待队列中;当socket接收到数据后,操作系统将该socket等待队列上的进程重新放回到工作队列,该进程变成运行状态,继续执行代码。
五、BIO
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(9000);
while (true) {
// 没有连接-阻塞
Socket socket = serverSocket.accept();
byte[] bytes = new byte[1024];
InputStream inputStream = socket.getInputStream();
while (true) {
// 没有数据-阻塞
int read = inputStream.read(bytes);
if (read != -1) {
System.out.println(new String(bytes, 0, read));
} else {
break;
}
}
socket.close();
}
}
BIO模式存在两个阻塞点,一个时accept阻塞等待客户端连接,一个是阻塞等待socket请求数据;简单跟以下源码就会发现
new ServerSocket(9000)
最终通过
-
int newfd = socket0(stream, false /*v6 Only*/);
调用Linuxint socket(int domain, int type, int protocol);
创建服务端socket; -
bind0(nativefd, address, port, exclusiveBind);
调用Linuxint bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
绑定端口; -
listen0(nativefd, backlog);
调用Linuxint listen(int sockfd, int backlog);
设置监听;
接下来serverSocket.accept()
最终通过
-
newfd = accept0(nativefd, isaa);
调用Linuxint accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
阻塞等待客户端连接,会创建一个socket用于跟该客户端进行通信,返回socket文件描述符fd;
最后inputStream.read(bytes);
调用Linux ssize_t recv(int sockfd, void *buf, size_t len, int flags);
阻塞从socket读缓冲区读取客户端请求数据;
可以通过 man 命令查看Linux 系统调用方法具体描述;
通过传统BIO的操作方式可以看出一个请求必须要创建一个内核线程进行处理,recv
只能监视单个socket,当并发比较高时就会消耗大量系统资源,也就是所谓的C10K问题;那么如何解决这个问题呢?后面出现的多路复用 select / poll / epoll思路都是使用一个线程来处理若干个连接(监视若干个socket),类似餐厅服务员的角色。
六、select
select 方案是用一个 fd_set 结构体来告诉内核同时监控多个socket,当其中有socket的状态发生变化或超时,则调用返回。之后应用使用 FD_ISSET 来逐个查看是哪个socket的状态发生了变化。
6.1 select原理
以下面select伪代码为例:
int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...)
listen(s, ...)
//接受客户端连接
int c = accept(s, ...)
int readSet[] = 存放需要监听的socket文件描述符
while(1){
int n = select(..., readSet, ...)
for(int i=0; i < readSet.count; i++){
if(FD_ISSET(readSet[i], ...)){
//fds[i]的数据处理
}
}
}
select 系统调用函数
# maxfd:表示的是待测试的描述符基数,它的值是待测试的最大描述符加 1
# readset:读描述符集合
# writeset:写描述符集合
# exceptset:错误描述符集合
# timeout:超时时间
int select(int maxfd, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout);
上面示例代码中,先准备一个数组 readSet 存放着所有需要监视读事件的socket。然后调用select,如果 readSet 中的所有socket都没有数据,select会阻塞,直到有一个socket接收到数据,select返回,唤醒线程。用户可以遍历 readSet,通过FD_ISSET (对这个数组进行检测,判断出对应socket的元素 readSet[fd]是 0 还是 1)判断具体哪个 socket 收到数据,然后做出处理。
6.2 select流程
线程 A 调用 select readSet 数组元素为sock1、sock2、sock3 ,此时3个socket 都没有数据可读,就把线程A 从工作队列中移除,并分别添加到3个sock 的等待队列中;
当3个sock中任意一个有数据可读时,中断程序都会把线程A 从所有sock等待队列中移除并重新加入工作队列,等待cpu时间片继续执行(即从select中返回)。但是此时线程A 并不知道哪个socket有数据,于是还要遍历readSet使用 FD_ISSET 来逐个查看是哪个socket的有数据可读。
6.3 select的缺点
1、
每次调用select都需要将线程加入到所有监视socket的等待队列,每次唤醒都需要从每个队列中移除。这里涉及了两次遍历,而且每次都要将整个fd_set列表传递给内核,有一定的开销。正是因为遍历操作开销大,出于效率的考量,才会规定select的最大监视数量,默认只能监视1024个socket。
2、
由于只有一个字段记录关注和发生事件,每次调用之前要重新初始化 fd_set 结构体。
3、
线程被唤醒后,程序并不知道哪些socket收到数据,还需要遍历一次。
七、poll
总结以下 select 的缺点就是句柄上限+重复初始化+逐个排查所有文件句柄状态效率不高。poll 主要解决 select 的前两个问题:通过改变跟内核交互的数据结构突破了文件描述符的限制,同时使用不同字段分别标注关注事件和发生事件,来避免重复初始化。
int poll(struct pollfd *fds, unsigned long nfds, int timeout);
# fd:描述符 fd
# events:描述符上待检测的事件类型events,注意这里的 events 可以表示多个不同的事件,
# 具体的实现可以通过使用二进制掩码位操作来完成,例如,POLLIN 和 POLLOUT 可以表示读和写事件
# revents:
struct pollfd {
int fd; /* file descriptor */
short events; /* events to look for */
short revents; /* events returned */
};
和 select 非常不同的地方在于:
- poll 每次检测之后的结果不会修改原来的传入值,而是将结果保留在 revents 字段中,这样就不需要每次检测完都得重置待检测的描述字和感兴趣的事件。
- 在 select 里面,文件描述符的个数已经随着 fd_set 的实现而固定,没有办法对此进行配置;而在 poll 函数里,可以控制 pollfd 结构的数组大小,这意味着可以突破原来 select 函数最大描述符的限制,在这种情况下,应用程序调用者需要分配 pollfd 数组并通知 poll 函数该数组的大小。
八、epoll
epoll 使用 eventpoll 作为中间层,线程不用加入在被监视的每一个 socket 阻塞队列中,也不用再遍历 socket 列表查看哪些有事件发生。
epoll提供了三个函数:
# 创建了一个 epoll 实例 epollevent
int epoll_create(int size);
# 往这个 epoll 实例增加或删除监控的事件
# epfd:epoll_create 创建的 epoll 实例句柄
# op:增加还是删除一个监控事件
# fd:注册的事件的文件描述符
# event:注册的事件类型,并且可以在这个结构体里设置用户需要的数据
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
# 类似之前的 poll 和 select 函数,调用者进程被挂起,在等待内核 I/O 事件的分发
# epfd:实例描述字,也就是 epoll 句柄
# events:用户空间需要处理的 I/O 事件,这是一个数组,数组的大小由 epoll_wait 的返回值决定,这个数组的每个元素都是一个需要待处理的 I/O 事件,其中 events 表示具体的事件类型,事件类型取值和 epoll_ctl 可设置的值一样,这个 epoll_event 结构体里的 data 值就是在 epoll_ctl 那里设置的 data,也就是用户空间和内核空间调用时需要的数据
# maxevents:大于 0 的整数,表示 epoll_wait 可以返回的最大事件值
# timeout:阻塞调用的超时值,如果这个值设置为 -1,表示不超时;如果设置为 0 则立即返回,即使没有任何 I/O 事件发生
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
8.1 代码示例
public class ServerConnect {
private static final int BUF_SIZE = 1024;
private static final int PORT = 8080;
private static final int TIMEOUT = 3000;
public static void main(String[] args) {
selector();
}
public static void selector() {
Selector selector = null;
ServerSocketChannel ssc = null;
try {
// 打开一个Channel
ssc = ServerSocketChannel.open();
// 将Channel绑定端口
ssc.socket().bind(new InetSocketAddress(PORT));
// 设置Channel为非阻塞,如果设置为阻塞,其实和BIO差不多了。
ssc.configureBlocking(false);
// 打开一个Slectore
selector = Selector.open();
// 向selector中注册Channel和感兴趣的事件
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// selector监听事件,select会被阻塞,直到selector监听的channel中有事件发生或者超时,会返回一个事件数量
//TIMEOUT就是超时时间,selector初始化的时候会添加一个用于主动唤醒的pipe,待会源码分析会说
if (selector.select(TIMEOUT) == 0) {
System.out.println("==");
continue;
}
/**
* SelectionKey的组成是selector和Channel
* 有事件发生的channel会被包装成selectionKey添加到selector的publicSelectedKeys属性中
* publicSelectedKeys是SelectionKey的Set集合
*下面这一部分遍历,就是遍历有事件的channel
*/
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
handleAccept(key);
}
if (key.isReadable()) {
handleRead(key);
}
if (key.isWritable() && key.isValid()) {
handleWrite(key);
}
if (key.isConnectable()) {
System.out.println("isConnectable = true");
}
//每次使用完,必须将该SelectionKey移除,否则会一直存储在publicSelectedKeys中
//下一次遍历又会重复处理
iter.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (selector != null) {
selector.close();
}
if (ssc != null) {
ssc.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));
}
public static void handleRead(SelectionKey key) throws IOException {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = sc.read(buf);
while (bytesRead > 0) {
buf.flip();
while (buf.hasRemaining()) {
System.out.print((char) buf.get());
}
System.out.println();
buf.clear();
bytesRead = sc.read(buf);
}
if (bytesRead == -1) {
sc.close();
}
}
public static void handleWrite(SelectionKey key) throws IOException {
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
SocketChannel sc = (SocketChannel) key.channel();
while (buf.hasRemaining()) {
sc.write(buf);
}
buf.compact();
}
}
调用到内核的流程如下:
1、
ssc = ServerSocketChannel.open() -> EPollSelectorProvider -> int socket
(int domain, int type, int protocol);
2、
ssc.socket().bind(newInetSocketAddress(PORT)); -> int bind
(int sockfd, const struct sockaddr addr, socklen_t addrlen);
3、
getImpl().listen(backlog); -> int listen
(int sockfd, int backlog);
4、
ssc.configureBlocking(false); -> int fcntl
(int fd, int cmd, ... / arg */ );
5、
selector=Selector.open(); -> EPollSelectorImpl epollCreate() -> int epoll_create
(int size);
6、
ssc.register(selector,SelectionKey.OP_ACCEPT); -> int epoll_ctl
(int epfd, int op, int fd, struct epoll_event *event);
while
7、
selector.select(TIMEOUT) -> int epoll_wait
(int epfd, struct epoll_event *events, int maxevents, int timeout);
8.2 epoll流程
如图,eventpoll 主要包含3个结构
-
监视队列:
epoll_ctl添加或删除所要监听的socket。例如:图中通过epoll_ctl添加sock1、sock2的监视,内核会将eventpoll添加到这socket的等待队列中(图中虚线)。为了方便的添加和移除,还要便于搜索,以避免重复添加,使用红黑树结构进行存储(搜索、插入和删除时间复杂度都是O(log(N)),效率较好。 -
等待队列:
eventpoll也是一个文件句柄,因此也有等待队列,记录阻塞的线程,例如:图中线程A 执行了 epoll_wait 操作,被从工作队列中移除,然后被eventpoll 的等待队列引用; -
就绪队列:
当某个socket有事件发生时,中断处理程序就会把该socket加入到就绪队列,同时唤醒eventpoll 阻塞队列中的线程,此时线程只需要遍历就绪队列就可以知道哪个socket有事件发生,例如:图中sock2有数据到来时,中断处理程序先把sock2 放入就绪队列中,然后唤醒等待队列中的线程A,这时线程A 被重新加入工作队列中,等到CPU时间片轮询到线程A时,遍历就绪队列中的socket进行处理。
九、总结
select,poll,epoll都是IO多路复用机制,即可以同时监视多个文件描述符,一旦某个描述符就绪(读或写就绪),能够通知程序进行相应读写操作
- select,poll需要多次遍历文件描述符集合 fd_set,把阻塞线程放到每一个socket中,当某个描述符就绪时再把这些线程从socket阻塞队列中移除;而epoll 通过eventpoll作为中介,只需要把eventpoll放到socket 的阻塞队列中,当有描述符就绪时只需要遍历就绪队列,而不需要遍历所有fd;
- select,poll每次调用都要把整个fd_set集合从用户态往内核态拷贝一次,而epoll只要一次拷贝,这也能节省不少的开销。