http://www.jianshu.com/p/191041073919 , 主要讲解了 NIO 中的每个部分(没有完全看完)
要求:
nio 是什么,三个组件 selector, channel, buffer 分别担任了什么角色,起到了什么作用。
数据的请求流程是什么
核心组件
Java NIO 由以下几个核心部分组成:
Channel
Buffer
Selector
Channel 和 Buffer
基本上,所有的 IO 在NIO 中都从一个Channel 开始。Channel 有点象流。 数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中。这里有个图示:
NIO 和 IO 的对比
IO 和 NIO 的区别主要体现在三个方面:
- IO 基于流(Stream oriented), 而 NIO 基于 Buffer (Buffer oriented)
- IO 操作是阻塞的, 而 NIO 操作是非阻塞的
- IO 没有 selector 概念, 而 NIO 有 selector 概念.
selector
selector 是 NIO 中才有的概念, 它是 Java NIO 之所以可以非阻塞地进行 IO 操作的关键。
通过 Selector, 一个线程可以监听多个 Channel 的 IO 事件, 当我们向一个 Selector 中注册了 Channel 后, Selector 内部的机制就可以自动地为我们不断地查询(select) 这些注册的 Channel 是否有已就绪的 IO 事件(例如可读, 可写, 网络连接完成等). 通过这样的 Selector 机制, 我们就可以很简单地使用一个线程高效地管理多个 Channel 了。
Channel
通常来说, 所有的 NIO 的 I/O 操作都是从 Channel 开始的. 一个 channel 类似于一个 stream
java Stream 和 NIO Channel 对比
我们可以在同一个 Channel 中执行读和写操作, 然而同一个 Stream 仅仅支持读或写
Channel 可以异步地读写, 而 Stream 是阻塞的同步读写
Channel 总是从 Buffer 中读取数据, 或将数据写入到 Buffer 中
Channel 类型有:
FileChannel:文件操作
DatagramChannel:UDP 操作
SocketChannel:TCP 操作
ServerSocketChannel:TCP 操作, 使用在服务器端
这些通道涵盖了 UDP 和 TCP网络 IO以及文件 IO
FileChannel
FileChannel 是操作文件的Channel, 我们可以通过 FileChannel 从一个文件中读取数据, 也可以将数据写入到文件中
注意:FileChannel 不能设置为非阻塞模式
SocketChannel
SocketChannel 是一个客户端用来进行 TCP 连接的 Channel
创建一个 SocketChannel 的方法有两种:
- 打开一个 SocketChannel, 然后将其连接到某个服务器中
- 当一个 ServerSocketChannel 接受到连接请求时, 会返回一个 SocketChannel 对象
打开 SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("http://example.com", 80));
读取数据
ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = socketChannel.read(buf);
如果 read()返回 -1, 那么表示连接中断了
写入数据
String newData = "New String to write to file..." + System.currentTimeMillis();
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
channel.write(buf);
}
// 非阻塞模式
// 我们可以设置 SocketChannel 为异步模式, 这样我们的 connect, read, write 都是异步的了.
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://example.com", 80));
while(! socketChannel.finishConnect() ){
//wait, or do something else...
}
非阻塞模式
在非阻塞模式下, accept()是非阻塞的, 因此如果此时没有连接到来, 那么 accept()方法会返回null:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
while(true){
SocketChannel socketChannel =
serverSocketChannel.accept();
if(socketChannel != null){
//do something with socketChannel...
}
}
Buffer
当我们需要与 NIO Channel 进行交互时, 我们就需要使用到 NIO Buffer, 即数据从 Buffer读取到 Channel 中, 并且从 Channel 中写入到 Buffer 中
实际上, 一个 Buffer 其实就是一块内存区域, 我们可以在这个内存区域中进行数据的读写. NIO Buffer 其实是这样的内存块的一个封装, 并提供了一些操作方法让我们能够方便地进行数据的读写
Buffer 类型有:
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
这些 Buffer 覆盖了能从 IO 中传输的所有的 Java 基本数据类型
基本使用
- 将数据写入到 Buffer 中
- 调用 Buffer.flip()方法, 将 NIO Buffer 转换为读模式
- 从 Buffer 中读取数据
- 调用 Buffer.clear() 或 Buffer.compact()方法, 将 Buffer 转换为写模式
缓冲区四个属性值一定遵循capacity ≥ limit ≥ position ≥ mark ≥ 0,put()时,若 position 超过 limit 则会抛出 BufferOverflowException;get() 时,若 position 超过 limit 则会抛出 BufferUnderflowException。
一旦读取了所有的 Buffer 数据, 那么我们必须清理 Buffer, 让其从新可写, 清理 Buffer 可以调用 Buffer.clear() 或 Buffer.compact()。
Buffer 属性
一个 Buffer 有三个属性: capacity, position, limit。
其中 position 和 limit 的含义与 Buffer 处于读模式或写模式有关, 而 capacity 的含义与 Buffer 所处的模式无关
Capacity
一个内存块会有一个固定的大小, 即容量(capacity), 我们最多写入capacity 个单位的数据到 Buffer 中, 例如一个 DoubleBuffer, 其 Capacity 是100, 那么我们最多可以写入100个 double 数据
Position
当从一个 Buffer 中写入数据时, 我们是从 Buffer 的一个确定的位置(position)开始写入的. 在最初的状态时, position 的值是0. 每当我们写入了一个单位的数据后, position 就会递增一
当我们从 Buffer 中读取数据时, 我们也是从某个特定的位置开始读取的. 当我们调用了 filp()方法将 Buffer 从写模式转换到读模式时, position 的值会自动被设置为0, 每当我们读取一个单位的数据, position 的值递增1
position 表示了读写操作的位置指针
举例说明
下图是新创建的一个容量为10的字节缓冲区的内存图:
向缓冲区 put() 四次后的缓冲区内存示意图:
执行buff.flip()将缓冲区翻转后的内存示意图
执两次get()后的缓冲区内存示意图
关于 Direct Buffer 和 Non-Direct Buffer 的区别
Direct Buffer:
- 所分配的内存不在 JVM 堆上, 不受 GC 的管理.(但是 Direct Buffer 的 Java 对象是由 GC 管理的, 因此当发生 GC, 对象被回收时, Direct Buffer 也会被释放)
- 因为 Direct Buffer 不在 JVM 堆上分配, 因此 Direct Buffer 对应用程序的内存占用的影响就不那么明显(实际上还是占用了这么多内存, 但是 JVM 不好统计到非 JVM 管理的内存.)
- 申请和释放 Direct Buffer 的开销比较大. 因此正确的使用 Direct Buffer 的方式是在初始化时申请一个 Buffer, 然后不断复用此 buffer, 在程序结束后才释放此 buffer.
- 使用 Direct Buffer 时, 当进行一些底层的系统 IO 操作时, 效率会比较高, 因为此时 JVM 不需要拷贝 buffer 中的内存到中间临时缓冲区中.
Non-Direct Buffer:
- 直接在 JVM 堆上进行内存的分配, 本质上是 byte[] 数组的封装.
- 因为 Non-Direct Buffer 在 JVM 堆中, 因此当进行操作系统底层 IO 操作中时, 会将此 buffer 的内存复制到中间临时缓冲区中. 因此 Non-Direct Buffer 的效率就较低.
Buffer 的比较
我们可以通过 equals() 或 compareTo() 方法比较两个 Buffer, 当且仅当如下条件满足时, 两个 Buffer 是相等的:
- 两个 Buffer 是相同类型的
- 两个 Buffer 的剩余的数据个数是相同的
- 两个 Buffer 的剩余的数据都是相同的.
通过上述条件我们可以发现, 比较两个 Buffer 时, 并不是 Buffer 中的每个元素都进行比较, 而是比较 Buffer 中剩余的元素.
flip, rewind 和 clear 的区别
flip:当从写模式变为读模式时, 原先的 写 position 就变成了读模式的 limit。
rewind: 这个方法仅仅是将 position 置为0。
clear:将 positin 设置为0, 将 limit 设置为 capacity。
Selector
Selector 允许一个单一的线程来操作多个 Channel. 如果我们的应用程序中使用了多个 Channel, 那么使用 Selector 很方便的实现这样的目的, 但是因为在一个线程中使用了多个 Channel, 因此也会造成了每个 Channel 传输效率的降低。
为了使用 Selector, 我们首先需要将 Channel 注册到 Selector 中, 随后调用 Selector 的 select()方法, 这个方法会阻塞, 直到注册在 Selector 中的 Channel 发送可读写事件. 当这个方法返回后, 当前的这个线程就可以处理 Channel 的事件了。
注意:
- 如果一个 Channel 要注册到 Selector 中, 那么这个 Channel 必须是非阻塞的, 即channel.configureBlocking(false)。因此 FileChannel 不可以注册到 Selector 上,因为 FileChannel 是阻塞的。
- 一个 Channel 仅仅可以被注册到一个 Selector 一次, 如果将 Channel 注册到 Selector 多次, 那么其实就是相当于更新 SelectionKey 的 interest set
注意到, 在使用 Channel.register()方法时, 第二个参数指定了我们对 Channel 的什么类型的事件感兴趣, 这些事件有:
- Connect, 即连接事件(TCP 连接), 对应于SelectionKey.OP_CONNECT
- Accept, 即确认事件, 对应于SelectionKey.OP_ACCEPT
- Read, 即读事件, 对应于SelectionKey.OP_READ, 表示 buffer 可读
- Write, 即写事件, 对应于SelectionKey.OP_WRITE, 表示 buffer 可写
关于 SelectionKey
包含如下内容:
- interest set, 即我们感兴趣的事件集, 即在调用 register 注册 channel 时所设置的 interest set
- ready set,代表了 Channel 所准备好了的操作
- channel
- selector
- attached object, 可以在 selectionKey 中附加一个对象
获取可操作的 Channel
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
注意, 在每次迭代时, 我们都调用 "keyIterator.remove()" 将这个 key 从迭代器中删除, 因为 select() 方法仅仅是简单地将就绪的 IO 操作放到 selectedKeys 集合中, 因此如果我们从 selectedKeys 获取到一个 key, 但是没有将它删除, 那么下一次 select 时, 这个 key 所对应的 IO 事件还在 selectedKeys 中。
例如此时我们收到 OP_ACCEPT 通知, 然后我们进行相关处理, 但是并没有将这个 Key 从 SelectedKeys 中删除, 那么下一次 select() 返回时 我们还可以在 SelectedKeys 中获取到 OP_ACCEPT 的 key。
Selector 的基本使用流程
- 通过 Selector.open() 打开一个 Selector.
- 将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set)
- 不断重复:
- 调用 select() 方法
- 调用 selector.selectedKeys() 获取 selected keys
- 迭代每个 selected key:
- 从 selected key 中获取 对应的 Channel 和附加信息(如果有的话)
- 判断是哪些 IO 事件已经就绪了, 然后处理它们。如果是 OP_ACCEPT 事件, 则调用
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()
获取 SocketChannel, 并将它设置为 非阻塞的, 然后将这个 Channel 注册到 Selector 中。 - 根据需要更改 selected key 的监听事件
- 将已经处理过的 key 从 selected keys 集合中删除
关闭 Selector
当调用了 Selector.close()方法时, 我们其实是关闭了 Selector 本身并且将所有的 SelectionKey 失效, 但是并不会关闭 Channel。
NIO 代码示范:hello world
客户端代码
package com.study.nio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOClient {
/*标识数字*/
private static int flag = 0;
/*缓冲区大小*/
private static int BLOCK = 4096;
/*接受数据缓冲区*/
private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
/*发送数据缓冲区*/
private static ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
/*服务器端地址*/
private final static InetSocketAddress SERVER_ADDRESS = new InetSocketAddress(
"localhost", 9988);
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
// 打开socket通道
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞方式
socketChannel.configureBlocking(false);
// 打开选择器
Selector selector = Selector.open();
// 注册连接服务端socket动作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 连接
socketChannel.connect(SERVER_ADDRESS);
// 分配缓冲区大小内存
Set<SelectionKey> selectionKeys;
Iterator<SelectionKey> iterator;
SelectionKey selectionKey;
SocketChannel client;
String receiveText;
String sendText;
int count=0;
while (true) {
//选择一组键,其相应的通道已为 I/O 操作准备就绪。
//此方法执行处于阻塞模式的选择操作。
selector.select();
//返回此选择器的已选择键集。
selectionKeys = selector.selectedKeys();
//System.out.println(selectionKeys.size());
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
System.out.println("client connect");
client = (SocketChannel) selectionKey.channel();
// 判断此通道上是否正在进行连接操作。
// 完成套接字通道的连接过程。
if (client.isConnectionPending()) {
client.finishConnect();
System.out.println("客户端完成连接!");
sendbuffer.clear();
sendbuffer.put("Hello,Server".getBytes());
sendbuffer.flip();
client.write(sendbuffer);
}
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
client = (SocketChannel) selectionKey.channel();
//将缓冲区清空以备下次读取
receivebuffer.clear();
//读取服务器发送来的数据到缓冲区中
count=client.read(receivebuffer);
if(count>0){
receiveText = new String( receivebuffer.array(),0,count);
System.out.println("客户端接受服务器端数据--:"+receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
} else if (selectionKey.isWritable()) {
sendbuffer.clear();
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
client = (SocketChannel) selectionKey.channel();
System.out.println("enter a world");
while ((sendText = br.readLine()) != null) {
sendbuffer.put(sendText.getBytes());
//将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
sendbuffer.flip();
client.write(sendbuffer);
System.out.println("客户端向服务器端发送数据--:"+sendText);
client.register(selector, SelectionKey.OP_READ);
System.out.println("enter a world");
}
}
}
selectionKeys.clear();
}
}
}
服务端代码
package com.study.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
/*标识数字*/
private int flag = 0;
/*缓冲区大小*/
private int BLOCK = 4096;
/*接受数据缓冲区*/
private ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
/*发送数据缓冲区*/
private ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
private Selector selector;
public NIOServer(int port) throws IOException {
// 打开服务器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 服务器配置为非阻塞
serverSocketChannel.configureBlocking(false);
// 检索与此通道关联的服务器套接字
ServerSocket serverSocket = serverSocketChannel.socket();
// 进行服务的绑定
serverSocket.bind(new InetSocketAddress(port));
// 通过open()方法找到Selector
selector = Selector.open();
// 注册到selector,等待连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server Start----" + port);
}
// 监听
private void listen() throws IOException {
while (true) {
// 选择一组键,并且相应的通道已经打开
selector.select();
// 返回此选择器的已选择键集。
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
handleKey(selectionKey);
}
}
}
// 处理请求
private void handleKey(SelectionKey selectionKey) throws IOException {
// 接受请求
ServerSocketChannel server = null;
SocketChannel client = null;
String receiveText = "";
String sendText = "";
int count=0;
// 测试此键的通道是否已准备好接受新的套接字连接。
if (selectionKey.isAcceptable()) {
// 返回为之创建此键的通道。
server = (ServerSocketChannel) selectionKey.channel();
// 接受到此通道套接字的连接。
// 此方法返回的套接字通道(如果有)将处于阻塞模式。
client = server.accept();
System.out.println("客户端连接成功!" + client.getLocalAddress());
// 配置为非阻塞
client.configureBlocking(false);
// 注册到selector,等待连接
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
//将缓冲区清空以备下次读取
receivebuffer.clear();
// 返回为之创建此键的通道。
client = (SocketChannel) selectionKey.channel();
//读取服务器发送来的数据到缓冲区中
count = client.read(receivebuffer);
if (count > 0) {
receiveText = new String( receivebuffer.array(),0,count);
System.out.println("服务器端接受客户端数据--:"+receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
} else if (selectionKey.isWritable()) {
//将缓冲区清空以备下次写入
sendbuffer.clear();
// 返回为之创建此键的通道。
client = (SocketChannel) selectionKey.channel();
sendText="message from server--" + receiveText.toUpperCase();
//向缓冲区中输入数据
sendbuffer.put(sendText.getBytes());
//将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
sendbuffer.flip();
//输出到通道
client.write(sendbuffer);
System.out.println("服务器端向客户端发送数据--:"+sendText);
client.register(selector, SelectionKey.OP_READ);
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
new NIOServer(9988).listen();
}
}