前言
之前的文章介绍了Netty的线程池NioEventLoopGroup的初始化过程,这次将分析Netty中同样非常重要的一个东西ServerBootstrap。
ServerBootstrap使用片段
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat(getServerName() + " Server Acceptor NIO Thread#%d").build();
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat(getServerName() + " Server Reactor NIO Thread#%d").build();
this.bossGroup = new NioEventLoopGroup(numberOfThreads, bossThreadFactory);
this.workerGroup = new NioEventLoopGroup(numberOfThreads, workerThreadFactory);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(DeviceServerListener.this.timeoutSeconds));
pipeline.addLast("lineBasedFrameDecoder-" + maxLength, new LineBasedFrameDecoder(Integer.parseInt(maxLength)));// 按行('\n')解析成命令ByteBuf
pipeline.addLast("stringPluginMessageDecoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("stringToByteEncoder", new StringToByteEncoder());// 将JSON字符串类型消息转换成ByteBuf
pipeline.addLast("deviceMessageDecoder", new DeviceMessageDecoder());// 将JSON字符串消息转成deviceMessage对象
pipeline.addLast("deviceMessageEncoder", new DeviceMessageEncoder());// 将deviceMessage对象转成JSON字符串
pipeline.addLast("deviceHeartBeatResponseHandler", new DeviceHeartBeatResponseHandler(heartTime));
pipeline.addLast("deviceAuthResponseHandler",
new DeviceAuthResponseHandler(DeviceServerListener.this.timeoutSeconds, DeviceServerListener.serverInstanceName));
pipeline.addLast("deviceMessageHandler", new DeviceMessageHandler());
// log.debug("Added Handler to Pipeline: {}", pipeline.names());
}
}).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
// Start the server. Bind and start to accept incoming connections.
this.channelFuture = bootstrap.bind(serverPort).sync();
这是Netty服务端比较标准的初始化片段,可以看到这其中ServerBootstrap
有着非常重要的戏份,它就像是Netty的启动器一样,其中的NioEventLoopGroup
之前已经分析过了,那么直接来看ServerBootstrap
的初始化。
源码解析
从以上片段可以看到初始化时首先通过ServerBootstrap
的无参构造函数创建一个对象,该构造函数没有任何的操作因此不做分析。接着是该对象的一串链式调用bootstrap.group().channel().childHandler().option()
,我们来逐一看一下。
首先是group()
方法
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
这里将传入的childGroup
对象赋值给ServerBootstrap
的childGroup
属性,然后调用了父类AbstractBootstrap
的group()
方法
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}
由这里看出该方法将传入的parentGroup
赋值给了group
属性。
接着来看channel()
方法
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}
这里调用了channelFactory(new BootstrapChannelFactory<C>(channelClass))
,来跟一下
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
这里完成了对AbstractBootstrap
类的channelFactory
属性的赋值,赋值对象为new BootstrapChannelFactory<C>(channelClass)
。
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable
这里的C
根据AbstractBootstrap
的类定义为Channel
的子类,因此实际传入的channelClass
值为NioServerSocketChannel.class
。
接着来看childHandler()
方法
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
没有过多操作,也仅是对ServerBootstrap
的childHandler
属性赋值,但这里传入的childHandler
稍微有点复杂。传入的对象为new ChannelInitializer<SocketChannel>()
同时覆写了其initChannel(SocketChannel ch)
方法,该方法初始化了一个处理链用于处理接收和发送双向的消息,这块比较重要,后续将会单独进行分析,在此暂不深入。
最后来看一下option()
方法
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
这里维护了一个options
数组将要设置的项都放进了该数组中,其定义如下
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
这里可以看出放入对象的key值限定为ChannelOption<?>
对象,大致都有下面这些
public static final ChannelOption<ByteBufAllocator> ALLOCATOR = valueOf("ALLOCATOR");//根据PooledByteBufAllocator或UnpooledByteBufAllocator设置对象池开启与否(4.1版本开始默认开)
public static final ChannelOption<RecvByteBufAllocator> RCVBUF_ALLOCATOR = valueOf("RCVBUF_ALLOCATOR");//设置接收池
public static final ChannelOption<MessageSizeEstimator> MESSAGE_SIZE_ESTIMATOR = valueOf("MESSAGE_SIZE_ESTIMATOR");
public static final ChannelOption<Integer> CONNECT_TIMEOUT_MILLIS = valueOf("CONNECT_TIMEOUT_MILLIS");
public static final ChannelOption<Integer> MAX_MESSAGES_PER_READ = valueOf("MAX_MESSAGES_PER_READ");
public static final ChannelOption<Integer> WRITE_SPIN_COUNT = valueOf("WRITE_SPIN_COUNT");
public static final ChannelOption<Integer> WRITE_BUFFER_HIGH_WATER_MARK = valueOf("WRITE_BUFFER_HIGH_WATER_MARK");
public static final ChannelOption<Integer> WRITE_BUFFER_LOW_WATER_MARK = valueOf("WRITE_BUFFER_LOW_WATER_MARK");
public static final ChannelOption<Boolean> ALLOW_HALF_CLOSURE = valueOf("ALLOW_HALF_CLOSURE");
public static final ChannelOption<Boolean> AUTO_READ = valueOf("AUTO_READ");
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");//允许发送广播数据报
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");//是否启用心跳保活机制
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");//发送缓冲区大小
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");//接收缓冲区大小
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");//是否允许重复使用本地地址和端口
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");//保证阻塞close()的调用,直到数据完全发送
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");//临时存放已完成三次握手的请求队列的最大长度
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
public static final ChannelOption<Integer> IP_TOS = valueOf("IP_TOS");
public static final ChannelOption<InetAddress> IP_MULTICAST_ADDR = valueOf("IP_MULTICAST_ADDR");
public static final ChannelOption<NetworkInterface> IP_MULTICAST_IF = valueOf("IP_MULTICAST_IF");
public static final ChannelOption<Integer> IP_MULTICAST_TTL = valueOf("IP_MULTICAST_TTL");
public static final ChannelOption<Boolean> IP_MULTICAST_LOOP_DISABLED = valueOf("IP_MULTICAST_LOOP_DISABLED");
public static final ChannelOption<Boolean> TCP_NODELAY = valueOf("TCP_NODELAY");//是否开启Nagle算法,即是否等数据累积到一定程度再发送数据
这里很大一部分是tcp中的一些参数设置,比较常用的几个参数都在源码上做了注释,其中对性能影响很大的一项是ALLOCATOR
,根据一些资料显示开启对象池性能远高于不开启,而Netty从4.1版本开始也将默认选项设为了开启对象池。
这里我们顺便来看一下valueOf
方法的实现
public static <T> ChannelOption<T> valueOf(String name) {
checkNotNull(name, "name");
ChannelOption<T> option = names.get(name);
if (option == null) {
option = new ChannelOption<T>(name);
ChannelOption<T> old = names.putIfAbsent(name, option);
if (old != null) {
option = old;
}
}
return option;
}
可以看到这个方法主要是维护了names
这个数组,其定义如下
private static final ConcurrentMap<String, ChannelOption> names = PlatformDependent.newConcurrentHashMap();
来看一下PlatformDependent.newConcurrentHashMap()
的具体实现
public static <K, V> ConcurrentMap<K, V> newConcurrentHashMap() {
if (CAN_USE_CHM_V8) {
return new ConcurrentHashMapV8<K, V>();
} else {
return new ConcurrentHashMap<K, V>();
}
}
看到这里我又一次被震惊了,Netty自己实现了一个ConcurrentHashMapV8
用于应对java8以下的版本,要知道这可是一个6000多行代码的类。。。不得不说Netty的开发者真的不愧优化狂魔的称号。
分析完了bootstrap.group().channel().childHandler().option()
这一串链式调用,接着来看下一串bootstrap.bind(serverPort).sync()
。这里首先来看bind()
方法
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
这里根据端口号创建了一个InetSocketAddress
对象,并调用了bind(SocketAddress localAddress)
方法
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
这里的validate()
方法校验了AbstractBootstrap
类的group
和channelFactory
属性是否为空,然后执行doBind()
方法
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();// 创建Channel并注册到线程池
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// 注册完成且注册成功
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 由于注册是异步事件,可能此时没有注册完成,那么使用异步操作
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// 注册过程中有异常则失败
promise.setFailure(cause);
} else {
// 注册完成且成功
promise.executor = channel.eventLoop();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
首先看一下regFuture
对象如何通过initAndRegister()
方法生成的
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建一个Channel
channel = channelFactory().newChannel();
// 初始化处理器Handler
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// Channel还没有注册到线程池,使用默认线程GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 将channel注册到Reactor线程池
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
channelFactory().newChannel()
这里调用的是AbstractBootstrap
的channelFactory
属性的newChannel()
方法,根据之前赋值的channelFactory
对象的情况来看,这里最终得到的是NioServerSocketChannel
的一个实例对象。得到channel
对象后,接着调用init(channel)
进行了初始化,该方法由子类ServerBootstrap
实现。
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);//将options中逐一设置到DefaultChannelConfig对象上
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {//为了把ServerBootstrapAcceptor放在处理链的最末端,该类主要功能是将mainReactor接受的Channel传递给subReactor
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
这个方法看似很长其实主要做了两件事:一、将父类和子类的options和attrs进行赋值;二、构建channel
的pipeline
属性的处理链。这里主要来看一下addLast()
方法,由DefaultChannelPipeline
进行具体实现,来看一下
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);//判断handler是否已添加并且是否是sharable的
newCtx = newContext(group, filterName(name, handler), handler);//filterName(name, handler)自动生成name,newContext方法对newCtx对象的handler、inbound、outbound、name、pipeline属性进行了赋值
addLast0(newCtx);//将newCtx加到tail节点之前
if (!registered) {//一旦channel注册到eventloop上就将registered置为true,并且不可再改变
newCtx.setAddPending();//CAS地修改newCtx的handlerState的值
callHandlerCallbackLater(newCtx, true);//添加newCtx到pendingHandlerCallbackHead的next节点
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
最终执行的是三参的addLast()
方法并且传入的group
和name
均为空。该方法中主要根据channel
是否注册到eventloop
上做不同的处理,初始化handler链。
回到initAndRegister()
方法中,分析完了init(channel)
再来看另一个很重要的将channel
注册到Reactor线程池的register()
方法。该方法在NioEventLoopGroup
的父类MultithreadEventLoopGroup
中实现
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public EventExecutor next() {
return chooser.next();
}
这里的chooser
上一篇文章中有分析过,其根据线程池设置的线程数是否2的幂次方有不同的实现,而next()
方法返回的是数组中的一个NioEventLoop
对象,该用类的register()
方法,最终会返回一个注册了该channel
的DefaultChannelPromise
对象,具体这里就不再深入了。
到此initAndRegister()
方法就分析完毕,回到doBind()
方法来继续分析doBind0()
方法
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
这里能看到,只有regFuture.isSuccess()
也就是channel
注册成功时时才会执行绑定操作channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)
,否则直接向promise
写注册失败,这里的promise
是一个DefaultChannelPromise
类型的对象,该类继承自Future
,可以认为是一种特殊的Future
对象。bind(localAddress, promise)
方法最终会绑定在tail
节点上,最终交由DefaultChannelPipeline
的内部类的unsafe
去进行绑定,调用链非常深在此就不做展开了。
最后跳回来看一下bootstrap.bind(serverPort).sync()
中的sync()
,之前分析过bootstrap.bind(serverPort)
返回的是promise
,因此sync()
方法由DefaultChannelPromise
实现
@Override
public ChannelPromise sync() throws InterruptedException {
super.sync();
return this;
}
接着跟到DefaultPromise
类
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
@Override
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
可以看出正常情况下该方法会始终在while循环中,导致执行到sync()
方法的线程阻塞,因此bootstrap.bind(serverPort).sync()
以后的代码都是不可达的,这点值得注意。
总结
ServerBootstrap
的使用方式非常固定,大部分常规使用都会应用该初始化代码模板。主要做的事情就是赋值ServerBootstrap
的各个属性,并且创建Channel
、绑定用户定义的Handler
、以及将该Channel
注册到一个eventloop
中(这里特别要强调的是Channel
只能被绑定到一个eventloop
中),最后绑定本地端口监听IO事件。