netty中事件的传播主要包含inBound事件和outBound事件
ChannelInboundHandler extends ChannelHandler
ChannelOutboundHandler extends ChannelHandler
首先我们看下ChannelInboundHandler接口,主要包含以下方法,基本都是用和连接事件相关的
/**
* channel 注册到NioEventLoop上的回调
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* channel 解除注册到NioEventLoop上的回调
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* channel在激活之后的回调
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* channel失效之后的回调
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* channel在读取数据,或者接收 到链接之后的回调,
* 对于服务端channel,这里的msg是一个链接
* 对于客户端channel,这里的msg是一个ByteBuf
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* 数据读取完成后的一个回调
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/**
* 用户可以自定义的一些事件
* Gets called if an user event was triggered.
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**
* 异常事件的传播
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
而InBoundHandler有其对应的对应的实现类
class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler
接下来基于ChannelInboundHandlerAdapter讲一下read事件在pipeline中的传播流程,这是我们案例的服务端代码
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
/**
添加3个InBoundHandler
*/
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
}
});
// Start the server.
ChannelFuture f = b.bind(8007).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
InBoundHandlerA的实现如下:
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(this.getClass().getName() + " read msg:" + msg);
super.channelRead(ctx, msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().pipeline().fireChannelRead("hello");
}
}
InBoundHandlerB,InBoundHandlerC的实现如下:
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(this.getClass().getName() + " read msg:" + msg);
super.channelRead(ctx, msg);
}
}
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(this.getClass().getName() + " read msg:" + msg);
super.channelRead(ctx, msg);
}
}
启动server,本地通过telnet命令telnet localhost 8007 触发后,我们可以看到控制台会输出以下结果:
com.tyust.netty.inbound.InBoundHandlerA read msg:hello
com.tyust.netty.inbound.InBoundHandlerB read msg:hello
com.tyust.netty.inbound.InBoundHandlerC read msg:hello
17:21:48.214 [nioEventLoopGroup-3-1] DEBUG io.netty.channel.DefaultChannelPipeline - Discarded inbound message hello that reached at the tail of the pipeline. Please check your pipeline configuration.
从控制台输出的日志可以得知,read事件在pipeline中的传播是基于InBoundHandler在pipeline中的添加顺序来的.接下来从InBoundHandlerA的这行代码ctx.channel().pipeline().fireChannelRead("hello")入手,基于源码分析一下这个中间的执行流程.
代码位置:io.netty.channel.DefaultChannelPipeline#fireChannelRead

可以看出这个事件是在
Head节点开始传播的。
基于前面的文章,我们都知道,实例中的代码的pipeline结构是这样的

而现在我们在IA这个节点触发了一个read事件,流程也就是这样的

根据代码可知,最开始是在head节点开始传播的,从head节点触发之后,我们继续跟代码,看到代码后会进入HeadContext进行处理


代码进入这个位置:io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead


这时候会开始去找寻下一个
InboundHandler,我们看下找寻的逻辑,轮训pipeline中的Handler,碰到inbound的就返回.
inbound和outbound的标示是在构建context的时候就定好了的

继续跟代码,这时候就得到了InBoundHandlerA,继续调用其invokeChannelRead,就进入了我们的InBoundHandlerA#channelRead方法


同样的,InBoundHandlerB和InBoundHandlerB的执行也是一样的思路.
最后,事件会传播到我们的TailContext节点

看下我们TailContext中的read逻辑,会打印出前面我们控制台中显示的那一段
Discarded inbound message hello that reached at the tail of the pipeline. Please check your pipeline configuration.的日志,最后msg进行回收,避免内存泄漏.


好的,我们的InBound事件的传播就分析到这里,接下来我们看outBound事件。
看到ChannelOutboundHandler接口的定义,可以看出基本都是跟IO读写相关的事件
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
而OutBoundHandler也有其对应的对应的实现类
class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler
接下来基于ChannelOutboundHandlerAdapter讲一下write事件在pipeline中的传播流程,基于之前的代码,我们改下添加handler的部分
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});
InBoundHandlerA的实现如下:
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(this.getClass().getName() + " write msg: " + msg);
super.write(ctx, msg, promise);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(() -> ctx.channel().write("hello,world"), 3, TimeUnit.SECONDS);
}
}
InBoundHandlerB,InBoundHandlerC的代码实现如下:
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(this.getClass().getName() + " write msg: " + msg);
super.write(ctx, msg, promise);
}
}
public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(this.getClass().getName() + " write msg: " + msg);
super.write(ctx, msg, promise);
}
}
启动server,本地通过telnet命令telnet localhost 8007 触发后,我们可以看到控制台会输出以下结果:
com.tyust.netty.outbound.OutBoundHandlerC write msg: hello,world
com.tyust.netty.outbound.OutBoundHandlerB write msg: hello,world
com.tyust.netty.outbound.OutBoundHandlerA write msg: hello,world
从控制台输出的日志可以得知, write事件在pipeline中的传播是基于outBoundHandler在pipeline中的添加顺序逆向顺序来的。接下来从OutBoundHandlerA的这行代码ctx.channel().pipeline().fireChannelRead("hello")入手,基于源码分析一下这个中间的执行流程。
从代码流程来看,可以看出事件传播是从TailContext开始传播


