Netty源码愫读(三)ChannelPipeline、ChannelHandlerContext相关源码学习

1、Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext关系

四者关系如下图:

关系png.png

Channel:Channel为通信载体,负责底层传输层的具体事件及消息处理,其封装底层处理的复杂性,通过统一接口将事件及消息交给ChannelPipeline处理。

ChannelPipeline:ChannelPipeline为消息的管道,一个Channel对应唯一ChannelPipeline,ChannelPipeline中包含多个ChannelHandlerContext,各个ChannelHandlerContext以链表的形式构成消息处理的责任链,而ChannelPipeline并不对消息做处理,其只是转发给ChannelHandlerContext处理,而ChannelHandlerContext又交给具体的ChannelHandler处理,并将处理后的消息沿着链表转发给下一个ChannelHandlerContext。

ChannelHandlerContext:ChannelHandlerContext为ChannelPipeline和ChannelHandler的上下文,其保存对应的ChannelPipeline及ChannelHandler,并且根据添加顺序,多个ChannelHandlerContext之间构成链表。ChannelHandlerContext提供和ChannelPipeline类似的方法,但调用ChannelHandlerContext上的方法只会从当前的ChannelHandler开始向下一个ChannelHandler传播;而调用ChannelPipeline上的方法会从链表头或尾向下传播。

ChannelHandler:ChannelHandler为具体的消息处理类,其由应用层定义。消息由某个ChannelHandler处理完后,会沿着链表将消息交由下个ChannelHandler处理。

2、ChannelPipeline源码分析

ChannelPipeline类继图:

ChannelPipeline类继图.png

说明:

Iterable:遍历器接口,具体接口为:Iterable<Entry<String, ChannelHandler>>,其提供iterator()、forEach()等方法,用于遍历管道中的ChannelHandler。

ChannelInboundInvoker:管道的入口事件处理接口,对于Channel中的入口事件都是通过此接口进行处理的。消息类型包括:register、unregister、active、inactive、exception、read等;

ChannelOutboundInvoker:管道的出口事件处理接口,对于Channel相关的出口事件都是通过此接口进行处理的。消息类型包括:bind、connect、close、write、flush等。

ChannelPipeline:管道相关操作接口,提供了对管道中的ChannelHandler进行增删改查等接口,包括:addFirst、addLast等。

DefaultChannelPipeline:ChannelPipeline的默认实现类。

2.1、遍历处理器

pipeline提供其对应的Handler的遍历处理接口。Iterable<Entry<String, ChannelHandler>及ChannelPipeline中的部分方法。

2.1.1、方法说明

方法名称 返回值 功能说明
iterator() Iterator<Map.Entry<String, ChannelHandler>> 返回Map.Entry<String, ChannelHandler>的遍历器,此map即为pipeline中所有Handler的name与Handler的map。
names() List<String> 返回所有handler的name的集合
toMap() Map<String, ChannelHandler> 返回handler的name与ChannelHandler的map

2.1.2、方法实现

iterator()及toMap()方法实现:

public final Map<String, ChannelHandler> toMap() {
    Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {
        if (ctx == tail) {
            return map;
        }
        map.put(ctx.name(), ctx.handler());
        ctx = ctx.next;
    }
}

@Override
public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
    return toMap().entrySet().iterator();
}

由以上toMap()实现可知,map中为pipeline中从head到tail的handler的map;

2.2、inbound事件

当发生I/O事件时,如链路建立连接、链路关闭、读取数据完成等,都会产生一个事件,事件在pipeline中进行传播和处理,它是实际处理的总入口。netty将有限的网络I/O事件进行统一抽象,ChannelInboundInvoker即为pipeline抽象的入口接口。pipeline中以fireXXX命名的方法都是从I/O线程流向用户业务Handler的inbound消息。

2.2.1、方法说明

