ChannelPipeline是什么
ChannelPipeline
是ChannelHandler
的列表,用于拦截或处理Channel
的入站事件和出站事件操作。ChannelPipeline
实现了拦截过滤器模式,让用户完全控制如何处理事件以及管道中的ChannelHandler
如何相互交互。
每个Channel
都有各自的ChannelPipeline
,并且一个新的Channel
被创建时该ChannelPipeline
同时也会被创建。
事件是如何在Pipeline中流动的
下图描述了ChannelHandler
在ChannelPipeline
中如何处理I/O
事件。I/O
事件由ChannelInboundHandler
或ChannelOutboundHandler
处理,并通过调用ChannelHandlerContext
中定义的事件传播方法(例如ChannelHandlerContext#fireChannelRead(Object)
和ChannelHandlerContext#write(Object)
中定义的事件传播方法转发给最近的处理程序。
Channel与ChannelPipeline的关系
我们已经知道当在创建Channel
时并会自动创建对应的ChannelPipeline
,所以它们就是一对一的关系,如下图所示:
在创建Channel
时会创建对应的ChannelPipeline
,而在创建ChannelPipeline
时又会创建对应的TailContext
和HeadContext
,然后将其构造成双向链表的节点。
接下来我们来具体分析下ChannelPipeline
的实现过程。
ChannelPipeline的实现
在前面分析Bootstrap
和ServerBootstrap
的章节中我们知道在AbstractChannel
构造方法中会实例化ChannelPipeline
,所以下面我们一步一步来进行分析。
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
//实例化ChannelPipeline的操作
pipeline = newChannelPipeline();
}
这里我们只需关注newChannelPipeline
方法即可,如下:
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
该方法直接将当前Channel
对象传递给构造函数并实例化了DefaultChannelPipeline
对象,我们先来看看该类的一个类结构图:
我们来分析下DefaultChannelPipeline
对应的父类接口ChannelInboundInvoker
,该接口主要定义了入站事件方法,如下所示:
/**
* 入站事件传播方法:
* ChannelHandlerContext#fireChannelRegistered()
* ChannelHandlerContext#fireChannelActive()
* ChannelHandlerContext#fireChannelRead(Object)
* ChannelHandlerContext#fireChannelReadComplete()
* ChannelHandlerContext#fireExceptionCaught(Throwable)
* ChannelHandlerContext#fireUserEventTriggered(Object)
* ChannelHandlerContext#fireChannelWritabilityChanged()
* ChannelHandlerContext#fireChannelInactive()
* ChannelHandlerContext#fireChannelUnregistered()
*/
/**
* Channel已注册到EventLoop
*/
ChannelInboundInvoker fireChannelRegistered();
/**
* Channel已从其EventLoop中取消注册
*/
ChannelInboundInvoker fireChannelUnregistered();
/**
* Channel当前处于活动状态,表示已连接
*/
ChannelInboundInvoker fireChannelActive();
/**
* Channel当前处于非活动状态,表示连接已关闭
*/
ChannelInboundInvoker fireChannelInactive();
/**
* Channel在入站操作中收到了异常
*/
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
/**
* Channel接收到用户自定义事件
*/
ChannelInboundInvoker fireUserEventTriggered(Object event);
/**
* Channel接收到一条消息
*/
ChannelInboundInvoker fireChannelRead(Object msg);
/**
* 触发ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)事件到ChannelPipeline中的下一个ChannelInboundHandler事件
*/
ChannelInboundInvoker fireChannelReadComplete();
/**
* 触发ChannelInboundHandler#channelWritabilityChanged(ChannelHandlerContext)事件到ChannelPipeline中的下一个ChannelInboundHandler
*/
ChannelInboundInvoker fireChannelWritabilityChanged();
父类接口ChannelOutboundInvoker
主要定义了出站事件方法,如下所示:
/**
* 出站事件传播方法:
* ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
* ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
* ChannelHandlerContext#write(Object, ChannelPromise)
* ChannelHandlerContext#flush()
* ChannelHandlerContext#read()
* ChannelHandlerContext#disconnect(ChannelPromise)
* ChannelHandlerContext#close(ChannelPromise)
* ChannelHandlerContext#deregister(ChannelPromise)
*/
/**
* 请求绑定给定的SocketAddress一旦操作完成并通知ChannelFuture,要么操作成功或者错误
*/
ChannelFuture bind(SocketAddress localAddress);
/**
* 请求连接给定的SocketAddress并在操作完成后通知ChannelFuture,要么操作成功或者错误
*/
ChannelFuture connect(SocketAddress remoteAddress);
/**
* 请求连接给定的SocketAddress,同时绑定到localAddress并在操作完成后通知ChannelFuture,要么操作成功或者错误
*/
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
/**
* 请求与远程节点断开连接并在操作完成后通知ChannelFuture(要么操作成功要么因为错误)
*/
ChannelFuture disconnect();
/**
* 请求关闭Channel并在操作完成后通知ChannelFuture,关闭后不能再重复使用
*/
ChannelFuture close();
/**
* 请求从先前分配的EventExecutor中注销,并在操作完成后通知ChannelFuture
*/
ChannelFuture deregister();
/**
* 请求绑定给定的SocketAddress并在操作完成后通知给定的ChannelPromise
*/
ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise);
/**
* 请求连接给定的SocketAddress并在操作完成后通知给定的ChannelPromise
*/
ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise);
/**
* 请求连接给定的SocketAddress同时绑定到localAddress,并在操作完成后通知ChannelPromise
*/
ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
/**
* 请求断开远程节点的连接并在操作完成后通知给定的ChannelPromise
*/
ChannelFuture disconnect(ChannelPromise promise);
/**
* 请求关闭Channel并在操作完成后通知给定的ChannelPromise,该Channel关闭后不可复用
*/
ChannelFuture close(ChannelPromise promise);
/**
* 请求从先前分配的EventExecutor中注销,并在操作完成后通知给定的ChannelPromise
*/
ChannelFuture deregister(ChannelPromise promise);
/**
* 请求从Channel读取数据到第一个入站缓冲区,如果读取了数据,则触发ChannelInboundHandler#channelRead(ChannelHandlerContext,Object)事件,并触发ChannelInboundHandler#channelReadComplete(ChannelHandlerContext)事件,以便处理程序可以决定是否继续读取。如果已经有一个待处理的读取操作,则此方法不执行任何操作。
*/
ChannelOutboundInvoker read();
/**
* 通过ChannelPipeline请求通过ChannelHandler写入消息。该方法不会执行实际写出消息,所以要确保执行#flush()方法将所有未写出的消息刷新到实际传输中。
*/
ChannelFuture write(Object msg);
/**
* 通过ChannelPipeline请求通过ChannelHandlerContext写出消息。该方法不会执行实际写出消息,所以要确保执行#flush()方法将所有未写出的消息刷新到实际传输中。
*/
ChannelFuture write(Object msg, ChannelPromise promise);
/**
* 请求通过ChannelOutboundInvoker刷新所有等待的消息
*/
ChannelOutboundInvoker flush();
/**
* 简化调用##write(Object, ChannelPromise)和#flush()
*/
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
/**
* 简化调用#write(Object)和#flush()
*/
ChannelFuture writeAndFlush(Object msg);
/**
* 返回一个新的ChannelPromise
*/
ChannelPromise newPromise();
/**
* 返回一个新的ChannelProgressPromise
*/
ChannelProgressivePromise newProgressivePromise();
/**
* 创建一个新的ChannelFuture并将其标记为已成功。因此ChannelFuture#isSuccess()返回为true。添加到其中的所有FutureListener将直接受到通知。通用每次调用阻塞方法都将返回而不会阻塞
*/
ChannelFuture newSucceededFuture();
/**
* 创建一个新的ChannelFuture并将其标记为已失败。因此ChannelFuture#isSuccess()返回为false。添加到其中的所有FutureListener将直接受到通知。通用每次调用阻塞方法都将返回而不会阻塞
*/
ChannelFuture newFailedFuture(Throwable cause);
/**
* 返回一个特殊的ChannelPromise,可以将其重复用于不同的操作
*/
ChannelPromise voidPromise();
ChannelPipeline接口主要定义了一些ChannelHandler的一些操作,如下所示:
public interface ChannelPipeline extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
<T extends ChannelHandler> T remove(Class<T> handlerType);
ChannelHandler removeFirst();
ChannelHandler removeLast();
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
ChannelHandler newHandler);
ChannelHandler first();
ChannelHandlerContext firstContext();
ChannelHandler last();
ChannelHandlerContext lastContext();
ChannelHandler get(String name);
<T extends ChannelHandler> T get(Class<T> handlerType);
ChannelHandlerContext context(ChannelHandler handler);
ChannelHandlerContext context(String name);
ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
Channel channel();
List<String> names();
Map<String, ChannelHandler> toMap();
}
ChannelPipeline
接口主要定义了一些常用添加ChannelHandler
的操作,这里就不过讲解了。
讲完了ChannelPipeline
的继承接口之后,我们来看看具体实现DefaultChannelPipeline
的实现。
DefaultChannelPipeline
的构造方法如下:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
这里先将Channel
进行保存,然后分别实例化了TailContext
和HeadContext
,因为这2个比较重要,所以来研究下其具体实现,这里先来看看TailContext
的类图。
由上图可知,TailContext
继承了AbstractChannelHandlerContext
并且实现了ChannelInboundHandlerHandler
接口,所以本质上TailContext
既是一个标准的Handler
也是一个HandlerContext
, 接下来看下其源码:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
}
这里构造方法中调用了父类AbstractChannelHandlerContext
的构造方法,进来看看:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
因为这里都是赋值操作,我们来看看#mask()方法是如何计算得出executionMask的。
这里说明一下executionMask是干什么的,早在之前Netty
的版本中是用instanceof
来判断是inbound
事件还是outbound
事件,这样判断的结果是比较暴力的,因为我们如果定义了一个inbound
处理程序,但是我这个程序只希望处理我想处理的事件,因为使用instanceof
判断的方式使我们不得不处理我们不想处理的事件,所以在一定的程度上加大了耦合程度,在新版Netty
中采用了位运算来判断,并且粒度更细(方法级别),使得我们不必关系我们不关系的事件,使得Handler
更加灵活。
接下来我们看下ChannelHandlerMask类定义的粒度:
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
因为executionMask是通过mask方法计算得出,这里跟进mask方法:
static int mask(Class<? extends ChannelHandler> clazz) {
// Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
// lookup in the future.
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}
mask方法中做了一层缓存处理,这里我们直接跟进mask0()方法是如何计算得出的:
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask;
}
我们先来看下isSkippable方法的实现,最后再来说明该方法:
private static boolean isSkippable(final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
Method m;
try {
m = handlerType.getMethod(methodName, paramTypes);
} catch (NoSuchMethodException e) {
return false;
}
return m != null && m.isAnnotationPresent(Skip.class);
}
});
}
该方法用于判断给定的Class
对象是否有给定的方法或者是否有对应Skip
注解,如果有则返回true
,否则返回false
。
再回到mask0()方法,该方法中,首先mask=MASK_EXCEPTION_CAUGHT,然后判断给定的Handler
是属于ChannelInboundHandler
还是ChannelOutboundHandler
,进而再判断对应的事件方法,如下整理了其大致结构:
InboundHandler事件:
- MASK_EXCEPTION_CAUGHT
- MASK_CHANNEL_REGISTERED
- MASK_CHANNEL_UNREGISTERED
- MASK_CHANNEL_ACTIVE
- MASK_CHANNEL_INACTIVE
- MASK_CHANNEL_READ
- MASK_CHANNEL_READ_COMPLETE
- MASK_CHANNEL_WRITABILITY_CHANGED
- MASK_USER_EVENT_TRIGGERED
OutboundHandler事件:
- MASK_EXCEPTION_CAUGHT
- MASK_BIND
- MASK_CONNECT
- MASK_DISCONNECT
- MASK_CLOSE
- MASK_DEREGISTER
- MASK_READ
- MASK_WRITE
- MASK_FLUSH
也就是如果给定的Handler
类型为inbound
则该mask默认处理所有对应的inbound
事件,然后通过isSkippable方法来判断该handler
是否有处理该事件的方法或者该方法是否有@Skip注解,如果存在该条件则从该mask中移除该事件,表示后面ChannelPipeline
通过位运算查找对应事件处理的Handler
时,该Handler
默认会将其过滤掉,自然也不会触发该事件了。
在TailContext
中,因为实现了ChannelInboundHandler
,执行结果如下,所以最终计算得出的executionMask为511。
int mask = MASK_EXCEPTION_CAUGHT; // mask = 1
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND; // mask=511
}
//bit
// 1 |= 511
// 0000 0000 0001
//|0001 1111 1111
//---------------
// 0001 1111 1111
//mask = 511
我们再回到TailContext
的构造函数中,接下来执行了一个setAddComplete()方法,如下所示:
final boolean setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE) {
return false;
}
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return true;
}
}
}
该方法采用CAS
的方式改变Handler
状态为已添加完成。
到此为止,TailContext
的工作到这里就完成了,接下里我们看看它的兄弟HeadContext
。
HeadContext
HeadContext
和TailContext
的不同之处在于该对象同时实现了ChannelInboundHandler
和ChannelOutboundHandler
,同时也继承了AbstractHandlerContext
,一起来看下类结构图。
接下來看看其代码实现:
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}
该对象与TailContext
大致相同,所以就不具体讲解了,不同之处该对象引用了一个Unsafe
的引用。因为HeadContext
同时实现了ChannelInboundHandler
和ChannelOutboundHandler
,所以其executionMask=131071。
我们再回到DefaultChannelPipeline
构造函数中,如下:
protected DefaultChannelPipeline(Channel channel) {
//...
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在完成了TailContext
和HeadContext
实例化后,此时tail和head就分别指向了不同的AbstractHandlerContext
,最终通过head.next=tail和tail.prev = head进行链表关联,如下所示。
ChannelHandler是如何添加的
当我们要添加一个Handler
的时候,内部是如何实现的呢,接下来使用addLast方法来分析其实现。
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
当我们调用ChannelPipeline
添加一个Handler
时,我们假设调用的是addLast方法,首先会调用checkMultiplicity方法来检测是否重复添加,如下所示:
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
该方法判断Handler
是否重复添加,判断条件为Handler
的added字段是否为true,或者该Handler
是否被标记为@Sharable重复使用,否则直接抛出错误,重复添加Handler
,否则该Handler
为首次添加,将added字段设置为true,表示已经添加,防止重复添加的判断条件。
判断重复添加工作后,接下来调用了newContext方法实例化了一个AbstractChannelHandlerContext对象,代码如下:
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
该方法直接实例化了一个DefaultChannelHandlerContext
对象,一起来看看该类的实现:
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
private final ChannelHandler handler;
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
}
该对象和TailContext
、HeadContext
的实现基本上差不多,前文有提,这里就不过多深入讲解了。
执行完newContext()方法后,接下来就执行了addLast0()方法了,如下所示:
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
可以看到的是,这是将节点进行链接到对应的链表节点上,执行完的链表结果如下所示:
到此为止,一个完整的Handler
添加过程就完成了,当然还有一些其它操作过程这里就不细讲了,有兴趣可以自行了解。
总结
通过源码分析,我们可以知道ChannelPipeline
就像是一个大管家,管理着多个Handler
和HandlerContext
的关联,任何入站和出站的事件通过位运算找到对应需要处理的Handler
,然后根据规则流经不同的Handler
事件处理方法。当一个入站事件触发的时候会从head节点,依次找到合适节点进行处理,出站事件则会从tail节点开始依次找到合适的节点进行处理。