上一节学习了pipeline
初始化的过程。初始化了HeadContext
和TailContext
,并构建了pipeline
双向链表,每个节点存储ChannelHandlerContext
。
本节研究添加ChannelHandler
的过程。在学习之前先整理一些之前学到的内容。
- 在
服务端channel初始化channle
的过程中,bossGroup
为服务端channel
的pipeline
添加了一个特殊的ChannelHandler
:ServerBootstrapAcceptor
,如下这幅图。
可
客户端channel
所在的workGroup
处理所有childHandler
,这里的Handler
属于客户端channel
因此在
bossGroup
在初始化channel
的过程中,会调用addLast()
增加handler
,促发HandlerAdded
事件,而引导配置的时候触发的是ChannelInitializer#handlerAdded()
。
入口
在引导的过程当中,重写了ChannelInitializer#initChannel()
,其中获取了pipeline
并调用了addLast()
方法,同事将ChannelHandler
作为参数传入addLast
方法中。
class DataServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new DefaultEventExecutorGroup(2),
new DataServerHandler(),
new DataServerOutHandler2(),
new DataServerOutHandler(),
new DataServerHandler2());
}
}
DefaultChannelPipeline#addLast()
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
//循环添加
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
具体的addLast
方法做了以下几件事
- 检查
Handler
是否重复添加 - 创建数节点
ChannelHandlerContext
- 添加数据节点
- 触发
handlerAdded
事件,执行对应Handler
的回调函数
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//检查是否重复添加
checkMultiplicity(handler);
//group:异步线程
//创建数据节点
//filterName:从头节点开始遍历检查名字是否重复,如果没有传入name则生成一个name
newCtx = newContext(group, filterName(name, handler), handler);
//添加数据节点
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
//触发事件回调函数:handlerAdded
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
//触发事件回调函数:handlerAdded
callHandlerAdded0(newCtx);
return this;
}
-
checkMultiplicity(handler);
检查Handler
是否重复添加
逻辑主要时判断是否有@Sharable
注解,从而判断是否允许从夫,其次通过成员变量added
属性判断是否被添加过。如果重复的话则排除异常。
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
//判断是否是共享 同时 是否时被添加过的
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
public boolean isSharable() {
/**
* Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
* {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
* {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
* {@link Thread}s are quite limited anyway.
*
* See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
*/
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
-
newCtx = newContext(group, filterName(name, handler), handler);
,创建数据节点
实例化DefaultChannelHandlerContext
的过程中将handler
作为成员变量持有,并且标记了handler
是inbound
还是outbound
,并将一些重要的组件pipeline
,executor
保存起来。
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
//实例化DefaultChannelHandlerContext
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
//标记类型
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
//持有handler
this.handler = handler;
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
指的注意的是childExecutor
,该方法处理添加handler
时,传入的EventExecutorGroup
。该Executor
可用于用户在回调方法中处理异步计算。
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
//存储异步线程的容器:管理异步线程
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
//新建一个Map存储异步线程
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
//存储异步线程
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
//返回异步线程
return childExecutor;
}
用户配置异步线程,处理异步计算。
class DataServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
//配置异步线程处理异步计算
.addLast(new DefaultEventExecutorGroup(2),
new DataServerHandler(),
new DataServerOutHandler2(),
new DataServerOutHandler(),
new DataServerHandler2());
}
}
class DataServerHandler extends SimpleChannelInboundHandler<Object> {
public DataServerHandler() {
super(false);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(MessageFormat.format("---> receive client msg = \"{0}\"", byteBuf.toString(CharsetUtil.UTF_8)));
System.out.println("<--- send to client msg = server msg");
ByteBuf serverMsg = Unpooled.copiedBuffer("server msg", CharsetUtil.UTF_8);
ctx.write(serverMsg);
ctx.executor().execute(new Runnable() {
public void run() {
try {
TimeUnit.SECONDS.sleep(20);
System.out.println("io async end ");
} catch (InterruptedException e) {
//ignore
}
}
});
System.out.println("<--- send to client channel msg = server channel msg");
ByteBuf serverChannelMsg = Unpooled.copiedBuffer("server channel msg", CharsetUtil.UTF_8);
//tail.write
ctx.channel().write(serverChannelMsg);
System.out.println("<--- send to client msg = server msg2");
ByteBuf serverMsg2 = Unpooled.copiedBuffer("server msg2", CharsetUtil.UTF_8);
ctx.write(serverMsg2);
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
}
结合初始化NioEventLoopGroup
的过程实际上这里保存的executor
就是ThreadPerTaskExecutor
,因此同样时新增了一个EventLoop
数组并初始化一个chooser
,当group.next()
的时候调用chooser
选择一个EventLoop
执行操作。
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
- 添加数据节点
addLast0(newCtx);
其逻辑主要是链表的操作:将新的数据节点添加到尾节点(TailContext
)之前
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
-
callHandlerAdded0(newCtx);
:触发handlerAdded事件
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
//调用回调函数
ctx.callHandlerAdded();
} catch (Throwable t) {
//省略代码
}
final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
if (setAddComplete()) {
//获取handler调用handlerAdded触发事件
handler().handlerAdded(this);
}
}
整理以下整个添加handler的过程
bossGroup
初始化的时候将对服务端channel
的pipeline
进行了添加ChannelInitializer
的操作,促发用户initChannel
回调方法,添加具体的handler
,之后删除ChannelInitializer
。其中
bossGroup
添加具体的handler
,包括两个:引导调用handler()
配置的,另一个是ServerBootstrapAcceptor
。当新连接接入时候,会创建
客户端channel
并初始化,当处理读取事件的时候触发ServerBootstrap.ServerBootstrapAcceptor#channelRead()
,调用如下代码
//添加childHandler
child.pipeline().addLast(childHandler);
该childHandler
也是引导配置的时候调用childHandler()
方法,传入ChannelInitializer
实例作为参数。因此同样的逻辑。不一样的是,childHandler
的处理逻辑已经交给了childGroup
进行处理了,脱离了bossGroup
,查看如下代码。
//选择NioEventLoop并注册Selector
childGroup.register(child)
.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});