接着,会去pipeline中开始寻找下一个节点OutBoundHandlerC



接着代码就会进入OutBoundHandlerC#write方法中,OutBoundHandlerC中事件会继续沿着pipeline往下进行传播,最终会传播到HeadContext

流程就是如图所示:

最后我们看下在HeadContext中对write事件的处理,他会调用unsafe的write方法,unsafe#write主要是将数据写会到客户端,这里对unsafe不做过多的解析,后面我们会详细讲unsafe。

ok,outBound事件就分析到这里,接下来我们分析异常的传播
修改我们server端的代码变成如下:
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});
其中InBoundHandlerB的代码如下,调用channelRead方法的时候会抛出一个异常:
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
throw new RuntimeException(this.getClass().getName() + " happen error");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(this.getClass().getName() + " exceptionCaught exec ");
ctx.fireExceptionCaught(cause);
}
}
InBoundHandlerA,InBoundHandlerC,OutBoundHandlerA,OutBoundHandlerB,OutBoundHandlerC 重些exceptionCaught方法,代码如下:
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(this.getClass().getName() + " exceptionCaught ");
super.exceptionCaught(ctx, cause);
}
启动server端代码,本地通过telnet命令telnet localhost 8007 触发后,随便输入字符,我们可以看到控制台会输出以下结果:
com.tyust.netty.exception.InBoundHandlerB exceptionCaught exec
com.tyust.netty.exception.InBoundHandlerC exceptionCaught
com.tyust.netty.exception.OutBoundHandlerA exceptionCaught
com.tyust.netty.exception.OutBoundHandlerB exceptionCaught
com.tyust.netty.exception.OutBoundHandlerC exceptionCaught
21:09:05.814 [nioEventLoopGroup-3-1] WARN io.netty.channel.DefaultChannelPipeline - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.RuntimeException: com.tyust.netty.exception.InBoundHandlerB happen error
从日志显示得出,异常是随着handler的添加顺序进行传播,接下来我们进行断点分析;在调用完InBoundHandlerB#channelRead方法后,事件会往下一个节点进行传播,但由于出现了异常,代码会进入这个位置io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object),紧接着,他会去找pipeline中下一个重写了exceptionCaught的方法
找到了InboundHandlerB



也就出现了我们控制台中显示的com.tyust.netty.exception.InBoundHandlerB exceptionCaught exec 日志输出;
接下来他会继续找下一个重写了exceptionCaught的方法也就是InBoundHandlerC,以此类推,最后会执行到TailContext的exceptionCaught方法


最后我们看下TailContext的exceptionCaught方法,它什么事情都没做,只是把日志进行输出,然后进行一场回收


这样其实很不友好,异常是反映我们系统是否出问题最重要的一个因素,我们需要将其捕获进行处理,因此常用的处理流程是调整我们的代码添加一个异常的handler
public class ExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(this.getClass().getName() + " 异常处理,e:" + cause);
}
}
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
ch.pipeline().addLast(new ExceptionHandler());
}
});
好了,我们的事件及异常传播到这里就结束了,留给大家两个问题,大家可以沿着我们上面的分析去解决这两个问题:
- 在
outbound事件传播中,如果OutBoundHandlerA#handlerAdded使用的case2中的代码,事件会是怎么样在pipeline中传播的?
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//case 1
ctx.executor().schedule(() -> ctx.channel().write("hello,world"), 3, TimeUnit.SECONDS);
//case 2
ctx.executor().schedule(() -> ctx.write("hello,world"), 3, TimeUnit.SECONDS);
}
- 同样的,在
inBound事件传播中,如果InBoundHandlerA#channelActive方法中调用的是case2中的代码,那事件是如何传播的?
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//case 1
ctx.channel().pipeline().fireChannelRead("hello");
//case 2
ctx.fireChannelRead("hello");
}