1、Channel、ChannelPipeline、ChannelHandler、ChannelHandlerContext关系
四者关系如下图:
Channel:Channel为通信载体,负责底层传输层的具体事件及消息处理,其封装底层处理的复杂性,通过统一接口将事件及消息交给ChannelPipeline处理。
ChannelPipeline:ChannelPipeline为消息的管道,一个Channel对应唯一ChannelPipeline,ChannelPipeline中包含多个ChannelHandlerContext,各个ChannelHandlerContext以链表的形式构成消息处理的责任链,而ChannelPipeline并不对消息做处理,其只是转发给ChannelHandlerContext处理,而ChannelHandlerContext又交给具体的ChannelHandler处理,并将处理后的消息沿着链表转发给下一个ChannelHandlerContext。
ChannelHandlerContext:ChannelHandlerContext为ChannelPipeline和ChannelHandler的上下文,其保存对应的ChannelPipeline及ChannelHandler,并且根据添加顺序,多个ChannelHandlerContext之间构成链表。ChannelHandlerContext提供和ChannelPipeline类似的方法,但调用ChannelHandlerContext上的方法只会从当前的ChannelHandler开始向下一个ChannelHandler传播;而调用ChannelPipeline上的方法会从链表头或尾向下传播。
ChannelHandler:ChannelHandler为具体的消息处理类,其由应用层定义。消息由某个ChannelHandler处理完后,会沿着链表将消息交由下个ChannelHandler处理。
2、ChannelPipeline源码分析
ChannelPipeline类继图:
说明:
Iterable:遍历器接口,具体接口为:Iterable<Entry<String, ChannelHandler>>,其提供iterator()、forEach()等方法,用于遍历管道中的ChannelHandler。
ChannelInboundInvoker:管道的入口事件处理接口,对于Channel中的入口事件都是通过此接口进行处理的。消息类型包括:register、unregister、active、inactive、exception、read等;
ChannelOutboundInvoker:管道的出口事件处理接口,对于Channel相关的出口事件都是通过此接口进行处理的。消息类型包括:bind、connect、close、write、flush等。
ChannelPipeline:管道相关操作接口,提供了对管道中的ChannelHandler进行增删改查等接口,包括:addFirst、addLast等。
DefaultChannelPipeline:ChannelPipeline的默认实现类。
2.1、遍历处理器
pipeline提供其对应的Handler的遍历处理接口。Iterable<Entry<String, ChannelHandler>及ChannelPipeline中的部分方法。
2.1.1、方法说明
方法名称 | 返回值 | 功能说明 |
---|---|---|
iterator() | Iterator<Map.Entry<String, ChannelHandler>> | 返回Map.Entry<String, ChannelHandler>的遍历器,此map即为pipeline中所有Handler的name与Handler的map。 |
names() | List<String> | 返回所有handler的name的集合 |
toMap() | Map<String, ChannelHandler> | 返回handler的name与ChannelHandler的map |
2.1.2、方法实现
iterator()及toMap()方法实现:
public final Map<String, ChannelHandler> toMap() {
Map<String, ChannelHandler> map = new LinkedHashMap<String, ChannelHandler>();
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == tail) {
return map;
}
map.put(ctx.name(), ctx.handler());
ctx = ctx.next;
}
}
@Override
public final Iterator<Map.Entry<String, ChannelHandler>> iterator() {
return toMap().entrySet().iterator();
}
由以上toMap()实现可知,map中为pipeline中从head到tail的handler的map;
2.2、inbound事件
当发生I/O事件时,如链路建立连接、链路关闭、读取数据完成等,都会产生一个事件,事件在pipeline中进行传播和处理,它是实际处理的总入口。netty将有限的网络I/O事件进行统一抽象,ChannelInboundInvoker即为pipeline抽象的入口接口。pipeline中以fireXXX命名的方法都是从I/O线程流向用户业务Handler的inbound消息。
2.2.1、方法说明
方法名称 | 返回值 | 功能说明 |
---|---|---|
fireChannelRegistered() | ChannelInboundInvoker | 当Channel 已经注册到它的EventLoop 并且能够处理I/O 时被调用 |
fireChannelUnregistered() | ChannelInboundInvoker | 当Channel 从它的EventLoop 注销并且无法处理任何I/O 时被调用 |
fireChannelActive() | ChannelInboundInvoker | 当Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪 |
fireChannelInactive() | ChannelInboundInvoker | 当Channel 离开活动状态并且不再连接它的远程节点时被调用 |
fireExceptionCaught(Throwable cause) | ChannelInboundInvoker | Channel异常事件 |
fireUserEventTriggered(Object event) | ChannelInboundInvoker | 当ChannelnboundHandler.fireUserEventTriggered()方法被调 |
fireChannelRead(Object msg) | ChannelInboundInvoker | 当从Channel 读取数据时被调用 |
fireChannelReadComplete() | ChannelInboundInvoker | 当Channel上的一个读操作完成时被调用 |
fireChannelWritabilityChanged() | ChannelInboundInvoker | 当Channel 的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(以避免发生OutOfMemoryError)或者可以在Channel 变为再次可写时恢复写入。可以通过调用Channel 的isWritable()方法来检测Channel 的可写性。与可写性相关的阈值可以通过Channel.config().setWriteHighWatesetWriteHighWaterMark()和Channel.config().setWriteLowWaterMark()方法来设置 |
2.2.2、方法实现
pipeline中inbound事件的处理都非常简单,其主要交由AbstractChannelHandlerContext中对应的静态方法进行处理。
部分处理源码如下:
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
@Override
public final ChannelPipeline fireChannelInactive() {
AbstractChannelHandlerContext.invokeChannelInactive(head);
return this;
}
@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
return this;
}
@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
return this;
}
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
@Override
public final ChannelPipeline fireChannelReadComplete() {
AbstractChannelHandlerContext.invokeChannelReadComplete(head);
return this;
}
@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
return this;
}
2.3、outbound事件
ChannelOutboundInvoker是outbound消息的接口,由用户或者代码发起的I/O操作被称为outbound消息,即为从pipeline中流程的消息的统称。
2.3.1、方法说明
方法名称 | 返回值 | 功能说明 |
---|---|---|
bind(SocketAddress localAddress) | ChannelFuture | 当请求将Channel 绑定到本地地址时被调用,绑定成功或失败都通过ChannelFuture进行通知 |
connect(SocketAddress remoteAddress) | ChannelFuture | 当请求将Channel 连接到远程节点时被调用,当连接超时时抛出ConnectTimeoutException,当连接被拒绝时,将抛出ConnectException |
connect(SocketAddress remoteAddress, SocketAddress localAddress) | ChannelFuture | |
disconnect() | ChannelFuture | 当请求将Channel 从远程节点断开时被调用,不论处理成功或失败,都会进行通知 |
close() | ChannelFuture | 当请求关闭Channel 时被调用 |
deregister() | ChannelFuture | 当请求将Channel 从它的EventLoop 注销时被调用 |
bind(SocketAddress localAddress, ChannelPromise promise) | ChannelFuture | 当请求将Channel 绑定到本地地址时被调用 |
connect(SocketAddress remoteAddress, ChannelPromise promise) | ChannelFuture | 当请求将Channel 连接到远程节点时被调用 |
connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) | ChannelFuture | 当请求将Channel 连接到远程节点时被调用 |
disconnect(ChannelPromise promise) | ChannelFuture | 当请求将Channel 从远程节点断开时被调用 |
close(ChannelPromise promise) | ChannelFuture | 请求关闭Channel 时被调用 |
deregister(ChannelPromise promise) | ChannelFuture | 当请求将Channel 从它的EventLoop 注销时被调用 |
read() | ChannelFuture | 当请求从Channel 读取更多的数据时被调用 |
write(Object msg) | ChannelFuture | 当请求通过Channel 将数据写到远程节点时被调用 |
write(Object msg, ChannelPromise promise) | ChannelFuture | 当请求通过Channel 将数据写到远程节点时被调用 |
flush() | ChannelOutboundInvoker | 当请求通过Channel 将入队数据冲刷到远程节点时被调用 |
writeAndFlush(Object msg, ChannelPromise promise) | ChannelFuture | 当请求通过Channel 将入队数据冲刷到远程节点时被调用 |
writeAndFlush(Object msg) | ChannelFuture | 当请求通过Channel 将入队数据冲刷到远程节点时被调用 |
newPromise() | ChannelPromise | 返回一个新的ChannelPromise |
newProgressivePromise() | ChannelProgressivePromise | 返回一个新的ChannelProgressivePromise |
newSucceededFuture() | ChannelFuture | 返回一个已被标记为成功的ChannelFuture,所有与此ChannelFuture绑定的监听器都将被通知,所有阻塞调用也将直接返回 |
newFailedFuture(Throwable cause) | ChannelFuture | 返回一个已被标记为失败的ChannelFuture,所有与此ChannelFuture绑定的监听器都将被通知,所有阻塞调用也将直接返回 |
voidPromise() | ChannelPromise | 返回一个不同操作也重用的ChannelPromise,但使用有一定限制,需要小心使用 |
2.3.2、实现源码
实现源码如下:
@Override
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress) {
return tail.connect(remoteAddress);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return tail.connect(remoteAddress, localAddress);
}
@Override
public final ChannelFuture disconnect() {
return tail.disconnect();
}
@Override
public final ChannelFuture close() {
return tail.close();
}
@Override
public final ChannelFuture deregister() {
return tail.deregister();
}
@Override
public final ChannelPipeline flush() {
tail.flush();
return this;
}
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
@Override
public final ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise);
}
@Override
public final ChannelFuture disconnect(ChannelPromise promise) {
return tail.disconnect(promise);
}
@Override
public final ChannelFuture close(ChannelPromise promise) {
return tail.close(promise);
}
@Override
public final ChannelFuture deregister(final ChannelPromise promise) {
return tail.deregister(promise);
}
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
return tail.write(msg, promise);
}
@Override
public final ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return tail.writeAndFlush(msg, promise);
}
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
@Override
public final ChannelPromise newPromise() {
return new DefaultChannelPromise(channel);
}
@Override
public final ChannelProgressivePromise newProgressivePromise() {
return new DefaultChannelProgressivePromise(channel);
}
@Override
public final ChannelFuture newSucceededFuture() {
return succeededFuture;
}
@Override
public final ChannelFuture newFailedFuture(Throwable cause) {
return new FailedChannelFuture(channel, null, cause);
}
@Override
public final ChannelPromise voidPromise() {
return voidPromise;
}
由以上源码可知,outbound的具体实现都是交由tail(ChannelHandlerContext)来实现的。
2.4、ChannelPipeline链表维护
ChannelPipeline中维护了一个ChannelHandlerContext的链表,I/O事件通过链表在用户的Handler中传播。
2.4.1、链表维护接口
方法名称 | 返回值 | 功能说明 |
---|---|---|
addFirst(String name, ChannelHandler handler) | ChannelPipeline | 将handler添加到pipeline队列的头部 |
addLast(String name, ChannelHandler handler) | ChannelPipeline | 将handler添加到pipeline队列的尾部 |
addBefore(String baseName, String name, ChannelHandler handler) | ChannelPipeline | 将handler添加到baseName对应的handler之前 |
addAfter(String baseName, String name, ChannelHandler handler) | ChannelPipeline | 将handler添加到baseName对应的handler之后 |
addFirst(ChannelHandler... handlers) | ChannelPipeline | 按顺序批量添加Handler到队列头部 |
addLast(ChannelHandler... handlers) | ChannelPipeline | 按顺序批量添加Handler到队列尾部 |
remove(ChannelHandler handler) | ChannelPipeline | 移除handler |
remove(String name) | ChannelHandler | 移除名字为name的handler |
remove(Class<T> handlerType) | ChannelPipeline | 移除类型为handlerType的handler |
removeFirst() | ChannelPipeline | 移除第一个handler |
removeLast() | ChannelPipeline | 移除最后一个handler |
replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) | ChannelPipeline | 用newHandler替换oldHandler |
first() | ChannelHandler | 获取第一个Handler |
firstContext() | ChannelHandlerContext | 获取第一个Context |
last() | ChannelHandler | 获取最后一个Handler |
lastContext() | ChannelHandlerContext | 获取最后一个Context |
get(String name) | ChannelHandler | 通过名字获取Handler |
context(ChannelHandler handler) | ChannelHandlerContext | 通过Handler获取其对应的Context |
context(String name) | ChannelHandlerContext | 通过Handler的名字获取其对应的Context |
注:以上接口中添加的头尾不包括head节点和tail节点,这两节点为netty框架的节点,不允许用户修改。
2.4.2、接口实现
以下对主要接口的源码进行分析。
addFirst():
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
name = filterName(name, handler);
newCtx = newContext(group, name, handler);
addFirst0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private void addFirst0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
nextCtx.prev = newCtx;
}
主要流程为:
- 检查Handler是否在多个pipeline中重复添加。被注解为@Sharable的Handler是可以在多个pipeline中重复添加的,否则为保证线程安全,不允许在多个pipeline中添加。
- 检查handler名字是否重复。如果添加时的name为空,则由框架自动生成name,生成规则为:[SimpleName] + "#" + [数字],数字从0累加,知道名字不重复为止。如果添加时的name不空,则检查name是否重复,重复则抛出IllegalArgumentException异常,否则验证通过;
- 根据pipeline、EventExecutorGroup、name、handler新建一个ChannelHandlerContext;
- 挑用addFirst0()将新建的context添加到pipeline中head的下一个节点;
- 若Channel还未在EventLoop中注册,则注册PendingHandlerAddedTask任务,当Channel注册成功时,调用ChannelHandler.handlerAdded()方法;若Channel已经注册成功则直接调用callHandlerAdded0()方法来通过管道调用所有Handler的ChannelHandler.handlerAdded()方法。
addLast():
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
addLast()的实现源码与addFirst基本一样,唯一区别是将handler添加的pipeline的tail节点的前一个节点。
2.5、DefaultChannelPipeline源码分析
DefaultChannelPipeline为ChannelPipeline接口的实现。也定义了Pipeline中的head和tail节点及实现等。
2.5.1、基本属性
private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap<Class<?>, String>();
}
};
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
private final boolean touch = ResourceLeakDetector.isEnabled();
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private volatile MessageSizeEstimator.Handle estimatorHandle;
private boolean firstRegistration = true;
/**
* This is the head of a linked list that is processed by {@link #callHandlerAddedForAllHandlers()} and so process
* all the pending {@link #callHandlerAdded0(AbstractChannelHandlerContext)}.
*
* We only keep the head because it is expected that the list is used infrequently and its size is small.
* Thus full iterations to do insertions is assumed to be a good compromised to saving memory and tail management
* complexity.
*/
private PendingHandlerCallback pendingHandlerCallbackHead;
/**
* Set to {@code true} once the {@link AbstractChannel} is registered.Once set to {@code true} the value will never
* change.
*/
private boolean registered;
HEAD_NAME:head对应的Handler的名字;
TAIL_NAME:tail对应的handler的名字;
nameCaches:Handler与其name的map的缓存;
ESTIMATOR:消息中字节大小统计器;
head:pipeline队列的头节点,其是ChannelHandlerContext与ChannelHandler的实现。
tail:pipeline队列的尾节点,其是ChannelHandlerContext与ChannelHandler的实现。
channel:pipeline对应的Channel;
succeededFuture:处理成功的异步结果;
voidPromise:通用的异步处理结果;
childExecutors:子执行器;
estimatorHandle:消息字节大小统计器的处理器;
firstRegistration:是否第一次注册
pendingHandlerCallbackHead:头节点一些事件的异步回调任务;
2.5.2、构造函数
构造函数源码:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
构造函数比较简单,主要新建的succeededFuture和voidPromise异步通知,以及链表的头结点(head)和尾节点(tail)。
2.5.3、HeadContext源码解析
HeadContext为Pipeline的头节点实现,其即时ChannelHandlerContext的实现,也是ChannelHandler的实现。
2.5.3.1、HeadContext类继承关系
HeadContext类继承图:
HeadContext实现ChannelHandler的inbound接口和outbound接口,也实现了ChannelHandlerContext的inbound及outbound接口。
2.5.4、TailContext源码解析
TailContext为pipeline的尾节点实现,其即时ChannelHandlerContext的实现,也是ChannelHandler的实现。
TailContext类继承图:
TailContext在实现ChannelHandlerContext接口,同时实现ChannelHandler的inbound接口。
3、ChannelHandlerContext源码解析
ChannelHandlerContext 代表了ChannelHandler 和ChannelPipeline 之间的关联,每当有ChannelHandler 添加到ChannelPipeline 中时,都会创建ChannelHandlerContext。ChannelHandlerContext 的主要功能是管理它所关联的ChannelHandler 和在同一个ChannelPipeline 中的其他ChannelHandler 之间的交互。
ChannelHandlerContext中的一些接口在ChannelPipeline中也有实现,但传播方向有一点重要的不同。如果调用Channel 或者ChannelPipeline 上的这些方法,它们将沿着整个ChannelPipeline 进行传播。而调用位于ChannelHandlerContext上的相同方法,则将从当前所关联的ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个能够处理该事件的ChannelHandler。
ChannelHandlerContext类继承图:
ChannelInboundInvoker:是网络I/O的事件的统一抽象,即为inbound事件,方法都以fireXXX开头,pipeline也实现此接口。
ChannelOutboundInvoker:是用户线程或代码发起的I/O操作,被称为outbound事件。
AttributeMap:存储属性键值对;
AbstractChannelHandlerContext:ChannelHandlerContext的抽象实现类,对通用处理进行了处理;
DefaultChannelHandlerContext:ChannelHandlerContext的默认实现,netty框架即使用此实现;
HeadContext/TailContext:pipeline的头结点和尾节点实现;
3.1、AbstractChannelHandlerContext源码分析
AbstractChannelHandlerContext为ChannelHandlerContext的抽象实现。
3.1.1、基本属性
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} is about to be called.
*/
private static final int ADD_PENDING = 1;
/**
* {@link ChannelHandler#handlerAdded(ChannelHandlerContext)} was called.
*/
private static final int ADD_COMPLETE = 2;
/**
* {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int REMOVE_COMPLETE = 3;
/**
* Neither {@link ChannelHandler#handlerAdded(ChannelHandlerContext)}
* nor {@link ChannelHandler#handlerRemoved(ChannelHandlerContext)} was called.
*/
private static final int INIT = 0;
private final boolean inbound;
private final boolean outbound;
private final DefaultChannelPipeline pipeline;
private final String name;
private final boolean ordered;
// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
final EventExecutor executor;
private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor.
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
private Runnable invokeChannelReadCompleteTask;
private Runnable invokeReadTask;
private Runnable invokeChannelWritableStateChangedTask;
private Runnable invokeFlushTask;
private volatile int handlerState = INIT;
next:pipeline中的下一个ChannelHandlerContext节点;
prev:pipeline中的上一个ChannelHandlerContext节点;
inbound:标识此Context对应的Handler是否为ChannelInboundHandler类型;
outbound:标识此Context对应的Handler是否为ChannelOutboundHandler类型;
pipeline:此Context对应的Pipeline;
name:此Context的名字;
ordered:事件顺序标志;
executor:事件执行线程;
succeededFuture:成功的异步处理结果;
invokeChannelReadCompleteTask:读完成处理任务;
invokeReadTask:读数据任务;
invokeChannelWritableStateChangedTask:Channel写状态变更任务;
invokeFlushTask:冲刷数据任务;
handlerState:当前Handler的状态
handlerState有以下四种状态:
// 初始状态
private static final int INIT = 0;
// 对应Handler的handlerAdded方法将要被调用但还未调用
private static final int ADD_PENDING = 1;
// 对应Handler的handlerAdded方法被调用
private static final int ADD_COMPLETE = 2;
// 对应Handler的handlerRemoved方法被调用
private static final int REMOVE_COMPLETE = 3;
3.1.1、构造函数
构造函数源码:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
3.1.2、inbound事件
AbstractChannelHandlerContext中对inbound事件的处理大同小异,本处只对fireChannelRegistered进行分析,其他事件处理流程基本相同;
源码:
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
fireChannelRegistered():此方法主要是找到下个inbound类型的Context,并交由invokeChannelRegistered(final AbstractChannelHandlerContext next):静态方法进行处理;
invokeChannelRegistered(final AbstractChannelHandlerContext next):此静态方法主要判断事件处理是否在执行线程中,是则直接处理;否则异步处理。同时,pipeline中也会调用此方法对注册事件进行传播,pipeline中fireChannelRegistered事件的处理就是调用此静态方法,而参数为HeadContext,即从head节点开始传播注册事件;
invokeChannelRegistered():此方法首先判断Context的Handler是否已经在pipeline中添加完成,完成则直接调用对应Handler的channelRegistered()方法对注册事件进行处理;否则直接调用fireChannelRegistered()将事件交由下个inbound类型的Context处理。
3.1.3、outbound事件
与inbound事件相同,Context的outbound事件的传播流程也大体相同,本处以bind()事件为例进行传播流程的分析,其他事件传播流程类似。
源码:
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
从bind()方法可知,其主要查找下一个Context并调用invokeBind()进行处理,而invokeBind()又调用Handler的bind();Handler的bind()通用处理是沿着outbound的Context向head节点传播,其最终调用的是pipeline中head节点的Handler的bind()方法,而head节点的bind的方法会调用底层Channel的Unsafe的bind()方法进行最终的bind()操作。
3.2、DefaultChannelHandlerContext源码分析
DefaultChannelHandlerContext为netty的默认ChannelHandlerContext实现,其实现非常简单。
源码:
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
}
3.2.1、基本属性
handler:context对应的ChannelHandler;
3.2.1、构造函数
构造函数主要通过isInbound()方法和isOutbound()方法判断此ChannelHandler为inbound或outbound处理器。其他处理都交由AbstractChannelHandlerContext。
相关阅读:
Netty源码愫读(一)ByteBuf相关源码学习 【https://www.jianshu.com/p/016daa404957】
Netty源码愫读(二)Channel相关源码学习【https://www.jianshu.com/p/02eac974258e】
Netty源码愫读(四)ChannelHandler相关源码学习【https://www.jianshu.com/p/6ee0a3b9d73a】
Netty源码愫读(五)EventLoop与EventLoopGroup相关源码学习【https://www.jianshu.com/p/05096995d296】
Netty源码愫读(六)ServerBootstrap相关源码学习【https://www.jianshu.com/p/a71a9a0291f3】
参考书籍:
《Netty实战》
《Netty权威指南》
参考博客:
https://www.jianshu.com/p/4c35541eec10
https://www.jianshu.com/p/0b79872eb515
https://www.jianshu.com/p/a0a51fd79f62
http://www.wolfbe.com/detail/201609/379.html