方法名称 返回值 功能说明
fireChannelRegistered() ChannelInboundInvoker 当Channel 已经注册到它的EventLoop 并且能够处理I/O 时被调用
fireChannelUnregistered() ChannelInboundInvoker 当Channel 从它的EventLoop 注销并且无法处理任何I/O 时被调用
fireChannelActive() ChannelInboundInvoker 当Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪
fireChannelInactive() ChannelInboundInvoker 当Channel 离开活动状态并且不再连接它的远程节点时被调用
fireExceptionCaught(Throwable cause) ChannelInboundInvoker Channel异常事件
fireUserEventTriggered(Object event) ChannelInboundInvoker 当ChannelnboundHandler.fireUserEventTriggered()方法被调
fireChannelRead(Object msg) ChannelInboundInvoker 当从Channel 读取数据时被调用
fireChannelReadComplete() ChannelInboundInvoker 当Channel上的一个读操作完成时被调用
fireChannelWritabilityChanged() ChannelInboundInvoker 当Channel 的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生OutOfMemoryError)或者可以在Channel 变为再次可写时恢复写入。可以通过调用Channel 的isWritable()方法来检测Channel 的可写性。与可写性相关的阈值可以通过Channel.config().setWriteHighWatesetWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()方法来设置

2.2.2、方法实现

pipeline中inbound事件的处理都非常简单,其主要交由AbstractChannelHandlerContext中对应的静态方法进行处理。

部分处理源码如下:

@Override
public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

@Override
public final ChannelPipeline fireChannelInactive() {
    AbstractChannelHandlerContext.invokeChannelInactive(head);
    return this;
}

@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
    AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
    return this;
}

@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
    AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
    return this;
}

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

@Override
public final ChannelPipeline fireChannelReadComplete() {
    AbstractChannelHandlerContext.invokeChannelReadComplete(head);
    return this;
}

@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
    AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
    return this;
}

2.3、outbound事件

ChannelOutboundInvoker是outbound消息的接口,由用户或者代码发起的I/O操作被称为outbound消息,即为从pipeline中流程的消息的统称。

2.3.1、方法说明

方法名称 返回值 功能说明
bind(SocketAddress localAddress) ChannelFuture 当请求将Channel 绑定到本地地址时被调用,绑定成功或失败都通过ChannelFuture进行通知
connect(SocketAddress remoteAddress) ChannelFuture 当请求将Channel 连接到远程节点时被调用,当连接超时时抛出ConnectTimeoutException,当连接被拒绝时,将抛出ConnectException
connect(SocketAddress remoteAddress, SocketAddress localAddress) ChannelFuture
disconnect() ChannelFuture 当请求将Channel 从远程节点断开时被调用,不论处理成功或失败,都会进行通知
close() ChannelFuture 当请求关闭Channel 时被调用
deregister() ChannelFuture 当请求将Channel 从它的EventLoop 注销时被调用
bind(SocketAddress localAddress, ChannelPromise promise) ChannelFuture 当请求将Channel 绑定到本地地址时被调用
connect(SocketAddress remoteAddress, ChannelPromise promise) ChannelFuture 当请求将Channel 连接到远程节点时被调用
connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) ChannelFuture 当请求将Channel 连接到远程节点时被调用
disconnect(ChannelPromise promise) ChannelFuture 当请求将Channel 从远程节点断开时被调用
close(ChannelPromise promise) ChannelFuture 请求关闭Channel 时被调用
deregister(ChannelPromise promise) ChannelFuture 当请求将Channel 从它的EventLoop 注销时被调用
read() ChannelFuture 当请求从Channel 读取更多的数据时被调用
write(Object msg) ChannelFuture 当请求通过Channel 将数据写到远程节点时被调用
write(Object msg, ChannelPromise promise) ChannelFuture 当请求通过Channel 将数据写到远程节点时被调用
flush() ChannelOutboundInvoker 当请求通过Channel 将入队数据冲刷到远程节点时被调用
writeAndFlush(Object msg, ChannelPromise promise) ChannelFuture 当请求通过Channel 将入队数据冲刷到远程节点时被调用
writeAndFlush(Object msg) ChannelFuture 当请求通过Channel 将入队数据冲刷到远程节点时被调用
newPromise() ChannelPromise 返回一个新的ChannelPromise
newProgressivePromise() ChannelProgressivePromise 返回一个新的ChannelProgressivePromise
newSucceededFuture() ChannelFuture 返回一个已被标记为成功的ChannelFuture,所有与此ChannelFuture绑定的监听器都将被通知,所有阻塞调用也将直接返回
newFailedFuture(Throwable cause) ChannelFuture 返回一个已被标记为失败的ChannelFuture,所有与此ChannelFuture绑定的监听器都将被通知,所有阻塞调用也将直接返回
voidPromise() ChannelPromise 返回一个不同操作也重用的ChannelPromise,但使用有一定限制,需要小心使用

