同步与异步的概念
同步I/O
每个请求必须逐个地被处理,一个请求的处理可能导致整个流程的暂时等待,这些事件无法并发地执行。用户线程发起I/O请求后需要等待或者轮询内核I/O操作完成后才能继续执行
举个栗子:
一堆人排在等着看病,医生只能一个一个的看,前面的病人没看完,后面的病人只能等着。如果正在看病的病人需要去做CT,这个医生也会一直等待着,直到病人做完CT回来,诊断完毕才会给下一个病人看病。
异步I/O
多个请求可以并发地执行,一个请求或者任务的执行不会导致整个流程的暂时等待。用户线程发起I/O请求后仍然继续执行,当内核I/O操作完成后会通知用户线程,或者调用用户线程注册的回调函数
举个栗子:
还是看病的例子。医生发现,这样死板的工作方式效率比较低,于是决定改变这种方式。当看病的人需要去做CT的时候,给下一个等待的病人看病。前一个病人拍完CT后,让护士通知他,他再继续给他诊断,这样就充分的利用工作时间
阻塞与非阻塞
阻塞
某个请求发出后,由于该请求操作需要的条件不满足,请求操作一直阻塞,不会返回,直到条件满足
非阻塞
请求发出后,若该请求需要的条件不满足,则立即返回一个标志信息告知条件不满足,而不会一直等待。一般需要通过循环判断请求条件是否满足来获取请求结果。
同步/IO、异步I/O与阻塞、非阻塞是IO模型中两个不同维度的描述。
同步和异步描述的是多个任务执行过程中,后发起的任务是否必须等待先发起的任务完成之后再进行。而不管先发起的任务请求是阻塞等待完成,还是立即返回通过循环等待成功。
而阻塞和非阻塞描述的是请求的方法是否立即返回(或者说是否在条件不满足时被阻塞)
Unix下五中I/O模型
- 阻塞I/O
- 非阻塞I/O
- I/O多路复用(select和poll)
- 信号驱动I/O(SIGIO)
- 异步I/O(Posix.1的aio_系统函数)
通常一个socket上的读操作包含两个阶段:
1:等待数据准备好
2:将数据从内核拷贝到进程中
阻塞I/O
如前面所述,阻塞请求无法立即完成则保持阻塞。默认情况下,Linux下的所有socket都是阻塞I/O。阻塞I/O分为如下两个阶段
- 阶段1:等待数据就绪。网络I/O的情况就是等待远端数据陆续抵达,磁盘I/O的情况就是等待磁盘数据从磁盘上读取到内核态内存中
- 阶段2:数据拷贝。出于系统安全,用户态的程序没有权限直接读取内核态内存,因此内核负责把内核态内存中的数据拷贝一份到用户态内存中
非阻塞I/O
非阻塞I/O请求包含以下三个阶段
- sockect设置NONBLOCK(非阻塞)就是告诉内核,当所请求的I/O操作无法完成时,不要讲线程睡眠耳塞返回一个错误码(EWOULDBLOCK),这样请求就不会阻塞了
- I/O操作函数将不断的测试数据是否已经准备好,一直测试,直到数据准备好。整个I/O请求的过程中,虽然用户线程每次发起I/O请求后可以立即返回,但是为了等到数据,扔需要不断地轮旋、重复请求,消耗了大量的Cpu资源
- 数据准备好了,从内核拷贝到用户空间
一般很少直接使用这种模型,而是在其他I/O模型中使用非阻塞I/O 这一特性。这种方式对单个I/O 请求意义不大,但给I/O多路复用提供了条件
I/O多路复用(异步阻塞I/O)
I/O多路复用会用到select或者poll函数,这两个函数也会使线程阻塞,但是和阻塞I/O所不同的是,这两个函数可以同属阻塞多个I/O操作。而且可以同时对多个读操作,多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数。
从流程上来看,使用select函数进行I/O请求和同步阻塞模型没有太大区别,甚至还多了添加监视Channel,以及调用select函数的额外操作,增加了额外工作。但是,使用select后最大的优势是用户可以在一个线程内同时处理多个Channel的I/O请求。用户可以注册多个Channel,然后不断地调用select读取被激活的Channel,即可达到在同一个线程内同时处理多个I/O请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的
使用select/poll该方法用一个用户态线程负责轮询多个Channel,直到某个阶段1的数据就绪,再通知实际用户线程执行阶段2的拷贝。通过一个专职的用户态线程执行非阻塞I/O轮询,模拟实现了阶段一的异步化
信号驱动I/O(SIGIO)
通过调用sigaction注册信号函数,等内核数据准备好的时候系统中断当前程序,执行信号函数(在这里面调用recv)
异步I/O
调用aio_read函数,告诉内核描述字,缓冲区指针,缓冲区大小,文件偏移以及通知方式,然后立即返回。当内核数据拷贝到缓冲区后,再通知应用程序。所以异步I/O模式下,阶段1和阶段2全包由内核完成,不需要用户线程参与
总结
IO分两阶段:
- 1.数据准备阶段
- 2.内核空间复制回用户进程缓冲区阶段
一般来讲:阻塞IO模型、非阻塞IO模型、IO复用模型(select/poll/epoll)、信号驱动IO模型都属于同步IO,因为阶段2是阻塞的(尽管时间很短)。只有异步IO模型是符合POSIX异步IO操作含义的,不管在阶段1还是阶段2都可以干别的事
Java中四种I/O模型
除了信号驱动I/O外,Java对其他四中I/O模型都有支持。
- Bio(阻塞I/O)
- Nio(非阻塞I/O,Reactor模式即是多路复用I/O)
- Aio(Proactor模式的异步I/O)
阻塞I/O服务器的实现
单线程,逐条处理请求的模式
public class MyServer {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(8008); //创建一个服务端且端口为8008
while (true){ //循环监听
Socket client = server.accept(); //服务端监听到一个客户端请求
System.out.println(client.getRemoteSocketAddress()+"地址的客户端连接成功!");
//创建字符缓存输入流
BufferedReader bufferedReader = null;
//创建字符写入流
PrintWriter printWriter = null;
try {
bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
printWriter = new PrintWriter(client.getOutputStream(),true);
String inputLine = null;
while((inputLine = bufferedReader.readLine()) != null) {
printWriter.println(inputLine);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
bufferedReader.close();
printWriter.close();
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
为每个请求创建一个线程
上面的模式,同一个时间只能处理一个请求,等待IO的时候浪费了Cpu,无法充分利用多Cpu的优势。下面使用多线程的方式对阻塞I/O模式进行改变,每个请求交给一个线程处理
public class IOServerMultiThread {
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(8008); // 创建一个服务端且端口为8008
while (true) { // 循环监听
Socket client = server.accept(); // 服务端监听到一个客户端请求
new Thread() {
@Override
public void run() {
System.out.println(client.getRemoteSocketAddress() + "地址的客户端连接成功!");
// 创建字符缓存输入流
BufferedReader bufferedReader = null;
// 创建字符写入流
PrintWriter printWriter = null;
try {
bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
printWriter = new PrintWriter(client.getOutputStream(), true);
String inputLine = null;
while ((inputLine = bufferedReader.readLine()) != null) {
printWriter.println(inputLine);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
bufferedReader.close();
printWriter.close();
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}.start();
}
}
}
使用线程池
上面的例子虽然通过多线程增加了性能,但是每个请求会创建一个线程,创建线程会耗资源,这样无限制的创建线程,不仅是资源浪费,更会导致系统崩溃。我们这次使用线程池来进行线程的管理及复用
public class MultThreadsServer {
//创建一个线程池
private static ExecutorService executorService = Executors.newCachedThreadPool();
//一旦有新的客户端请求,创建这个线程进行处理
private static class HandleMsg implements Runnable{
Socket client;
public HandleMsg(Socket client) {
this.client = client;
}
public void run() {
//创建字符缓存输入流
BufferedReader bufferedReader = null;
//创建字符写入流
PrintWriter printWriter = null;
try {
bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
printWriter = new PrintWriter(client.getOutputStream(),true);
String inputLine = null;
long a = System.currentTimeMillis();
while((inputLine = bufferedReader.readLine()) != null) {
printWriter.println(inputLine);
}
long b = System.currentTimeMillis();
System.out.println("此线程花费了:"+(b-a)+"秒!");
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
bufferedReader.close();
printWriter.close();
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
public static void main(String[] args) throws IOException {
ServerSocket server = new ServerSocket(8686); //创建一个服务端且端口为8686
Socket client = null;
while (true){ //循环监听
client = server.accept(); //服务端监听到一个客户端请求
System.out.println(client.getRemoteSocketAddress()+"地址的客户端连接成功!");
executorService.submit(new HandleMsg(client)); //将该客户端请求通过线程池放入HandlMsg线程中进行处理
}
}
}
Reactor模式
精典Reactor模式
在Reactor模式中,包含如下角色
- Reactor 将I/O事件发派给对应的Handler
- Acceptor 处理客户端连接请求
- Handlers 执行非阻塞读/写
最简单的Reactor模式实现代码如下所示
public class NioServer {
public static void main(String[] args) {
try {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress("127.0.0.1", 8008));
Selector selector = Selector.open();
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
int key = selector.select();
if(key == 0) {
continue;
}
Iterator<SelectionKey> ite =selector.selectedKeys().iterator();
while(ite.hasNext()) {
SelectionKey sk = ite.next();
ite.remove();
if(sk.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) sk.channel();
SocketChannel channel = acceptServerSocketChannel.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}else if(sk.isReadable()){
SocketChannel channel = (SocketChannel)sk.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
long bytesRead = channel.read(byteBuffer);
while (bytesRead > 0) {
byteBuffer.flip();
byte[] data = byteBuffer.array();
String info = new String(data).trim();
System.out.println("从客户端发送过来的消息是:" + info);
byteBuffer.clear();
bytesRead = channel.read(byteBuffer);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
从上示代码中可以看到,多个Channel可以注册到同一个Selector对象上,实现了一个线程同时监控多个请求状态(Channel)。同时注册时需要指定它所关注的事件,例如上示代码中socketServerChannel对象只注册了OP_ACCEPT事件,而socketChannel对象只注册了OP_READ事件。
selector.select()是阻塞的,当有至少一个通道可用时该方法返回可用通道个数。同时该方法只捕获Channel注册时指定的所关注的事件
从上示代码中可以看到,多个Channel可以注册到同一个Selector对象上,实现了一个线程同时监控多个请求状态(Channel)。同时注册时需要指定它所关注的事件,例如上示代码中socketServerChannel对象只注册了OP_ACCEPT事件,而socketChannel对象只注册了OP_READ事件。
selector.select()是阻塞的,当有至少一个通道可用时该方法返回可用通道个数。同时该方法只捕获Channel注册时指定的所关注的事件
使用多线程处理的Reactor模式
将处理器的执行放入线程池,多线程进行业务处理。但Reactor仍为单个线程
public class MulThreadsServer {
private static ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.bind(new InetSocketAddress("127.0.0.1", 8008));
server.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
int k = selector.select();
if(k == 0) {
continue;
}
Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
while(ite.hasNext()) {
SelectionKey key = ite.next();
ite.remove();
if(key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}else if(key.isReadable()){
//提交線程池處理
executorService.submit(()->{
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel channel = (SocketChannel) key.channel();
int count = channel.read(buffer);
if (count < 0) {
channel.close();
key.cancel();
return null;
} else if (count == 0) {
return null;
}
System.out.println("从客户端发送过来的消息是:"+new String(buffer.array()));
return null;
});
}
}
}
}
}
多Reactor
继续改进:对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。
public class MyNioServer {
private Selector selector;
private final static int port = 8008;
private MyProcessor processor;
private void initServer() throws Exception {
this.selector = Selector.open();
ServerSocketChannel channel = ServerSocketChannel.open();
channel.configureBlocking(false);
channel.socket().bind(new InetSocketAddress("127.0.0.1", port));
channel.register(selector, SelectionKey.OP_ACCEPT);
processor = new MyProcessor();
processor.start();
while(true) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
if(key.isAcceptable()) {
doAccept(key);
}
iterator.remove();
}
}
}
public void doAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
System.out.println("ServerSocketChannel正在循环监听");
SocketChannel clientChannel = serverChannel.accept();
clientChannel.configureBlocking(false);
clientChannel.socket().setTcpNoDelay(true);
clientChannel.socket().setKeepAlive(true);
processor.accept(clientChannel);
// clientChannel.register(key.selector(),SelectionKey.OP_READ);
}
public static void main(String[] args) throws IOException {
MyNioServer myNioServer = new MyNioServer();
try {
myNioServer.initServer();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class MyProcessor extends Thread{
private final static int BUF_SIZE = 10240;
private Queue<SocketChannel> newConnections = new ConcurrentLinkedQueue<SocketChannel>();
private Queue<SocketChannel> connections = new ConcurrentLinkedQueue<SocketChannel>();
private Selector selector;
private final Queue<String> writeRequestQueue = new LinkedBlockingDeque<String>();
public MyProcessor() {
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
public void accept(SocketChannel socketChannel) throws IOException {
newConnections.add(socketChannel);
selector.wakeup();
}
@Override
public void run() {
while(true) {
try {
// System.out.println("MyProcessor 正在待命");
configureNewConnections();
updateTrafficMask();
int k = selector.select(1000);
if(k == 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
System.out.println(key.interestOps());
iterator.remove();
if(key.isReadable()) {
doRead(key);
}else if(key.isWritable()) {
doWrite(key);
}else if(key.isConnectable()) {
System.out.println("连接成功!");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void doRead(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
long bytesRead = clientChannel.read(byteBuffer);
if (bytesRead>0){
byteBuffer.flip();
byte[] data = byteBuffer.array();
String info = new String(data).trim();
System.out.println("从客户端发送过来的消息是:"+info);
byteBuffer.clear();
bytesRead = clientChannel.read(byteBuffer);
// clientChannel.register(selector, SelectionKey.OP_WRITE);
this.writeRequestQueue.add(info);
this.connections.add(clientChannel);
}
if (bytesRead==-1){
clientChannel.close();
}
}
public void updateTrafficMask() {
if(!this.writeRequestQueue.isEmpty()) {
try {
this.connections.poll().register(selector, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
}
public void doWrite(SelectionKey key) throws IOException {
if(this.writeRequestQueue.isEmpty()) {
return ;
}
ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
byte[] bts = this.writeRequestQueue.poll().getBytes();
System.out.println(bts.length);
byteBuffer.put(bts);
byteBuffer.flip();
SocketChannel clientChannel = (SocketChannel) key.channel();
clientChannel.write(byteBuffer);
clientChannel.register(selector, SelectionKey.OP_READ);
}
public void configureNewConnections() {
while(!newConnections.isEmpty()) {
SocketChannel socketChannel = newConnections.poll();
try {
System.out.println("注册OP_READ事件");
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
}
}