基础概念
同步和异步
描述的方法跟调用者间通信的方式,如果不需要调用者主动等待,调用者调用后立即返回,然后方法本身通过回调,消息通知等方式通知调用者结果,就是异步的。如果调用方法后一直需要调用者一直等待方法返回结果,那么就是同步的。
阻塞和非阻塞
描述的是调用者调用方法后的状态,比如:线程A调用了B方法,A线程处于阻塞状态。
下面用一个案例说明:小明烧开水
第一版烧水: 当小明将水倒入烧水壶之后,需要坐在沙发看着水是否烧开。因此该场就是同步阻塞的,小明无法去做其他事情,而且得一直看着水是否开了。
第二版烧水:当小明将水倒入烧水壶之后,小明就去干其他事情了,每个2分钟回来看一下水是否烧开,因此该场就是同步非阻塞的,小明处于非阻塞状态,但是他需要主动去观察水是否烧开
第三版烧水:当小明将水倒入烧水壶之后,小明就去干其他事情了,当水开时,烧水壶会发出尖锐的叫声,这是小明就回来处理了。因此该场景就是异步非阻塞的,小明可以忙自己的事情,因此他是非阻塞的。而且小明也不需要自己主动的观察水是否烧开,而是烧水壶提醒小明水开了,因此是异步的。
java 中的IO 模型
BIO 模型
同步并阻塞(传统阻塞模型),服务器实现模式为一个连接一个线程,即客户端有连接请求服务端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。
在客户端和服务器建立连接后,服务器启动一个线程去监听客户端的输入流,当客户端未输入任何数据,服务端这个线程处于阻塞状态,而且服务器是主动的等待客户端信息,所以是同步进行的。
NIO 模型
同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。
服务器线程不会阻塞等待某个客户端传输内容,而是轮询的进行处理,因此是非阻塞的。由于客户端的请求需要服务端主动去查询,因此是同步的。
NIO有三个核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)
-
Buffer
一个Buffer对象是固定数量的数据的容器。其作用是一个存储器,或者分段运输区,在这里数据可被存储并在之后用于检索。
-
Channel
通道(Channel)可以理解为数据传输的管道。通道与流不同的是,流只是在一个方向上移动(一个流必须是inputStream或者outputStream的子类),而通道可以用于读、写或者同时用于读写。
-
Selector
选择器提供选择执行已经就绪的任务的能力,这使得多元 I/O 成为可能,就绪选择和多元执行使得单线程能够有效率地同时管理多个 I/O 通道(channels)。
每个channel都会对应一个Buffer
Selector对应一个线程,一个线程对应多个channel(连接)
该图反应了有三个channel注册到了 selector
程序切换到哪个channel是由事件决定的
Selector会根据不同的事件,在各个通道上切换
Buffer就是一个内存块,底层是有一个数组
数据的读取写入是通过Buffer,这个和BIO不同,BIO中要么是输入流或者是输出流,不能双向,但是NIO的Buffer是可以读也可以写,需要flip方法切换
channel是双向的,可以反映底层操作系统的情况。比如Linux,底层操作系统通道就是双向的。
AIO 模型(NIO.2)
异步非阻塞,AIO引入异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后,才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用
下面用一个实例展示下NIO客户端和服务器,具体功能:客户端发送hello world, 服务端打印内容
服务器代码:
package nio.demo2;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
/**
* @author huangyichun
* @date 2020/9/11
*/
@Slf4j
public class NIOServer {
public static void main(String[] args) throws IOException {
//创建ServerSocketChannel ->ServerSocket
final ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到一个selector对象
final Selector selector = Selector.open();
//绑定一个端口 6666,在服务器监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//把serverSocketChannel注册到selector,关心事件为OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
log.info("注册后的selectionKey数量={}", selector.keys().size());
//循环等待
while(true) {
//这里我们等待1秒,如果没有事件发生,返回
if (selector.select(1000) == 0) {
log.info("服务器等待了1秒,无连接");
continue;
}
//如果返回的大于0,获取到相关的selectKey集合
//1.如果返回的大于0,标识已经获取到关注的时间
//2. selector.selectedKeys()返回关注事件的集合
final Set<SelectionKey> selectionKeys = selector.selectedKeys();
//遍历selectionKeys
selectionKeys.forEach(key -> {
//获取到selectionKey,根据key对应的通道发生的事件做相应的处理
if(key.isAcceptable()) {//如果是OP_ACCEPT,有新的客户端连接
//给该客户端生成一个socketChannel
try {
SocketChannel socketChannel = serverSocketChannel.accept();
log.info("客户端连接成功,socketChannel={}", socketChannel.hashCode());
socketChannel.configureBlocking(false);
//将客户端channel注册到selector上,关注时间为OP_READ,同时给该channel关联一个Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
log.info("客户端连接后,注册的selectionKey数量={}", selector.keys().size());
} catch (IOException e) {
e.printStackTrace();
}
}else if(key.isReadable()) {//发送OP_READ
//通过key反向获取到对应的channel
final SocketChannel channel = (SocketChannel)key.channel();
//获取到该channel关联的buffer
log.info("客户端连接成功,socketChannel={}", channel.hashCode());
ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
try {
channel.read(byteBuffer);
log.info("客户端发送的数据是:{}", new String(byteBuffer.array()));
} catch (IOException e) {
e.printStackTrace();
}
}
//从集合中移除当前的selectionKey,防止重复操作
selectionKeys.remove(key);
});
}
}
}
客户端代码:
package nio.demo2;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* @author huangyichun
* @date 2020/9/14
*/
@Slf4j
public class NIOClient {
public static void main(String[] args) throws IOException {
//得到一个通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//提供服务端的ip和端口
final InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
log.info("因为连接需要时间,客户端不会阻塞,可以做其他工作...");
}
}
//连接成功,就发送数据
String str = "hello world";
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
//发送数据,将buffer数据写入channel
socketChannel.write(buffer);
System.in.read();
}
}