上一节研究了outbound
事件的传播过程,是和ChannelOutboundHandler
的添加顺序相反的。从pipeline
或者channel
调用outbound
事件的传播方法,是从TailContext
开始传播,直到HeadContext
,最终头节点使用unsafe
处理具体的事件。
本节学习异常的传播,添加了inbound
和outbound
处理器,其顺序是iA iB iC oA oB oC
,并覆盖了exceptionCaught
异常处理方法。在iB
处理器中抛出了一个自定义异常,跟踪。
class DataServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new InboundHandlerA(),
new InboundHandlerB(),
new InboundHandlerC(),
new OutboundHandlerA(),
new OutboundHandlerB(),
new OutboundHandlerC()
);
}
}
class OutboundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("OutboundHandlerA.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
class OutboundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("OutboundHandlerB.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
class OutboundHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("OutboundHandlerC.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
class InboundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InboundHandlerA.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
class InboundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
throw new BusinessException("form InboundHandlerB");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InboundHandlerB.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
class InboundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("InboundHandlerC.exceptionCaught()");
ctx.fireExceptionCaught(cause);
}
}
从打印的信息可以猜测出,异常的传播似乎是顺序传播的。
- 通过端点跟中捕获异常的地方,发现经过
notifyHandlerException
,invokeExceptionCaught
最终handler().exceptionCaught(this, cause);
调用了用户异常处理回调函数,交由用户本身进行处理。
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
//获取对应的handler并调用channelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
//唤醒异常处理
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
if (logger.isWarnEnabled()) {
logger.warn(
"An exception was thrown by a user handler " +
"while handling an exceptionCaught event", cause);
}
return;
}
//异常处理
invokeExceptionCaught(cause);
}
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
//调用异常处理回调函数exceptionCaught
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
if (logger.isDebugEnabled()) {
logger.debug(
"An exception {}" +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:",
ThrowableUtil.stackTraceToString(error), cause);
} else if (logger.isWarnEnabled()) {
logger.warn(
"An exception '{}' [enable DEBUG level for full stacktrace] " +
"was thrown by a user handler's exceptionCaught() " +
"method while handling the following exception:", error, cause);
}
}
} else {
fireExceptionCaught(cause);
}
}
- 跟踪传播方法
ctx.fireExceptionCaught(cause);
,我们知道传播异常时通过fireExceptionCaught方法
。发现直接待用了当前节点的下一个节点。接下来的过程和上述一致。
@Override
public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
//直接使用链表中的下一个节点
invokeExceptionCaught(next, cause);
return this;
}
static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
ObjectUtil.checkNotNull(cause, "cause");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//执行异常回调方法
next.invokeExceptionCaught(cause);
} else {
try {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeExceptionCaught(cause);
}
});
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to submit an exceptionCaught() event.", t);
logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
}
}
}
}
补充:ctx.pipeline().fireExceptionCaught(cause);
是从头节点开始传播的。
ctx.pipeline().fireExceptionCaught(cause);
@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
//从头节点开始传播
AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
return this;
}
- 最终异常如果一直传播下去的话则会到达尾节点,打出警告日志,同时释放内存。
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
onUnhandledInboundException(cause);
}
protected void onUnhandledInboundException(Throwable cause) {
try {
logger.warn(
"An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
"It usually means the last handler in the pipeline did not handle the exception.",
cause);
} finally {
ReferenceCountUtil.release(cause);
}
}
- 异常处理建议
根据异常处理的特点,在尾部增加一个异常处理器,统一处理所有的异常,处理完成之后并不往下进行传播
class DataServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new InboundHandlerA(),
new InboundHandlerB(),
new InboundHandlerC(),
new OutboundHandlerA(),
new OutboundHandlerB(),
new OutboundHandlerC(),
new ExceptionCaughtHandler()
);
}
}
class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter{
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof BusinessException) {
System.out.println("is BusinessException");
}
}
}