Java NIO主要包含三个概念,即缓冲区(Buffer)、通道(Channel)和选择器(Selector)。前面的文章已经介绍了缓冲区和通道,本文则讲述最复杂的选择器Selector。
本文是本系列的第三篇文章,关于缓冲区Buffer可以看第一篇:
Java NIO之Buffer(缓冲区)
关于通道Channel可以看第二篇:
Java NIO 之 Channel(通道)
-
Selector涉及的三个概念
在理解了Buffer和Channel之后,终于来到了最终的解决方案面前,那就是使用Selector来实现单线程控制多路非阻塞IO。Selector是如此重要,可以说它就是NIO异步IO的核心控制器。Selector需要其他两种对象配合使用,即SelectionKey和SelectableChannel,它们之间的关系如下图所示:
image.png
SelectableChannel是一类可以与Selector进行配合的通道,例如Socket相关通道以及Pipe产生的通道都属于SelectableChannel。这类通道可以将自己感兴趣的操作(例如read、write、accept和connect)注册到一个Selector上,并在Selector的控制下进行IO相关操作。
Selector是一个控制器,它负责管理已注册的多个SelectableChannel,当这些通道的某些状态改变时,Selector会被唤醒(从select()方法的阻塞中),并对所有就绪的通道进行轮询操作。
SelectionKey是一个用来记录SelectableChannel和Selector之间关系的对象,它由SelectableChannel的register()方法返回,并存储在Selector的多个集合中。它不仅记录了两个对象的引用,还包含了SelectableChannel感兴趣的操作,即OP_READ,OP_WRITE,OP_ACCEPT和OP_CONNECT。
1.1 register方法
在展示例子代码之前,必须对一些概念和操作进行简要的介绍。首先是SelectableChannel的register方法,它的正式定义为:
SelectionKey register(Selector sel, int ops)
第一个参数指明要注册的Selector,第二个参数指明本通道感兴趣的操作,此参数的取值可以是SelectionKey.OP_ACCEPT等四个,以及它们的逻辑值,例如SelectionKey.OP_READ & SelectionKey.OP_WRITE。方法的返回值是一个SelectionKey,这个对象会被自动加入Selector的keys集合,因此不必特意保留这个SelectionKey的对象引用,需要时可以使用Selector的keys()方法得到所有的SelectionKey对象引用。
注册完成后,该通道就与Selector保持关联了。当通道的状态改变时,其改变会自动被Selector感知,并在Selector的三个集合中反应出来。
1.2 Selector的三个集合
如上图所示,Selector对象会维持三个SelectionKey集合,分别是keys集合,存储了所有与Selector关联的SelectionKey对象;selectedKeys集合,存储了在一次select()方法调用后,所有状态改变的通道关联的SelectionKey对象;cancelledKeys集合,存储了一轮select()方法调用过程中,所有被取消但还未从keys中删除的SelectionKey对象。
其中最值得关注的是selectedKeys集合,它使用Selector对象的selectedKeys()方法获得,并通常会进行轮询处理。
1.3 select方法
Selector类的select()方法是一个阻塞方法,它有两种形式:
int select()
int select(long timeout)
不带参数的方法会一直阻塞,直到至少有一个注册的通道状态改变,才会被唤醒;带有timeout参数的方法会一直阻塞,直到时间耗尽,或者有通道的状态改变。
1.4 轮询处理
在一次select()方法返回后,应对selectedKeys集合中的所有SelectionKey对象进行轮询操作,并在操作完成后手动将SelectionKey对象从selectedKeys集合中删除。
-
Selector代码实例
在展示具体的代码之前,先画一个从《Netty In Action》书上抄来的图:
image.png
服务端代码:
public class SelectorServer {
private static final int PORT = 1234;
private static ByteBuffer buffer = ByteBuffer.allocate(1024);
public static void main(String[] args) {
try {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(PORT));
ssc.configureBlocking(false);
//1.register()
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("REGISTER CHANNEL , CHANNEL NUMBER IS:" + selector.keys().size());
while (true) {
//2.select()
int n = selector.select();
if (n == 0) {
continue;
}
//3.轮询SelectionKey
Iterator<SelectionKey> iterator = (Iterator) selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
//如果满足Acceptable条件,则必定是一个ServerSocketChannel
if (key.isAcceptable()) {
ServerSocketChannel sscTemp = (ServerSocketChannel) key.channel();
//得到一个连接好的SocketChannel,并把它注册到Selector上,兴趣操作为READ
SocketChannel socketChannel = sscTemp.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("REGISTER CHANNEL , CHANNEL NUMBER IS:" + selector.keys().size());
}
//如果满足Readable条件,则必定是一个SocketChannel
if (key.isReadable()) {
//读取通道中的数据
SocketChannel channel = (SocketChannel) key.channel();
readFromChannel(channel);
}
//4.remove SelectionKey
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static void readFromChannel(SocketChannel channel) {
buffer.clear();
try {
while (channel.read(buffer) > 0) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
System.out.println("READ FROM CLIENT:" + new String(bytes));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
首先注册了一个ServerSocketChannel,它用来监听1234端口上的连接;当监听到连接时,把连接上的SocketChannel再注册到Selector上,这些SocketChannel注册的是SelectionKey.OP_READ事件;当这些SocketChannel状态变为可读时,读取数据并显示。
客户端代码:
public class SelectorClient {
static class Client extends Thread {
private String name;
private Random random = new Random(47);
Client(String name) {
this.name = name;
}
@Override
public void run() {
try {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(1234));
while (!channel.finishConnect()) {
TimeUnit.MILLISECONDS.sleep(100);
}
ByteBuffer buffer = ByteBuffer.allocate(1024);
for (int i = 0; i < 5; i++) {
TimeUnit.MILLISECONDS.sleep(100 * random.nextInt(10));
String str = "Message from " + name + ", number:" + i;
buffer.put(str.getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
buffer.clear();
}
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.submit(new Client("Client-1"));
executorService.submit(new Client("Client-2"));
executorService.submit(new Client("Client-3"));
executorService.shutdown();
}
}
客户端创建了三个线程,每个线程创建一个SocketChannel通道,并连接到服务器,并向服务器发送5条消息。
- 小结
Selector是Java NIO的核心概念,以至于一些人直接将NIO称之为Selector-based IO。要学会Selector的使用首先是要明白其相关的多个概念,并多多动手去写。
至此《Java NIO编程实例》系列的三篇就写完了,接下来应该好好介绍一下Netty了,毕竟它才是在具体的Java服务端编程用得最多的框架。Netty克服了NIO中一些概念和设计上的不足之处,提供了更加优雅的解决方案。但是,要学好用好Netty,学习NIO是必经之路,有了NIO的基础,才能真正学好Netty。
参考:
https://blog.csdn.net/LogicTeamLeader/article/details/69666274