阻塞和非阻塞的几点区别
在开始了解NIO之前,先看阻塞和非阻塞IO的几点区别
缓冲区
NIO中有一个缓冲区(以下统一称buffer)的概念,不论是读信息还是写信息,都是在buffer上进行操作;相比之下,BIO使用的是流(以下统一称stream)的概念,stream必须分读和写两种,通过装饰器模式生成stream后,数据直接写入stream或从stream中读取。
所以NIO是面向buffer的,而BIO是面向stream的;但socket总是面向stream的。
通道
NIO使用通道(以下统一称channel)来建立客户端和服务端之间的联系;BIO依然是stream(也就是说BIO没有区分管道和数据的不同)。Channel是双向的,既可以写也可以读,而stream还是必须分读或者写。
这里特别注意,由于channel是双向的,后面注册到多路复用器时,客户端channel(也就是SocketChannel对象)是作为READ类型注册,但其实也可以写操作。
Channel的类继承结构比较复杂,如下图所示。其中
-
InterruptibleChannel
是一个能够异步的关闭和捕获的channel(接口); -
SelectableChannel
是一个能够被多路复用器选择的channel(抽象类); -
ServerSocketChannel
是一个基于stream的监听socket的可被选择的channel; -
SocketChannel
是一个基于stream的连接socket的可被选择的channel。
注:#3是一个服务于监听socket的channel,#4是一个服务于连接socket的channel。
整体机制
个人理解,非阻塞IO的核心,是采用了一种和BIO完全不同的机制。服务端生成一个监听socket,以及客户端生成一个连接socket,都会新new一个线程(因为只需要一个线程即可,所以不用线程池),这个线程将当前的channel注册到一个地方。通过客户端或服务端的该线程(注意是独立的)的轮询,检查注册的socket,哪个socket准备好,比如可以读到数据或者可以连接,就做相应的操作。
这样做的好处是服务端或客户端都不会产生阻塞,由独立线程注册和处理监听事件,而原来的进程就不会一直等待,可以继续其它的操作,等待ready再由独立线程执行相关操作。
独立线程中的注册点称为多路复用器(以下统一称Selector),通过轮询注册其上的SelectionKey,查看当前连接是否可用,并做相关操作。所以Selector是NIO的核心。
NIO - Server端的实现
以下通过代码分析NIO对于同样业务逻辑的实现
Server的创建
try {
// selector是一个多路复用器实例
selector = Selector.open();
// servChannel是一个ServerSocketChannel对象,代表server通道
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
// 服务器channel启动一个socket绑定端口
servChannel.socket().bind(new InetSocketAddress(port), 1024);
// 这个服务端socket通道注册到selector上,监听可接受连接事件(这步最为关键!!!)
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
Server端的Selector的多路选择算法
这是一个单独的线程,实际是一个Selector的运行内容。其核心是获取选择器上所有的注册的key,看key的状态并做相应的操作。
while (!stop) {
try {
// 循环前休眠1秒
selector.select(1000);
// 获取所有注册到该selector上的key
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
// 轮询所有的key
while (it.hasNext()) {
key = it.next();
it.remove();
try {
// 根据key的类型做不同的处理
handleInput(key);
} catch (Exception e) {
// ...
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
以下为handleInput的处理算法。
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 该方法测试这个key的channel是否已经准备好接收新的socket连接
if (key.isAcceptable()) {
// 接受新的连接,注意此处key的channel一定是一个服务端socket通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// 接受后会返回当前和server通信的客户端socket通道
// 注意此处的channel也是服务端角度获取的客户端的socket,会从此channel获得客户端的request
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 将这个客户端socket通道注册到selector上,并监听可读事件
sc.register(selector, SelectionKey.OP_READ);
}
// 该方法测试这个key的channel是否已经可读
if (key.isReadable()) {
// 读取数据,注意此处key的channel是从服务端角度获取的客户端的socket
// 从下面的代码可以看出,从这个通道读取的内容,是客户端发给服务端的request
// 注意此处一定要理解清楚!!!
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
// 读取结果分3种情况,只有大于0表示读取到信息
if (readBytes > 0) {
// 缓冲区的limit设为0,相当于一次性读所有的内容
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
// 转码后获取request,即body的内容
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
// doWrite(sc, currentTime);
// doWrite方法用于将结果作为response返回给客户端,直接给出该方法内容
// doWrite method start
if (currentTime != null && currentTime.trim().length() > 0) {
byte[] bytes = currentTime.getBytes();
// 声明buffer并放入信息
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
// 设置limit为0,相当于一次性写所有的内容
writeBuffer.flip();
// 注意SocketChannel是既可以读,也可以写的
sc.write(writeBuffer);
}
// doWrite method end
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}
NIO - Client端的实现
Client的创建
try {
// 创建多路复用器
selector = Selector.open();
// 创建SocketChannel
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// 这个socket还没有连接,所以不需要注册到selector上
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
Client连接Server
try {
// doConnect(); 以下直接给出
// doConnect method start
// 尝试连接服务端,如果成功返回true
if (socketChannel.connect(new InetSocketAddress(host, port))) {
// 将这个客户端socket通道注册到selector上,并监听可读事件
socketChannel.register(selector, SelectionKey.OP_READ);
// doWrite(socketChannel); 以下直接给出
// doWrite method start
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
// 构造request,并写入缓冲区
socketChannel.write(writeBuffer);
if (!writeBuffer.hasRemaining()){
System.out.println("Send order 2 server succeed.");
}
// doWrite method end
} else {
// 没有连接成功,并不是连接失败,只是没有获得握手信息
// 将这个客户端socket通道注册到selector上,并监听连接事件,当服务器应答后,即可继续操作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
// doConnect method end
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
Client端的Selector的多路选择算法
在连接之后,该线程中的selector采用和server同样的策略,轮询SelectionKey,并在handleInput中进行处理,如下。
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 同server端的检查
SocketChannel sc = (SocketChannel) key.channel();
// 此处即对应connect方法返回false的情况(监听connect事件)
if (key.isConnectable()) {
// true表示握手过程完成
if (sc.finishConnect()) {
// 重新注册,监听读事件
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
} else
System.exit(1);// 连接失败,进程退出
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
// 通过读缓冲区,获取服务端的response,同样是3中情况,大于0为读到信息
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
// 此处会通知selector的轮询结束
this.stop = true;
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
}
}
【未完待续,下一部分开始Netty的实现及针对4.x的改造】