上一节学习了删除ChannelHandler
的过程,至此我们已经了解了pipeline
和ChannelHandlerContext
,ChannelHandler
着三者之间的关系。pipeline
通过维持一个链表结构,链表节点是ChannelHandlerContext
,该节点持有ChannelHandler
。部分对ChannelHandler
的操作直接暴露给ChannelHandlerContext
,因此我们可以直接操作ChannelHandlerContext
来间接操作ChannelHandler
。
本节以ChannelRead
事件为例,学习inBound事件的传播过程。
class DataServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new InboundHandlerA(),
new InboundHandlerB(),
new InboundHandlerC()
);
}
}
class InboundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InboundHandlerA = " + msg);
ctx.fireChannelRead(msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//调用通道的fireChannelRead方法是从头节点HeadContext开始传播
ctx.channel().pipeline().fireChannelRead("hello world");
}
}
class InboundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InboundHandlerB = " + msg);
//调用数据节点的传播方法是从当前节点往下传播事件
ctx.fireChannelRead(msg);
}
}
class InboundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InboundHandlerC = " + msg);
ctx.fireChannelRead(msg);
}
}
启动服务并添加一个连接,百变Handler的添加顺序。发现InboundHandler
是顺序执行的
class DataServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new InboundHandlerB(),
new InboundHandlerA(),
new InboundHandlerC()
);
}
}
-
handler
之间的传播信息通过fireXXX
方法,以channelRead
事件为例,则时通过fireChannelRead()
方法,可以用ChannelContextHandler
调用,也可以用pipeline
调用。其区别是从哪个节点开始传播。
//调用通道的fireChannelRead方法是从头节点HeadContext开始传播
ctx.channel().pipeline().fireChannelRead("hello world");
//从本节点往下传播
ctx.fireChannelRead(msg);
-
ctx.channel().pipeline().fireChannelRead("hello world")
传播过程
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
//从头节点开始执行channelRead方法
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//执行channelRead()
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
//获取对应的handler并调用channelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
最终传播到HeadContext
,调用channelRead
方法再往下进行传播
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//从当前节点往下传播channelRead事件
ctx.fireChannelRead(msg);
}
fireChannelRead()
调用了findContextInbound()
通过inbound
属性轮询出下一个ChannelInboundHandler
。
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
//先找到下一个节点,再执行channelRead方法
//findContextInbound : 找到下一个节点
invokeChannelRead(findContextInbound(), msg);
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
//通过inbound属性轮询出下一个inboundHandlerContext
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
而之前的章节也有提到过,inbound和outbound属性是在添加ChannelHandler
的时候,创建ChannelHandlerContext
时被添加。而判断是否时inbound则用的是instanceof
关键字。
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
- 最终inbound事件的传播过程,是从头节点开始,逐个往下传递并触发用户回调函数,在这过程当中,可以手动调用
pipeline
的传播事件的方法,从任何一个节点开始从头开始触发传播事件,也可以直接通过ChannelHandlerContext
的传播事件方法,一次从本节点开始往下传播事件。最后传到尾节点TailContext
以channelRead
为例,当走到这个方法则表明,通道内未对传播的内容进行处理,并且占用的内存未释放,在尾节点打印了日志并最终释放了内存。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(msg);
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
//释放内存
ReferenceCountUtil.release(msg);
}
}