tomcat-NIoEndpoint

对比着BIO来看,这篇主要是流程整理,了解一个过程

1)初始化

服务端配置加载后创建endpoint

endpoint=new NioEndpoint();

在endpoint初始化方法init中调用了bind方法,有三行比较重要

serverSock = ServerSocketChannel.open();//创建sock文件

serverSock.socket().bind(addr,getBacklog());//创建了sock后绑定监听

selectorPool.open();//Selector.open()实际是不同系统的实现,Linux是EPollSelectorProvider创建的EPollSelectorImpl

需要注意的是,我们运行不同版本不同系统的jvm这个ServerSocketChannel里面加载的东西可能不一样,跨平台的代码有变动很正常

2)启动startInternal

pollers = new Poller[getPollerThreadCount()];

for (int i=0; i

pollers[i] = new Poller();

Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);

pollerThread.setPriority(threadPriority);

pollerThread.setDaemon(true);

pollerThread.start();

}

startAcceptorThreads();

启动了接收器和两个pollers

poller是一个异步线程来执行轮询请求队列

3)接收accept()

与bio差距不大,在调用accept0本地方法阻塞到有连接后返回一个socket

其中有一个方法有点意思,后来发现这就是nio的中断机制

    protected final void begin() {
        if (interruptor == null) {
            interruptor = new Interruptible() {
                    public void interrupt(Thread target) {
                        synchronized (closeLock) {
                            if (closed)
                                return;
                            closed = true;
                            interrupted = target;
                            try {
                                AbstractInterruptibleChannel.this.implCloseChannel();
                            } catch (IOException x) { }
                        }
                    }};
        }

//1.因为这里实际是想调用 Thread的blockedOn方法,但是这个方法是package-private需要 invoked via sun.misc.SharedSecrets from java.nio code

//2.设置好打断方法后会在调用线程的interrupt方法时进行对 blocker 调用 interrupt方法

blockedOn(interruptor);//调用 sun.misc.SharedSecrets.get.blockedOn方法解决

//下面这行防止在设置过程中线程就发生了打断

Thread me = Thread.currentThread();

if (me.isInterrupted())

interruptor.interrupt(me);

}

4)处理

setSocketOptions就是处理的地方

NioChannel channel = nioChannels.poll();//nioChannels就是一个ConcurrentLinkedQueue,作为channel的缓存池返回队列中的第一个

channel = new NioChannel(socket, bufhandler);//新的请求会创建NioChannel,旧的会channel.reset();

getPoller0().register(channel);//getPoller0获取当前endpoint的poller数,最大是2,循环返回1和2

注册的东西是sock包装后的channel

public void register(final NioChannel socket) {

socket.setPoller(this);

KeyAttachment key = keyCache.poll();//poll出来的是已经处理过后的

final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);//KeyAttachment构造中也是传入的channel

ka.reset(this,socket,getSocketProperties().getSoTimeout());//相当于bio中的wrapper

ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());

ka.setSecure(isSSLEnabled());

PollerEvent r = eventCache.poll();

ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.

if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);

else r.reset(socket,ka,OP_REGISTER);

addEvent(r);//往events填PollerEvent

}

public void addEvent(Runnable event) {

events.offer(event);

if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();//第一次进来会调用selector.wakeup

}

5)稍加整理了一张处理过程概要图

image.png

图中有一个细节没有画出来,在Poller#events方法中,循环PollerEvent调用其run方法处理SelectionKey

6)nio的关键地方

selector.select//判断有多少连接有数据变化

selector.selectedKeys//返回了变化的连接

selector是poller的属性在其构造时创建好的

selector是spi调用不同的实现类,在linux的情况下要看select方法要到EPollSelectorImpl中

select方法调用到了EPollSelectorImpl的doSelect

protected int doSelect(long timeout) throws IOException {

        if (closed)

            throw new ClosedSelectorException();

        processDeregisterQueue();//清除已经被取消掉的SelectionKey,数据在processKey中添加的

        try {

            begin();

            pollWrapper.poll(timeout);

        } finally {

            end();

        }

        processDeregisterQueue();

        int numKeysUpdated = updateSelectedKeys();//会将变化的key记录到selectedKeys在下一个方法返回

        if (pollWrapper.interrupted()) {

            // Clear the wakeup pipe

            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);

            synchronized (interruptLock) {

                pollWrapper.clearInterrupted();

                IOUtil.drain(fd0);

                interruptTriggered = false;

            }

        }

        return numKeysUpdated;

    }

pollWrapper.poll方法会判断已经注册的fd文件哪些有变化,fd是一个数组,在epollWait调用后将变化的数据放在了pollArrayAddress中

其实现的方式最底层都是epoll_create/epoll_wait/epoll_ctl这些linux系统的方法

(linux的后面补了再上)

再看selectedKeys方法,实际并未调用到底层操作,返回的是上一步select存储在本地的publicSelectedKeys数据

所以整个请求实际上是sockServer监听到了请求,就返回一个sock,然后sock封装为channel再包装到pollerEvent中进行注册,在注册时使用selectedKeys记录对应的请求,后续判断有请求的channel进行处理

然后将有请求的数据进行轮询调用多线程,将有数据的请求才放入线程池执行,这样就提高了线程池的利用率

再有一点就是nio的处理模式,用到了reactor模式,我看的这个版本是7.0.103,用到的是主从reactor模式,acceptor就是主reactor,poller就是从reactor,acceptor负责接收创建sock,监听connect事件,poller负责监听读写事件

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容