IO多路复用

一、BIO

BIO(Blocking IO)阻塞IO,在读入输入流或者输出流时,在读写动作完成之前,线程会一直阻塞在那里。socket编程就是BIO,看个示例。
服务端:

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class IOServer {
    public static void main(String[] args) throws Exception {

        ServerSocket serverSocket = new ServerSocket(8000);

        // (1) 接收新连接线程
        new Thread(() -> {
            while (true) {
                try {
                    // (1) 阻塞方法获取新的连接
                    Socket socket = serverSocket.accept();
                    System.out.println("get a connection");
                    // (2) 每一个新的连接都创建一个线程,负责读取数据
                    new Thread(() -> {
                        try {
                            byte[] data = new byte[1024];
                            InputStream inputStream = socket.getInputStream();
                            while (true) {
                                int len;
                                // (3) 按字节流方式读取数据
                                while ((len = inputStream.read(data)) != -1) {
                                    System.out.println(new String(data, 0, len));
                                }
                            }
                        } catch (IOException e) {
                        }
                    }).start();

                } catch (IOException e) {
                }

            }
        }).start();
    }
}

客户端:

import java.io.IOException;
import java.net.Socket;
import java.util.Date;

public class IOClient {

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                Socket socket = new Socket("127.0.0.1", 8000);
                while (true) {
                    try {
                        socket.getOutputStream().write((new Date() + ": hello world").getBytes());
                        socket.getOutputStream().flush();
                        Thread.sleep(2000);
                    } catch (Exception e) {
                    }
                }
            } catch (IOException e) {
            }
        }).start();
    }
}

可以看到服务端是一个线程处理一个socket,socket.getInputStream()操作等待数据时会阻塞线程,具体是执行SocketImputStream的socketRead0方法时挂起线程了。大量被阻塞的线程就带来了资源的浪费,所以就需要一种一个线程处理多个连接的解决方式。

二、NIO

NIO(Nonblocking IO)就是用的IO多路复用的思想,即一个线程监听多个文件描述符,处理多个连接的能力。同样我们先看看jdk里的NIO栗子。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {
    public static void main(String[] args) throws IOException {
        Selector serverSelector = Selector.open();
        Selector clientSelector = Selector.open();

        new Thread(() -> {
            try {
                // 对应IO编程中服务端启动
                ServerSocketChannel listenerChannel = ServerSocketChannel.open();
                listenerChannel.socket().bind(new InetSocketAddress(8000));         // 端口
                listenerChannel.configureBlocking(false);                           // 非阻塞
                listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);

                while (true) {
                    // 监测是否有新的连接,这里的1指的是阻塞的时间为1ms
                    if (serverSelector.select(1) > 0) {
                        Set<SelectionKey> set = serverSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();

                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();

                            if (key.isAcceptable()) {
                                try {
                                    // (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
                                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                                    clientChannel.configureBlocking(false);
                                    clientChannel.register(clientSelector, SelectionKey.OP_READ);
                                } finally {
                                    keyIterator.remove();
                                }
                            }

                        }
                    }
                }
            } catch (IOException ignored) {
            }

        }).start();


        new Thread(() -> {
            try {
                while (true) {
                    // (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为1ms
                    if (clientSelector.select(1) > 0) {
                        Set<SelectionKey> set = clientSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();

                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();

                            if (key.isReadable()) {
                                try {
                                    SocketChannel clientChannel = (SocketChannel) key.channel();
                                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                                    // (3) 读取数据以块为单位批量读取
                                    clientChannel.read(byteBuffer);
                                    byteBuffer.flip();
                                    System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
                                            .toString());
                                } finally {
                                    keyIterator.remove();
                                    key.interestOps(SelectionKey.OP_READ);
                                }
                            }

                        }
                    }
                }
            } catch (IOException ignored) {
            }
        }).start();
    }
}

我们可以看到这个server端只用了2个线程来监听和处理连接。第一个线程将ServerSocketChannel注册到serverSelector里,然后serverSelector不断轮询去发现新的连接。一旦有新的连接(serverSelector.select(1) > 0),就把SelectionKey(连接的标志符)注册到clientSelector。在另一个线程里clientSelector不断轮询发现新的这些连接里有没有就绪的数据可读(clientSelector.select(1) > 0),有的话就轮询这些连接标志符SelectionKey,拿出channel,获取并处理数据。

可能有同学不太清楚channel的概念,我们java里的流可以分为输入流(InputStream)和输出流(OutputStream),输入流只有读操作,输出流只有写操作。这里的channel相当于结合了输入流和输出流,既能读又能写的一种流,不过Channel接口本身没提供读和写的方法,需要自己加。

Selector轮询器是jdk NIO的实现关键,它的select方法底层是用的系统调用select、poll或者epoll,对文件描述符(fd)的监听。windows下我们看jdk里有一个WindowsSelectorImpl,它的doSelect方法里是调用SubSelector的poll方法。openJDK里也有windows下的epoll实现,有兴趣可以看看。