2.3.2、实现源码

实现源码如下:

@Override
public final ChannelFuture bind(SocketAddress localAddress) {
    return tail.bind(localAddress);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress) {
    return tail.connect(remoteAddress);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return tail.connect(remoteAddress, localAddress);
}

@Override
public final ChannelFuture disconnect() {
    return tail.disconnect();
}

@Override
public final ChannelFuture close() {
    return tail.close();
}

@Override
public final ChannelFuture deregister() {
    return tail.deregister();
}

@Override
public final ChannelPipeline flush() {
    tail.flush();
    return this;
}

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

@Override
public final ChannelFuture connect(
        SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, localAddress, promise);
}

@Override
public final ChannelFuture disconnect(ChannelPromise promise) {
    return tail.disconnect(promise);
}

@Override
public final ChannelFuture close(ChannelPromise promise) {
    return tail.close(promise);
}

@Override
public final ChannelFuture deregister(final ChannelPromise promise) {
    return tail.deregister(promise);
}

@Override
public final ChannelPipeline read() {
    tail.read();
    return this;
}

@Override
public final ChannelFuture write(Object msg) {
    return tail.write(msg);
}

@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
    return tail.write(msg, promise);
}

@Override
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
    return tail.writeAndFlush(msg, promise);
}

@Override
public final ChannelFuture writeAndFlush(Object msg) {
    return tail.writeAndFlush(msg);
}

@Override
public final ChannelPromise newPromise() {
    return new DefaultChannelPromise(channel);
}

@Override
public final ChannelProgressivePromise newProgressivePromise() {
    return new DefaultChannelProgressivePromise(channel);
}

@Override
public final ChannelFuture newSucceededFuture() {
    return succeededFuture;
}

@Override
public final ChannelFuture newFailedFuture(Throwable cause) {
    return new FailedChannelFuture(channel, null, cause);
}

@Override
public final ChannelPromise voidPromise() {
    return voidPromise;
}

由以上源码可知,outbound的具体实现都是交由tail(ChannelHandlerContext)来实现的。

2.4、ChannelPipeline链表维护

ChannelPipeline中维护了一个ChannelHandlerContext的链表,I/O事件通过链表在用户的Handler中传播。

2.4.1、链表维护接口

