nio之selector多线程编程模式

本文的多线程编程模式指的是将处理连接和处理数据读写分别在不同的线程执行,这样每个线程的职责分明,处理连接请求速度很快一般只需要一个线程就可以了,而处理读写请求的一般耗时较长,因而可以使用多个线程来处理。以下,我们的例子中处理连接请求的线程称为boss,处理读写请求的线程称之为worker。当boss接受一个连接后,便将其注册到worker,让worker去处理。

首先,来看下boss代码

class Boss implements Runnable {
    Selector selector;
    private static final int OP = SelectionKey.OP_ACCEPT;
    Worker[] workers;
    ServerSocketChannel ssc;
    private static final int PORT = 8899;

    public Boss(Worker[] workers) {
        this.workers = workers;
    }

    void init() throws IOException {
        selector = Selector.open();
        ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(PORT));
        ssc.register(selector, OP);
    }
    @Override
    public void run() {
        try {
            init();
            int acceptCount = 0;
            for (;;) {
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isAcceptable()) {
                        ssc = (ServerSocketChannel) key.channel();
                        workers[++acceptCount % workers.length].register(ssc.accept());
                    }
                    iterator.remove();
                }
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }
}

boss实现Runnable的好处是,boss本身也可以作为一个任务提交给线程池处理,且boss还持有一个Worker数组的引用,因此boss在接受连接请求后,需要将连接提交给worker进行处理。所以boss的任务只需要负责处理accept事件即可。

接下来,看下worker的代码

class Worker implements Runnable {
    Selector selector;
    private static final int OP = SelectionKey.OP_READ;
    private AtomicBoolean atomicBoolean = new AtomicBoolean(false);
    private Thread thread;
    private static final LinkedBlockingQueue<Runnable> QUEUE = new LinkedBlockingQueue<>();

    private void start() throws IOException {
        selector = Selector.open();
        thread = new Thread(this);
        thread.start();
        isStarted = true;
    }

    protected void register(SocketChannel sc) throws IOException {
        if (!atomicBoolean.get()) {
            atomicBoolean.compareAndSet(false, true);
            start();
        }
        sc.configureBlocking(false);
        QUEUE.offer(() -> {
            try {
                sc.register(selector, OP, null);
            } catch (ClosedChannelException e) {
                e.printStackTrace();
            }
        });
        selector.wakeup();
    }
    @Override
    public void run() {
        try {
            for (;;) {
                int selectCnt = selector.select();
                Runnable task = QUEUE.poll();
                if (task != null) {
                    task.run();
                }
                if (selectCnt == 0) {
                    continue;
                }
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isReadable()) {
                        handleRead(key);
                    } else if (key.isWritable()) {
                        handleWrite(key);
                    }
                    iterator.remove();
                }
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    private void handleRead(SelectionKey key) {
    }

    private void handleWrite(SelectionKey key) {
    }
}

首先来看下worker为何要有isStarted和thread两个成员变量,其实这里也可以去掉这两个,但是就要在创建worker对象时便将其提交给线程或者线程池去执行,也就是说可能并没有boss提交连接让worker处理便开启了线程,浪费CPU资源,因而只需要在有boss提交任务给它后,再开启线程执行。但是每次提交不能都去start一个线程,所以才需要加一个是否started的标志位

然后worker还有一个队列,这个主要是因为boss线程和当前worker执行线程需要进行通讯,因为boss调用register方法时,可能worker的selector正在进行select操作,此时无法将channel注册到selector上,因此需要先将其注册的行为当做一个任务放在队列里面,等worker的selector执行select方法返回后,再从任务列表获取任务来注册channel

最后就是有可能boss调worker的register方法后,除了提交一个任务到队列中,还需要唤醒selector,避免worker的selector一直阻塞在select方法上,无法处理新的连接读取事件,因此需要唤醒,还有一种比较不推荐的方法是,worker的selector调用select方法是有带超时时间的,这样也能避免selector一直阻塞。

本文的的多线程模式只是一个非常简单的例子,只是了解下可以使用多线程来分离selector的职责而已,实际生产中可以参考下netty的使用方式

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容