基于Netty源代码版本:netty-all-4.1.33.Final
ChannelPipeline的结构图
首先每一个Channel在初始化的时候就会创建一个ChannelPipeline,目前ChannelPipeline的实现只有DefaultChannelPipeline一种,所以我们也以DefaultChannelPipeline来分析。DefaultChannelPipeline内部有一个双向链表结构,这个链表的每个节点都是一个AbstractChannelHandlerContext类型的节点,DefaultChannelPipeline刚初始化时就会创建两个初始节点,分别是HeadContext和TailContext,这两个节点也并不完全是标记节点,他们都有各自实际的作用
- HeadContext,实现了bind,connect,disconnect,close,write,flush等等几个方法,基本都是通过直接调用unsafe的相关方法实现的。而对于其他的方法基本都是通过调用AbstractChannelHandlerContext的fire方法将事件传给下一个节点。
- TailContext, 主要用于处理写数据几乎没有实现任何逻辑,它的功能几乎全部继承自AbstractChannelHandlerContext,而AbstractChannelHandlerContext对于大部分事件处理的实现都是简单地将事件向下一个节点传递。注意,这里下一个节点不一定是前一个还是后一个,要根据具体事件类型或者具体的操作而定,对于ChannelOutboundInvoker接口中的方法都是从尾节点向首节点传递事件,而对于ChannelInboundInvoker接口中的方法都是从首节点往尾节点传递。我们可以形象地理解为,首节点是最靠近socket的,而尾节点是最原理socket的,所以有数据进来时,产生的读事件最先从首节点开始向后传递,当有写数据的动作时,则会从尾节点向头结点传递。
下面,我们以两个最重要的事件读事件和写事件,来分析netty的这种链式处理结构到底是怎么运转的。
Unsafe
顾名思义,unsafe是不安全的意思,就是告诉你不要在应用程序里面直接使用Unsafe以及他的衍生类对象。
netty官方的解释如下:
Unsafe operations that should never be called from user-code. These methods are only provided to implement the actual transport, and must be invoked from an I/O thread
Unsafe 在Channel定义,属于Channel的内部类,表明Unsafe和Channel密切相关
下面是unsafe接口的所有方法
interface Unsafe {
RecvByteBufAllocator.Handle recvBufAllocHandle();
SocketAddress localAddress();
SocketAddress remoteAddress();
void register(EventLoop eventLoop, ChannelPromise promise);
void bind(SocketAddress localAddress, ChannelPromise promise);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelPromise promise);
void close(ChannelPromise promise);
void closeForcibly();
void deregister(ChannelPromise promise);
void beginRead();
void write(Object msg, ChannelPromise promise);
void flush();
ChannelPromise voidPromise();
ChannelOutboundBuffer outboundBuffer();
}
按功能可以分为分配内存,Socket四元组信息,注册事件循环,绑定网卡端口,Socket的连接和关闭,Socket的读写,看的出来,这些操作都是和jdk底层相关
Unsafe 继承结构
NioUnsafe 在 Unsafe基础上增加了以下几个接口:
public interface NioUnsafe extends Unsafe {
/**
* Return underlying {@link SelectableChannel}
*/
SelectableChannel ch();
/**
* Finish connect
*/
void finishConnect();
/**
* Read from underlying {@link SelectableChannel}
*/
void read();
void forceFlush();
}
从增加的接口以及类名上来看,NioUnsafe 增加了可以访问底层jdk的SelectableChannel的功能,定义了从SelectableChannel读取数据的read方法
Unsafe的分类
从以上继承结构来看,我们可以总结出两种类型的Unsafe分类,一个是与连接的字节数据读写相关的NioByteUnsafe,一个是与新连接建立操作相关的NioMessageUnsafe
NioByteUnsafe中的读:委托到外部类NioSocketChannel
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
}
最后一行已经与jdk底层以及netty中的ByteBuf相关,将jdk的 SelectableChannel的字节数据读取到netty的ByteBuf中
NioMessageUnsafe中的读:委托到外部类NioSocketChannel
public class NioServerSocketChannel extends AbstractNioMessageChannel
implements io.netty.channel.socket.ServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
}
NioMessageUnsafe 的读操作很简单,就是调用jdk的accept()方法,新建立一条连接
*NioByteUnsafe中的写:委托到外部类NioSocketChannel
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
@Override
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes);
}
}
最后一行已经与jdk底层以及netty中的ByteBuf相关,将netty的ByteBuf中的字节数据写到jdk的 SelectableChannel中
读事件
首先,我们需要找到一个产生读事件并调用相关方法使得读事件开始传递的例子,很自然我们应该想到在EventLoop中会产生读事件。
如下,就是NioEventLoop中对于读事件的处理,通过调用NioUnsafe.read方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
......
// 处理read和accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
......
}
我们继续看NioByteUnsafe.read方法,这个方法我们之前在分析NioEventLoop事件处理逻辑时提到过,这个方法首先会通过缓冲分配器分配一个缓冲,然后从channel(也就是socket)中将数据读到缓冲中,每读一个缓冲,就会触发一个读事件,我们看具体的触发读事件的调用:
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 创建ByteBuf分配器
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 分配一个缓冲
byteBuf = allocHandle.allocate(allocator);
// 将通道的数据读取到缓冲中
allocHandle.lastBytesRead(doReadBytes(byteBuf));
// 如果没有读取到数据,说明通道中没有待读取的数据了,
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
// 因为没读取到数据,所以应该释放缓冲
byteBuf.release();
byteBuf = null;
// 如果读取到的数据量是负数,说明通道已经关闭了
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
// 更新Handle内部的簿记量
allocHandle.incMessagesRead(1);
readPending = false;
// 向channel的处理器流水线中触发一个事件,
// 让取到的数据能够被流水线上的各个ChannelHandler处理
// 触发事件,将会引发pipeline的读事件传播
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
// 这里根据如下条件判断是否继续读:
// 上一次读取到的数据量大于0,并且读取到的数据量等于分配的缓冲的最大容量,
// 此时说明通道中还有待读取的数据
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
NioByteUnsafe 要做的事情可以简单地分为以下几个步骤
- 拿到Channel的config之后拿到ByteBuf分配器,用分配器来分配一个ByteBuf,ByteBuf是netty里面的字节数据载体,后面读取的数据都读到这个对象里面
- 将Channel中的数据读取到ByteBuf
- 数据读完之后,调用 pipeline.fireChannelRead(byteBuf); 从head节点开始传播至整个pipeline
- 最后调用fireChannelReadComplete();
为了代码逻辑的完整性,我这里把整个方法的代码都贴上来,其实我们要关注的仅仅是pipeline.fireChannelRead(byteBuf)这一句,好了,现在我们找到ChannelPipeline触发读事件的入口方法,我们顺着这个方法,顺藤摸瓜就能一步步理清事件的传递过程了。
DefaultChannelPipeline.fireChannelRead
如果我们看一下ChannelPipeline接口,这里面的方法名都是以fire开头的,实际就是想表达这些方法都是触发了一个事件,然后这个事件就会在内部的处理器链表中传递。
我们看到这里调用了一个静态方法,并且以头结点为参数,也就是说事件传递是从头结点开始的。
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
AbstractChannelHandlerContext.invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)
可以看到,这个方法中通过调用invokeChannelRead执行处理逻辑
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
// 维护引用计数,主要是为了侦测资源泄漏问题
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 调用invokeChannelRead执行处理逻辑
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
AbstractChannelHandlerContext.invokeChannelRead(Object msg)
这里可以看到,AbstractChannelHandlerContext通过自己内部的handler对象来实现读数据的逻辑。这也体现了ChannelHandlerContext在整个结构中的作用,其实它是起到了在ChannelPipeline和handler之间的一个中间人的角色,那我们要问:既然ChannelHandlerContext不起什么实质性的作用,那为什么要多这一个中间层呢,这样设计的好处是什么?我认为这样设计其实是为了尽最大可能对使用者屏蔽netty框架的细节,试想如果没有这个context的中间角色,使用者必然要详细地了解ChannelPipeline,并且还要考虑事件传递是找下一个节点,还要考虑下一个节点应该沿着链表的正序找还是沿着链表 倒叙找,所以这里ChannelHandlerContext的角色我认为最大的作用就是封装了链表的逻辑,并且封装了不同类型操作的传播方式。当然也起到了一些引用传递的作用,如channel引用可以简介地传递给用户。
好了,回到正题,从前面的方法中我们知道读事件最先是从HeadContext节点开始的,所以我们看一下HeadContext的channelRead方法(因为HeadContext也实现了handler方法,并且返回的就是自身)
private void invokeChannelRead(Object msg) {
// 如果这个handler已经准备就绪,那么就执行处理逻辑
// 否则将事件传递给下一个处理器节点
if (invokeHandler()) {
try {
// 调用内部的handler的channelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
HeadContext.channelRead
这里的调用也是一个重要的注意点,这里调用了ChannelHandlerContext.fireChannelRead方法,这正是事件传播的方法,fire开头的方法的作用就是将当前的操作(或者叫事件)从当前处理节点传递给下一个处理节点。这样就实现了事件在链表中的传播。
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
......
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
......
}
小结:
到这里我们先暂停一下,总结一下读事件(或者是读操作)在ChannelPipeline内部的传播机制,其实很简单:
- 首先外部调用者会通过unsafe最终调用ChannelPipeline.fireChannelRead方法,并将从channel中读取到的数据作为参数传进来
- 以头结点作为参数调用静态方法AbstractChannelHandlerContext.fireChannelRead
- 然后头结点HeadContext开始调用节点的invokeChannelRead方法(即ChannelHandlerContext的invokeChannelRead方法),
- invokeChannelRead方法会调用当前节点的handler对象的channelRead方法执行处理逻辑
- handler对象的channelRead方法中可以调用AbstractChannelHandlerContext.fireChannelRead将这个事件传递到下一个节点
- 这样事件就能够沿着链条不断传递下去,当然如果业务处理需要,完全可以在某个节点将事件的传递终止,也就是在这个节点不调用ChannelHandlerContext.fireChannelRead
写事件
此外,我们分析一下写数据的操作是怎么传播的。分析写数据操作的入口并不想读事件那么好找,在netty中用户的代码中写数据最终都是被放到内部的缓冲中,当NioEventLoop中监听到底层的socket可以写数据的事件时,实际上是吧当前缓冲中的数据发送到socket中,而对于用户来讲,是接触不到socketChannel这一层的。
根据前面的分析,我们知道,用户一般都会与Channel,ChannelHandler, ChannelhandlerContext这几种类打交道,写数据的操作也是通过Channel的write和writeAndFlush触发的,这两个方法区别在于writeAndFlush在写完数据后还会触发一次刷写操作,将缓冲中的数据实际写入到socket中。
AbstractChannel.write
仍然是将操作交给内部的ChannelPipeline,触发流水线操作
@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
return pipeline.write(msg, promise);
}
DefaultChannelPipeline.write
这里可以很清楚地看出来,写数据的操作从为节点开始,但是TailContext并未重写write方法,所以最终调用的还是AbstractChannelHandlerContext中的相应方法。
我们沿着调用链往下走,发现write系列的方法其实是将写操作传递给了下一个ChannelOutboundHandler类型的处理节点,注意这里是从尾节点向前找,遍历链表的顺序和读数据正好相反。
真正调用
@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
return tail.write(msg, promise);
}
AbstractChannelHandlerContext.write
从这个方法可以明显地看出来,write方法将写操作交给了下一个ChannelOutboundHandler类型的处理器节点。
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
// 沿着链表向前遍历,找到下一个ChannelOutboundHandler类型的处理器节点
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
// 调用AbstractChannelHandlerContext.invokeWriteAndFlush方法执行真正的写入逻辑
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
// 如果当前是异步地写入数据,那么需要将写入的逻辑封装成一个任务添加到EventLoop的任务对队列中
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
AbstractChannelHandlerContext.invokeWrite
我们接着看invokeWrite方法
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
AbstractChannelHandlerContext.invokeWrite0
这里可以清楚地看到,最终是调用了handler的write方法执行真正的写入逻辑,这个逻辑实际上就是有用户自己实现的。
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
// 调用当前节点的handler的write方法执行真正的写入逻辑
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
到这里,我们已经知道写入的操作是怎么从尾节点开始,也知道了通过调用当前处理节点的AbstractChannelHandlerContext.write方法可以将写入操作传递给下一个节点,那么数据经过层层传递后,最终是怎么写到socket中的呢?回答这个问题,我们需要看一下HeadContext的代码!我们知道写入的操作是从尾节点向前传递的,那么头节点HeadContext就是传递的最后一个节点。
HeadContext.write
最终调用了unsafe.write方法。
在AbstractChannel.AbstractUnsafe的实现中,write方法将经过前面一系列处理器处理过的数据存放到内部的缓冲中。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
刷写操作的传递
前面我们提到,写数据的操作除了write还有writeAndFlush,这个操作除了写数据,还会紧接着执行一次刷写操作。刷写操作也会从尾节点向前传递,最终传递到头结点HeadContext,其中的flush方法如下:
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
在AbstractChannel.AbstractUnsafe的实现中,flush操作会将前面存储在内部缓冲区中的数据吸入到socket中,从而完成刷写。
总结:
本节,我们主要通过io事件处理中最重要的两种事件,即读事件和写事件为切入点 详细分析了netty中对于这两种事件的处理方法。其中写数据的事件与我们之前在jdk nio中建立起的印象差别还是不大的,都是对从socket中读取的数据进行处理,但是写事件跟jdk nio中的概念就有较大差别了,因为netty对数据的写入做了很大的改变和优化,用户代码中通过channel调用相关的写数据的方法,这个方法会触发处理器链条上的所有相关的处理器对待写入的数据进行加工,最后在头结点HeadContext中被写入channel内部的缓冲区,通过flush操作将缓冲的数据写入socket中。
这里面最重要的也是最值得我们学习的一点就是责任链模式,显然,这又是一次对责任链模式的成功运用,是的框架的扩展性大大增强,而且面向用户的接口更加容易理解,简单易用,向用户屏蔽了大部分框架实现细节。