方法名称 返回值 功能说明
addFirst(String name, ChannelHandler handler) ChannelPipeline 将handler添加到pipeline队列的头部
addLast(String name, ChannelHandler handler) ChannelPipeline 将handler添加到pipeline队列的尾部
addBefore(String baseName, String name, ChannelHandler handler) ChannelPipeline 将handler添加到baseName对应的handler之前
addAfter(String baseName, String name, ChannelHandler handler) ChannelPipeline 将handler添加到baseName对应的handler之后
addFirst(ChannelHandler... handlers) ChannelPipeline 按顺序批量添加Handler到队列头部
addLast(ChannelHandler... handlers) ChannelPipeline 按顺序批量添加Handler到队列尾部
remove(ChannelHandler handler) ChannelPipeline 移除handler
remove(String name) ChannelHandler 移除名字为name的handler
remove(Class<T> handlerType) ChannelPipeline 移除类型为handlerType的handler
removeFirst() ChannelPipeline 移除第一个handler
removeLast() ChannelPipeline 移除最后一个handler
replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) ChannelPipeline 用newHandler替换oldHandler
first() ChannelHandler 获取第一个Handler
firstContext() ChannelHandlerContext 获取第一个Context
last() ChannelHandler 获取最后一个Handler
lastContext() ChannelHandlerContext 获取最后一个Context
get(String name) ChannelHandler 通过名字获取Handler
context(ChannelHandler handler) ChannelHandlerContext 通过Handler获取其对应的Context
context(String name) ChannelHandlerContext 通过Handler的名字获取其对应的Context

注:以上接口中添加的头尾不包括head节点和tail节点,这两节点为netty框架的节点,不允许用户修改。

2.4.2、接口实现

以下对主要接口的源码进行分析。

addFirst():

public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        name = filterName(name, handler);

        newCtx = newContext(group, name, handler);

        addFirst0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}
private void addFirst0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext nextCtx = head.next;
    newCtx.prev = head;
    newCtx.next = nextCtx;
    head.next = newCtx;
    nextCtx.prev = newCtx;
}

主要流程为:

  • 检查Handler是否在多个pipeline中重复添加。被注解为@Sharable的Handler是可以在多个pipeline中重复添加的,否则为保证线程安全,不允许在多个pipeline中添加。
  • 检查handler名字是否重复。如果添加时的name为空,则由框架自动生成name,生成规则为:[SimpleName] + "#" + [数字],数字从0累加,知道名字不重复为止。如果添加时的name不空,则检查name是否重复,重复则抛出IllegalArgumentException异常,否则验证通过;
  • 根据pipeline、EventExecutorGroup、name、handler新建一个ChannelHandlerContext;
  • 挑用addFirst0()将新建的context添加到pipeline中head的下一个节点;
  • 若Channel还未在EventLoop中注册,则注册PendingHandlerAddedTask任务,当Channel注册成功时,调用ChannelHandler.handlerAdded()方法;若Channel已经注册成功则直接调用callHandlerAdded0()方法来通过管道调用所有Handler的ChannelHandler.handlerAdded()方法。

addLast():

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

addLast()的实现源码与addFirst基本一样,唯一区别是将handler添加的pipeline的tail节点的前一个节点。

2.5、DefaultChannelPipeline源码分析

DefaultChannelPipeline为ChannelPipeline接口的实现。也定义了Pipeline中的head和tail节点及实现等。

2.5.1、基本属性

private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);

private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
        new FastThreadLocal<Map<Class<?>, String>>() {
    @Override
    protected Map<Class<?>, String> initialValue() throws Exception {
        return new WeakHashMap<Class<?>, String>();
    }
};

