- Selector
Selector允许一个单一的线程来操作多个Channel。如果我们的应用程序使用了多个Channel,那么使用Selector很方便的实现这样的目的,但是因为在一个线程中使用了多个Channel,因此也会造成每个Channel传输效率的降低。
使用Selector的图解如下:
为了使用Selector,我们首先需要将Channel注册到Selector中,随后调用Selector的select()方法,这个方法会阻塞,直到注册在Selector中的Channel发送可读写事件。当这个方法返回后,当前的这个线程就可以处理Channel的事件了。
-
创建选择器
通过Selector.open()方法,我们可以创建一个选择器:Selector selector = Selector.open();
-
将Channel注册到选择器中
为了使用选择器管理Channel,我们需要将Channel注册到选择器中:channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
注意:如果一个Channel要注册到Selector中,那么这个Channel必须是非阻塞的,即channel.configureBlocking(false);因此Channel必须要是非阻塞的,因此FileChannel是不能够使用选择器的,因此FileChannel都是阻塞的。
在使用Channel.register()方法时,第二个参数指定了我们队Channel的什么类型的事件感兴趣,这些事件有:
Connect,即连接事件(TCP连接),对应于SelectionKey.OP_CONNECT
Accept,即确认时间,对应于SelectionKey.OP_ACCEPT
Read,即读事件,对应于SelectionKey.OP_READ,表示buffer可读
-
Write,即写事件,对应于SelectionKey.OP_WRITE,表示buffer可写
一个Channel发出的一个事件也可以称为对于某个时间,Channel准备好了。因此一个Channel成功连接到了另一个服务器也可以被成为connect ready.
我们可以使用或运算符|来组合多个事件,例如int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
注意,一个Channel仅仅可以被注册到一个Selector一次,如果将Channel注册到Selector多次,那么其实就实现相当于更新SelectionKey的interest set,例如:
channel.register(selector, SelectionKey.OP_READ);
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
上面的channel注册到同一个Selector两次了,那么第二次的注册其实就是相当于更新这个Channel的interest set为SelectionKey.OP_READ | SelectionKey.OP_WRITE.
- 关于SelectionKey
如上所示,当我们使用register注册一个Channel时,会返回一个SelectionKey对象,这个对象包含了如下内容:
- interest set,即我们感兴趣的事件集,即在调用register注册channel时所设置的interest set
- ready set
- channel
- selector
- attached object,可选的附加对象
- interest set
我们可以通过如下方式获取interest set
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
-
ready set
代表了Channel所准备好了的操作
我们可以像判断interest set一样操作ready set,但是我们还尅使用如下方法进行判断int readySet = selectionKey.readyOps(); selectionKey.isAcceptable(); selectionKey.isConnectable(); selectionKey.isReadable(); selectionKey.isWritable();
-
Channel和Selector
我们可以通过SelectionKey获取相对应的Channel和SelectorChannel channel = selectionKey.channel(); Selector selector = selectionKey.selector();
-
Attaching Object
我们可以在selectionKey中附加一个对象selectionKey.attach(theObject); Object attachedObj = selectionKey.attachment();
或者在注册时直接附加:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
通过Selector选择Channel
我们可以通过Selector.select()方法获取对某件事件准备好了的Channel,即如果我们在注册Channel时,对其的可写事件感兴趣,那么当select()返回时,我们就可以获取Channel了
注意:select()方法返回的值表示有多少个Channel可操作。-
获取可操作的Channel
如果select()方法返回值表示有多少个Channel准备好了,那么我们可以通过Selected key set 访问这个ChannelSet<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); }
注意,在每次迭代时,我们都调用“keyIterator.remove()”将这个key从迭代器中删除,因为select()方法仅仅是简单的将就绪的IO操作放到selectedKeys集合中,因此如果我们从selectedKeys获取一个key,但是没有将他删除,那么下一次select时,这个key所对应的IO事件还在selectedKeys中。
例如此时我们收到OP_ACCEPT通知,然后我们进行相应处理,但是并没有将这个key从SelectedKeys中删除,那么下一次select()返回时,我们还可以在SelectedKeys中获取到OP_ACCEPT的key.
注意,我们还可以动态更改selectedKeys中的key 的interest set.例如在OP_ACCEPT中,我们可以降interest set更新为OP_READ,这样Selector就会将这个Channel的读IO就绪事件包含进来了。
Selector的基本使用流程
通过Selector.open()打开一个Selector
-
将Channel注册到Selector中,并设置需要监听的事件(interest set)
3.不断重复:- 调用select()方法
- 调用selector.selectedKey()获取selected keys
- 迭代每个selected key:
- 从selected key中获取对应的Channel和附加信息(如果有的话)
- 判断是那些IO事件已经就绪了,然后处理他们,如果是OP_ACCEPT事件,则调用“SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()”获取SocketChannel,并将他们设置为非阻塞的,然后将这个Channel注册到Selector中
- 根据需要更改selected key的监听事件
- 将已经处理过的key从selected keys集合中删除
关闭Selector
当调用了Selector.close()方法时,我们其实是关闭了Selector本身并且将所有的SelectionKey失效,但是并不会关闭Channel-
完整的Selector例子
public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000; public static void main(String[] args) throws IOException { //打开服务器Socket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //打开Selector Selector selector = Selector.open(); //服务端socket监听8080端口,并配置为非阻塞模式 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false); //将channel注册到selector中 //通常我们都是先注册一个OP_ACCEPT事件,然后在OP_ACCEPT到来时,再将这个Channel的OP_READ注册到Selector中 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { //通过调用select方法,阻塞地等待Channel I/O可操作 if (selector.select(TIMEOUT) == 0) { System.out.println("."); continue; } //获取I/O操作就绪的SelectionKey,通过SelectionKey可以知道哪些Channel在那类I/O操作已经就绪 Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isAcceptable()) { //当OP_ACCEPT事件到来时,我们就有从ServerSocketChannel中获取一个SocketChannel,代表客户端的连接 //注意,在OP_ACCEPT事件中,从key.channel()返回的Channel是ServerSocketChannel, //而在OP_WRITE 和OP_READ中,从key.channel()返回的是SocketChannel SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); //在OP_ACCEPT到来时,再将这个Channel的OP_READ注册到Selector中 //注意,这里我们如果没有设置OP_READ的话,即interest set仍然是OP_CONNECT的话,那么select方法会一直直接返回 clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUF_SIZE)); } if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buf); if (bytesRead == -1) { clientChannel.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); System.out.println("Get data length: " + bytesRead); } } if (key.isValid() && key.isWritable()) { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } } } } }