关于ChannelRead事件的传播
在自定义handler的时候,通常要重写channelRead
函数,如果想要将该事件向后传播(注意,传播顺序与handler添加顺序相同),需要调用fireChannelRead
函数,ChannelRead事件便在这里中断
通常在重写的channelRead函数里,有两种传播ChannelRead事件的方式
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//第一种
ctx.fireChannelRead(msg);
//第二种
ctx.channel().pipeline().fireChannelRead(msg);
}
这两种方式的主要区别在于接下来传播的起始位置,非常重要
- 使用第一种方式,事件会从该节点开始继续向后传播
- 使用第二种方式,事件会从head节点开始传播
下面分析源码来做说明
第一种传播方式
跟进到AbstractChannelHandlerContext#fireChannelRead
方法
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
findContextInbound
方法就是在寻找下一个节点,看看这个方法的代码
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.next;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
return ctx;
}
继续看看skipContext
方法
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
可以看到,这里判断的关键就在executionMask
这个成员变量,而这个成员变量就在AbstractChannelHandlerContext
里被赋值
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline,
EventExecutor executor,
String name,
Class<? extends ChannelHandler> handlerClass) {
//省略其他代码
this.executionMask = mask(handlerClass);
}
mask
方法最终调用到mask0方法
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
//省略部分代码
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
//省略部分代码
}
//省略部分代码
} catch (Exception e) {
//省略部分代码
}
return mask;
}
isSkippable
函数只有在找不到函数或者函数被@Skip注解时才返回false
可以看到,实际上executionMask
就是用来记录handler的类型信息和方法注解信息
skipContext
方法实际上就是在寻找下一个没有用@Skip注解了ChannelRead方法的inbound节点
private static boolean isSkippable(
final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
Method m;
try {
m = handlerType.getMethod(methodName, paramTypes);
} catch (NoSuchMethodException e) {
if (logger.isDebugEnabled()) {
logger.debug(
"Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
}
return false;
}
return m != null && m.isAnnotationPresent(Skip.class);
}
});
}
继续看fireChannelRead
函数里调用到的invokeChannelRead
函数
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
//不在当前eventloop,放到异步任务队列里
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
//省略
}
} else {
fireChannelRead(msg);
}
}
可以看到,这里就是在调用下一个节点的channelRead
方法
第二种传播方式
跟进到DefaultChannelPipeline#fireChannelRead
方法
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
继续跟进invokeChannelRead
方法
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
可以看到,这里就是传入head节点,从head节点开始向后传播channelRead事件
资源释放相关问题
若自定义的handler继承自ChannelInboundHandlerAdapter,并且在ChannelRead函数里没有将事件向后传播,那么需要自行调用函数处理资源释放,如下
ReferenceCountUtil.release(msg);
Write事件传播
write事件的传播顺序与handler的添加顺序相反(即最后添加的outboundHandler最先处理write事件)
类似的,在用户代码里传播write事件也有两种方式
第一种方式,从当前节点,往前寻找outbound,继续传播
ctx.write(msg);
第二种方式,从tail节点开始,往前寻找outbound传播
ctx.channel().write(msg);
如果没有中断,最终write事件会传播到head节点,然后head节点会调用unsafe的write方法
异常的传播
异常的产生
首先,异常是在ChannelRead
、ChannelRegister
等这些函数中抛出的,然后在形如invokeChannelXXX
(例如invokeChannelRead
)中捕获,例如
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
//捕获异常
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
看看invokeExceptionCaught
方法
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
//省略
}
} else {
fireExceptionCaught(cause);
}
}
可以看到,这里调用exceptionCaught
方法处理异常
传播异常
异常的传播方向与handler的添加方向一致,并且不区分是inboundHandler还是outboundHandler(即异常可以从inboundHandler传播到outboundHandler,反之亦可)
默认情况下,如果不重写exceptionCaught
方法,那么会把该异常继续向后传播,最终会传播到tail节点,tail节点会打印一条日志表明该异常未被处理
如果重写了exceptionCaught
方法,并且想将该异常继续向后传播,那么需要调用fireExceptionCaught
方法
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
//其他处理代码
ctx.fireExceptionCaught(cause);
}
关联:Netty4中Handler的执行顺序以及ctx.close() 与 ctx.channel().close()的区别