private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
        AtomicReferenceFieldUpdater.newUpdater(
                DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;

private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();

private Map<EventExecutorGroup, EventExecutor> childExecutors;
private volatile MessageSizeEstimator.Handle estimatorHandle;
private boolean firstRegistration = true;

/**
 * This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
 * all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
 *
 * We only keep the head because it is expected that the list is used infrequently and its size is small.
 * Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
 * complexity.
 */
private PendingHandlerCallback pendingHandlerCallbackHead;

/**
 * Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
 * change.
 */
private boolean registered;

HEAD_NAME:head对应的Handler的名字;

TAIL_NAME:tail对应的handler的名字;

nameCaches:Handler与其name的map的缓存;

ESTIMATOR:消息中字节大小统计器;

head:pipeline队列的头节点,其是ChannelHandlerContext与ChannelHandler的实现。

tail:pipeline队列的尾节点,其是ChannelHandlerContext与ChannelHandler的实现。

channel:pipeline对应的Channel;

succeededFuture:处理成功的异步结果;

voidPromise:通用的异步处理结果;

childExecutors:子执行器;

estimatorHandle:消息字节大小统计器的处理器;

firstRegistration:是否第一次注册

pendingHandlerCallbackHead:头节点一些事件的异步回调任务;

2.5.2、构造函数

构造函数源码:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

构造函数比较简单,主要新建的succeededFuture和voidPromise异步通知,以及链表的头结点(head)和尾节点(tail)。

2.5.3、HeadContext源码解析

HeadContext为Pipeline的头节点实现,其即时ChannelHandlerContext的实现,也是ChannelHandler的实现。

2.5.3.1、HeadContext类继承关系

HeadContext类继承图:

HeadContext类继承图.png

HeadContext实现ChannelHandler的inbound接口和outbound接口,也实现了ChannelHandlerContext的inbound及outbound接口。

2.5.4、TailContext源码解析

TailContext为pipeline的尾节点实现,其即时ChannelHandlerContext的实现,也是ChannelHandler的实现。

TailContext类继承图:

TailContext类继承图.png

TailContext在实现ChannelHandlerContext接口,同时实现ChannelHandler的inbound接口。

3、ChannelHandlerContext源码解析

ChannelHandlerContext 代表了ChannelHandler 和ChannelPipeline 之间的关联,每当有ChannelHandler 添加到ChannelPipeline 中时,都会创建ChannelHandlerContext。ChannelHandlerContext 的主要功能是管理它所关联的ChannelHandler 和在同一个ChannelPipeline 中的其他ChannelHandler 之间的交互。

ChannelHandlerContext中的一些接口在ChannelPipeline中也有实现,但传播方向有一点重要的不同。如果调用Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个ChannelPipeline 进行传播。而调用位于ChannelHandlerContext上的相同方法,则将从当前所关联的ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个能够处理该事件的ChannelHandler。

ChannelHandlerContext类继承图:

ChannelHandlerContext类继承图.png

ChannelInboundInvoker:是网络I/O的事件的统一抽象,即为inbound事件,方法都以fireXXX开头,pipeline也实现此接口。

ChannelOutboundInvoker:是用户线程或代码发起的I/O操作,被称为outbound事件。

AttributeMap:存储属性键值对;

AbstractChannelHandlerContext:ChannelHandlerContext的抽象实现类,对通用处理进行了处理;

DefaultChannelHandlerContext:ChannelHandlerContext的默认实现,netty框架即使用此实现;

HeadContext/TailContext:pipeline的头结点和尾节点实现;

3.1、AbstractChannelHandlerContext源码分析

AbstractChannelHandlerContext为ChannelHandlerContext的抽象实现。

3.1.1、基本属性

volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;

private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
        AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

/**
 * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
 */
private static final int ADD_PENDING = 1;
/**
 * {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
 */
private static final int ADD_COMPLETE = 2;
/**
 * {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
 */
private static final int REMOVE_COMPLETE = 3;
/**
 * Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
 * nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
 */
private static final int INIT = 0;

private final boolean inbound;
private final boolean outbound;
private final DefaultChannelPipeline pipeline;
private final String name;
private final boolean ordered;

// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture;

// Lazily instantiated tasks used to trigger events to a handler with different executor.
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
private Runnable invokeChannelReadCompleteTask;
private Runnable invokeReadTask;
private Runnable invokeChannelWritableStateChangedTask;
private Runnable invokeFlushTask;

private volatile int handlerState = INIT;

next:pipeline中的下一个ChannelHandlerContext节点;

prev:pipeline中的上一个ChannelHandlerContext节点;

inbound:标识此Context对应的Handler是否为ChannelInboundHandler类型;

outbound:标识此Context对应的Handler是否为ChannelOutboundHandler类型;

pipeline:此Context对应的Pipeline;

name:此Context的名字;

ordered:事件顺序标志;

executor:事件执行线程;

succeededFuture:成功的异步处理结果;

invokeChannelReadCompleteTask:读完成处理任务;

invokeReadTask:读数据任务;

invokeChannelWritableStateChangedTask:Channel写状态变更任务;

invokeFlushTask:冲刷数据任务;

handlerState:当前Handler的状态

handlerState有以下四种状态:

// 初始状态 
private static final int INIT = 0; 
// 对应Handler的handlerAdded方法将要被调用但还未调用 
private static final int ADD_PENDING = 1; 
// 对应Handler的handlerAdded方法被调用 
private static final int ADD_COMPLETE = 2; 
// 对应Handler的handlerRemoved方法被调用 
private static final int REMOVE_COMPLETE = 3;

3.1.1、构造函数

构造函数源码:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

3.1.2、inbound事件

AbstractChannelHandlerContext中对inbound事件的处理大同小异,本处只对fireChannelRegistered进行分析,其他事件处理流程基本相同;

源码:

@Override
public ChannelHandlerContext fireChannelRegistered() {
    invokeChannelRegistered(findContextInbound());
    return this;
}

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}

