基本概念
Java NIO(Native IO) 是JDK1.4 开始提供的新的API。为所有原始类型提供Buffer缓存,字符集编码等解决方案。它通过一个本地的DirectBuffer来直接为APP分配内存,避免JVM参与其中。通过这样的方式来提高效率
NIO模型
NIO是一个同步非阻塞的线程模型,同步是指线程不断轮询IO事件是否就绪,非阻塞是指线程在等待IO就绪之前,可以做其他的任务。同步核心为Selector
,Selector 代替了线程对IO事件的轮询,避免了阻塞同时减少了线程的消耗。非阻塞的核心是通道(Channel)和缓冲区(Buffer),当IO事件就绪时,就可以将数据写进缓冲区,再由缓冲区交给线程。从而保证
IO的成功
NIO的三个核心组件
- Channels : 通道是I/O传输过程中需要通过的入口,缓冲区是这些数据的传输的来源或者目的地
- 在NIO中如果想要将Data传输到目的地的Buffer中,则首先需要传输到目的地的Channel中。
- 然后目的地Buffer再从属于自己的Channel中取出数据放置到Buffer中
- Buffers:
- 缓冲区,可以理解为DirectBuffer区域向线程输出数据的缓冲地带。
- 当IO可用时而且数据到达时,可以预先写入缓冲区。通过这样的方式,线程就不需要特意的等待IO
- Selectors: 顾名思义就是用来调度各个IO事件,Selector允许单线程处理多个Channel
- 首先向Selector注册Channel
- 调用其select方法。这个方法会阻塞到某个注册的通道有时间就绪(轮询)
- 坚挺Selector感兴趣的四个事件
OP_ACCEPT
,OP_CONNECT
,OP_READ
,OP_WRITE
虽然Java NIO 中除此之外还有很多类和组件,但在我看来,Channel,Buffer 和 Selector 构成了核心的API。其它组件,如Pipe和FileLock,只不过是与三个核心组件共同使用的工具类。
基本使用
基于NIO的一个Server
Server 端
public static void main(String[] args){
try {
//开启一个Selector
Selector selector = Selector.open();
//Server端的Channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost",8083);
//监听 localhost:8083 端口
serverSocketChannel.bind(inetSocketAddress);
//设置为非阻塞的模式
serverSocketChannel.configureBlocking(false);
//serverSocket 只能接收 新的连接,所以这个只会返回 {@link SelectionKey#OP_ACCEPT}.
int ops = serverSocketChannel.validOps();
//将Channel注册到selector 上,返回的SelectionKey 标识着这个Channel的状态,感兴趣的Event,可用的event等
SelectionKey selectionKey = serverSocketChannel.register(selector,ops);
while (true){
System.out.println("Waiting for the Connections come");
//将 准备进行IO操作的Channel集合返回。这是个阻塞的方法,当至少有一个Channel ready的时候才会返回!
selector.select();
//
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.stream().forEach(key ->{
try {
//如果IO的Connector是可用的,那么
if(key.isAcceptable()){
//接收一个来自客户端的请求,
SocketChannel socketClientChannel = serverSocketChannel.accept();
//因为这个已经设置了非阻塞式的,所以这里的连接可能会是null
if (socketClientChannel != null){
socketClientChannel.configureBlocking(false);
//将selector的OP_READ 注册为clientSocket
socketClientChannel.register(selector, SelectionKey.OP_READ);
}
}else if(key.isReadable()){
//因为我们前面将ClientChannel 注册到OP_READ上面,所有这里的SelectorKey的channel一定是Client
SocketChannel crunchifyClient = (SocketChannel) key.channel();
if (crunchifyClient == null){
log("no connections valid sleep 100ms ");
Thread.sleep(100);
}
ByteBuffer crunchifyBuffer = ByteBuffer.allocate(1024);
//将Channel将Channel 中的数据写入到Buffer中。。
crunchifyClient.read(crunchifyBuffer);
String result = new String(crunchifyBuffer.array()).trim();
log("Message received: " + result);
crunchifyClient.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static void log(String s) {
System.out.println(s);
}
}
Cilent 端
public class NIOClient implements Runnable {
public NIOClient(String connectID) {
this.connectID = connectID;
}
private String connectID;
public static void main(String[] args) {
for (int count = 0; count < 50;count++){
new Thread(new NIOClient(""+count)).start();
}
}
public static void log(String msg) {
System.out.println(msg);
}
@Override
public void run() {
try {
Selector selector = Selector.open();
InetSocketAddress socketAddress = new InetSocketAddress("localhost", 8083);
//
// Thread.sleep(1000); //每10ms发出一个
SocketChannel socketChannel = SocketChannel.open(socketAddress);
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_CONNECT);
for (int index = 0; index < 100; index++) {
byte[] bytes = new String("connectID" +connectID + "this is the " + index + "data").getBytes();
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
socketChannel.write(byteBuffer);
byteBuffer.clear();
Thread.sleep(2000);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}