一、BIO
BIO(Blocking IO)阻塞IO,在读入输入流或者输出流时,在读写动作完成之前,线程会一直阻塞在那里。socket编程就是BIO,看个示例。
服务端:
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
public class IOServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(8000);
// (1) 接收新连接线程
new Thread(() -> {
while (true) {
try {
// (1) 阻塞方法获取新的连接
Socket socket = serverSocket.accept();
System.out.println("get a connection");
// (2) 每一个新的连接都创建一个线程,负责读取数据
new Thread(() -> {
try {
byte[] data = new byte[1024];
InputStream inputStream = socket.getInputStream();
while (true) {
int len;
// (3) 按字节流方式读取数据
while ((len = inputStream.read(data)) != -1) {
System.out.println(new String(data, 0, len));
}
}
} catch (IOException e) {
}
}).start();
} catch (IOException e) {
}
}
}).start();
}
}
客户端:
import java.io.IOException;
import java.net.Socket;
import java.util.Date;
public class IOClient {
public static void main(String[] args) {
new Thread(() -> {
try {
Socket socket = new Socket("127.0.0.1", 8000);
while (true) {
try {
socket.getOutputStream().write((new Date() + ": hello world").getBytes());
socket.getOutputStream().flush();
Thread.sleep(2000);
} catch (Exception e) {
}
}
} catch (IOException e) {
}
}).start();
}
}
可以看到服务端是一个线程处理一个socket,socket.getInputStream()操作等待数据时会阻塞线程,具体是执行SocketImputStream的socketRead0方法时挂起线程了。大量被阻塞的线程就带来了资源的浪费,所以就需要一种一个线程处理多个连接的解决方式。
二、NIO
NIO(Nonblocking IO)就是用的IO多路复用的思想,即一个线程监听多个文件描述符,处理多个连接的能力。同样我们先看看jdk里的NIO栗子。
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
public static void main(String[] args) throws IOException {
Selector serverSelector = Selector.open();
Selector clientSelector = Selector.open();
new Thread(() -> {
try {
// 对应IO编程中服务端启动
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
listenerChannel.socket().bind(new InetSocketAddress(8000)); // 端口
listenerChannel.configureBlocking(false); // 非阻塞
listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
while (true) {
// 监测是否有新的连接,这里的1指的是阻塞的时间为1ms
if (serverSelector.select(1) > 0) {
Set<SelectionKey> set = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) {
try {
// (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
clientChannel.register(clientSelector, SelectionKey.OP_READ);
} finally {
keyIterator.remove();
}
}
}
}
}
} catch (IOException ignored) {
}
}).start();
new Thread(() -> {
try {
while (true) {
// (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为1ms
if (clientSelector.select(1) > 0) {
Set<SelectionKey> set = clientSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) {
try {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3) 读取数据以块为单位批量读取
clientChannel.read(byteBuffer);
byteBuffer.flip();
System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
.toString());
} finally {
keyIterator.remove();
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
}
} catch (IOException ignored) {
}
}).start();
}
}
我们可以看到这个server端只用了2个线程来监听和处理连接。第一个线程将ServerSocketChannel注册到serverSelector里,然后serverSelector不断轮询去发现新的连接。一旦有新的连接(serverSelector.select(1) > 0),就把SelectionKey(连接的标志符)注册到clientSelector。在另一个线程里clientSelector不断轮询发现新的这些连接里有没有就绪的数据可读(clientSelector.select(1) > 0),有的话就轮询这些连接标志符SelectionKey,拿出channel,获取并处理数据。
可能有同学不太清楚channel的概念,我们java里的流可以分为输入流(InputStream)和输出流(OutputStream),输入流只有读操作,输出流只有写操作。这里的channel相当于结合了输入流和输出流,既能读又能写的一种流,不过Channel接口本身没提供读和写的方法,需要自己加。
Selector轮询器是jdk NIO的实现关键,它的select方法底层是用的系统调用select、poll或者epoll,对文件描述符(fd)的监听。windows下我们看jdk里有一个WindowsSelectorImpl,它的doSelect方法里是调用SubSelector的poll方法。openJDK里也有windows下的epoll实现,有兴趣可以看看。
三、select、poll和epoll
这三个是linux下的NIO模型,其实都是将需要监听的fd和需要监听的事件传递给内核,然后不断去获取那些已经就绪的fd。
3.1 select
select函数:
int select (int maxfd1,fd_set *readset,fd_set *writeset, fd_set *exceptset,const struct timeval * timeout);
即需要传给内核:
- 文件描述符1024长度的数组
- 需要监听的描述符状态(读、写、异常)
- 我们需要等待的时间
struct timeval {
long tv_sec; // seconds
long tv_usec; // microseconds
}
等待时间 timeout == null,等待无限长时间,即阻塞;
tv_sec == 0 && tv_usec == 0,不等待直接返回,即非阻塞;
否则就是等待指定时长。
过程:
- 进来N个连接,服务端创建N个ClientSocket
- 建立连接后,进程A创建fdset
- 进程A将fdset从用户态堆栈拷到内核态,然后阻塞,CPU运行进程B
- 客户端发送数据到网卡,DMA将数据从网卡拷贝到网卡缓冲区,网卡发起硬中断IRQ
- CPU响应中断,保存进程B的用户态堆栈信息,修改CPU寄存器,将堆栈指针指向内核态堆栈(切换到内核态),根据IRQ找到网卡中断处理程序,并执行
- 响应中断程序将数据从网卡缓冲区拷到socket缓冲区,进程A出队,到运行队列
- 进程A将select返回数据从内核态拷到用户态堆栈(切换到用户态)
3.2 poll
函数:
int poll(struct pollfd fds[], nfds_t nfds, int timeout)
struct pollfd {
int fd; /*文件描述符*/
short events; /* 等待的需要测试事件 */
short revents; /* 实际发生了的事件,也就是返回结果 */
};
有水平触发的特点。
类似select,只是fd长度不受限,不多BB。
3.3 epoll
select和poll的缺点是文件描述符需要在用户空间和内核空间之间来回拷贝,epoll解决了这个问题,他在内核创建一块空间,然后直接操作内核空间设置需要监听的fd。
他有三个函数:
- 系统函数int epoll_create(int size);
在内核开辟一块空间,返回epoll的文件描述符编号 - int epoll_ctl(int epfd, int op, int fd, struct epoll_event * event);是epoll的事件注册函数。
参数:
epfd:epoll结构编号
op: 请求类型EPOLL_CTL_ADD、EPOLL_CTL_MOD、EPOLL_CTL_DEL。
fd:需要监听的文件描述符,一般指socket_fd。
event: 感兴趣的事件EPOLLIN、EPOLLOUT、EPOLLPRI、EPOLLERR等
epoll_event结构:
struct epoll_event {
_uint32_t events; // Epoll events
epoll_data_t data; // User data variable
}
- int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout) 等待事件的产生,
参数:
epfd:epoll编号
events: 函数返回时,内核把就绪状态的数据拷到该数组里
maxevents: events最多能接受的数据量。
timeout: 0:立即返回; -1: 阻塞直到事件就绪; >0: 等待指定时间。
返回: 本次就绪的fd个数。
工作模式:
LT(水平触发):时间就绪后,用户可以选择处理或不处理,如果不处理,下次调用epoll_wait时会将本次未处理的事件打包给你。
ET(边缘触发):时间就绪后,用户必须处理,内核把就绪时间给你后,就把对应的就绪事件清理掉了。减小了epoll时间重复触发的次数,因此效率比LE高。
过程:
- 进来N个连接,服务端创建N个ClientSocket
- 建立连接后,进程A创建fdset
- 调用epoll_create创建eventpoll
- 调用epoll_ctl将关心的socket加入到eventpoll内(以红黑树的结构存储)
- 调用epoll_wait挂起进程A,进程A跑到eventpoll的等待队列里
- 客户端发送数据到网卡,DMA将数据从网卡拷贝到网卡缓冲区,网卡发起硬中断IRQ
- CPU响应中断,保存进程B的用户态堆栈信息,修改CPU寄存器,将堆栈指针指向内核态堆栈(切换到内核态),根据IRQ找到网卡中断处理程序,并执行
- 响应中断程序将数据从网卡缓冲区拷到socket缓冲区,进程A从eventpoll的等待队列里出队,到运行队列
- 进程A将epoll返回数据从内核态拷到用户态堆栈(切换到用户态)
**对比: **
- select的缺点在于
- 单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024,这点在poll中解决了。
- 对socket进行扫描时是线性扫描,即采用轮询的方法,效率较低。
- 要维护一个用来存放大量fd的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大。
- epoll 优点
没有最大并发连接的限制,能打开的FD的上限远大于1024(1G的内存上能监听约10万个端口)。
效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;即Epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关,因此在实际的网络环境中,Epoll的效率就会远远高于select和poll。
内存拷贝,利用mmap()文件映射内存加速与内核空间的消息传递;即epoll使用mmap减少复制开销。