这篇主要想分析下ServerBootstrap的启动过程,包括:
- netty怎么创建selector,什么时候注册OP_ACCEPT事件?
- netty怎么初始化各类handler?
首先我们看下经典的netty server启动代码:
DefaultThreadFactory bossThreadFactory = new DefaultThreadFactory("boss");
DefaultThreadFactory workerThreadFactory = new DefaultThreadFactory("worker");
EventLoopGroup bossGroup = new NioEventLoopGroup(1, bossThreadFactory);
EventLoopGroup workerGroup = new NioEventLoopGroup(16, workerThreadFactory);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new TimeServerHandler());
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
// TODO: handle exception
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
可以看到ServerBootstrap的一些初始化参数,包括对应reactor线程模型的两个EventLoopGroup:bossGroup和workerGroup(这里为了区分处理连接的accept线程和处理io的worker线程)、channel类型、channel的参数、注入的handler。
启动过程主要是bind方法,最终会进入AbstractBootstrap.doBind方法
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化和注册channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
继续看initAndRegister方法
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//基于反射+工场创建NioServerSocketChannel类,利用之前传的NioServerSocketChannel.class参数
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
//异常处理略。。。
}
//注册channel到bossGroup中的某个线程
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
initAndRegister方法首先创建NioServerSocketChannel类,然后init(channel),最后注册channel到NioEventLoopGroup中,看下init方法主要做什么:
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//以上为设置channel的属性
//创建ChannelPipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//初始化ChannelInitializer(一次性handler),实现一些初始化操作
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
init方法首先给channel设置了一些属性,然后对channel绑定的ChannelPipeline加入了一个特殊的handler:ChannelInitializer,ChannelInitializer类的作用后面再讨论。
继续看ChannelFuture regFuture = config().group().register(channel)操作,首先拿到NioEventLoopGroup,register进入MultithreadEventLoopGroup.register方法:
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
next方法是从EventLoopGroup中选择一个线程来注册channel,选择方式:
executors[idx.getAndIncrement() & executors.length - 1]
这里要注意一下,因为NioServerSocketChannel只有一个,所以只会注册到一个线程中,bossGroup的线程数设置多少没有意义,最终只有一个线程注册NioServerSocketChannel,即accept线程。
进入register方法(AbstractChannel.register):
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
。。。
//校验代码略
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
//本例是main线程,最终会进入此方法
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
重点看下register0(promise)方法,该方法所在的runnable会作为一个task放入eventLoop线程中(即boss线程,也是reactor模型中的accept线程)。
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//对channel注册0事件
doRegister();
neverRegistered = false;
registered = true;
//会触发初始化ChannelInitializer
pipeline.invokeHandlerAddedIfNeeded();
//设置成功,并通知相关Listener
safeSetSuccess(promise);
//注册完成事件
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
//第一次会进入此方法,进行激活channel,是最终注册op_accept事件的地方
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
查看注册方法doRegister:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
可以看到这里实现了注册事件,不过只注册了0,还不是OP_ACCEPT事件。我们知道注册事件需要selector组件,那么eventLoop().unwrappedSelector()是怎么创建的呢?回到eventLoop的初始化方法中,在其父类MultithreadEventExecutorGroup的构造方法中有对每个NioEventLoop进行初始化,而NioEventLoop的构造方法如下:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
//开启selector
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
看完doRegister(),继续看pipeline.invokeHandlerAddedIfNeeded()方法,最终会调到ChannelInitializer的initChannel方法,该方法的目的是将config中的handler和ServerBootstrapAcceptor加入ChannelPipeline。initChannel方法,就会移除ChannelPipeline中的ChannelInitializer(移除代码在ChannelInitializer类的initChannel方法的finally代码中),即只做初始化功能,一次性handler。
再到safeSetSuccess(promise),这里会对promise设置成功,并通知相关listener,比如在AbstractBootstrap#doBind中的regFuture,这里后面再说。
继续到pipeline.fireChannelActive()方法,最终会到AbstractNioChannel#doBeginRead:
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
//第一次会进入此逻辑,此时readInterestOp=16,即accept事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
再回到regFuture的Listener事件,有个doBind操作,最终会执行到NioServerSocketChannel#doBind:
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
这也就让NioServerSocketChannel和端口绑定上了。
再看下开头的两个问题:
- netty怎么创建selector,什么时候注册OP_ACCEPT事件?
netty在创建eventLoopGroup时,会对每个eventLoop创建并绑定selector,在bossGroup中会选择一个eventLoop注册selector并初始化监听事件0,等到bind后才监听OP_ACCEPT事件。 - netty怎么初始化各类handler?
通过一次性的handler(ChannelInitializer)