JavaNio-Selector

一、概览

在这篇文章中,我们将探索一下JavaNIO的Selector组件。selector提供了一个机制,该机制可以监视一个或多个NIO通道,当这些通道上的某些操作已就绪时,可以及时地识别到。

利用这种方式,单个线程就可以管理多个通道,因此也就可以管理多个网络连接。这也为编写高可用高扩展的网络服务器提供了技术保障。

二、 为什么要使用Selector

在Selector的帮助下,我们可以使用一个而非多个线程来管理多个通道。对于操作系统而言,线程间的上下文切换是十分昂贵的,而且使用的线程过多也极大地占据内存空间。

因此,使用的线程数越少越好。然而,需要记住的是,现代操作系统和cpu可以很好地处理多任务,因此,多线程的消耗也在随着时间而减少。

这里,我们将看下我们是如何利用selector来达到一个线程管理多个通道的。

注意: selector不仅可以帮助你读取数据,他们同样可以监听网络连接,并且提供在多个低速通道间写数据。

三、设置

要想使用selector的话,我们并不需要什么特殊的配置。我们所需要的类都在java.nio包下。我们只需要引入我们需要的包就可以啦。
之后,我们就可以把多个channel注册到此selector对象上了。当这些通道上有IO活动发生时,该selector自会提醒我们的。这也就解释了,我们何以能够在依赖于单线程完成从大量数据源中读写数据。

注册到selector上的channel必须是SelectableChannel的子类,因为只有这种类型的channel才能设置为非阻塞模式。

四、创建一个Selector

调用Selector类的open方法即可创建一个selector实例,此时它是用系统默认的selector提供者来创建一个新的selector的。

Selector selector = Selector.open();

五、注册selectable Channel

如果你想要让selector去监听某个通道的话,那么你首先需要把这些需要监听的channel都注册到该selector上。我们可以调用channel的register方法来完成注册。

但是在把某个channel注册到selector上之前,它必须被设置为非阻塞模式(non-blocking mode):

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

当然了,这也就意味着,我们无法把FileChannel和selector放在一起使用,因为FileChannel无法切换到非阻塞模式。通常我们都是把socket channel和selector放在一起使用。

channel.register()方法的第一个参数是我们先前创建的Selector对象,第二个参数是我们所关注的该channel上发生的事件。

我们总共可监听的事件有四种,每一个都是由SelectionKey中的一个常量来表示的:

. Connect
- 当某个客户端试图连接服务器时,就会触发该事件。由SelectionKey.OP_CONNECT来表示
. Accept
- 当服务器接收某个客户端的连接时,就会触发该事件。由SelectionKey.OP_ACCEPT来表示
. Read
- 当服务器已准备好从通道中读取数据时,就会触发该事件。由SelectionKey.OP_READ来表示
. Write
- 当服务器已准备好向该通道中写数据时,就会触发该事件。由SelectionKey.OP_WRITE来表示

方法的返回值是一个SelectionKey对象,这个对象就代表了某个通道注册到selector之后的注册结果。

六、SelectionKey 对象

正如我们在上面所看的那样,当我们把某个channel注册到selector上之后,我们就会得到一个SelectionKey对象,该对象中存储了通道注册的信息。

6.1 事件集

事件集参数定义了我们希望该selector监听此通道上的哪些事件。它是一个integer值,我们可以通过以下方式获取相关信息。

首先,我们可以通过SelectionKey的interestOps方法获取到事件集,然后我们把Selectionkey和此值做"与运算",我们就可以得到一个布尔值来表明此事件是不是我们监听的事件。

```
  int interestSet = selectionKey.interestOps();

  boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
  boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
  boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
  boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

```

6.2 就绪集

就续集定义了某个channel上有哪些事件是准备就绪的。它是一个整型数字。

我们有一种便捷的方式来获取的某个channel的就绪集:

```
  selectionKey.isAcceptable();
  selectionKey.isConnectable();
  selectionKey.isReadable();
  selectionKey.isWriteable();
```

6.3 通道

要想根据SelectionKey访问channel的话,有一个非常简单的方法:

Channel channel = key.channel();

6.4 Selector

从SelectionKey对象上获取对应的Selector对象也非常简单:

