上一节在Netty server启动的过程中,我们已经清楚了server启动时会为每个Channel分配一个NioEventLoop,NioEventLoop中有一个run方法主要用来监听事件和执行队列中的任务。本篇文章我们主要关心这个run方法是怎么监听事件,监听完事件后又是如何处理的。
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
} catch (Throwable t) {
}
}
}
一些细节我们省去,主要分析processSelectedKeys这个方法是怎么监听事件的,看下这个方法的底层调用。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
}
}
processSelectedKeysOptimized这个方法里有一个for循环来遍历注册到Selector上的SelectionKey,通过SelectionKey拿到注册到Selector上的Channel,本文分析的是accept连接过程,因此这个SelectionKey上的channel类型对应的其实就是NioServerSocketChannel。拿到这个channel之后继续调用processSelectedKey这个方法。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
....
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
通过读取SelectionKey的readyOps方法获取selector上监听的事件类型,我们看到最后一个if条件判断其实就是当监听到的事件类型是OP_READ或者OP_ACCEPT时就调用unsafe.read()这个方法,所以当收到一个连接请求时,就会调用这个方法来处理,首先看下unsafe是在哪初始化的,看下面这句代码。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
....
}
ch的类型的NioServerSocketChannel,通过追溯代码,发现是在server启动初始化创建NioServerSocketChannel对象时初始化的,这个unsafe的具体实现类是NioMessageUnsafe类,我们看下这个类中read方法的具体实现。
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
}
}
}
看下doReadMessages这个方法。
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
第一句就是调用Channel的accept方法,因为调用这个方法时已经收到客户端的连接请求了,所以调用accept方法后会返回一个SocketChannel对象,而不是一直阻塞等待客户端连接,这也就是NIO线程模型的好处,最后将这个SocketChannel对象放在buf这个list集合中。继续回到上面NioMessageUnsafe的read方法实现。
@Override
public void read() {
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
}
}
}
调用doReadMessages将SocketChannel对象放到readBuf这个list集合后,遍历这个list集合,调用pipeline.fireChannelRead(readBuf.get(i))这个方法,看下这个方法接下来会发生什么。
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
首先调用DefaultChannelPipeline的fireChannelRead方法,然后在这个方法里调用AbstractChannelHandlerContext.invokeChannelRead(head, msg),这里的head是一个AbstractChannelHandlerContext对象,msg是调用accept方法返回的一个SocketChannel对象,此后进入AbstractChannelHandlerContext对象中的invokeChannelRead方法。
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
这个方法里继续调用下面这个方法。
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
@Override
public ChannelHandler handler() {
return handler;
}
其中,try语句中的handler()这个方法返回的是一个ChannelHandler对象,这个handler是在之前server启动初始化时的init方法中完成初始化的。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
这个handler对应的就是ServerBootstrapAcceptor这个handler,继续回到上面那个方法,拿到这个handler后调用ServerBootstrapAcceptor这个类的channelRead方法。
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
child这个Channel就是调用accept方法返回的一个NioSocketChannel对象,然后将childHandler放到这个Channel对象的pipeline中,并对这个Channel对象完成参数初始化。
这个方法中比较重要的是childGroup.register(child)这个方法,这个方法和之前在将server启动初始化那一节作用一样,这里是将NioSocketChannel这个Channel注册到childGroup中的一个NioEventLoop中,然后在NioEventLoop中监听这个channel的读事件和写事件。
到这里,accept连接过程就结束了,其实总结起来很简单。
(1)在NioServerSocketChannel对应的NioEventLoop中监听连接事件,监听到连接事件后,返回一个NioSocketChannel对象。
(2)将这个Channel对象注册到childGroup中,由childGroup中的NioEventLoop完成这个channel上的IO事件监听。
讲完了accept连接过程,下一节就分析下客户端发送消息到服务端时整个的消息处理过程。