在前面讲了ChannelPipeline以及ChannlHandler以及ChannelHandlerContext的结构。
下面就来看看ChannelHandler的执行过程。以及Inbound,outBound对应的ChannelHandler的执行顺序。
下面先上demo
pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gee</groupId>
<artifactId>nio-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
<!-- 时间工具类 start -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
<version>1.18.8</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 指定maven编译的jdk的版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
代码
ChannelHandler
其中ABC都是ChannelInboundHandler的子类。
DEF都是ChannelOutboundHandler的子类。
public class AChannelHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("inbound A");
ctx.fireChannelRegistered();
}
}
public class BChannelHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("inbound B");
ctx.fireChannelRegistered();
}
}
public class CChannelHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("inbound C");
ctx.fireChannelRegistered();
}
}
public class DChannelHandler extends ChannelOutboundHandlerAdapter{
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
System.out.println("outbound D");
ctx.connect(remoteAddress, localAddress, promise);
}
}
public class EChannelHandler extends ChannelOutboundHandlerAdapter{
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
System.out.println("outbound E");
ctx.connect(remoteAddress, localAddress, promise);
}
}
public class FChannelHandler extends ChannelOutboundHandlerAdapter{
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
System.out.println("outbound F");
ctx.connect(remoteAddress, localAddress, promise);
}
}
服务端
public class Server {
private static final int port = 9527;
public static void main(String args[]) {
start();
}
public static void start() {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(boss, work);
sb.channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new AChannelHandler());
ch.pipeline().addLast(new BChannelHandler());
ch.pipeline().addLast(new CChannelHandler());
}
});
ChannelFuture cf = null;
try {
cf = sb.bind(port).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
}
客户端
public class Client {
private static final int port = 9527;
private static final String host = "127.0.0.1";
public static void main(String args[]) {
connect();
}
public static void connect() {
NioEventLoopGroup work = new NioEventLoopGroup();
Bootstrap bs = new Bootstrap();
bs.group(work);
bs.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new AChannelHandler());
ch.pipeline().addLast(new BChannelHandler());
ch.pipeline().addLast(new CChannelHandler());
ch.pipeline().addLast(new DChannelHandler());
ch.pipeline().addLast(new EChannelHandler());
ch.pipeline().addLast(new FChannelHandler());
}
});
ChannelFuture cf = null;
try {
cf = bs.connect(host, port).sync();
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
work.shutdownGracefully();
}
}
}
最后分别启动服务端以及客户端,观察运行结果。
运行结果
从运行结果来看。
inBound的channelHandler 是顺序执行的。A->B->C
而outBound的channelHandler是逆序执行。F->E->D。
先记住,inbound是顺序,而outBound是逆序的。至于为什么后面再说。
为什么inBound是顺序的?
就还是从源码入手,一步一步看吧。
从我们的demo入手。
其实之前已经讲过channelHandler是如何被插入的ChannelHandlerContext的链中的,这里就不说了。
这里主要还是讲一下执行顺序。
这里还是从channel的connect开始。
channel在connect之前,当然是需要一系列的初始化,比如注册到对应的selector中,将这个channel对应的channelHandler都放到channlPipeline对应的链中。
代码入口。以channel的初始化为例子。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
通过工厂返回channel实例。这里主要是会涉及到ChannelPipeline的实例化以及初始化。
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
省略.....
}
channel初始化完,要做的事情当然就是注册到selector中了。
所以直接看这里面的代码即可。往下面看
ChannelFuture regFuture = config().group().register(channel);
省略......
return regFuture;
}
}
channel注册到selector中后,会通过pipeline发起注册事件,用于去完善一些后续操作。
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
register方法最后,定位到如下方法。
promise.channel().unsafe().register(this, promise);
return promise;
}
}
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
protected abstract class AbstractUnsafe implements Unsafe {
private void register0(ChannelPromise promise) {
try {
//channel注册
doRegister();
channel注册完之后,有一些需要后续处理的事情。所以开始fire。。。发射。
为什么要通过pipeline发射呢?待会就知道了。因为pipeline本身管理着ChannelHandlerContext的链,即channelHandler的链。
pipeline.fireChannelRegistered();
省略..........
} catch (Throwable t) {
省略..........
}
}
}
}
为什么inBound是顺序?因为执行顺序是从head发起的。
因为channel中使用的channelPipeline默认的类就是DefaultChannelPipeline
public class DefaultChannelPipeline implements ChannelPipeline {
链的头部
final AbstractChannelHandlerContext head;
链的尾部
final AbstractChannelHandlerContext tail;
为什么是顺序执行,看到这里大家可能就懂了吧?
因为各种fire方法。。。ChannelInBoundInvoker接口的实现,都是从头开始遍历的。
所以,inbound的channelHandler的执行顺序必然是顺序的。
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
}
那么如何执行呢?
从上面的代码片段,往里面走。
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
//这个context是head,从代码上来看,最重要的还是head实现的invokeChannelRegistered方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private boolean invokeHandler() {
// Store in local variable to reduce volatile reads.
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
private void invokeChannelRegistered() {
head的状态是ADD_COMPLETE,所以必然if(true)
if (invokeHandler()) {
try {
//直接往代码里面走,headContext返回自身。
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
invokeHandlerAddedIfNeeded();
//进行传播,head也是AbstractChannelHandlerContext的子类。方法就是在AbstractChannelHandlerContext实现的。
ctx.fireChannelRegistered();
}
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelHandlerContext fireChannelRegistered() {
找到下一个为Inbound的ctx。
再回到最初的方法。这里其实是一个递归操作。
invokeChannelRegistered(findContextInbound());
return this;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
是不是又回到一开始的入口了?
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
拿到context自身对应的channelHandler,如果覆写了的话会执行覆写方法。
如果没覆写的话,其实就是继续ctx.fireChannelRegistered.在父类中实现。
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
}
可能看着有点绕。那就画个图吧。
其实下面部分是一个递归。
ChannelHandlerContext的一个抽象子类AbstractChannelContextHandler已经实现了,invokeChannelRegistered方法,说白了就是继续传播,传播到下一个ChannelHandlerContext,下一个ChannelHandlerContext可以去实现ChannelInBoundHandler里面的方法,决定要在这个过程中做什么,或者是要不要继续传播。如果不传播就停下来了。待会举一个例子。如果要继续传播,就需要ctx.firexxxxx。理论上来说,一直传播的话,会到达TailContext里面的方法,最后结束。因为TailContext是最后一个inbound属性的context.
下面看看TailContext中的代码。其实我们可以看到很多空方法,说白了就是执行结束,直接出栈。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
@Override
public ChannelHandler handler() {
return this;
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception { }
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { }
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// This may not be a configuration error and so don't log anything.
// The event may be superfluous for the current pipeline configuration.
ReferenceCountUtil.release(evt);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
onUnhandledInboundException(cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { }
}
如何使传播中断呢?
那就改一下什么的代码,比如将BChannelHandler的代码进行调整,其他不变。
将ctx.fireChannelRegistered注释。最后运行。
public class BChannelHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("inbound B");
//ctx.fireChannelRegistered();
}
}
运行结果如下,之前是ABCFED,现在变成ABFED,所以我们可以通过决定要不要往下传播,从而去控制某个channelHandler是否要执行。
最后来说说OutBound
为什么OutBound是逆序的呢?
答案估计大家都猜到了,从tail发起,一级一级的找到每个outBound属性的channelHandlerContext即可。
但是还是看看代码吧。
channel初始化完,注册完的操作,就是连接到服务端。
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public final ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise);
}
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
逐级往前查,把一个一个属性为outBound的ctx找出来。
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
找出下一个属性为outBound的ctx
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
执行,若没有实现的话,则通过父类继续往下传播。
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
最后还是走到这步了。继续往下走。
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
}
这个类是默认的时候,其实就是继续传播。
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
继续往下走
ctx.connect(remoteAddress, localAddress, promise);
}
}
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
继续往下走
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
这个方法走到最后,其实是HeadContext实现的。
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
}
public class DefaultChannelPipeline implements ChannelPipeline {
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
}
如何终止传播呢?不要继续用ctx去主动操作即可。
大概就是这样子了。
总结一下,inbound的channelHandler为什么是顺序执行的,是因为从head发起的,然后逐级找到inbound为true的ctx。如果需要停止传播,在实现的ChannelInBoundHandler的方法里面去停止fire即可。
而outBound的channelHandler是逆序的,是因为从tail发起的,逐级找到outBound为true的ctx。如果需要传播,也是在实现的ChannelOutBoundHandler, 不再用ctx去执行outBound的方法即可。