Selector selector = key.selector();

6.5 附属对象

我们可以向SelectionKey上附属一个对象,因为有时我们或许给某个channel分配一个自定义ID或者附属某个java对象,以便于可以跟踪该channel的行为情况:
我们可以使用SelectionKey的attach()方法来很方便地做到这一点:

key.attach(object);

Object object = key.attachment();

还有一种方式,就是在channel注册的时候,我们把需要附属的对象作为参数传递给register方法:

SelectionKey key = channel.register(
  selector, SelectionKey.OP_ACCEPT, object);

七、channel key选择

到目前为止,我们已经看到了如何创建一个selector对象,如何向selector上注册channel,以及观察SelectionKey的属性信息。

这才进行到一半,现在我们需要持续地挑选就绪事件:

int channels = selector.select();

此方法是一个阻塞方法,它会一直阻塞住直到有就绪事件到来。方法的返回值代表的是处于就绪状态channel个数。

紧接着,我们通常会获取需要处理的selectionKey

 Set<SelectionKey>  selectionKeys = selector.selectedKeys();

后面,我们只需要遍历这个集合,获取到对应的channel,并执行相应的处理动作即可。

八、完整示例

我们这里有个简单的服务端和客户端的简单示例,用以演示一下,使用selector是如何编写网络程序的。

8.1 服务端程序

public class EchoServer {

    private static final String POISION_PILL = "POISON_PILL";


    public static void main(String[] args) throws IOException {

        Selector selector   = Selector.open();

        ServerSocketChannel  serverSocket = ServerSocketChannel.open();
        serverSocket.bind(new InetSocketAddress("localhost",5454));
        serverSocket.configureBlocking(false);
        SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        ByteBuffer  buffer = ByteBuffer.allocate(256);

        while (true){

            int select = selector.select();// selecting the ready set,this method blocks until at least one channel is ready for an operation

            Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 获取可用于处理的selected keys
            //遍历这些已准备就绪的事件,并处理
            Iterator<SelectionKey> iter = selectedKeys.iterator();
            while (iter.hasNext()){

                SelectionKey key = iter.next();
                //逐一对这些已就绪事件进行处理

                //如果是接收网络连接的事件
                if(key.isAcceptable()){
                    register(selector,serverSocket);
                }

                //如果是可读事件
                if(key.isReadable()){
                    answerWithEcho(buffer,key);
                }
                iter.remove(); //由此可以看出,使用迭代器进行遍历,在遍历时,可以执行移出动作
            }
        }

    }



    private static void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {

        //如果发现,该key是网络套接字接收事件,则接收此客户端连接,并把此客户端连接注册到selector上
        SocketChannel  client = serverSocket.accept();
        client.configureBlocking(false);
        client.register(selector,SelectionKey.OP_READ);
    }

    private static void answerWithEcho(ByteBuffer buffer, SelectionKey key) throws IOException {

        SocketChannel   client = (SocketChannel) key.channel();  //根据key取出对应的channel
        client.read(buffer); //把客户端发来的数据读进buffer中,可以和之前的相比就可看出
                             //之前是一个字节一个字节的以流的形式读取,现在是批量地读进buffer中


        String content = new String(buffer.array());
        if(content.trim().equals(POISION_PILL)){
            //如果收到了"POISION_PILL"字符串的话,则关闭此客户端
            client.close();
            System.out.println("Not accepting client messages anymore");
        }else {
            buffer.flip(); //
            client.write(buffer);
            buffer.clear(); //清空buffer
        }

    }


    public static Process  start() throws IOException,InterruptedException {

        String javaHome = System.getProperty("java.home");
        String  javaBin = javaHome + File.separator + "bin"  + File.separator + "java";
        String classPath= System.getProperty("java.class.path");
        String className= EchoServer.class.getCanonicalName();

        ProcessBuilder  builder = new ProcessBuilder(javaBin,"-cp", classPath,className);

        return builder.start();
    }

}


8.2 客户端程序

public class EchoClient {


    private static SocketChannel  client;
    private static ByteBuffer buffer;
    private static EchoClient instance;



    public static  EchoClient start(){

        if(instance == null){
            instance = new EchoClient();
        }
        return instance;
    }