fireChannelRegistered():此方法主要是找到下个inbound类型的Context,并交由invokeChannelRegistered(final AbstractChannelHandlerContext next):静态方法进行处理;

invokeChannelRegistered(final AbstractChannelHandlerContext next):此静态方法主要判断事件处理是否在执行线程中,是则直接处理;否则异步处理。同时,pipeline中也会调用此方法对注册事件进行传播,pipeline中fireChannelRegistered事件的处理就是调用此静态方法,而参数为HeadContext,即从head节点开始传播注册事件;

invokeChannelRegistered():此方法首先判断Context的Handler是否已经在pipeline中添加完成,完成则直接调用对应Handler的channelRegistered()方法对注册事件进行处理;否则直接调用fireChannelRegistered()将事件交由下个inbound类型的Context处理。

3.1.3、outbound事件

与inbound事件相同,Context的outbound事件的传播流程也大体相同,本处以bind()事件为例进行传播流程的分析,其他事件传播流程类似。

源码:

public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}

从bind()方法可知,其主要查找下一个Context并调用invokeBind()进行处理,而invokeBind()又调用Handler的bind();Handler的bind()通用处理是沿着outbound的Context向head节点传播,其最终调用的是pipeline中head节点的Handler的bind()方法,而head节点的bind的方法会调用底层Channel的Unsafe的bind()方法进行最终的bind()操作。

3.2、DefaultChannelHandlerContext源码分析

DefaultChannelHandlerContext为netty的默认ChannelHandlerContext实现,其实现非常简单。

源码:

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
}

3.2.1、基本属性

handler:context对应的ChannelHandler;

3.2.1、构造函数

构造函数主要通过isInbound()方法和isOutbound()方法判断此ChannelHandler为inbound或outbound处理器。其他处理都交由AbstractChannelHandlerContext。

相关阅读:
Netty源码愫读(一)ByteBuf相关源码学习 【https://www.jianshu.com/p/016daa404957
Netty源码愫读(二)Channel相关源码学习【https://www.jianshu.com/p/02eac974258e
Netty源码愫读(四)ChannelHandler相关源码学习【https://www.jianshu.com/p/6ee0a3b9d73a
Netty源码愫读(五)EventLoop与EventLoopGroup相关源码学习【https://www.jianshu.com/p/05096995d296
Netty源码愫读(六)ServerBootstrap相关源码学习【https://www.jianshu.com/p/a71a9a0291f3

参考书籍:
《Netty实战》
《Netty权威指南》

参考博客:

https://www.jianshu.com/p/4c35541eec10
https://www.jianshu.com/p/0b79872eb515
https://www.jianshu.com/p/a0a51fd79f62
http://www.wolfbe.com/detail/201609/379.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容