一、概览
在这篇文章中,我们将探索一下JavaNIO的Selector组件。selector提供了一个机制,该机制可以监视一个或多个NIO通道,当这些通道上的某些操作已就绪时,可以及时地识别到。
利用这种方式,单个线程就可以管理多个通道,因此也就可以管理多个网络连接。这也为编写高可用高扩展的网络服务器提供了技术保障。
二、 为什么要使用Selector
在Selector的帮助下,我们可以使用一个而非多个线程来管理多个通道。对于操作系统而言,线程间的上下文切换是十分昂贵的,而且使用的线程过多也极大地占据内存空间。
因此,使用的线程数越少越好。然而,需要记住的是,现代操作系统和cpu可以很好地处理多任务,因此,多线程的消耗也在随着时间而减少。
这里,我们将看下我们是如何利用selector来达到一个线程管理多个通道的。
注意: selector不仅可以帮助你读取数据,他们同样可以监听网络连接,并且提供在多个低速通道间写数据。
三、设置
要想使用selector的话,我们并不需要什么特殊的配置。我们所需要的类都在java.nio包下。我们只需要引入我们需要的包就可以啦。
之后,我们就可以把多个channel注册到此selector对象上了。当这些通道上有IO活动发生时,该selector自会提醒我们的。这也就解释了,我们何以能够在依赖于单线程完成从大量数据源中读写数据。
注册到selector上的channel必须是SelectableChannel的子类,因为只有这种类型的channel才能设置为非阻塞模式。
四、创建一个Selector
调用Selector类的open方法即可创建一个selector实例,此时它是用系统默认的selector提供者来创建一个新的selector的。
Selector selector = Selector.open();
五、注册selectable Channel
如果你想要让selector去监听某个通道的话,那么你首先需要把这些需要监听的channel都注册到该selector上。我们可以调用channel的register方法来完成注册。
但是在把某个channel注册到selector上之前,它必须被设置为非阻塞模式(non-blocking mode):
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
当然了,这也就意味着,我们无法把FileChannel和selector放在一起使用,因为FileChannel无法切换到非阻塞模式。通常我们都是把socket channel和selector放在一起使用。
channel.register()方法的第一个参数是我们先前创建的Selector对象,第二个参数是我们所关注的该channel上发生的事件。
我们总共可监听的事件有四种,每一个都是由SelectionKey中的一个常量来表示的:
. Connect
- 当某个客户端试图连接服务器时,就会触发该事件。由SelectionKey.OP_CONNECT来表示
. Accept
- 当服务器接收某个客户端的连接时,就会触发该事件。由SelectionKey.OP_ACCEPT来表示
. Read
- 当服务器已准备好从通道中读取数据时,就会触发该事件。由SelectionKey.OP_READ来表示
. Write
- 当服务器已准备好向该通道中写数据时,就会触发该事件。由SelectionKey.OP_WRITE来表示
方法的返回值是一个SelectionKey对象,这个对象就代表了某个通道注册到selector之后的注册结果。
六、SelectionKey 对象
正如我们在上面所看的那样,当我们把某个channel注册到selector上之后,我们就会得到一个SelectionKey对象,该对象中存储了通道注册的信息。
6.1 事件集
事件集参数定义了我们希望该selector监听此通道上的哪些事件。它是一个integer值,我们可以通过以下方式获取相关信息。
首先,我们可以通过SelectionKey的interestOps方法获取到事件集,然后我们把Selectionkey和此值做"与运算",我们就可以得到一个布尔值来表明此事件是不是我们监听的事件。
```
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
```
6.2 就绪集
就续集定义了某个channel上有哪些事件是准备就绪的。它是一个整型数字。
我们有一种便捷的方式来获取的某个channel的就绪集:
```
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWriteable();
```
6.3 通道
要想根据SelectionKey访问channel的话,有一个非常简单的方法:
Channel channel = key.channel();
6.4 Selector
从SelectionKey对象上获取对应的Selector对象也非常简单:
Selector selector = key.selector();
6.5 附属对象
我们可以向SelectionKey上附属一个对象,因为有时我们或许给某个channel分配一个自定义ID或者附属某个java对象,以便于可以跟踪该channel的行为情况:
我们可以使用SelectionKey的attach()方法来很方便地做到这一点:
key.attach(object);
Object object = key.attachment();
还有一种方式,就是在channel注册的时候,我们把需要附属的对象作为参数传递给register方法:
SelectionKey key = channel.register(
selector, SelectionKey.OP_ACCEPT, object);
七、channel key选择
到目前为止,我们已经看到了如何创建一个selector对象,如何向selector上注册channel,以及观察SelectionKey的属性信息。
这才进行到一半,现在我们需要持续地挑选就绪事件:
int channels = selector.select();
此方法是一个阻塞方法,它会一直阻塞住直到有就绪事件到来。方法的返回值代表的是处于就绪状态channel个数。
紧接着,我们通常会获取需要处理的selectionKey
Set<SelectionKey> selectionKeys = selector.selectedKeys();
后面,我们只需要遍历这个集合,获取到对应的channel,并执行相应的处理动作即可。
八、完整示例
我们这里有个简单的服务端和客户端的简单示例,用以演示一下,使用selector是如何编写网络程序的。
8.1 服务端程序
public class EchoServer {
private static final String POISION_PILL = "POISON_PILL";
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.bind(new InetSocketAddress("localhost",5454));
serverSocket.configureBlocking(false);
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(256);
while (true){
int select = selector.select();// selecting the ready set,this method blocks until at least one channel is ready for an operation
Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 获取可用于处理的selected keys
//遍历这些已准备就绪的事件,并处理
Iterator<SelectionKey> iter = selectedKeys.iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
//逐一对这些已就绪事件进行处理
//如果是接收网络连接的事件
if(key.isAcceptable()){
register(selector,serverSocket);
}
//如果是可读事件
if(key.isReadable()){
answerWithEcho(buffer,key);
}
iter.remove(); //由此可以看出,使用迭代器进行遍历,在遍历时,可以执行移出动作
}
}
}
private static void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
//如果发现,该key是网络套接字接收事件,则接收此客户端连接,并把此客户端连接注册到selector上
SocketChannel client = serverSocket.accept();
client.configureBlocking(false);
client.register(selector,SelectionKey.OP_READ);
}
private static void answerWithEcho(ByteBuffer buffer, SelectionKey key) throws IOException {
SocketChannel client = (SocketChannel) key.channel(); //根据key取出对应的channel
client.read(buffer); //把客户端发来的数据读进buffer中,可以和之前的相比就可看出
//之前是一个字节一个字节的以流的形式读取,现在是批量地读进buffer中
String content = new String(buffer.array());
if(content.trim().equals(POISION_PILL)){
//如果收到了"POISION_PILL"字符串的话,则关闭此客户端
client.close();
System.out.println("Not accepting client messages anymore");
}else {
buffer.flip(); //
client.write(buffer);
buffer.clear(); //清空buffer
}
}
public static Process start() throws IOException,InterruptedException {
String javaHome = System.getProperty("java.home");
String javaBin = javaHome + File.separator + "bin" + File.separator + "java";
String classPath= System.getProperty("java.class.path");
String className= EchoServer.class.getCanonicalName();
ProcessBuilder builder = new ProcessBuilder(javaBin,"-cp", classPath,className);
return builder.start();
}
}
8.2 客户端程序
public class EchoClient {
private static SocketChannel client;
private static ByteBuffer buffer;
private static EchoClient instance;
public static EchoClient start(){
if(instance == null){
instance = new EchoClient();
}
return instance;
}
public static void stop()throws IOException{
client.close();
buffer = null;
}
private EchoClient(){
try{
client = SocketChannel.open(new InetSocketAddress("localhost",5454));
buffer = ByteBuffer.allocate(256);
}catch (IOException e){
e.printStackTrace();
}
}
public String sendMessage(String msg){
buffer = ByteBuffer.wrap(msg.getBytes());
String response = null;
try{
client.write(buffer);
buffer.clear();
client.read(buffer);
response = new String(buffer.array()).trim();
buffer.clear();
}catch (IOException e){
e.printStackTrace();
}
return response;
}
}
8.3 测试程序
public class EchoTest {
Process server;
EchoClient client;
@Before
public void setup()throws IOException,InterruptedException {
server = EchoServer.start();
client = EchoClient.start();
}
@Test
public void givenServerClient_whenServerEchosMessage_thenCorrect() {
String resp1 = client.sendMessage("hello");
String resp2 = client.sendMessage("world");
System.out.println(resp1);
System.out.println(resp2);
}
@Test
public void whenWakeUpCalledOnSelector_thenBlokedThreadReturns() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel channel = pipe.source();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(1);
new Thread(()->{
invocationStepsTracker.add(">> Count down");
latch.countDown();
invocationStepsTracker.add(">> Count down");
latch.countDown();
try {
invocationStepsTracker.add(">> Start select");
selector.select();
invocationStepsTracker.add(">> End select");
} catch (IOException e) {
e.printStackTrace();
}
}).start();
invocationStepsTracker.add(">> Start await");
latch.await();
invocationStepsTracker.add(">> End await");
invocationStepsTracker.add(">> Wakeup thread");
selector.wakeup();
// clean up
channel.close();
System.out.println("============输出StepTracker中的内容================");
System.out.println(JSONObject.toJSONString(invocationStepsTracker));
}
@After
public void tearDown()throws IOException {
server.destroy();
EchoClient.stop();
}
}
九、Selector.wakeup()
前面,我们已经讲到了,当我们调用selector.select()方法时,此方法会把当前线程阻塞住直到被监听的channel上有就绪事件
发生。但是我们可以在其他线程中调用selector.wakeup()方法来唤醒被此selector阻塞的线程。
调用selector.wakeup()方法产生的结果是:不管是否有通道处于就绪状态,被阻塞的线程都会立即返回,而非继续等待。
我们可以使用CountDownLatch来演示一下,并跟踪一下代码的执行情况:
@Test
public void whenWakeUpCalledOnSelector_thenBlokedThreadReturns() throws Exception {
Pipe pipe = Pipe.open();
Selector selector = Selector.open();
SelectableChannel channel = pipe.source();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());
CountDownLatch latch = new CountDownLatch(1);
new Thread(()->{
invocationStepsTracker.add(">> Count down");
latch.countDown();
invocationStepsTracker.add(">> Count down");
latch.countDown();
try {
invocationStepsTracker.add(">> Start select");
selector.select();
invocationStepsTracker.add(">> End select");
} catch (IOException e) {
e.printStackTrace();
}
}).start();
invocationStepsTracker.add(">> Start await");
latch.await();
invocationStepsTracker.add(">> End await");
invocationStepsTracker.add(">> Wakeup thread");
selector.wakeup();
// clean up
channel.close();
System.out.println("============输出StepTracker中的内容================");
System.out.println(JSONObject.toJSONString(invocationStepsTracker));
}