当selector监听到有数据可读的时候,必然是通过channel去读取数据的。
所以入口还是channel的read方法。
由于我们使用的是NioSocketChannel.所以就关注一下NioSocketChannel的read方法。
最后定位到,read方法的实现,是在NioSocketChannel的父类AbstractNioByteChannel中。
代码如下
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
.............通过pipeline去传播read事件。
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
................读取完成后,通过pipeline去传播read完成事件。
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
}
从上面代码可以大概猜出,先传播读的事件,后续传播读完成的事件。
从代码上可以看出是通过channel的pipeline发起的。因此需要关注Pipeline中对于read事件的处理。
由于pipeline的类是DefaultChannelPipeline。所以看看DefaultChannelPipeline中的实现。
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
head实现了channelRead方法。
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
}
从上面代码看到,read是headContext发起的。最后通过headContext中实现的channelRead方法去读取数据。所以接下来,我们看看head是如何实现channelRead方法的。
public class DefaultChannelPipeline implements ChannelPipeline {
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx的类型是AbstractChannelHandlerContext,父类中的实现都是往下传递。所以接下来往下看
ctx.fireChannelRead(msg);
}
}
}
接下来看AbstractChannelHandlerContext中是如何进行传播的。
从以下代码可以看出。
1.先找到下一个Inbound=true的ctx
2.执行ctx所实现的channelRead方法。
3.ctx的channelRead方法如果是继续传播,则会1,2,3步骤一直递归。当然也可以考虑中断传播,不fire即可。
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
找到下一个inbound属性为true的ctx
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
执行channelRead方法,一般都是进行传播,不进行相关操作。
当然也可以有其他操作,比如像反序列化,反序列化之后再进行传播,这点跟序列化其实是一样的。具体可以看看ByteToMessageDecoder的channelRead方法。
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
}