NIO简介
*NIO到底是什么的简称?有人喜欢称之为New IO,因为它相对于以前的IO是新增的,所以官方称之为New IO。但是,由于之前的IO类库是阻塞的,New IO就是要让Java能够支持非阻塞IO,所以,也有人喜欢称之为Non-block IO。 *
1.缓冲区Buffer
Buffer 是一个对象, 它包含一些要写入或者刚读出的数据。 在 NIO 中加入 Buffer 对象,体现了新库与原 I/O 的一个重要区别。在面向流的 I/O 中,您将数据直接写入或者将数据直接读到 Stream 对象中。
在 NIO 库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的。在写入数据时,它是写入到缓冲区中的。任何时候访问 NIO 中的数据,您都是将它放到缓冲区中。
缓冲区实质上是一个数组。通常它是一个字节数组,但是也可以使用其他种类的数组。但是一个缓冲区不 仅仅 是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。
最常用的缓冲区类型是 ByteBuffer。一个 ByteBuffer 可以在其底层字节数组上进行 get/set 操作(即字节的获取和设置)。
2.通道Channel
Channel是一个通道,可以通过它读取与写入数据,它就像自来水管一样,网络数据通过Channel读取和写入。通道与流的不同之处在于通道是双向的,流只是在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而且通道可以用于读、写或者同时用于读写。
因为Channel是全双工的,所以它可以比流更好的映射底层操作系统的API。特别是在UNIX网络编程模型中,底层操作系统的通道都是全双工的,同时支持读写操作。
3.多路复用器Selector
多路复用器提供选择已经就绪的任务的能力。简单来讲,Selector会不断地轮询注册在其上的Channel,如果某个channel上有新的TCP连接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。
一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用epool()代替传统的select实现,所以它并没有最大连续句柄1024/2048的限制。这也就意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。
NIO服务端序列图
一般流程
打开ServerSocketChannel,用于监听客户端的链接
ServerSocketChannel acceptor = ServerSocketChannel.open();
绑定监听端口,设置连接为非阻塞模式
int port = 8080;
acceptor.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"),port));
acceptor.configureBlocking(false);
创建Reactor线程,创建多路复用器并启动线程
Selector selector = Selector.open();
new Thread(new ReactorTask()).start();
将ServerSocketChannel注册到Reactor线程的多路复用器Selcetor上
SelectionKey key = acceptor.register(selector,SelectionKey.OP_ACCEPT,ioHandler);
轮询
int num = selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keys = selectedKeys.iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
//doWhat
}
新的客户端接入
SocketChannel sc = serverChannel.accept();
设置位非阻塞模式
sc.configureBlocking(false);
sc.socket().setReuseAddress(true);
将新接入的客户端连接注册到Reactor上的多路复用器
SelectionKey key = sc.register(selector,SelectionKey.OP_READ);
异步读取客户端消息到缓冲区
int number = sc.read(receivedBuffer);
最后读取bytebuffer
while(buffer.hasRemain){
writeBuffer();
}
TimeServer示例
MultiplexerTimeServer.class
/**
* used to test nio
* Created by spark on 10/14/16.
*/
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean stop;
/**
* 初始化多路复用器,绑定监听端口
* @param port
*/
public MultiplexerTimeServer(int port) {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
//设为异步非阻塞
serverSocketChannel.configureBlocking(false);
//backlog设为1024
serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port:" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
/**
* 根据key的操作位获取网络事件的类型 TCP三次握手过程
* @param key
* @throws IOException
*/
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
//通过ByteBuffer读取客户端的请求信息 开辟1K的缓冲区
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes > 0){
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
System.out.println("The time server received order : " + body );
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(sc,currentTime);
}else if(readBytes < 0){
key.cancel();
sc.close();
}else{
}
}
}
}
/**
* 通过ByteBuffer将应答消息异步发送给客户端
* @param socketChannel
* @param response
* @throws IOException
*/
private void doWrite(SocketChannel socketChannel,String response) throws IOException {
if(response != null && response.trim().length() > 0){
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
socketChannel.write(writeBuffer);
}
}
@Override
public void run() {
//遍历selector,间隔为1s
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
//有就绪状态的Channel时,selector返回就绪状态的Channel的SelectionKey集合,通过对就绪状态的Channel集合进行迭代,进行异步读写操作
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (IOException e) {
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
TimeServer.class
public class TimeServer {
public static void main(String[] args) {
int port = 8080;
if(args != null && args.length > 0){
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
}
}
NIO客户端序列图
一般流程
打开SocketChannel,绑定客户端本地地址
SocketChannel clientChannel = SocketChannel.open();
设置SocketChannel为非阻塞模式
clientChannel.configureBlocking(false);
clientChannel.socket().setReuseAddress(true);
clientChannel.socket().setReceiveBufferSize(BUFFER_SIZE);
clientChannel.socket().setSendBufferSize(BUFFER_SIZE);
异步连接服务端
boolean connected = clientChannel.connect(new InetSocketAddress("ip",port));
判断 注册
if(connected){
clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);
}else{
clientChannel.register(selector,SelectionKey.OP_CONNECT,ioHandler);
}
创建Reactor线程,创建多路复用器并启动线程
Selector selector = Selector.open();
new Thread(new ReactorTask()).start();
轮询
int num = selector.select();
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keys = selectedKeys.iterator();
while(keys.hasNext()){
SelectionKey key = keys.next();
}
接受connect事件进行处理
if(key.isConnectable()){
//handleConnect
}
连接成功,注册读事件
if(clientChannel.finishConnect()) registerRead();
异步读和消息读取
int number = sc.read(receivedBuffer);
while(buffer.hasRemain){
}
TimeClient示例
TimeClientHandler.class
public class TimeClientHandler implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandler(int port, String host) {
this.port = port;
this.host = host == null ? "127.0.0.1" : host;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
private void handleInput(SelectionKey key) throws IOException {
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect()){
sc.register(selector,SelectionKey.OP_READ);
doWrite(sc);
}else{
System.exit(1);
}
}
if(key.isReadable()){
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("Now is " + body);
this.stop = true;
}else if(readBytes < 0){
key.cancel();
sc.close();
}else{
}
}
}
}
private void doWrite(SocketChannel sc) throws IOException {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if(!writeBuffer.hasRemaining()){
System.out.println("Send order 2 server succeed.");
}
}
private void doConnect() throws IOException {
if(socketChannel.connect(new InetSocketAddress(host,port))){
socketChannel.register(selector,SelectionKey.OP_READ);
doWrite(socketChannel);
}else{
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}
@Override
public void run() {
try {
doConnect();
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (IOException e) {
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
TimeClient.class
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
if(args != null && args.length > 0){
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
new Thread(new TimeClientHandler(port,"127.0.0.1"),"TimeClient-001").start();
}
}
AIO
NIO2.0引入了新的异步通道概念,并提供了异步文件通道和异步套接字通道的实现。
异步通道提供两种方式获取操作结果
- 通过java.util.concurrent.Feature类来表示异步操作的结果;
- 在执行异步操作的时候传入一个java.nio.channels。
CompletionHandler接口的实现类作为操作完成的回调。
AsyncTimeServerHandler.class
public class AsyncTimeServerHandler implements Runnable {
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AsyncTimeServerHandler(int port) {
this.port = port;
try {
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); // 创建一个异步服务端通道。
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));// bind 一个监听端口
System.out.println("The time server is start in port : " + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1); // 在完成一组正在执行的操作之前,允许当前的线程一直阻塞。
doAccept();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void doAccept() {
asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());// 处理接受消息的通知。
}
}
AcceptCompletionHandler.class
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncTimeServerHandler> {
@Override
public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
attachment.asynchronousServerSocketChannel.accept(attachment, this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, buffer, new ReadCompletionHandler(result));
}
@Override
public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}
}
ReadCompletionHandler.class
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public ReadCompletionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null)
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] body = new byte[attachment.remaining()];
attachment.get(body);
try {
String req = new String(body, "UTF-8");
System.out.println("The time server receive order : " + req);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(
System.currentTimeMillis()).toString() : "BAD ORDER";
doWrite(currentTime);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void doWrite(String currentTime) {
if (currentTime != null && currentTime.trim().length() > 0) {
byte[] bytes = (currentTime).getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
// 如果没有发送完成,继续发送
if (buffer.hasRemaining())
channel.write(buffer, buffer, this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}
}
TimeServer.class
public class TimeServer {
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
}
}
AsyncTimeClientHandler.class
public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable{
private AsynchronousSocketChannel client;
private String host;
private int port;
private CountDownLatch latch;
public AsyncTimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
client = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void completed(Void result, AsyncTimeClientHandler attachment) {
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
client.write(writeBuffer, writeBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
client.write(buffer, buffer, this);
} else {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
client.read(
readBuffer,
readBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes, "UTF-8");
System.out.println("Now is : " + body);
latch.countDown();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
client.close();
latch.countDown();
} catch (IOException e) {
// ingnore on close
}
}
});
}
@Override
public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
exc.printStackTrace();
try {
client.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
latch = new CountDownLatch(1);
client.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
TimeClient.class
public class TimeClient {
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler-001").start();
}
}