0. 概述
NIO的全称是NoneBlocking IO,非阻塞IO,区别于BIO,BIO的全称是Blocking IO,阻塞IO。那这个阻塞是什么意思呢?例如传统的多线程服务器是BlockingIO模式的,从头到尾所有的线程都是阻塞的,接收请求和处理的过程中:
- Accept是阻塞的,只有新连接来了,Accept才会返回,主线程才能继
- Read是阻塞的,只有请求消息来了,Read才能返回,子线程才能继续处理
- Write是阻塞的,只有客户端把消息收了,Write才能返回,子线程才能继续读取下一个请求
在IO处理被阻塞的时候处理线程就需要等在那里,占用了操作系统的调度资源,什么事也不干,是非常大的性能浪费。
NIO并不是Java独有的概念,NIO代表的一个词汇叫着IO多路复用。它是由操作系统提供的系统调用,早期这个操作系统调用的名字是select,但是性能低下,后来渐渐演化成了Linux下的epoll和Mac里的kqueue。
1.Java NIO
NIO并不是java独有概念,很早在操作系统层面已经提出。
Java从1.4版本引入NIO(New IO/Non-blocking IO)系列接口作为IO包的替代模块,用于替代Java IO和Java Networking 接口。与标准IO接口相比,JavaNIO提供了一种不同的方式来处理IO操作。
学习使用Java NIO需要理解下面几个概念:
- Channels和Buffers:在标准IO接口中我们最常用的是字节流(byte strams)和字符流(character streams)。在NIO接口中我们需要使用Channel和Buffer进行IO操作,Channel模拟了流的概念,但是又有不同。数据总是从一个Channel读到一个buffer中,或者从一个buffer中写到channel中。
- Non-blocking IO: Java NIO接口的核心就是提供了非阻塞IO的能力(Non-blocking IO)。例如:一个线程可以请求channel读取数据到buffer中,在channel读取数据的过程中,线程可以处理其他的事情,一旦数据已经读取到buffer中,线程可以继续处理buffer中的数据;对于将buffer中的数据写到channel中道理是一样的。
- Selectors:Java NIO包含了Selectors的设计,Selector通过事件驱动多个Channel的对象,Selector可以实现让一个线程管理使用多个数据的Channel。
Java NIO包含了大量的类和组件,但是Channel,Buffer和Selector组成了NIO接口的核心,其余的一些组件例如Pipe、FileLock等更像是这三个组件结合使用的一些工具类。因此要学习使用Java NIO的使用必须首先掌握这三个模块。
Channel和Buffer通常是共同使用的,一般来讲,所有的IO和NIO操作都从一个channel开始,channel有点像stream,数据可以通过channel读取到buffer里;也可以将数据从buffer写到channel中Java NIO提供了很多种channel和buffer类型;Channel接口主要实现类如下:
* FileChannel
* DatagramChannel
* SocketChannel
* ServerSocketChannel
这些实现类覆盖了 UDP + TCP 网络IO以及常用的文件IO操作,这些实现类里还要一些比较有趣的接口,这里先简单了解一下,后边会详细介绍。Buffer的是要实现类:
* ByteBuffer
* CharBuffer
* DoubleBuffer
* FloatBuffer
* IntBuffer
* LongBuffer
* ShortBuffer
这些Buffer的实现类涵盖了可以通过IO读写的所有基本类型:byte,short,int,long,float,double和字符(char)。Java NIO还包含了一个MappedByteBuffer用于使用内存映射读取文件,可以以内存的速度快速访问文件内容。
2.Channel
JavaNIO Channels和流有一些相似,但是又有些不同:
- 你可以同时读和写Channels,流Stream只支持单向的读或写(InputStream/OutputStream)
- Channels可以异步的读和写,流Stream是同步的
- Channels总是读取到buffer或者从buffer中写入
下面分别介绍一下Channel最重要的一些实现类:
- FileChannel : 可以读写文件中的数据
- DatagramChannel:可以通过UDP协议读写数据
- SocketChannel:可以通过TCP协议读写数据
- ServerSocketChannel:允许我们像一个web服务器那样监听TCP链接请求,为每一个链接请求创建一个SocketChannel
下面是一个基本的使用FileChannel读取数据到buffer的例子:
public class FileChannelExam {
public static void main(String[] args){
try {
String path = FileChannelExam.class.getResource("/data/nio-data.txt").getPath();
// 创建一个文件通道
RandomAccessFile file = new RandomAccessFile(path, "rw");
FileChannel channel = file.getChannel();
// 创建一个字节buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取数据到buffer
int len = channel.read(buffer);
while (len != -1){
System.out.println("Read " + len);
// 将写模式转变为读模式,
// 将写模式下的buffer内容最后位置设为读模式下的limit位置,作为读越界位,同时将读位置设为0
// 表示转换后重头开始读,同时消除写模式的mark标记
buffer.flip();
// 判断当前读取位置是否到达越界位(position < limit)
while (buffer.hasRemaining()){
// 读取当前position的字节(position++)
System.out.println(buffer.get());
}
// 清空当前buffer内容
buffer.clear();
len = channel.read(buffer);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
需要注意buffer.flip()方法,首先我们从Channel读取数据写入到Buffer,然后调用flip将切换到读模式,才能从buffer中读取数据。
Channel到Channel的数据传输
在Java NIO中我们可以直接将数据从一个Channel传输到另一个Channel中,比如FileChannel中有transferTo()和transferFrom()方法。
transferFrom()
transferFrom()方法可以将一个源channel中的数据传输到一个FileChannel中
String fromPath = FileChannelExam2.class.getResource("/data/nio-data.txt").getPath();
String toPath = FileChannelExam2.class.getResource("/data/nio-data-to.txt").getPath();
RandomAccessFile fromFile = new RandomAccessFile(fromPath, "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile(toPath, "rw");
FileChannel toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
toChannel.transferFrom(fromChannel, position, count);
transferFrom()有三个参数,源channel,position,count;position定义目标channel写入的起始位置,count定义写入数据的容量,如果源channel中的数据量小于count,只会写入源channel数据的量。
另外,在SocketChannel的实现中,当前SocketChannel已经读取一部分数据,稍后仍会读取更多数据情况下,并不一定能将完整的数据读取到FileChannel中。
transferTo()
transferTo()方法可以将FileChannel中的数据传输到其他channel中
String fromPath = FileChannelExam2.class.getResource("/data/nio-data.txt").getPath();
String toPath = FileChannelExam2.class.getResource("/data/nio-data-to.txt").getPath();
RandomAccessFile fromFile = new RandomAccessFile(fromPath, "rw");
FileChannel fromChannel = fromFile.getChannel();
RandomAccessFile toFile = new RandomAccessFile(toPath, "rw");
FileChannel toChannel = toFile.getChannel();
long position = 0;
long count = fromChannel.size();
fromChannel.transferTo(position, count, toChannel);
上边两个例子有些相似,唯一的区别就是调用的方法和调用方法的对象。这个方法和SocketChannel也会存在和transferFrom同样的问题。
3.Buffer
在Java NIO中各类Buffer主要用于和NIO Channel进行交互,数据从Channel中读取到Buffer中,从Buffer写入到Channel中。
我们可以将Buffer看做内存中的一块区域,我们可以在这块区域上写数据,然后在从中读取。这块内存区域被包装成NIO Buffer对象,提供了一系列的方法使我们操作这块内存变得更简单一些。
Buffer的基本使用
使用Buffer进行读写数据一般会通过下边四个步骤处理:
- 将数据写到Buffer中
- 调用buffer.flip()切换为读模式
- 从Buffer中读取数据
- 调用buffer.clear()或者buffer.compact()清空或压缩buffer
下边是个简单的Buffer使用的例子
public class FileChannelExam {
public static void main(String[] args){
try {
String path = FileChannelExam.class.getResource("/data/nio-data.txt").getPath();
// 创建一个文件通道
RandomAccessFile file = new RandomAccessFile(path, "rw");
FileChannel channel = file.getChannel();
// 创建一个字节buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取数据到buffer
int len = channel.read(buffer);
while (len != -1){
System.out.println("Read " + len);
// 将写模式转变为读模式,
// 将写模式下的buffer内容最后位置设为读模式下的limit位置,作为读越界位,同时将读位置设为0
// 表示转换后重头开始读,同时消除写模式的mark标记
buffer.flip();
// 判断当前读取位置是否到达越界位(position < limit)
while (buffer.hasRemaining()){
// 读取当前position的字节(position++)
System.out.println(buffer.get());
}
// 清空当前buffer内容
buffer.clear();
len = channel.read(buffer);
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
当我们将数据写入buffer时,buffer会记录我们写入了多少数据,当需要读取数据的时候,需要调用flip()方法将buffer从写模式切换到读模式,在读模式下,buffer允许用户读取已经写入buffer的所有数据。
一旦我们已经读取了buffer中的所有数据,我们需要清空buffer以便写一次写入数据。我们可以使用两种方法达到这个目的:
- 调用clear()方法:清空整个buffer;
- 调用compact()方法:仅清空已经读取的数据,未读取的数据移动到buffer的起始位置,新写入的数据会放到未读取数据的后边。
Buffer的 capacity, position 和limit
Buffer对象使用capacity,position,limit三个属性来保存内存状态以便灵活操作内存,了解这三个属性的作用是理解Buffer工作原理的关键。position和limit决定了Buffer可以读写的区域(position <= x < limit),capacity 表示读写的最大容量
下图模拟了Buffer在读、写模式下capacity、position、limit的状态。
- capacity
作为一块内存,buffer必须有一个固定容量,这就是buffer的capacity。你最多只能写入capacity容量的数据到buffer中,一旦buffer中被写满数据,在你写入新的数据之前需要置空buffer(通过读取数据或直接清空)。
- position
当写入buffer数据的时候需要明确写入的位置,这就是position,buffer初始化的时候position为0;当你写入一个字节或者整型数字后,position指针会移动到已经写入数据的内存的下一个内存位置,position的最大值为capacity-1;
当读取数据的时候,你也可以给定 一个position,当你调用filp()方法将一个buffer从写模式切换到读模式的时候,position会重置为0,你将会从0位置开始读取数据,读取数据后position也会移动到已读取数据的下一个位置。
- limit
在写数据的时候,limit限制了写入数据的最大容量即position的最大值(position < limit).在写模式下,limit=capacity;
从上边代码可以看到当调用flip()切换到读模式时,limit被设置为已写入数据的position值,限制你能读取数据的容量,也就是说你最多能读取你写入的所有的数据。
Buffer常用方法
1.申请一个Buffer
在使用Buffer之前,你必须为它申请一块内存空间,每个Buffer的实现类都实现了它自己的allocate()方法来完成内存申请的工作,下面的代码展示了如何创建一个Buffer对象。
// 创建一个1024字节的ByteBuffer对象
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 创建一个1024字符的CharBuffer对象
CharBuffer charBuffer = CharBuffer.allocate(1024);
2.写入数据到buffer中
向buffer写入数据有两种方法:
- 通过Channel向Buffer中写入数据
- 直接写入数据到Buffer
// 通过Channel写入,即将Channel数据读取到buffer中
int len = channel.read(buffer);
// 直接写入,调用put方法
buffer.put(127);
需要注意的是,put()方法有多重实现,你可以使用不同的方式写入数据,例如:写入到特定的位置,写入一个字节数组等。
3.flip()写切换到读
flip()方法是将buffer由写模式切换到读模式的方法,flip()方法将position重置为0,将limit设置为已经写入的最大位置,也就是position从标记写入位置改变为标记都区位置;源码中flip()方法的实现如下:
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
4.从buffer中读取数据
从buffer中读取数据同样有两种方法:
- 通过Channel从Buffer中读取数据
- 直接从Buffer中读取数据
// 使用Channel读取数据,即将数据写入Channel
int len = channel.write(buffer);
// 直接读取数据
byte data = buffer.get();
同样get()方法也有很多重载实现,允许我们使用不同的方法读取数据,可以参考Buffer实现类文档查看更多细节。
5.倒回rewind()
rewind()倒回方法只是将position重置为0,limit仍保持原值;一般在读模式下使用可以让我们重复读取buffer中的数据;在写模式下则会导致重新写入数据(类似于置空了buffer)。源码:
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
6.clear()和compact()
一旦完成读操作,我们需要让buffer重新改变为写模式,以便可以重新向buffer写入新的数据,buffer通过clear()和compact()来完成。
当调用clear的时候position会重置为0,limit设置为capacity,虽然buffer中的数据未被擦除,但逻辑上相当于buffer被清空了,因为新写入的数据会覆盖旧数据,如果buffer中还有未被读取的数据,这些数据依然会被覆盖!
clear源码实现,可以和rewind的比较一下,看有什么区别:
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
如果希望保留buffer中还未读取的数据,只是清理已读取的数据来腾出写入空间,则可以通过compact()方法实现;compact()方法会拷贝未读入的数据到buffer内存空间的起始位置,然后将position设置到未读取数据元素的最后位置,limit值仍然为buffer的capacity,现在buffer就有了更多的空间供写入数据。我们可以看一下HeapByteBuffer的源代码实现:
public ByteBuffer compact() {
//复制数据
System.arraycopy(hb, ix(position()), hb, ix(0), remaining());
// 重置position位置
position(remaining());
// limit设置为capacity
limit(capacity());
discardMark();
return this;
}
7.mark()和reset()
mark和reset方法是配合使用的一组方法,你可以通过mark()方法标记buffer中的一个位置,经过读写操作后position位置会改变,然后你就可以使用reset()方法使position位置回到mark()方法标记的位置。
buffer.mark();
...; // 读或写操作
buffer.reset(); // 回到标记位置
8.equals()
可以通过equals和compareTo()方法来比较两个buffer,equals判断条件:
1. 两个buffer是否同一类型;
2. 是否持有相同数量的数据;
3. 持有的数据是否每个元素都相同。
9.Scatter和Gather
Java NIO内置支持分散(Scatter)和聚集(Gather),Scatter和Gather是用于读取和写入Channel的概念。
Scatter是指从一个Channel中分散读取数据到一个或多个Buffer的操作,因此Channel将数据分散到多个Buffer中;
Gather是指将一个或多个Buffer中的数据写入一个Channel的操作,一次Channel可以从多个Buffer中收集数据。
Scatter和Gather在解决传输数据拥有多个部分需要进行分离的场景下有很大的用处;比如,一个消息数据中包含消息头(header)和消息体(body)两部分,我们就可以将消息头和消息体分别读入不同的Buffer保存,使得消息的分离处理更加方便。
- Scatter操作
将Channel中的数据读取到多个Buffer
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
ByteBuffer[] buffers = { header, body };
channel.read(buffers);
当Channel的read()方法传入参数为buffer数据的时候,read()方法会按照顺序将数据写入到传入的多个buffer中,当一个buffer写满后便会写入下一个buffer直到写满所有的buffer;因为分离读取的时候,Channel写入buffer的数据是按顺序的,Scatter操作并不适合动态长度的数据传输,也就意味着传输数据的每一部分都是固定长度时,Scatter才能发挥它的作用。
- Gather操作
Gather操作将多个buffer的数据写入到同一个Channel
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body = ByteBuffer.allocate(1024);
// 写入数据
ByteBuffer[] bufferArray = { header, body };
channel.write(bufferArray);
channel的write()方法可以接受buffer数据作为参数,write()方法会按照顺序将多个buffer中的数据依次写入channel。需要注意的是,write()操作只会写入buffer中已写入的数据,即position到limit之间的数据;例如一个buffer的容量为128字节,但buffer中只写入了28字节的数据,只有这28个字节会写入channel中,因此Gather操作和Scatter相反非常适合动态长度数据写入。
3.Selector
Selector是Java NIO中用于管理一个或多个Channel的组件,控制决定对哪些Channel进行读写;通过使用Selector让一个单线程可以管理多个Channel甚至多个网络连接。
使用Selector最大的优势就是可以在较少的线程中控制更多的Channel。事实上我们可以使用一个线程控制需要使用的所有Channel。操作系统线程的运行和切换需要一定的开销,使用的线程越小,系统开销也就越少;因此使用Selector可以节省很多系统开销。下图展示了一个线程使用Selector控制三个Channel的情形。
1.创建Selector
Selector selector = Selector.open();
2.注册Channel
想要通过Selector中控制Channel,必须将Channel注册到Selector中,通过SelectableChannel.register()方法实现。
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
需要注意的是注册到Selector的Channel必须是非阻塞模式的(non-blocking),FileChannel是无法使用的因为FileChannel无法切换到非阻塞模式,SocketChannel非常适合配合Selector使用。
register方法的第二个参数是监听设置,用于设置注册的channel通过Selector监听的操作事件类型,总共有四类事件可以监听:
- Connect
- Accept
- Read
- Write
JavaNIO中在SelectionKey中有四个静态变量表示这四类事件:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
如果在注册Channel的时候希望监听多个事件可以使用“|”连接静态变量
SelectionKey key = channel.register(selector,
SelectionKey.OP_READ|SelectionKey.OP_WRITE);
3.SelectionKey对象
Channel注册到Selector后会返回一个SelectionKey对象,这个对象包含了下面一些重要属性:
- 事件监听集合(interest set)
监听集合(interest set)是channel在selector监听的事件类型的集合,可以同SelectionKey读写这个配置。
int interestSet = selectionKey.interestOps();
// 通过 & 操作判断监听配置是否包含某类事件
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
- 就绪结合(ready set)
就绪集合(ready set)是channel已经就绪的操作的集合,我们主要在一个selection操作后访问就绪集合。
int readySet = selectionKey.readyOps();
// 可以使用和interest set 同样的方法测试集合中是否包含某类事件,
// 也可以通过调用下边的一些方法进行判断:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
- Channel对象
Channel channel = selectionKey.channel();
- Selector对象
Selector selector = selectionKey.selector();
- 一个可选附属对象(an attached object (optional) )
可以给SelectionKey添加一个附加对象,通常用来标记Channel或者Channel的特征信息。例如,我们可以将和Channel配合使用的Buffer附加到SelectionKey上。
// 附加对象
selectionKey.attach(theObject);
// 获取附加对象
Object attachedObj = selectionKey.attachment();
// 还可以再注册channel的时候直接添加附加对象
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
4.通过Selector选择Channel
将多个Channel注册到Selector后,我们就可以通过调用select()方法选择监听了特定事件(connect,accept,read,write)并且已经就绪的Channel。换种说法就是,如果你已经注册了一个监听read事件的channel,它就会通过select()方法接收到read事件。
select方法有几种不同的重载:
- int select():阻塞直到至少有一个channel对监听的事件操作准备就绪
- int select(long timeout):和select()方法一样,但只会阻塞到指定的超时时间;
- int selectNow():不会阻塞,无论是否有就绪的channel都会立即返回。
三个方法的返回值是最后一次调用select()后就绪的channel的数量,如果你调用select()返回1,表示调用select()后有一个channel准备就绪了;当你再次调用sleect()时再返回1,表示这次又有一个channel就绪了,如果对第一次调用就绪的channel没有做任何操作,这时总共有两个已经准备就绪的channel,在两次调用中都只有一个channel变为就绪状态。
5.selectionKey()
调用select()方法返回就绪channel个数后,可以调用selectedKeys()方法获取就绪channel的SelectionKey集合
Set<SelectionKey> selectedKeys = selector.selectedKeys();
我们可以通过这个集合访问已经就绪的channel
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
上边代码演示了遍历每一个SelectionKey并判断SelectionKey持有的channel引用就绪的事件。
注意循环中最后keyInterator.remove()方法,这里并不是将SelectionKey对象从selector中移除,只是从就绪集合中移除,对channel操作后必须调用这个方法,当下一次channel就绪后,它的SelectionKey还会被加入到就绪集合中。
6.wakeUp()
一个线程调用select()后可以通过再次调用select()离开阻塞状态;也可以通过其他线程调用wakeUp()方法是阻塞在select()的Selecor立即返回。
7.close()
使用完Selector后可以使用close()方法关闭它,这会关闭Selector和清除注册到Selector的SelecionKey对象,但Channel本身并不会关闭。
8.完整流程(伪代码)
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
int readyChannels = selector.select();
if(readyChannels == 0) continue;
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
}