其实在之前的客户端和服务端初始化的时候已经说过了,在初始化Channel的时候,同时初始化pipeline;
//AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
//DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
从DefaultChannelPipeline即可知道初始化pipeline的时候,head和tail是双向链表。可以看下它们的继承关系,再进行其初始化的观察。
TailContext:
从继承结构可以看出,HeadContext和TailContext都继承了AbstractChannelHandlerContext和实现了ChannelOutboundHandler/ChannelInboundHandler,说明用户双重属性,既是context同时也是handler,按我的理解意味着其即拥有上下文属性也拥有handler属性(处理业务逻辑)。在文章开始的图里已经说明:每个channel包含一个pipeline,而pipeline又维护了一个双向链表。
TailContext和HeadContext
这是TailContext和HeadContext构造函数,需要注意的是TailContext的inbound为true,outbound为false,HeadContext则相反,这两个参数和netty事件流向有关,具体情况下文说明。
重新分析ChannelInitializer和自定义handler的添加
Bootstrap.connect()-->Bootstrap.doResolveAndConnect()-->AbstractBootstrap.initAndRegister()-->Bootstrap.init()
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
。。。
}
config.handler()获取的就是ChannelInitializer,p.addLast(config.handler());就是把ChannelInitializer加入双向链表,看代码:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
让我们来看看其中的关键代码
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
由上可知,在加入ChannelInitializer的过程中可以知道,为了添加一个 handler 到pipeline中, 会把此handler包装成ChannelHandlerContext。同时addLast0说明ChannelInitializer是添加在tail之前。这个过程中注意下两个有意思的方法:
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
从源码中可以看到, 当一个handler实现了ChannelInboundHandler接口, 则 isInbound 返回真; 类似地, 当一个handler实现了ChannelOutboundHandler接口, 则isOutbound就返回真。ChannelInitializer是实现了ChannelInboundHandlerAdapter,所以inbound传入的是true。
自定义handler的添加
addLast方法中的另一条关键代码如下:
callHandlerAdded0(newCtx);
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
。。。
ctx.handler().handlerAdded(ctx);
。。。
}
ctx.handler()取到的自然是ChannelInitializer,而handlerAdded(ctx)都做了什么呢:
handlerAdded()-->boolean initChannel()-->void initChannel()
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
ChannelInitializer在加入双向链表后,调用重写initChannel()方法,在initChannel()方法中加入自定义handler,最后remove(ctx);移除ChannelInitializer。
回过头来看下channel的结构层次图,在初始化channel的时候会构建一个pipeline座位channel的属性(pipeline也有一个channel属性),每个pipeline维护了一个由ChannelHandlerContext 组成的双向链表. 这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext中又关联着一个ChannelHandler。
pipeline的传输机制
从上面的分析,我们知道AbstractChannelHandlerContext中有inbound和outbound两个boolean变量, 分别用于标识Context所对应的handler的类型, 即:
- inbound为真时, 表示对应的ChannelHandler实现了ChannelInboundHandler方法.
- outbound 为真时, 表示对应的 ChannelHandler 实现了 ChannelOutboundHandler 方法.
pipieline的事件传输类型有两种:inbound事件和outbound事件两种:
I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
这个是netty官方文档,可以很明显地看出:inbound事件和outbound事件的流向相反。 inbound 的传递方式是通过调用相应的ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的传递方式是通过调用 ChannelHandlerContext.OUT_EVT() 方法。例如ChannelHandlerContext.fireChannelRegistered()调用会发送一个ChannelRegistered 的inbound给下一个ChannelHandlerContext, 而ChannelHandlerContext.bind调用会发送一个bind的outbound事件给下一个 ChannelHandlerContext。
让我们来看下inbound事件传播的方法有哪些:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead()
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught()
ChannelHandlerContext.fireUserEventTriggered()
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
outbound事件传播的方法有:
ChannelHandlerContext.bind()
ChannelHandlerContext.connect()
ChannelHandlerContext.write()
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect()
ChannelHandlerContext.close()
让我们具体来看看这两类事件
outbound事件
outbound事件是请求事件,inbound事件是通知事件,这个要区分清楚。请求事件就是请求某件事即将发生,然后outbound事件进行通知。outbound事件的流向是:
tail -> customContext -> head
让我们用connect事件代码来证明:
当调用Bootstrap.connect()的时候,会触发一个outbound事件。以下是调用链
Bootstrap.connect -> Bootstrap.doConnect -> AbstractChannel.connect
让我们看看AbstractChannel.connect
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
//pipeline.connect的实现如下:
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
当 outbound 事件(这里是 connect 事件)传递到 Pipeline 后, 它其实是以 tail 为起点开始传播的。而 tail.connect 其实调用的是AbstractChannelHandlerContext.connect 方法。继续跟进,在AbstractChannelHandlerContext中connect方法:
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
。。。
final AbstractChannelHandlerContext next = findContextOutbound()
next.invokeConnect(remoteAddress, localAddress, promise);
。。。
}
让我们看下其中的关键代码:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
顾名思义,findContextOutbound就是找出以this context(tail)为基本节点,找出第一个outbound为true的context,然后通过ctx调用invokeConnect方法,如果
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
从tail往head方向获取handler并且调用其connect,如果用户没有从写这个方法,那么会调用ChannelOutboundHandlerAdapter实现的方法:
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
connect又会调用AbstractChannelHandlerContext中connect方法找到下一个outbound为true的handler调用其connect,这样的循环中,直到connect事件传递到DefaultChannelPipeline的双向链表的头节点, 即 head 中(head的outbound设置为true)。
Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect
outbound事件传到head后,因为head本身也是handler,handler()返回的的就它本身,让我们看看它connect方法的实现:
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
到这边outbound事件就结束了。inbound事件
inbound是通知事件,就是说某件事情已经发生了,然后利用inbound事件进行通知。inbound事件的传输方向和outbound刚好相反:
head -> customcontext -> tail
沿着connect继续走,在之后会有inbound事件,我们就以这个为例子进行inbound事件讲解。
承接上文,之前看到head的connect方法:
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
这里unsafe.connect调用的是AbstractNioChannel.connect(),关键代码如下:
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
···
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
}
···
}
在doConnect完成连接之后调用了fulfillConnectPromise,
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
return;
}
boolean active = isActive();
boolean promiseSet = promise.trySuccess();
if (!wasActive && active) {
pipeline().fireChannelActive();
}
if (!promiseSet) {
close(voidPromise());
}
}
让我们看pipeline().fireChannelActive();pipeline().fireChannelActive()将通道激活的消息(即 Socket 连接成功)发送出去。这里就是inbound事件的起点,往下走看这个过程是怎么样的:
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
很明显,以head(HeadContext)为起点,让我们看下在invokeChannelActive做了什么
static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelInactive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelInactive();
}
});
}
}
//next.invokeChannelInactive()实现
private void invokeChannelInactive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelInactive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelInactive();
}
}
这个方法和 Outbound 的对应方法(例如 invokeConnect) 如出一辙. 同Outbound一样, 如果用户没有重写channelActive方法, 那么会调用ChannelInboundHandlerAdapter 的 channelActive 方法:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
public ChannelHandlerContext fireChannelActive() {
invokeChannelActive(findContextInbound());
return this;
}
和outbound事件一样,一样的循环,最后事件传输到tail。tail 本身既实现了ChannelInboundHandler接口, 又实现了ChannelHandlerContext接口,因此当channelActive消息传递到tail后,会将消息转递到对应的ChannelHandler中处理,tail的handler()返回的就是tail本身,最后的channelActive即是tail中的。
inbound事件到这里也就结束了。