三、select、poll和epoll

这三个是linux下的NIO模型,其实都是将需要监听的fd和需要监听的事件传递给内核,然后不断去获取那些已经就绪的fd。

3.1 select

select函数:
int select (int maxfd1,fd_set *readset,fd_set *writeset, fd_set *exceptset,const struct timeval * timeout);
即需要传给内核:

  1. 文件描述符1024长度的数组
  2. 需要监听的描述符状态(读、写、异常)
  3. 我们需要等待的时间
struct timeval {
  long tv_sec; // seconds
  long tv_usec; // microseconds
}

等待时间 timeout == null,等待无限长时间,即阻塞;
tv_sec == 0 && tv_usec == 0,不等待直接返回,即非阻塞;
否则就是等待指定时长。

过程:

  1. 进来N个连接,服务端创建N个ClientSocket
  2. 建立连接后,进程A创建fdset
  3. 进程A将fdset从用户态堆栈拷到内核态,然后阻塞,CPU运行进程B
  4. 客户端发送数据到网卡,DMA将数据从网卡拷贝到网卡缓冲区,网卡发起硬中断IRQ
  5. CPU响应中断,保存进程B的用户态堆栈信息,修改CPU寄存器,将堆栈指针指向内核态堆栈(切换到内核态),根据IRQ找到网卡中断处理程序,并执行
  6. 响应中断程序将数据从网卡缓冲区拷到socket缓冲区,进程A出队,到运行队列
  7. 进程A将select返回数据从内核态拷到用户态堆栈(切换到用户态)

3.2 poll

函数:

int poll(struct pollfd fds[], nfds_t nfds, int timeout)

struct pollfd {

    int fd; /*文件描述符*/

    short events; /* 等待的需要测试事件 */

    short revents; /* 实际发生了的事件,也就是返回结果 */

};

有水平触发的特点。
类似select,只是fd长度不受限,不多BB。

3.3 epoll

select和poll的缺点是文件描述符需要在用户空间和内核空间之间来回拷贝,epoll解决了这个问题,他在内核创建一块空间,然后直接操作内核空间设置需要监听的fd。
他有三个函数:

  1. 系统函数int epoll_create(int size);
    在内核开辟一块空间,返回epoll的文件描述符编号
  2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event);是epoll的事件注册函数。
    参数:
    epfd:epoll结构编号
    op: 请求类型EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL。
    fd:需要监听的文件描述符,一般指socket_fd。
    event: 感兴趣的事件EPOLLIN、EPOLLOUT、EPOLLPRI、EPOLLERR等
    epoll_event结构:
  struct epoll_event {
      _uint32_t events;   // Epoll events
      epoll_data_t data;  // User data variable
  }
  1. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout) 等待事件的产生,
    参数:
    epfd:epoll编号
    events: 函数返回时,内核把就绪状态的数据拷到该数组里
    maxevents: events最多能接受的数据量。
    timeout: 0:立即返回; -1: 阻塞直到事件就绪; >0: 等待指定时间。
    返回: 本次就绪的fd个数。
    工作模式:
    LT(水平触发):时间就绪后,用户可以选择处理或不处理,如果不处理,下次调用epoll_wait时会将本次未处理的事件打包给你。
    ET(边缘触发):时间就绪后,用户必须处理,内核把就绪时间给你后,就把对应的就绪事件清理掉了。减小了epoll时间重复触发的次数,因此效率比LE高。

过程:

  1. 进来N个连接,服务端创建N个ClientSocket
  2. 建立连接后,进程A创建fdset
  3. 调用epoll_create创建eventpoll
  4. 调用epoll_ctl将关心的socket加入到eventpoll内(以红黑树的结构存储)
  5. 调用epoll_wait挂起进程A,进程A跑到eventpoll的等待队列里
  6. 客户端发送数据到网卡,DMA将数据从网卡拷贝到网卡缓冲区,网卡发起硬中断IRQ
  7. CPU响应中断,保存进程B的用户态堆栈信息,修改CPU寄存器,将堆栈指针指向内核态堆栈(切换到内核态),根据IRQ找到网卡中断处理程序,并执行
  8. 响应中断程序将数据从网卡缓冲区拷到socket缓冲区,进程A从eventpoll的等待队列里出队,到运行队列
  9. 进程A将epoll返回数据从内核态拷到用户态堆栈(切换到用户态)

**对比: **

  • select的缺点在于
  1. 单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024,这点在poll中解决了。
  2. 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低。
  3. 要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。
  • epoll 优点
  1. 没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口)。

  2. 效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。

  3. 内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。

参考资料:

B站-IO多路复用底层原理全解
聊聊IO多路复用之select、poll、epoll详解

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,039评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,426评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,417评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,868评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,892评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,692评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,416评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,326评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,782评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,957评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,102评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,790评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,442评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,996评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,113评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,332评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,044评论 2 355

推荐阅读更多精彩内容