Bootstrap适用于创建客户端连接的一个引导类,我们可以通过它很方便的创建出Netty客户端的连接,接下来我以官方源码里面的example echo项目为例来具体分析其实现:
例子来自官方的Example示例下面的echo项目,example\src\main\java\io\netty\example\echo
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
上面的代码向我们展示了Netty客户端初始化所需的内容:
1、EventLoopGroup:事件线程循环调度组,这里使用的是NioEventLoopGroup。
2、Bootstrap:客户端引导类
3、NioSocketChannel:Nio客户端通道
4、ChannelInitializer:客户端Handler初始器
5、ChannelPipeline:Channel管道
在研究Bootstrap实现之前,我们先来看下Bootstrap的类图结构:
可以看到,Bootstrap的类图结构还是比较简单的,上层有个抽象类AbstractBootstrap,以及两个顶层接口分别是Cloneable、Channel。
继续研究,我们看到上面的代码中是直接new
出来了一个Bootstrap,这个是个空的构造函数,所以这一步骤没啥可说的,接下来把EventLoopGroup设置进了b.group(group),我们进入这里看下其实现:
public B group(EventLoopGroup group) {
// 忽略参数校验
this.group = group;
return self();
}
其实这一步骤是由AbstractBootstrap来完成的,group的定义如下:
volatile EventLoopGroup group;
继续往下走,来到了 channel(NioSocketChannel.class),这里设置了一个Chanenl类型,此处使用的是NioSocketChannel,这里的具体实现也是由AbstractBootstrap来完成的,代码如下:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
我们来看下ReflectiveChannelFactory里面的实现是怎么样的:
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
this.constructor = clazz.getConstructor();
}
@Override
public T newChannel() {
return constructor.newInstance();
}
上面的代码去除了参数校验以及异常判断,这样看代码更加清晰明了一点,可以看到的是这不就是我们经常使用的Java反射实例对象的方式吗,首先先将传递进来的NioSocketChanenl.class,通过获取其默认的构造函数对象,并将其默认构造函数保存起来,后期实现的使用,直接通过newChannel()获取一个新的实例即可。ReflectiveChannelFactory实例对象后面通过channelFactory()方法设置给了AbstractBootstrap的channelFactory字段。
接下来,代码使用了option(ChannelOption.TCP_NODELAY, true),这里也是由AbstractBootstrap父类的进行处理的:
public <T> B option(ChannelOption<T> option, T value) {
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
这里可以看到如果ChannelOption里面的常量值如果设置的是null则会进行移除该选项,否则就将其设置到options中,下面我们来看下options在AbstractBootstrap中是如何定义的:
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
这里就是一个简单的LinkedHashMap用于保存ChannelOption的键值集合,比较简单。
下面设置了handler(),也是由AbstractBootstrap来设置,代码实现如下:
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
这里不过多展开,后面会有单独的章节讨论。
继续往下走,b.connect(HOST, PORT).sync();看下其实现先:
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
代码虽长,但是最重要的还是initAndRegister()方法,继续跟进看看:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
走到这里,我们可以看到我们之前通过传递NioSocketChannel.class构建的ReflectiveChannelFactory工厂排上用场了,这里也是通过newChannel()方法直接通过构造函数直接new出来了一个NioSocketChannel对象出来,因为这里使用的是默认构造函数构造的对象,具体的代码实现我们还是需要手动定位到NioSocketChannel中一看究竟,如下:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static SocketChannel newSocket(SelectorProvider provider) {
return provider.openSocketChannel();
}
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
可以看到这里使用了JDK的SelectorProvider提供器打开了一个新的SocketChannel通道,然后调用了父类AbstractNioByteChannel的构造函数:
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
AbstractNioByteChannel又调用了它的父类AbstractNioChannel构造函数:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
}
这里我们调用了父类的构造函数将parent(这里是null)传递给了AbstractChannel,然后设置了感兴趣的事件为SelectionKey.OP_READ,并且设置该通道为非阻塞,这里没有太多的东西,看下AbstractChannel的构造函数实现:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
可以看到这里的东西还是有点儿多的,首先将子类传递过来的Channel parent设置到this.parent,这里是个null,接下来再实例化id,这里的id实现为DefaultChannelId,接下来实例化了一个Unsafe的实现,这个Unsafe是Netty的自定义实现,看下其接口声明:
interface Unsafe {
//返回指定的RecvByteBufAllocator.Handle,它将用于在接收数据时分配ByteBuf。
RecvByteBufAllocator.Handle recvBufAllocHandle();
//返回本地绑定的SocketAddress
SocketAddress localAddress();
//返回远程绑定的SocketAddress
SocketAddress remoteAddress();
// 注册ChannelPromise到Channel,并在注册完成后通知ChannelFuture
void register(EventLoop eventLoop, ChannelPromise promise);
//将SocketAddress绑定到ChannelPromise的Channel上,当完成后将通知ChannelPromise
void bind(SocketAddress localAddress, ChannelPromise promise);
// 将给定ChannelFuture的Channel与给定的远程SocketAddress连接,如果要使用本地localAddress只需要给定参数,其它情况只需要传递null即可,连接操作完成后,ChannelPromise将收到通知
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
//断开ChannelFuture的Channel连接并且当操作完成后通知ChannelPromise
void disconnect(ChannelPromise promise);
//关闭ChannelFuture的Channel连接并且当操作完成后通知ChannelPromise
void close(ChannelPromise promise);
//立即关闭Channel而不触发任何事件,可能只有在注册尝试失败时才有用。
void closeForcibly();
//从EventLoop注销ChannelPromise的Channel,并在操作完成后通知ChannelPromise
void deregister(ChannelPromise promise);
//调度一个读取操作,该操作将填充ChannelPipeline中第一个ChannelInboundHandler的入站缓冲区。如果已经存在挂起的读取操作,则此方法不执行任何操作。
void beginRead();
// 调度一个写操作
void write(Object msg, ChannelPromise promise);
// 调度write(Object, ChannelPromise)方法,刷新所有的写操作
void flush();
//返回一个特殊的ChannelPromise,它可以被重用并传递给{@link Unsafe}中的操作。它永远不会收到成功或错误的通知,因此只是操作的占位符以ChannelPromise作为参数,但不希望得到通知。
ChannelPromise voidPromise();
//返回存储挂起写入请求的Channel的ChannelOutboundBuffer
ChannelOutboundBuffer outboundBuffer();
}
这么看来所有的网络底层操作都封装到了Netty实现的Unsafe接口中,这里我们看些newUnsafe代码的实现,在该父类中定义的是一个抽象方法,具体的实现在NioSocketChannel子类中:
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
可以看到Unsafe的具体实现是由NioSocketChannelUnsafe()类来实现的,其类层级图如下所示:
这里就不展开讲了。
继续往下走,接下来就到了pipeline = newChannelPipeline()了,这也对应了每个Channel都有一个对应的ChannelPipeline,这也是Netty实现无锁化的关键,后面的事件都是在该Pipeline中流转的,看下其实现代码吧:
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
这里的实现比较简单,直接实例化了一个DefaultChannelPipeline,并且将当前对象以构造参数的形式传递过去了,继续深入进去看看:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
这里可以看到这里有两个节点分别是tail、head,并且它们进行了关联,组成了一个双向链表的形式,接下来我们看下TailContext的类结构图:
接下来看下构造函数代码实现:
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
可以看到调用了父类AbstractChannelHandlerContext父类的构造函数:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.executionMask = mask(handlerClass);
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
接下来,我们继续跟进HeadContext的实现,首先看下HeadContext的类结构图:
代码实现和TailContext一致:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
和TailContext一样,HeadContext的父类也是AbstractChannelHandlerContext抽象类,到此为止,我们就完成了整个Channel的实例化工作了。我们继续回到AbstractBootstrap.initAndRegister()方法,下一步操作并是执行init(channel)方法:
void init(Channel channel) {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
}
这里的ChannelPipeline并是我们前面实例化的DefaultChannelPipeline,然后将我们EchoClient中设置的Handler添加到ChannelPipeline中,并设置ChannelOption以及属性等操作。
继续回到AbstractBootstrap.initAndRegister()方法,我们可以看到下面的代码块:
ChannelFuture regFuture = config().group().register(channel);
这里通过config()方法获取到BootstrapConfig实例,然后根据该实例的group()方法,里面的实现是直接bootstrap的group属性,所以这里的group就是我们程序里面设置的NioEventLoopGroup对象,我们定位到NioEventLoopGroup对象的register(channel)方法,该方法由父类MultithreadEventLoopGroup实现:
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
可以看到通过next()方法从线程组里面选取一个NioEventLoop线程来执行操作的,我们来跟进next()方法的实现,该方法由父类MultithreadEventExecutorGroup来实现:
public EventExecutor next() {
return chooser.next();
}
可以看到的是,这里是由我们之前设置的EventExecutorChooserFactory.EventExecutorChooser来实现的,如果传递线程的数量是2的幂则使用的是PowerOfTwoEventExecutorChooser,否则使用的是GenericEventExecutorChooser,这里不过是使用获取线程的方式不同,下面来分别看下两者的实现:
PowerOfTwoEventExecutorChooser的实现:
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
可以看到PowerOfTwoEventExecutorChooser的实现是通过一个原子整型的自增器不断增加然后与线程池里面的数量-1与之进行逻辑与获取该线程池里面的一个线程,其效率是非常高效的。
接下来看下GenericEventExecutorChooser的实现:
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
与PowerOfTwoEventExecutorChooser不同的是,GenericEventExecutorChooser的实现方式则是通过原子整型的自增器不断增加然后与线程池里面的数量进行模除操作来定义一个线程,其效率是没有按位与高效。
回到之前的代码,next()方法与设置的线程数是有很大关系的,推荐做法是使用2的幂来设置线程的数量,这样使用按位与能获得更好的性能。next()方法返回的是NioEventLoop线程对象,我们定位到该方法是由其父类SingleThreadEventLoop来实现的:
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
可以看到具体的实现是由channel的Unsafe类的实现类NioSocketChannelUnsafe来实现的,不过其register()方法是由其父类AbstractUnsafe来实现的:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
继续跟进register0:
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
这里我们跟进doRegister()方法,这里的实现是由AbstractNioChannel来完成的,源码如下:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
该方法用于将SocketChannel注册到到Selector中,并且监听0事件(表示不监听任何事件),attachment为当前channel。
在一系列操作完成之后,最终调用的是Bootstrap#doConnect:
private static void doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
这里会通过调用对应的NioEventLoop线程来发起NioSocketChannel#connect方法的调用,而这个方法对应调用的是DefaultChannelPipeline#connect方法,如下:
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
在此之前我们分析过DefaultChannelPipeline对象中有一个双向关联的链表,这里我们可以看到调用的是尾部链表,继续跟进去,其对应位置在AbstractChannelHandlerContext#connect:
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return connect(remoteAddress, null, promise);
}
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
这里我们看到主要是通过findContextOutbound方法传递一个MASK_CONNECT
参数获取一个对应的AbstractChannelHandlerContext对象来执行下面的操作逻辑,所以我们跟进该方法看下其实现:
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.prev;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
因为在DefaultChannelPipeline中使用的是双向链表保存着各个ChannelHandler,所以这里采用的是do-while循环的方式查找对应的AbstractChannelContext,而do-while结束的条件是通过skipContext方法计算得出,来看看里面的实现:
private static boolean skipContext(AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
这里采用的是位运算操作,可以大概的理解为利用位运算符计算得出合适的HandlerContext即可,感兴趣的小伙伴可以自行参考ChannelHandlerMask#mask方法,也就是我们没实例化一个Handler对象,在该对象的属性中都会存放一个executionMask,而在我们的DefaultChannelPipeline中的TailContext和HeadContext中都有着对应关于mask的特殊处理,也就是处理所有的事件,下面是TailContext的mask源码:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
}
这里调用了父类的构造方法,也就是AbstractChannelHandlerContext的构造方法:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
ChannelHandlerMask对应的常量字段:
// Using to mask which methods must be called for a ChannelHandler.
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
这里调用了ChannelHandlerMask#mask方法,如下:
static int mask(Class<? extends ChannelHandler> clazz) {
// MASKS为一个Netty自定义的ThreadLocal实现
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}
这里主要的逻辑是mask0方法,如下:
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask;
}
该方法用于根据传递的Handler类型计算得出对应的mask,TailContext因为实现了ChannelInboundHandler接口,所以走的是下面的逻辑:
int mask = MASK_EXCEPTION_CAUGHT;
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
}
最终TailContext的executionMask也就为511了。
接下来,我们看下HeadContext的executionMask是如何计算得出的,看下HeadContext的构造方法:
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}
可以看到的是,HeadContext和TailContext的不同之处是该对象既实现了ChannelOutboundHandler
接口也实现了ChannelInboundHandler
接口,正所谓能力越大责任越大啊,不过它们相同之处便是都继承了AbstractChannelHandlerContext
类,因为这里处理的方式相同,所以省略看下上文即可,不同的地方是,因为HeadContext实现了2个接口,所以对应的ChannelHandlerMask#mask0对应的处理有所不同,如下所示:
int mask = MASK_EXCEPTION_CAUGHT;
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
}
所以HeadContext对应处理了所有关于in
和out
的事件,所以HeadContext的executionMask最终得到了131071。
所以回到我们之前的findContextOutbound方法,最终遍历链表找到的便是HeadContext对象了。找到了该对象之后回到AbstractChannelHandlerContext#connect方法,如下:
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
//...
//这里返回的是HeadContext
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
//如果是在本线程之内直接发起调用
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
//否则使用异步任务发起处理
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
这里通过找到HeadContext调用了invokeConnect方法,如下:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
//当前ChannelHandler的handlerAdded方法是否已经被调用或者不保证顺序并且ChannelHandler的handlerAdded方法即将被调用
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
继续根据connect方法,HeadContext#connect:
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
这里的unsafe对象对应了NioSocketChannelUnsafe对象,这里的connect调用了AbstractNioChannel#connect方法:
public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
//doConnect方法会返回false
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
//保存起来用于检测是否发起重复的连接操作
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
//处理连接超时的情况
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
这里跟进doConnect方法,位置在NioSocketChannel#doConnect:
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
//利用Netty的工具类连接到远程主机,这里返回的是false
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
//注册连接事件
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
到此为止,关于Netty的Bootstrap的整个流程就分析完成了。
整个流程图如下所示。