NIO核心类库
1. 缓冲区Buffer
– 是一块连续的内存块。
– 是 NIO 数据读或写的中转地。
Buffer
是个抽象类。内部包含的重要属性有:position, limit, capacity
提供的基本方法包括:
flip()
: 写模式转换成读模式
rewind()
:将 position 重置为 0 ,一般用于重复读。
clear()
:清空 buffer ,准备再次被写入 (position 变成 0 , limit 变成 capacity) 。
compact()
: 将未读取的数据拷贝到 buffer 的头部位。
mark()、reset()
:mark 可以标记一个位置, reset 可以重置到该位置。
基本类型均有对应的Buffer
类型。
ByteBuffer
、 MappedByteBuffer
、 CharBuffer
、 DoubleBuffer
、 FloatBuffer
、 IntBuffer
、 LongBuffer
、ShortBuffer
。每种类型内部都包含一个该基本类型的数组。如 IntBuffer
,
final int[] hb;
2. 通道Channel
– 数据的源头或者数据的目的地
– 用于向 buffer 提供数据或者读取 buffer 数据 ,buffer 对象的唯一接口。
– 异步 I/O 支持
可以想象成自来水管。与流不同的是,通道是双向的。业务包容性增大,减少频繁创建管道。
每个客户端都有一个通道。
Channel
的几种类型:
-
FileChannel
从文件中读写数据。与Selector
一起使用时,Channel
必须处于非阻塞模式下。这意味着不能将FileChannel
与Selector
一起使用。 -
DatagramChannel
能通过UDP读写网络中的数据。 -
SocketChannel
能通过TCP读写网络中的数据。 -
ServerSocketChannel
可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
3. 多路复用器Selector
多个通道可以注册到一个Selector之上,Selector通过不断轮询这些通道,来监听它们的事件,然后将事件分发出去,或基于事件触发相应的响应。
SelectionKey可以获取就绪通道的集合。
底层使用Epoll()取代了传统select实现,优势在于没有最大句柄1024/2048的限制。
只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端接入。
主要方法:
open()
|打开一个选择器。
select()
|选择一组键,其相应的通道已为 I/O 操作准备就绪。
selectedKeys()
|返回此选择器的已选择键集。
SelectionKey四个重要常量,其实也是Selector监听SocketChannel的四种不同类型的事件。
OP_ACCEPT
| 用于套接字接受操作的操作集位。
OP_CONNECT
| 用于套接字连接操作的操作集位。
OP_READ
|用于读取操作的操作集位。
OP_WRITE
|用于写入操作的操作集位。
实例:使用NIO实现一个服务器端,接收多个客户端消息,并给出响应。
(注意当中的关键方法调用)
public class NIOServer {
private static ServerSocketChannel server;
private static int port = 8080;
//多路复用器
private static Selector selector;
ByteBuffer rBuffer = ByteBuffer.allocate(1024);
ByteBuffer wBuffer = ByteBuffer.allocate(1024);
Map<SelectionKey, String> sessionMsg = new HashMap<>();
public NIOServer(int port) throws IOException {
this.port = port;
server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress(port));
server.configureBlocking(false);
selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NIO服务已经启动,且监听端口是"+this.port);
}
//监听请求方法
public void listen() throws IOException {
while(true) {
//通过selector来看有没有注册事件
int eventCount = selector.select();
if(eventCount<=0) {
//继续轮询 NIO内部机制:不断轮询注册到selector上的多个Channel
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while(iter.hasNext()) {
//处理客户端事件
process(iter.next());
//处理完的事情移除
iter.remove();
}
}
}
//详细处理客户端事件
private void process(SelectionKey key) {
SocketChannel client = null;
try {
if (key.isValid() && key.isAcceptable()) {
client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ);
} else if(key.isValid() && key.isReadable()) {
//读到缓冲区
rBuffer.clear();
client = (SocketChannel)key.channel();
int len = client.read(rBuffer);
if(len > 0) {
String msg = new String(rBuffer.array(), 0, len);
sessionMsg.put(key, msg);
//读完,下次可以写了
client.register(selector, SelectionKey.OP_WRITE);
}
} else if(key.isValid() && key.isWritable()) {
if(!sessionMsg.containsKey(key)) {
return;
}
//服务器回客户端消息
client = (SocketChannel)key.channel();
wBuffer.clear();
wBuffer.put((new String(sessionMsg.get(key) + "请求完成")).getBytes());
//设置读取位
wBuffer.flip();
client.write(wBuffer);
client.register(selector, SelectionKey.OP_READ);
}
} catch (IOException e) {
try {
key.cancel();
client.socket().close();
client.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) {
try {
new NIOServer(port).listen();
} catch (IOException e) {
e.printStackTrace();
}
}
}
引用
JAVA NIO 简介