本文基于Netty 4
在讨论Netty服务器启动之前,先回顾一下服务端使用Java nio selector的启动过程:
//1. 获取服务端通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//2. 绑定端口
ssChannel.bind(new InetSocketAddress(9898));
//3. 设置为非阻塞模式
ssChannel.configureBlocking(false);
Selector selector = Selector.open();
//4. 向监听器注册accept事件
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
//5. 获取监听器上所有的监听事件值
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
//注册read事件
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
readMsg(channel);
} else if(...) {
...
}
it.remove();
}
}
Netty nio模式的启动过程:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
b.option(ChannelOption.SO_BACKLOG, 128);
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.TCP_NODELAY, true);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("http-decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast(new MyInboundHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
其实,无论是Java nio,还是Netty nio,总体都包含两个方面:1. 线程模型;2. IO模型;下面先分析Netty的线程模型。
Netty线程启动
NioEventLoopGroup是Netty线程的核心,下面看一下这个类的初始化
NioEventLoopGroup # 构造方法
public NioEventLoopGroup() {
this(0);
}
//无参的构造转调到了这里
public NioEventLoopGroup(int nThreads) {
//这里的Executor传入了null
this(nThreads, (Executor) null);
}
上面的构造方法最终会调用父类的构造方法
MultithreadEventLoopGroup # 构造方法
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//这里若threads==0,会初始化一个值;
//该初始值的算法 Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2))
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
最终走到这里 MultithreadEventExecutorGroup # 构造方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
//之前executor设为了null,这里会赋一个值,这个executor的作用是创建新 Thread并运行任务;后面会发现它的作用
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//这里会根据threads的值创建若干个EventExecutor
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//往里看一下children为何物
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
.........
}
}
}
//初始了chooser,后面会使用该chooser选择一个EventLoop
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
NioEventLoopGroup # newChild()
上面的children[] 最终保存的是NioEventLoop
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop中最核心的方法就是execute()
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
//系统刚启动时,NioEventLoop中的Thread肯定为null,且state==ST_NOT_STARTED,那么肯定会走到else中
if (inEventLoop) {
addTask(task);
} else {
//这里会走到doStartThread()
startThread();
//startThread()方法如下
////////////////////////////////////////////
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
//改变EventLoop的状态并启动真正干活的线程,CAS操作保证了每个EventLoop只启动一个线程
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
////////////////////////////////////////////
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void doStartThread() {
assert thread == null;
//这个executor就是上文在初始化时创建的 ThreadPerTaskExecutor
//ThreadPerTaskExecutor 会创建新的线程
executor.execute(new Runnable() {
@Override
public void run() {
//这个thread常用于inEventLoop()方法
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//run()为abstract方法,子类会实现
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
......
}
}
});
}
NioEventLoop # run()
该方法会进入死循环,不停地按预先分配的时间比例处理IO任务和其他的例如scheduled任务
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
//NIO多路复用选择器
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
......
}
}
通过上面的介绍,我们知道了初始化NioEventLoopGroup时会创建nThreads个NioEventLoop,当有新task到来时,会根据其所属Chanenl选择对应的EventLoop执行(execute())
线程已经先行启动,等着数据的到来;接下来看一下IO相关的初始化,Netty的IO模型是多路复用;
IO的启动
ServerBootstrap是IO的核心类,下面就从这个类入手,看一下IO的启动过程
1. 参数的设置
ServerBootstrap b = new ServerBootstrap();
//上面启动的线程(池)设置在这里
b.group(bossGroup, workerGroup);
b.channel(NioServerSocketChannel.class);
//tcp 三次握手使用的参数
b.option(ChannelOption.SO_BACKLOG, 128);
2. 绑定端口
在继续深入源码之前,需要区分一下register的含义;Java NIO的register一般指把ServerSocketChannel注册到Selector上;在Netty中,注册的含义不仅仅包含这层含义,还包括把NioServerSocketChannel与EventLoop关联在一起
bind(port)会进入AbstractBootstrap的doBind()中
AbstractBootstrap # doBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
// 注册已经在EventLoop中进行,因此下列方法可能和注册过程并发进行
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 注册是通过线程池完成,因此这里要判断是否注册完成
// 注册过程可能很快,这里已经完成
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 绑定本地端口
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
// 没有完成注册的话,就注册一个监听器,注册完成后进行回调
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
AbstractBootstrap # initAndRegister()
这个方法开始使用前面已经初始化的NioEventLoopGroup
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//设置参数时,有 b.channel(NioServerSocketChannel.class),
//因此这里的channel实际就是 NioServerSocketChannel
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
......
return ......
}
//这里的group()实际返回的就是bossGroup,即上面创建的第一个NioEventLoopGroup,因此真正的注册任务是由线程池来完成;
//只有当EventLoop与Channel关联在一起,才能算注册成功
ChannelFuture regFuture = config().group().register(channel);
.......
return regFuture;
}
在真正注册之前,会先实例化NioServerSocketChannel,接下来就看一下NioServerSocketChannel初始化过程;NioServerSocketChannel的继承关系如下:
channelFactory.newChannel()会调用NioServerSocketChannel的构造方法:
public NioServerSocketChannel(ServerSocketChannel channel) {
// NioServerSocketChannel 关心的事件是Accept,即新的客户端连接
super(null, channel, SelectionKey.OP_ACCEPT);
//这里也会进行内存分配相关类的初始化
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
//每个channel都有一个id
id = newId();
//真正与下层交互的东东,比如读写socket等操作
unsafe = newUnsafe();
//初始化为DefaultChannelPipeline
pipeline = newChannelPipeline();
}
NioServerSocketChannel已初始化完毕,但还没有与EventLoopGroup或EventLoop发生任何联系
ServerBootstrap # init()
//上面已完成Channel的初始化,这里的channel就是 NioServerSocketChannel
void init(Channel channel) throws Exception {
//一些attrs、options、handler以及childAttrs、childOptions、childHandler的设置
.......
//这里就是上面创建的DefaultChannelPipeline
ChannelPipeline p = channel.pipeline();
//之前创建的第二个NioEventLoopGroup(workerGroup)
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
//pipeline的链表最后加入了一个ChannelHandler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
// In this case the initChannel(...) method will only be called after this method returns. Because
// of this we need to ensure we add our handler in a delayed fashion so all the users handler are
// placed in front of the ServerBootstrapAcceptor.
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
在addLast时,有一点要注意:如果还没有完成注册,则把Handler保存在一个临时变量中,等注册完毕后再调用相应方法;如果已完成注册,则应调用handlerAdded()等回调方法;
DefaultChannelPipeline # addLast()
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
// 加入到Handler链中
addLast0(newCtx);
// 由前面的分析可知,系统刚启动时init()会走到这一步,此时并没有完成注册,因此会进入if;
if (!registered) {
newCtx.setAddPending();
//在注册完成后,会调用pendingHandlerCallbackHead的相关方法
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
callHandlerCallbackLater()
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
// 注册完成后,按照链的顺序依次调用
pending.next = task;
}
}
addLast()完成后,Pipeline中Handler链的顺序如下:
init()完成后,开始异步注册
AbstractBootStrap # initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
.......
}
// 异步注册
// 使用chooser在EventLoopGroup中选择一个EventLoop进行注册
ChannelFuture regFuture = config().group().register(channel);
......
return regFuture;
}
SingleThreadEventLoop # register()
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
AbstractChannel # AbstractUnsafe # register()
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// NioServerSocketChannel的eventLoop 赋了值
AbstractChannel.this.eventLoop = eventLoop;
// 系统刚启动时,EventLoopGroup中所有的EventLoop都是未启动状态,EventLoop的thread属性也为null
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 由上文可知,execute()方法用来提交任务,启动一个新的EventLoop
// 在此,channel已经和EventLoop产生了联系
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
........
}
}
}
AbstractChannel # AbstractUnsafe # register0()
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// JDK的注册:将ServerSocketChannel注册到selector上;有一点要注意,这里注册的时候,并没有注册感兴趣的事件;
// selectionKey = javaChannel().register(eventLoop().selector, 0, this);
doRegister();
neverRegistered = false;
registered = true;
// 到这里注册完成
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
到这里可以看出Netty中完成注册的标识必须满足下面三个条件:
- NioServerSocketChannel 的eventLoop 字段赋上了值
- 这个EventLoop要启动(新建的EventLoop是未启动状态)
- JDK级别的ServerSocketChannel 注册到selector上(不一定有事件)
在前面我们准备了一些动作必须等注册完成后才能触发,这里再回顾一下是哪些动作:
a. AbstractBootstrap # doBind() 里注册了一个监听器,注册完成后调用;
b. DefaultChannelPipeline # addLast() 里注册了一个回调链,也是等注册完成后调用,实际就是调用handlerAdded();
注册后继续做了如下动作:
pipeline.invokeHandlerAddedIfNeeded();
// 设置成功标志,这里会调用listener;前面注册的ChannelFutureListener就会在此调用,进行本地端口的绑定;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
//
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
pipeline.invokeHandlerAddedIfNeeded()会走到下面代码
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
// 这里就是 上文添加的Handler链;ChannelInitializer就是其中之一
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
// 这里会调用到ChannelInitializer的initChannel()
task.execute();
task = task.next;
}
}
ChannelInitializer # initChannel()
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
.......
} finally {
remove(ctx);
}
return true;
}
return false;
}
上述操作会导致Pipeline中Handler链发生下面的变化:
safeSetSuccess(promise) 会走到如下代码
DefaultPromise # trySuccess()
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
//调用listener;上文注册的ChannelFutureListener就会在此被调用
notifyListeners();
return true;
}
return false;
}
到此Netty服务器已经启动,等待客户端连接,启动大致流程如下:
触发channelActive()
服务器注册完成后,Pipeline中Handler链的结构如下,此时调用pipeline.fireChannelActive()
设置ACCEPT事件
DefaultChannelPipeline # fireChannelActive()
public final ChannelPipeline fireChannelActive() {
// 这里最终会走到HeadContext中
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
HeadContext # channelActive()
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//这个会沿着Handler链的方向调用下一个handler的方法
//在目前的Handler链中,其实什么都没做
ctx.fireChannelActive();
readIfIsAutoRead();
/////////////////////////////////////////// readIfIsAutoRead的方法体
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
//这个是可以配置关闭的,默认都是打开;这个开关与限流有关
// 这个方法最终走到AbstractNioChannel的doBeginRead()
channel.read();
}
}
///////////////////////////////////////////
}
AbstractNioChannel # doBeginRead()
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// readInterestOp在channel初始化时设为了accept
selectionKey.interestOps(interestOps | readInterestOp);
}
}
由上文可知,注册时没有注册感兴趣的事件 (selectionKey = javaChannel().register(eventLoop().selector, 0, this),当时传入的是0)。这里才完成感兴趣事件的设置(至于为什么滞后设置,我觉得是要把所有相关的类都初始化完成后才注册,否则在这个过程中有客户端访问,可能会出错)。此时EventLoop的循环才真正可以检查IO事件。
接收客户端连接
当客户端连接到来时,EventLoop的循环方法(run())会检测到这一事件,进入processSelectedKey()方法
NioEventLoop # processSelectedKey()
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
......
try {
......
//处理 read or accept
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 调用NioMessageUnsafe的read()
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
NioMessageUnsafe # read()
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
///////////////////////////////////////////// NioServerSocketChannel.doReadMessages()
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
......
}
return 0;
/////////////////////////////////////////////
......
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
.......
} finally {
.......
}
}
这里会调用accept()接受客户端连接并保存在list中;接着触发了pipeline.fireChannelRead(),channelRead事件就会从HeadContext传播到ServerBootstrapAcceptor
ServerBootstrapAcceptor # channelRead()
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//msg 是前文accept()接收的NioSocketChannel
final Channel child = (Channel) msg;
// 添加ChannelInitializer
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//childGroup为workerGroup
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
上面的方法会将accept的SocketChannel进行注册,与ServerSocketChannel的注册唯一不同的是此处会把Handler与另一个NioEventLoopGroup关联;下面的代码是我们对这个socket的初始设置。
ServerBootstrap b = new ServerBootstrap();
b.childOption(ChannelOption.SO_REUSEADDR, true);
b.childOption(ChannelOption.TCP_NODELAY, true);
b.childOption(ChannelOption.SO_KEEPALIVE, true);
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("http-decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpObjectAggregator(1048576));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast(new MyInboundHandler());
}
});
Tips: Netty中将与accept()返回的socket有关的参数称之为childXXX,比如childOption()就是给该socket设置参数,childHandler()是给该socket设置处理器
channelRead()后,系统中就形成了一条新的Handler链,其结构如下;之后与客户端的读写交互都由这个Handler链处理