    public static void stop()throws IOException{
        client.close();
        buffer = null;
    }


    private EchoClient(){

        try{

            client = SocketChannel.open(new InetSocketAddress("localhost",5454));
            buffer = ByteBuffer.allocate(256);
        }catch (IOException e){
            e.printStackTrace();
        }
    }


    public String sendMessage(String msg){

        buffer = ByteBuffer.wrap(msg.getBytes());
        String response = null;

        try{
            client.write(buffer);
            buffer.clear();
            client.read(buffer);
            response = new String(buffer.array()).trim();
            buffer.clear();
        }catch (IOException e){
            e.printStackTrace();
        }
        return response;
    }

}


8.3 测试程序

public class EchoTest {

    Process  server;

    EchoClient  client;


    @Before
    public void setup()throws IOException,InterruptedException {
        server = EchoServer.start();
        client = EchoClient.start();
    }



    @Test
    public void givenServerClient_whenServerEchosMessage_thenCorrect() {

        String resp1 = client.sendMessage("hello");
        String resp2 = client.sendMessage("world");
        System.out.println(resp1);
        System.out.println(resp2);
    }


    @Test
    public void  whenWakeUpCalledOnSelector_thenBlokedThreadReturns() throws Exception {

        Pipe pipe = Pipe.open();
        Selector selector = Selector.open();
        SelectableChannel channel = pipe.source();
        channel.configureBlocking(false);
        channel.register(selector, SelectionKey.OP_READ);

        List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());

        CountDownLatch latch = new  CountDownLatch(1);

        new Thread(()->{

            invocationStepsTracker.add(">> Count down");
            latch.countDown();

            invocationStepsTracker.add(">> Count down");
            latch.countDown();

            try {

                invocationStepsTracker.add(">> Start select");
                selector.select();
                invocationStepsTracker.add(">> End select");
            } catch (IOException e) {
                e.printStackTrace();
            }

        }).start();

        invocationStepsTracker.add(">> Start await");
        latch.await();
        invocationStepsTracker.add(">> End await");
        invocationStepsTracker.add(">> Wakeup thread");
        selector.wakeup();
        // clean up
        channel.close();

        System.out.println("============输出StepTracker中的内容================");
        System.out.println(JSONObject.toJSONString(invocationStepsTracker));
    }

    @After
    public void tearDown()throws IOException {
        server.destroy();
        EchoClient.stop();
    }
}


九、Selector.wakeup()

前面,我们已经讲到了,当我们调用selector.select()方法时,此方法会把当前线程阻塞住直到被监听的channel上有就绪事件
发生。但是我们可以在其他线程中调用selector.wakeup()方法来唤醒被此selector阻塞的线程。

调用selector.wakeup()方法产生的结果是:不管是否有通道处于就绪状态,被阻塞的线程都会立即返回,而非继续等待。

我们可以使用CountDownLatch来演示一下,并跟踪一下代码的执行情况:

@Test
   public void  whenWakeUpCalledOnSelector_thenBlokedThreadReturns() throws Exception {

       Pipe pipe = Pipe.open();
       Selector selector = Selector.open();
       SelectableChannel channel = pipe.source();
       channel.configureBlocking(false);
       channel.register(selector, SelectionKey.OP_READ);

       List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());

       CountDownLatch latch = new  CountDownLatch(1);

       new Thread(()->{

           invocationStepsTracker.add(">> Count down");
           latch.countDown();

           invocationStepsTracker.add(">> Count down");
           latch.countDown();

           try {

               invocationStepsTracker.add(">> Start select");
               selector.select();
               invocationStepsTracker.add(">> End select");
           } catch (IOException e) {
               e.printStackTrace();
           }

       }).start();

       invocationStepsTracker.add(">> Start await");
       latch.await();
       invocationStepsTracker.add(">> End await");
       invocationStepsTracker.add(">> Wakeup thread");
       selector.wakeup();
       // clean up
       channel.close();

       System.out.println("============输出StepTracker中的内容================");
       System.out.println(JSONObject.toJSONString(invocationStepsTracker));
   }

参考文献

  1. baeldung-JavaNIO-selector

  2. Oracle文档Selector

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容