前言
上一章节《Netty 源码解析系列-服务端启动流程解析》
我们完成了服务端启动,那么服务端启动完成后,客户端接入以及读I/O 事件是怎么哪里开始的?以及 netty 的 boss 线程接收到客户端 TCP 连接请求后如何将链路注册到 worker 线程池?带着这些疑问,我们开始客户端连接接入及读写 I/O 解析。
1.NioEventLoop run()开始
processSelectedKeys();
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
根据 selectedKeys 是否为空,判断是否采用优化后的 selectedKeys ,进到 processSelectedKeysOptimized。
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
...
}
}
k.attachment() 获取附加的对象,那我们是在哪里附加上去的呢?上一篇《Netty 源码解析-服务端启动流程解析》注册时 attach 上去的对象,其实就是 NioServerSocketChannel 自身。
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
...
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
...
}
}
我们再回到 k.attachment() ,在取出附加对象后,判断类型是否为 AbstractNioChannel ,从这里我们可以看到,不是附加 AbstractNioChannel 类型,那么就是附加的 NioTask 对象,在这里我们只看关于 AbstractNioChannel 的,进到 processSelectedKey() 方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
...
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
...
}
当操作类型是读操作或者连接操作,进入 unsafe.read() ,有两个类实现了这个方法,一个是 AbstractNioByteChannel 的内部类 NioByteUnsafe ,一个是 AbstractNioMessageChannel 的内部类 NioMessageUnsafe ,这两个类都是 NioUnsafe 实现类 AbstractNioChannel 的子类,那到底是哪一个子类?我们看看 NioServerSocketChannel 创建时是创建的 NioByteUnsafe 还是 NioMessageUnsafe。
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
}
public abstract class AbstractNioChannel extends AbstractChannel {
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
}
NioServerSocketChannel 是 AbstractNioMessageChannel 的子类,AbstractNioMessageChannel 是 AbstractNioChannel 的子类,newUnsafe() 是 AbstractChannel 的抽象方法,那么我们从这里就知道,AbstractNioMessageChannel 实现了 AbstractChannel的newUnsafe() 抽象方法,由此判断,我们选择 AbstractNioMessageChannel 的内部类 NioMessageUnsafe 的 read()。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
...
for (;;) {
int localRead = doReadMessages(readBuf);
...
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
...
}
这里分两部分,一个是处理消息,一个是处理事件。
1.处理消息
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
...
buf.add(new NioSocketChannel(this, ch));
return 1;
...
}
接受了一个客户端 SocketChannel,封装到NioSocketChannel,添加到list集合中,我们看看new NioSocketChannel()。
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
}
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioByteUnsafe();
}
protected class NioByteUnsafe extends AbstractNioUnsafe {
@Override
public final void read() {
...
}
}
}
AbstractNioByteChannel 也继承了 AbstractNioChannel ,并实现了 newUnsafe() 方法,由此我们可以推断出当客户端第一次连接时,走的是 AbstractNioMessageChannel 的子类 NioMessageUnsafe的read() ,当客户端发送数据时,走的是 AbstractNioByteChannel 的内部类 AbstractNioUnsafe 的 read() 方法。
2.处理事件
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
@Override
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(msg);
}
});
}
return this;
}
从next的debug可以看出,当前handler是ServerBootstrapAcceptor这个处理器来处理ChannelRead() 方法,如果看了上一篇《Netty 源码解析-服务端启动流程解析》就会知道,这是在init() 方法中pipeline.addLast(new ServerBootstrapAcceptor())。为什么不是p.addLast(new ChannelInitializer<Channel>())? 因为在ChannelInitializer.channelRegistered() 会删除当前initChannel 处理器。
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
我们继续看ServerBootstrapAcceptor的ChannelRead() 方法。
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
这里分三个步骤
(1) 将childHandler添加到处理器上,这个从哪里来?就是从最开始设置serverBootstrap.childHandler(new IOChannelInitialize())。
(2) 设置一些参数。
(3) work线程池register客户端的channel。
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public EventExecutor next() {
return chooser.next();
}
private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}
从work线程池选一个线程来执行register。
@Override
public ChannelFuture register(Channel channel) {
return register(channel, new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
...
channel.unsafe().register(this, promise);
return promise;
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
@Override
protected void doRegister() throws Exception {
...
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
...
}
后面的流程和上一篇《Netty 源码解析-服务端启动流程解析》
的注册流程是一样的,区别在于服务启动时注册是在boss线程池任务队列中执行注册,客户端新接入注册是在work线程池任务队列中执行register0() 方法,并将work线程池的selector注册到Java NIO
到这里,我们就可以回答开篇的的几个问题:客户端是如何接入?netty的boss线程接收到客户端TCP连接请求后如何将链路注册到worker线程池?
现在我们还剩下一个问题:读写I/O事件是怎么哪里开始的?
我们回到文章开头
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
...
}
}
前面boss线程池在这里完成了客户端连接接入,并将链路注册到worker线程池任务队列,添加了read事件的监听,那么现在work线程不停循环selectedKeys中有没有待处理的事件,当有待处理事件,那么会执行processSelectedKey() 方法。
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
...
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
...
}
...
}
在这里unsafe.read() 选择AbstractNioByteChannel的read()。
@Override
public final void read() {
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int totalReadAmount = 0;
boolean readPendingReset = false;
do {
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
byteBuf = null;
close = localReadAmount < 0;
break;
}
if (!readPendingReset) {
readPendingReset = true;
setReadPending(false);
}
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
if (!config.isAutoRead()) {
break;
}
if (localReadAmount < writable) {
break;
}
} while (++ messages < maxMessagesPerRead);
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} finally {
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}
}
把这一大段代码分解成几部分
1.设置循环读,16次,未读完则会等到下一轮select 继续读取,maxMessagesPerRead默认等于16。
2.获取缓存操作handler,config.getRecvByteBufAllocator().newHandle()。
3.申请缓存空间,allocHandle.allocate(allocator)。
4.从socket中读取数据到byteBuf中。
5.传递读事件到下一个handler处理器。
6.读完之后发送读完时间到下一个handler处理器
我们只看读事件,其他细节后面的文章再详细解析。
@Override
public ChannelPipeline fireChannelRead(Object msg) {
head.fireChannelRead(msg);
return this;
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
if (msg == null) {
throw new NullPointerException("msg");
}
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(msg);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(msg);
}
});
}
return this;
}
Handler事件顺序是HeadContextHandler --> IdleStateHandler -->IOHandler --> TailContext
private void invokeChannelRead(Object msg) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
进到IdleStateHandler
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
设置读事件为true,为后面状态检测做准备,继续向下传递读事件,这次是IOHandler的读事件。
public class IOHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
System.out.println(msg.toString());
}
...
}
交给用户自定义handler处理读事件,自此读I/O事件是怎么哪里开始,如何交给用户handler处理已解析完毕。
总结:
1.boss线程处理NioServerSocketChannel的accept事件,并将客户端添加到work任务队列,任务队列执行redister0()方法, 将read事件注册到work线程的selector。
2.work线程轮询selectkeys,当有事件上来时,将缓存数据发送到用户handler 。