1、Java NIO 介绍
NIO是java New IO的简称,在jdk1.4里提供的新api。Sun 官方标榜的特性如下:
1、为所有的原始类型提供(Buffer)缓存支持。
2、字符集编码解码解决方案
3、Channel:一个新的原始I/O操作
4、字符锁和内存映射文件的文件访问接口
5、提供多路(non-bloking)非阻塞式高伸缩性网络I/O
NIO的创建目的是为了让Java程序员可以实现高速I/O而无需编写自定义的本机代码。NIO将最耗时的I/O操作(即提供和提取缓冲区)转移回操作系统,因而可以极大的提高速度。
流与块的比较
原来的I/O库(在java.io.*中)与NIO最重要的区别是数据打包和传输的方式。正如前面提到的,原来I/O以流的方式处理数据,而NIO以块的方式处理数据。
面向流的I/O系统一次一个字节地处理数据。一个输入流产生一个字节的数据,一个输出流消费一个字节的数据。为流式数据创建过滤器非常容易。链接几个过滤器,以便每个过滤器只负责单个复杂处理机制的一部分,这样也是相对简单的。不利的一面是,面向流的I/O通常相当慢。
一个面向块的I/O系统以块的形式处理数据。每一个操作都在一步中产生或者消费一个数据块。按块处理数据比按(流式的)字节处理数据快得多。但是面向块的I/O缺少一些面向流的I/O所具有的优雅性和简单性。
2、Java NIO 的核心组件
Java NIO的核心组件包括:Channel(通道),Buffer(缓冲区),Selector(选择器),其中Channel和Buffer比较好理解
简单来说 NIO是面向通道和缓冲区的,意思就是:数据总是从通道中读到buffer缓冲区内,或者从buffer写入到通道中。
3、Java NIO Buffer(缓冲区)
概念上,缓冲区是包在一个对象内的基本数据元素数据。Buffer类相比于一个简单数组的优点是它将关于数据的数据内容和信息包含在一个单一的对象中。Buffer类以及它专有的子类定义了一个用于处理数据缓存区的API。
缓冲区的工作与通道紧密联系。通道是I/O传输发生时通过的入口,而缓冲区是这些数据传输的来源或目标。对于离开缓冲区的传输,待传递出去的数据被置于一个缓冲区,被传送到通道;待传回的缓冲区的传输,一个通道将数据放置在所提供的缓冲区中。这种在协同对象之间进行的缓冲区数据传递时高效数据处理的关键。
3.1、属性
所有的缓冲区都具有四个属性来提供关于其所包含的数据元素的信息,它们是:
属 性 | 作 用 |
---|---|
capacity | 容量,指缓冲区能够容纳的数据元素的最大数量,这一容量在缓冲区创建时被设定,并且永远不能被改变 |
limit | 上界,指缓冲区的第一个不能被读或写的元素,或者说是,缓冲区中现存元素的计数 |
position | 位置,指下一个要被读或写的元素的索引,位置会自动由相应的get()和put()函数更新 |
mark | 标记,指一个备忘位置,调用mark()来设定mark=position,调用reset()来设定postion=mark,标记未设定前是未定义的 |
这四个属性总是遵循以下的关系:0 <= mark <= position <= limit <= capacity
public class CharBufferDemo {
public static void main(String[] args) {
/**
* 创建容量为10类型为CharBuffer,由于其为抽象类,不能直接实例化.
*/
CharBuffer charBuffer = CharBuffer.allocate(10);
System.out.println("position:"+charBuffer.position());
System.out.println("limit:"+charBuffer.limit());
System.out.println("capacity:"+charBuffer.capacity());
}
}
position:0
limit:10
capacity:10
allocate 实现,可以看出实际返回HeapCharBuffer(CharBuffer子类)
public static CharBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapCharBuffer(capacity, capacity);
}
3.2、方法
下面看一下如何使用一个缓冲区,Buffer中提供了以下的一些方法:
方 法 | 作 用 |
---|---|
Object array() | 返回此缓冲区的底层实现数组 |
int arrayOffset() | 返回此缓冲区的底层实现数组中第一个缓冲区还俗的偏移量 |
int capacity() | 返回此缓冲区的容量 |
Buffer clear() | 清除此缓冲区 |
Buffer flip() | 反转此缓冲区 |
boolean hasArray() | 告知此缓冲区是否具有可访问的底层实现数组 |
boolean hasRemaining() | 告知在当前位置和限制之间是否有元素 |
boolean isDirect() | 告知此缓冲区是否为直接缓冲区 |
boolean isReadOnly() | 告知此缓冲区是否为只读缓存 |
int limit() | 返回此缓冲区的上界 |
Buffer limit(int newLimit) | 设置此缓冲区的上界 |
Buffer mark() | 在此缓冲区的位置设置标记 |
int position() | 返回此缓冲区的位置 |
Buffer position(int newPosition) | 设置此缓冲区的位置 |
int remaining() | 返回当前位置与上界之间的元素数 |
Buffer reset() | 将此缓冲区的位置重置为以前标记的位置 |
Buffer rewind() | 重绕此缓冲区 |
public class CharBufferDemo {
public static void main(String[] args) {
CharBuffer charBuffer = CharBuffer.allocate(10);
/** 往缓冲区中写数据,可以写单个字符或者数组
* charBuffer.put('h')
* charBuffer.put('e')
* charBuffer.put('l')
* charBuffer.put('l')
* charBuffer.put('o')
*/
charBuffer.put("hello".toCharArray());
/**
* 缓冲区翻转,如果翻转的话,position为下一个要写的位置
*/
charBuffer.flip();
/**
*hasRemaining() 判断是否已经读到了上界
*/
while (charBuffer.hasRemaining()) {
//读取数据
System.out.println(charBuffer.get());
}
/**
* 缓冲区绕回,可以重读
*/
charBuffer.rewind();
System.out.println("重读");
while (charBuffer.hasRemaining()) {
System.out.println(charBuffer.get());
}
}
}
h
e
l
l
o
重读
h
e
l
l
o
关于这个API有一点值得注意的,像clear()这类函数,通常应当返回的是void而不是Buffer引用。这些函数将引用返回到它们在(this)上被引用的对象,这是一个允许级联调用的类设计方法。级联调用允许这种类型的代码:
buffer.mark();
buffer.position(5);
buffer.reset();
被简写成:
buffer.mark().position(5).reset();
4、Java NIO Channel(通道)
通道(Channel)是Java.nio的第二个主要创新。它们既不是一个扩展也不是一项增强,而是全新、极好的Java I/O示例,提供与I/O服务的直接连接。Channel用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。
通道是一种途径,借助该途径,可以用最小的总开销来访问操作系统本身的I/O服务。缓冲区则是通道内部用来发送和接收数据的端点。
通道是访问I/O服务的管道。I/O可以分为广义的两大类别:File I/O
和Stream I/O
。那么相应地有两种类型的通道也就不足为怪了,它们是文件(file)通道和套接字(socket)通道。你会发现有一个FileChannel
类和三个socket
通道类:SocketChannel
、ServerSocketChannel
和DatagramChannel
。
通道可以以多种方式创建。Socket通道有可以直接创建新socket通道的工厂方式。但是有一个FileChannel
对象却只能通过在一个打开的RandomAccessFile
、FileInputStream
或FileOutputStream
对象上调用getChannel()
方法来获取。你不能直接创建一个FileChannel
对象。
使用通道:
通道可以是单向(unidirectional
)或者双向的(bidirectional
)一个channel
类可能实现定义read()
方法的ReadableByteChannel
接口,而另一个channel
类也许实现WritableByteChannel
接口以提供write()
方法。实现这两种接口其中之一的类都是单向的,只能在一个方向上传输数据。如果一个类同时实现这两个接口,那么它是双向的,可以双向传输数据。
通常会连接一个特定I/O服务且通道实例(channel instance
)的性能受它所连接的I/O服务的特征限制,记住这很重要。一个连接到只读文件的Channel实例不能进行写操作,即时该实例属性的类可能有write()方法。基于此,程序员需要知道通达是如何打开的,避免试图尝试一个底层I/O服务不允许的操作。
通道可以阻塞(bolcking
)或非阻塞(nonblocking
)模式运行。非阻塞模式的通道永远不会让调用的线程休眠。请求的操作要么立即完成,要么返回一个结果表明未进行任何操作。只有面向流(stream-oriented
)的通道,如sockets
和pipes
才能使用非阻塞模式。
关闭通道:
通过调用通道的close方法进行关闭,但是可能会导致关闭底层I/O服务时发生阻塞(非阻塞模式和阻塞模式都一样)
通过isopen
方法来测试通道的开放状态,如果返回true,那么说明通道可以使用。反之,说明通道已经关闭,不能使用。
1.FileChannel类
文件通道总是阻塞式的,因此不能被置于非阻塞模式。现代操作系统都有复杂的缓存和预取机制,使得本地磁盘I/O
操作延迟很少。网络文件系统一般而言延迟会多些,不过却也因该优化而受益。面向流的I/O
的非阻塞范例对于面向文件的操作并无多大意义,这是由文件I/O本质上的不同性质造成的。对于文件I/O
,最强大之处在于异步I/O(asynchronous I/O)
,它允许一个进程可以从操作系统请求一个或多个I/O
操作而不必等待这些操作的完成。发起请求的进程之后会收到它请求的I/O
操作已完成的通知。
2.Socket通道
新的socket
通道类可以运行非阻塞模式并且是可选择的。这两个性能可以激活大程序(如网络服务器和中间件组件)巨大的可伸缩性和灵活性。本节中我们会看到,再也没有为每个socket
连接使用一个线程的必要了,也避免了管理大量线程所需的上下文交换总开销。借助新的NIO
类,一个或几个线程就可以管理成百上千的活动socket
连接了并且只有很少甚至可能没有性能损失。所有的socket
通道类(DatagramChannel、SocketChannel
和ServerSocketChannel
)都继承了位于java.nio.channels.spi
包中的AbstractSelectableChannel
。这意味着我们可以用一个Selector对象来执行socket通道的就绪选择(readiness selection
)。
请注意DatagramChannel
和SocketChannel
实现定义读和写功能的接口而ServerSocketChannel
不实现。ServerSocketChannel
负责监听传入的连接和创建新的SocketChannel
对象,它本身从不传输数据。
在我们具体讨论每一种socket
通道前,您应该了解socket
和socket
通道之间的关系。之前的章节中有写道,通道是一个连接I/O
服务导管并提供与该服务交互的方法。就某个socket
而言,它不会再次实现与之对应的socket
通道类中的socket
协议API
,而java.net
中已经存在的socket
通道都可以被大多数协议操作重复使用。
全部socket
通道类(DatagramChannel
、SocketChannel
和ServerSocketChannel
)在被实例化时都会创建一个对等socket对象。这些是我们所熟悉的来自java.net
的类(Socket
、ServerSocket
和DatagramSocket
),它们已经被更新以识别通道。对等socket
可以通过调用socket( )
方法从一个通道上获取。此外,这三个java.net
类现在都有getChannel( )
方法。
Socket
通道将与通信协议相关的操作委托给相应的socket
对象。socket
的方法看起来好像在通道类中重复了一遍,但实际上通道类上的方法会有一些新的或者不同的行为。
要把一个socket
通道置于非阻塞模式,我们要依靠所有socket通道类的公有超级类:SelectableChannel
。就绪选择(readiness selection
)是一种可以用来查询通道的机制,该查询可以判断通道是否准备好执行一个目标操作,如读或写。非阻塞I/O和可选择性是紧密相连的,那也正是管理阻塞模式的API代码要在SelectableChannel
超级类中定义的原因。
设置或重新设置一个通道的阻塞模式是很简单的,只要调用configureBlocking( )
方法即可,传递参数值为true则设为阻塞模式,参数值为false值设为非阻塞模式。真的,就这么简单!您可以通过调用isBlocking( )
方法来判断某个socket通道当前处于哪种模式。
非阻塞socket
通常被认为是服务端使用的,因为它们使同时管理很多socket
通道变得更容易。但是,在客户端使用一个或几个非阻塞模式的socket
通道也是有益处的,例如,借助非阻塞socket
通道,GUI程序可以专注于用户请求并且同时维护与一个或多个服务器的会话。在很多程序上,非阻塞模式都是有用的。
偶尔地,我们也会需要防止socket通道的阻塞模式被更改。API中有一个blockingLock( )
方法,该方法会返回一个非透明的对象引用。返回的对象是通道实现修改阻塞模式时内部使用的。只有拥有此对象的锁的线程才能更改通道的阻塞模式。
public class ServerSocketChannelApp {
private static final String MSG = "hello, I must be going \n";
public static void main(String[] args) throws Exception {
int port = 8989;
ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket ss = ssc.socket();
ss.bind(new InetSocketAddress(port));
// set no blocking
ssc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.wrap(MSG.getBytes());
while (true) {
// System.out.println("wait for connection ……");
SocketChannel sc = ssc.accept();
if (sc == null) {
// no connections, snooze a while ...
Thread.sleep(1000);
} else {
System.out.println("Incoming connection from " + sc.socket().getRemoteSocketAddress());
ByteBuffer readerBuffer = ByteBuffer.allocate(1024);
sc.read(readerBuffer);
readerBuffer.flip();
//output get
out(readerBuffer);
buffer.rewind();
sc.write(buffer);
sc.close();
}
}
}
private static void out(ByteBuffer readerBuffer) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < readerBuffer.limit(); i++) {
char c = (char) readerBuffer.get();
sb.append(new String(new char[]{c}));
}
System.out.println(sb.toString());
}
}
public class SocketChannelApp {
public static void main(String[] args) throws Exception {
InetSocketAddress addr = new InetSocketAddress("127.0.0.1", 8989);
SocketChannel sc = SocketChannel.open();
sc.connect(addr);
sc.configureBlocking(false);
while (!sc.finishConnect()) {
doSomethings();
}
//Do something with the connected socket
ByteBuffer buffer = ByteBuffer.wrap(new String("Hello server!").getBytes());
sc.write(buffer);
sc.close();
}
private static void doSomethings() {
System.out.println("do something useless!");
}
}
先运行Server 后运行Client
Incoming connection from /127.0.0.1:50129
Hello server!
4、Java NIO Selector
为什么要使用选择器
通道处于就绪状态后,就可以在缓冲区之间传送数据。可以采用非阻塞模式来检查通道是否就绪,但非阻塞模式还会做别的任务,当有多个通道同时存在时,很难将检查通道是否就绪与其他任务剥离开来,或者说是这样做很复杂,即使完成了这样的功能,但每检查一次通道的就绪状态,就至少有一次系统调用,代价十分昂贵。当你轮询每个通道的就绪状态时,刚被检查的一个处于未就绪状态的通道,突然处于就绪状态,在下一次轮询之前是不会被察觉的。操作系统拥有这种检查就绪状态并通知就绪的能力,因此要充分利用操作系统提供的服务。在JAVA中,Selector类提供了这种抽象,拥有询问通道是否已经准备好执行每个I/0操作的能力,所以可以利用选择器来很好地解决以上问题。
4.1、 Selector简介
选择器提供选择执行已经就绪的任务的能力.从底层来看,Selector提供了询问通道是否已经准备好执行每个I/O操作的能力。Selector 允许单线程处理多个Channel。仅用单个线程来处理多个Channels的好处是,只需要更少的线程来处理通道。事实上,可以只用一个线程处理所有的通道,这样会大量的减少线程之间上下文切换的开销。
在开始之前,需要回顾一下Selector、SelectableChannel和SelectionKey:
选择器(Selector)
Selector选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。当这么做的时候,可以选择将被激发的线程挂起,直到有就绪的的通道。
可选择通道(SelectableChannel)
SelectableChannel这个抽象类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的父类。因为FileChannel类没有继承SelectableChannel因此是不是可选通道,而所有socket通道都是可选择的,包括从管道(Pipe)对象的中获得的通道。SelectableChannel可以被注册到Selector对象上,同时可以指定对那个选择器而言,那种操作是感兴趣的。一个通道可以被注册到多个选择器上,但对每个选择器而言只能被注册一次。
选择键(SelectionKey)
选择键封装了特定的通道与特定的选择器的注册关系。选择键对象被SelectableChannel.register()返回并提供一个表示这种注册关系的标记。选择键包含了两个比特集(以整数的形式进行编码),指示了该注册关系所关心的通道操作,以及通道已经准备好的操作。
4.2、 Selector的使用
(1)创建Selector
Selector对象是通过调用静态工厂方法open()来实例化的,如下:
Selector Selector=Selector.open();
类方法open()实际上向SPI1发出请求,通过默认的SelectorProvider对象获取一个新的实例。
(2)将Channel注册到Selector
要实现Selector管理Channel,需要将channel注册到相应的Selector上,如下:
channel.configureBlocking(false);
SelectionKey key= channel.register(selector,SelectionKey,OP_READ);
通过调用通道的register()方法会将它注册到一个选择器上。与Selector一起使用时,Channel必须处于非阻塞模式下,否则将抛出IllegalBlockingModeException异常,这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式,而套接字通道都可以。另外通道一旦被注册,将不能再回到阻塞状态,此时若调用通道的configureBlocking(true)将抛出BlockingModeException异常。
register()方法的第二个参数是“interest集合”,表示选择器所关心的通道操作,它实际上是一个表示选择器在检查通道就绪状态时需要关心的操作的比特掩码。比如一个选择器对通道的read和write操作感兴趣,那么选择器在检查该通道时,只会检查通道的read和write操作是否已经处在就绪状态。
它有以下四种操作类型:
- Connect 连接
- Accept 接受
- Read 读
- Write 写
需要注意并非所有的操作在所有的可选择通道上都能被支持,比如ServerSocketChannel支持Accept,而SocketChannel中不支持。我们可以通过通道上的validOps()方法来获取特定通道下所有支持的操作集合。
Java中定义了四个常量来表示这四种操作类型:
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE
如果Selector对通道的多操作类型感兴趣,可以用“位或”操作符来实现:int interestSet=SelectionKey.OP_READ|SelectionKey.OP_WRITE;
当通道触发了某个操作之后,表示该通道的某个操作已经就绪,可以被操作。因此,某个SocketChannel成功连接到另一个服务器称为“连接就绪”(OP_CONNECT)。一个ServerSocketChannel准备好接收新进入的连接称为“接收就绪”(OP_ACCEPT)。一个有数据可读的通道可以说是“读就绪”(OP_READ)。等待写数据的通道可以说是“写就绪”(OP_WRITE)。
我们注意到register()方法会返回一个SelectionKey对象,我们称之为键对象。该对象包含了以下四种属性:
- interest集合
- read集合
- Channel
- Selector
interest集合是Selector感兴趣的集合,用于指示选择器对通道关心的操作,可通过SelectionKey对象的interestOps()获取。最初,该兴趣集合是通道被注册到Selector时传进来的值。该集合不会被选择器改变,但是可通过interestOps()改变。我们可以通过以下方法来判断Selector是否对Channel的某种事件感兴趣:
int interestSet=selectionKey.interestOps();
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
read集合是通道已经就绪的操作的集合,表示一个通道准备好要执行的操作了,可通过SelctionKey对象的readyOps()来获取相关通道已经就绪的操作。它是interest集合的子集,并且表示了interest集合中从上次调用select()以后已经就绪的那些操作。(比如选择器对通道的read,write操作感兴趣,而某时刻通道的read操作已经准备就绪可以被选择器获知了,前一种就是interest集合,后一种则是read集合。)。JAVA中定义以下几个方法用来检查这些操作是否就绪:
//int readSet=selectionKey.readOps();
selectionKey.isAcceptable();//等价于selectionKey.readyOps()&SelectionKey.OP_ACCEPT
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
需要注意的是,通过相关的选择键的readyOps()方法返回的就绪状态指示只是一个提示,底层的通道在任何时候都会不断改变,而其他线程也可能在通道上执行操作并影响到它的就绪状态。另外,我们不能直接修改read集合。
取出SelectionKey所关联的Selector和Channel
通过SelectionKey访问对应的Selector和Channel:
Channel channel =selectionKey.channel();
Selector selector=selectionKey.selector();
关于取消SelectionKey对象的那点事
我们可以通过SelectionKey对象的cancel()方法来取消特定的注册关系。该方法调用之后,该SelectionKey对象将会被”拷贝”至已取消键的集合中,该键此时已经失效,但是该注册关系并不会立刻终结。在下一次select()时,已取消键的集合中的元素会被清除,相应的注册关系也真正终结。
(3)为SelectionKey绑定附加对象
可以将一个或者多个附加对象绑定到SelectionKey上,以便容易的识别给定的通道。通常有两种方式:
1 在注册的时候直接绑定:
SelectionKey key=channel.register(selector,SelectionKey.OP_READ,theObject);
2 在绑定完成之后附加:
selectionKey.attach(theObject);//绑定
绑定之后,可通过对应的SelectionKey取出该对象:
selectionKey.attachment();。
如果要取消该对象,则可以通过该种方式:
selectionKey.attach(null).
需要注意的是如果附加的对象不再使用,一定要人为清除,因为垃圾回收器不会回收该对象,若不清除的话会成内存泄漏。
一个单独的通道可被注册到多个选择器中,有些时候我们需要通过isRegistered()方法来检查一个通道是否已经被注册到任何一个选择器上。 通常来说,我们并不会这么做。
(4)通过Selector选择通道
我们知道选择器维护注册过的通道的集合,并且这种注册关系都被封装在SelectionKey当中。接下来我们简单的了解一下Selector维护的三种类型SelectionKey集合:
已注册的键的集合(Registered key set)
所有与选择器关联的通道所生成的键的集合称为已经注册的键的集合。并不是所有注册过的键都仍然有效。这个集合通过keys()方法返回,并且可能是空的。这个已注册的键的集合不是可以直接修改的;试图这么做的话将引发java.lang.UnsupportedOperationException。
已选择的键的集合(Selected key set)
已注册的键的集合的子集。这个集合的每个成员都是相关的通道被选择器(在前一个选择操作中)判断为已经准备好的,并且包含于键的interest集合中的操作。这个集合通过selectedKeys()方法返回(并有可能是空的)。
不要将已选择的键的集合与ready集合弄混了。这是一个键的集合,每个键都关联一个已经准备好至少一种操作的通道。每个键都有一个内嵌的ready集合,指示了所关联的通道已经准备好的操作。键可以直接从这个集合中移除,但不能添加。试图向已选择的键的集合中添加元素将抛出java.lang.UnsupportedOperationException。
已取消的键的集合(Cancelled key set)
已注册的键的集合的子集,这个集合包含了cancel()方法被调用过的键(这个键已经被无效化),但它们还没有被注销。这个集合是选择器对象的私有成员,因而无法直接访问。
在刚初始化的Selector对象中,这三个集合都是空的。通过Selector的select()方法可以选择已经准备就绪的通道(这些通道包含你感兴趣的的事件)。比如你对读就绪的通道感兴趣,那么select()方法就会返回读事件已经就绪的那些通道。下面是Selector几个重载的select()方法:
select():阻塞到至少有一个通道在你注册的事件上就绪了。
select(long timeout):和select()一样,但最长阻塞事件为timeout毫秒。
selectNow():非阻塞,只要有通道就绪就立刻返回。
select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。
一旦调用select()方法,并且返回值不为0时,则可以通过调用Selector的selectedKeys()方法来访问已选择键集合。如下:
Set selectedKeys=selector.selectedKeys();
进而可以放到和某SelectionKey关联的Selector和Channel。如下所示:
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
关于Selector执行选择的过程
我们知道调用select()方法进行通道,现在我们再来深入一下选择的过程,也就是select()执行过程。当select()被调用时将执行以下几步:
- 首先检查已取消键集合,也就是通过cancle()取消的键。如果该集合不为空,则清空该集合里的键,同时该集合中每个取消的键也将从已注册键集合和已选择键集合中移除。(一个键被取消时,并不会立刻从集合中移除,而是将该键“拷贝”至已取消键集合中,这种取消策略就是我们常提到的“延迟取消”。)
- 再次检查已注册键集合(准确说是该集合中每个键的interest集合)。系统底层会依次询问每个已经注册的通道是否准备好选择器所感兴趣的某种操作,一旦发现某个通道已经就绪了,则会首先判断该通道是否已经存在在已选择键集合当中,如果已经存在,则更新该通道在已注册键集合中对应的键的ready集合,如果不存在,则首先清空该通道的对应的键的ready集合,然后重设ready集合,最后将该键存至已注册键集合中。这里需要明白,当更新ready集合时,在上次select()中已经就绪的操作不会被删除,也就是ready集合中的元素是累积的,比如在第一次的selector对某个通道的read和write操作感兴趣,在第一次执行select()时,该通道的read操作就绪,此时该通道对应的键中的ready集合存有read元素,在第二次执行select()时,该通道的write操作也就绪了,此时该通道对应的ready集合中将同时有read和write元素。
深入已注册键集合的管理
到现在我们已经知道一个通道的的键是如何被添加到已选择键集合中的,下面我们来继续了解对已选择键集合的管理 。首先要记住:选择器不会主动删除被添加到已选择键集合中的键,而且被添加到已选择键集合中的键的ready集合只能被设置,而不能被清理。如果我们希望清空已选择键集合中某个键的ready集合该怎么办?我们知道一个键在新加入已选择键集合之前会首先置空该键的ready集合,这样的话我们可以人为的将某个键从已注册键集合中移除最终实现置空某个键的ready集合。被移除的键如果在下一次的select()中再次就绪,它将会重新被添加到已选择的键的集合中。这就是为什么要在每次迭代的末尾调用keyIterator.remove()。
(5)停止选择
选择器执行选择的过程,系统底层会依次询问每个通道是否已经就绪,这个过程可能会造成调用线程进入阻塞状态,那么我们有以下三种方式可以唤醒在select()方法中阻塞的线程。
- 通过调用Selector对象的wakeup()方法让处在阻塞状态的select()方法立刻返回
该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。- 通过close()方法关闭Selector**
该方法使得任何一个在选择操作中阻塞的线程都被唤醒(类似wakeup()),同时使得注册到该Selector的所有Channel被注销,所有的键将被取消,但是Channel本身并不会关闭。- 调用interrupt()
调用该方法会使睡眠的线程抛出InterruptException异常,捕获该异常并在调用wakeup()
上面有些人看到“系统底层会依次询问每个通道”时可能在想如果已选择键非常多是,会不会耗时较长?答案是肯定的。但是我想说的是通常你可以选择忽略该过程,至于为什么,后面再说。
5、实例
NIOServer
public class NIOServer {
/* 标识数字 */
private int flag = 1;
/* 缓冲区大小 */
private int blockSize = 4096;
/* 接受数据缓冲区 */
private ByteBuffer sendBuffer = ByteBuffer.allocate(blockSize);
/* 发送数据缓冲区 */
private ByteBuffer receiveBuffer = ByteBuffer.allocate(blockSize);
private Selector selector;
public NIOServer(int port) throws IOException {
//打开服务器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//服务器配置为阻塞
serverSocketChannel.configureBlocking(false);
//检索与此通道关联的服务器套接字
ServerSocket serverSocket = serverSocketChannel.socket();
//进行服务的绑定
serverSocket.bind(new InetSocketAddress(port));
//通过open()方法接到Selector
selector = Selector.open();
//注册selector,等待连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start--->" + port);
}
public static void main(String[] args) throws IOException, InterruptedException {
int port = 9000;
NIOServer nioServer = new NIOServer(port);
nioServer.listen();
}
//监听
public void listen() throws IOException, InterruptedException {
while (true) {
//选择一组件,并且相应的通道已经打开
selector.select();
//返回此选择器的已选择键集
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
//具体业务逻辑
handleKey(selectionKey);
}
sleep(1000);
}
}
//处理请求
public void handleKey(SelectionKey selectionKey) throws IOException {
//接收请求
ServerSocketChannel server = null;
SocketChannel client = null;
String receiveText;
String sendText;
int count = 0;
// 测试此键的通道是否已经准备好接收新的套接字连接
if (selectionKey.isAcceptable()) {
//返回为之创建此键的通道
server = (ServerSocketChannel) selectionKey.channel();
//接受到此通道套接字的连接
//此方法返回的套接字通道(如果有)将处于阻塞模式。
client = server.accept();
//配置为阻塞
client.configureBlocking(false);
//注册selector,等待连接
client.register(selector, SelectionKey.OP_WRITE);
} else if (selectionKey.isReadable()) {
client = (SocketChannel) selectionKey.channel();
// 将缓冲区清空以备下次读取
receiveBuffer.clear();
// 读取哭护短发送来的数据到缓冲区中
count = client.read(receiveBuffer);
if (count > 0) {
receiveText = new String(receiveBuffer.array(), 0, count);
System.out.println("服务端接收到的客户信息:" + receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
}else if (selectionKey.isWritable()) {
// 将缓冲区清空以备下次写入
sendBuffer.clear();
client = (SocketChannel) selectionKey.channel();
//发送数据
sendText = "msg send to client:" + flag++;
// 向缓冲区中输入数据
sendBuffer.put(sendText.getBytes());
// 将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向哭护短,就要复位
sendBuffer.flip();
// 输出到通道
client.write(sendBuffer);
System.out.println("服务端发送数据给客户端:" + sendText);
client.register(selector, SelectionKey.OP_WRITE);
}
}
}
NIOClient
public class NIOClient {
private final static InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 9000);
/* 表示数字 */
private static int flag = 1;
/* 缓冲区大小 */
private static int blockSize = 4096;
/* 接收缓冲区 */
private static ByteBuffer sendBuffer = ByteBuffer.allocate(blockSize);
/* 发送缓存区 */
private static ByteBuffer receiveBuffer = ByteBuffer.allocate(blockSize);
public static void main(String[] args) throws IOException {
//打开socket通道
SocketChannel socketChannel = SocketChannel.open();
//设置为阻塞方式
socketChannel.configureBlocking(false);
//打开选择器
Selector selector = Selector.open();
//注册连接服务端socket动作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
//连接
socketChannel.connect(serverAddress);
Set<SelectionKey> selectionKeys;
Iterator<SelectionKey> iterator;
SelectionKey selectionKey;
SocketChannel client;
String receiveTest;
String sendText;
int count = 0;
while (true) {
// 连接一组键,其相应的通道已为I/O操作准备就绪
// 此方法执行处于阻塞模式的选择操作
selector.select();
// 返回此选择器的已选集键集
selectionKeys = selector.selectedKeys();
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
System.out.println("client connect");
client = (SocketChannel) selectionKey.channel();
// 判断此通道上是否正在进行连接操作
// 完成套接字通道的连接过程。
if (client.isConnectionPending()) {
client.finishConnect();
System.out.println("客户端完成连接操作!");
sendBuffer.clear();
sendBuffer.put("hello,Server".getBytes());
sendBuffer.flip();
client.write(sendBuffer);
}
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
client = (SocketChannel) selectionKey.channel();
// 读取服务器发送来的数据到缓存区中
receiveBuffer.clear();
count = client.read(receiveBuffer);
if (count > 0) {
receiveTest = new String(receiveBuffer.array(), 0, count);
System.out.println("客户端接收到服务端的数据:" + receiveTest);
client.register(selector, SelectionKey.OP_READ);
}
} else if (selectionKey.isWritable()) {
sendBuffer.clear();
client = (SocketChannel) selectionKey.channel();
sendText = "Msg send to Server->" + flag++;
sendBuffer.put(sendText.getBytes());
sendBuffer.flip();
client.write(sendBuffer);
System.out.println("客户端发送数据给服务器端");
client.register(selector, SelectionKey.OP_READ);
}
}
selectionKeys.clear();
}
}
}