对比着BIO来看,这篇主要是流程整理,了解一个过程
1)初始化
服务端配置加载后创建endpoint
endpoint=new NioEndpoint();
在endpoint初始化方法init中调用了bind方法,有三行比较重要
serverSock = ServerSocketChannel.open();//创建sock文件
serverSock.socket().bind(addr,getBacklog());//创建了sock后绑定监听
selectorPool.open();//Selector.open()实际是不同系统的实现,Linux是EPollSelectorProvider创建的EPollSelectorImpl
需要注意的是,我们运行不同版本不同系统的jvm这个ServerSocketChannel里面加载的东西可能不一样,跨平台的代码有变动很正常
2)启动startInternal
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
startAcceptorThreads();
启动了接收器和两个pollers
poller是一个异步线程来执行轮询请求队列
3)接收accept()
与bio差距不大,在调用accept0本地方法阻塞到有连接后返回一个socket
其中有一个方法有点意思,后来发现这就是nio的中断机制
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (closed)
return;
closed = true;
interrupted = target;
try {
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
//1.因为这里实际是想调用 Thread的blockedOn方法,但是这个方法是package-private需要 invoked via sun.misc.SharedSecrets from java.nio code
//2.设置好打断方法后会在调用线程的interrupt方法时进行对 blocker 调用 interrupt方法
blockedOn(interruptor);//调用 sun.misc.SharedSecrets.get.blockedOn方法解决
//下面这行防止在设置过程中线程就发生了打断
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
4)处理
setSocketOptions就是处理的地方
NioChannel channel = nioChannels.poll();//nioChannels就是一个ConcurrentLinkedQueue,作为channel的缓存池返回队列中的第一个
channel = new NioChannel(socket, bufhandler);//新的请求会创建NioChannel,旧的会channel.reset();
getPoller0().register(channel);//getPoller0获取当前endpoint的poller数,最大是2,循环返回1和2
注册的东西是sock包装后的channel
public void register(final NioChannel socket) {
socket.setPoller(this);
KeyAttachment key = keyCache.poll();//poll出来的是已经处理过后的
final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);//KeyAttachment构造中也是传入的channel
ka.reset(this,socket,getSocketProperties().getSoTimeout());//相当于bio中的wrapper
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
PollerEvent r = eventCache.poll();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
addEvent(r);//往events填PollerEvent
}
public void addEvent(Runnable event) {
events.offer(event);
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();//第一次进来会调用selector.wakeup
}
5)稍加整理了一张处理过程概要图
图中有一个细节没有画出来,在Poller#events方法中,循环PollerEvent调用其run方法处理SelectionKey
6)nio的关键地方
selector.select//判断有多少连接有数据变化
selector.selectedKeys//返回了变化的连接
selector是poller的属性在其构造时创建好的
selector是spi调用不同的实现类,在linux的情况下要看select方法要到EPollSelectorImpl中
select方法调用到了EPollSelectorImpl的doSelect
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();//清除已经被取消掉的SelectionKey,数据在processKey中添加的
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();//会将变化的key记录到selectedKeys在下一个方法返回
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
pollWrapper.poll方法会判断已经注册的fd文件哪些有变化,fd是一个数组,在epollWait调用后将变化的数据放在了pollArrayAddress中
其实现的方式最底层都是epoll_create/epoll_wait/epoll_ctl这些linux系统的方法
(linux的后面补了再上)
再看selectedKeys方法,实际并未调用到底层操作,返回的是上一步select存储在本地的publicSelectedKeys数据
所以整个请求实际上是sockServer监听到了请求,就返回一个sock,然后sock封装为channel再包装到pollerEvent中进行注册,在注册时使用selectedKeys记录对应的请求,后续判断有请求的channel进行处理
然后将有请求的数据进行轮询调用多线程,将有数据的请求才放入线程池执行,这样就提高了线程池的利用率
再有一点就是nio的处理模式,用到了reactor模式,我看的这个版本是7.0.103,用到的是主从reactor模式,acceptor就是主reactor,poller就是从reactor,acceptor负责接收创建sock,监听connect事件,poller负责监听读写事件