开篇
在此前的章节https://www.jianshu.com/p/e38844b145fb,我们介绍了Netty的引导启动类,但是并没有详细解析系统启动后的建链过程,本文将会就这一过程进行深入解析。
服务端
1、监听初始化
服务端从doBind()
开始
private ChannelFuture doBind(final SocketAddress localAddress) {
// 完成初始化以及注册操作
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 忽略
}
}
话不多说,直接看initAndRegister()
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 创建NioServerSocketChannel
channel = channelFactory.newChannel();
// 初始化channel
init(channel);
} catch (Throwable t) {
// 忽略
}
// 注册channel到EventLoop的selector
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
采用反射方式创建NioServerSocketChannel,会调到其无参构造函数,最终会调用
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
可以看到会将channel的readInterestOp初始化为SelectionKey.OP_ACCEPT,用于接受建链请求。channel创建好后,开始进行初始化
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
// 这个childGroup就是在引导类中配置的workerGroup
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
}
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
// 将在启动类中配置的channelHandler加入到NioServerSocketChannel对应的pipeline中
pipeline.addLast(handler);
}
// 将serverBootstrapAcceptor这个channelHandler加入到nioServerSocketChannel对应的pipeline中
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
ServerBootstrapAcceptor就是用来接收建链请求的。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 将启动类中配置的childHandler添加到NioSocketChannel对应的pipeline中
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// 将NioSocketChannel注册到EventLoop的selector上
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
好,我们返回到initAndRegister,至此NioServerSocketChannel已经创建并且初始化完毕,开始进行注册操作
ChannelFuture regFuture = config().group().register(channel);
会调到MultithreadEventLoopGroup的register方法
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
// 从引导类中配置的EventLoop线程池中轮询一个EventLoop
public EventLoop next() {
return (EventLoop) super.next();
}
接着往下,最终会调到AbstractChannel的register方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ...省略
// 由于当前eventLoop的thread还是null,所以这里返回false
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
进入到eventLoop的execute方法
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
将task(也即register0方法)放入队列,判断当前线程非EventLoop自己的线程,开始创建线程
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 将EventLoop的线程绑定为当前线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// EventLoop线程开始死循环,处理IO事件和自定义任务以及定时任务
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
}
调用executor的execute,会新创建一个线程执行task,这个线程就是EventLoop绑定的线程,在整个系统的生命周期范围内,这种绑定关系都是固定的,不会发生变更。下面来分解NioEventLoop的run方法
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
// 最终调用Java NIO的Selector的select方法处理IO事件
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
final long ioStartTime = System.nanoTime();
try {
// 处理准备好的IO事件
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
// 从任务队列取任务执行(还记得之前已经加入到队列里的task,也即register0方法吗?)
ranTasks = runAllTasks(0);
从任务队列中取出任务,开始执行之前加入到任务队列中的register0方法
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 调用Java NIO方法,将channel注册到selector
doRegister();
neverRegistered = false;
// 设置已经注册的标志位
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// isActive是通过channel是open状态并且已经绑定本地地址和端口来判断的
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// 忽略
}
}
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 调用Java NIO方法,将channel注册到selector,注意这里的监听事件设置的为0,目的是要让
// channel执行完其他操作后,再设置OP_ACCEPT的监听
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
}
// 忽略
}
}
注意这里的register将channel注册到selector时,传递的监听事件为0,意思是什么都不监听,等channel完成其他操作后,再设置OP_ACCEPT监听,那么是在哪里设置的呢?
- 如果isActive方法返回true,并且autoRead是true(默认是true),则会进入beginRead,最终执行doBeginRead,所以注册完成后,就会设置OP_ACCEPT事件监听。
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// 还记得之前创建NioServerSocketChannel时,设置readInterestOp为OP_ACCEPT吗?将该事件加入
// 到NioServerSocketChannel对应的Selector的监听序列中
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
-
如果isActive方法返回false,又是在哪里将OP_ACCEPT事件加到监听序列的呢?因为Netty的很多任务都是封装成task,放入队列中,再由EventLoop线程从队列中取出执行,所以直接从代码中跳跃着找不是很方便,我们可以在doBeginRead方法打个断点,看下调用栈
没错!我们在这里看到了熟悉的doBind0,这就是在本文的一开始的doBind方法中设置的监听回调
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
也就是在上面的NioServerSocketChannel初始化并注册成功后,会调到doBind0方法,在这个方法中将Channel的bind操作封装成一个任务,加入到EventLoop的任务队列
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
接下来,这个bind操作会在NioServerSocketChannel对应的pipeline上的outBound方向的channelHandler之间传递,也就是tail handler--> 其他的handler -> head headler,最终在head handler的bind方法中调用AbstractChannel的bind真正执行绑定操作
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 省略...
boolean wasActive = isActive();
try {
// 调用Java NIO接口真正执行绑定本地地址和端口的操作
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
// 触发channelActive事件
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
可以看到,在绑定操作执行完后,触发了channelActive事件,这个事件首先会被head handler的channelActive方法处理
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
这个read操作同样的跟上面的bind操作一样,会在NioServerSocketChannel对应的pipeline上的outBound方向的channelHandler之间传递,也就是tail handler--> 其他的handler -> head headler,最终调到head handler的read方法
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
经过百转千回终于走到doBeginRead方法了!有没有被绕晕,哈哈。不要着急,现在只是完成了本地地址和端口的绑定,NioServerSocketChannel的selector开始监听OP_ACCEPT事件,做好了接收建链请求的准备。
那么,当有客户端建链请求进来时,又是怎样处理的?接着往下看。
2、接收建链请求
回到EventLoop的run方法,当有IO事件被监听到时,会调用processSelectedKeys方法
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
在processSelectedKey方法中,判断是OP_ACCEPT事件,则会调用unsafe的read方法
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 调用NioServerSocketChannel的doReadMessages方法
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
在NioServerSocketChannel的doReadMessages方法中调用Java NIO的accept方法真正接受建链请求,并new一个NioSocketChannel用于后续的读写IO操作。
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 新建一个NioSocketChannel用于后续的读写请求
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
再回到read方法,这时候的readBuf中已经加入了新创建的NioSocketChannel,所以会触发channelRead事件在pipeline中传递。
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
还记得我们在channel初始化的时候,把ServerBootstrapAcceptor这个handler加入到pipeline了吗,来看下它的channelRead回调处理
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这个msg,其实就是NioSocketChannel
final Channel child = (Channel) msg;
// 将启动类中配置的channelInitializer加入到NioSocketChannel对应的pipeline
child.pipeline().addLast(childHandler);
// 设置NioSocketChannel的属性
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
// 将NioSocketChannel注册到childGroup的EventLoop的selector,准备接收和发送数据
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
就是在这里,初始化由NioServerSocketChannel创建的NioSocketChannel,并将其注册到在启动类中配置的childGroup线程池中的某个EventLoop上,准备接收发送数据。
至此,服务端的启动初始化以及建链流程分析完毕,总结如下:
- 利用反射,创建NioServerSocketChannel,并进行初始化,同时将ServerBootstrapAcceptor加入到pipeline;
- 将NioServerSocketChannel注册到EventLoop的selector上,在这个过程中会创建新线程,开始执行NioEventLoop的run方法,run方法内部会监听IO事件,同时从任务队列中取任务执行,这个注册操作其实也是队列中的任务;
- 在注册成功的回调里面执行绑定本地地址和端口的操作;
- 绑定成功后,会触发channelActive事件,这个事件首先会被每个pipeline都有的head handler拦截处理;
- 在head handler触发read操作,最终会执行到doBeginRead,将之前创建NioServerSocketChannel时,设置的OP_ACCEPT事件加入到NioServerSocketChannel对应的Selector的监听序列中,至此,监听初始化完毕。
- NioEventLoop的run方法监听到OP_ACCEPT事件;
- 在processSelectedKey中调用NioServerSocketChannel的doReadMessages,新建NioSocketChannel,用于处理后续的读写请求,同时触发channelRead事件;
- ServerBootstrapAcceptor拦截channelRead事件,初始化NioSocketChannel,并将NioSocketChannel注册到childGroup的EventLoop上,这个注册流程和NioServerSocketChannel相似,只是设置的readInterestOp为OP_READ,用于读取消息,不再赘述。
- 至此,NioSocketChannel真正接管NioServerSocketChannel accept的底层socket链路,开始收发消息。
客户端
有了以上服务端的启动、初始化、建链流程的分析后,客户端的流程就相对来说简单很多了。
客户端从doResolveAndConnect开始
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
和服务端一样,也是调用initAndRegister进行channel的初始化,并将channel注册到EventLoop上,不同的是,初始化的监听事件是OP_READ,而不是OP_ACCEPT,具体过程不再重复。
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
接下来调用doResolveAndConnect0和doConnect方法,将connect操作封装成task加入到任务队列中
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
EventLoop的线程从队列中取出任务执行。connect操作在pipeline上流转,connect是outBound方向的操作,所以先从tail handler开始,最终到head handler,调用NioSocketChannel的doConnect方法
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
// 调用Java NIO的接口发送建链请求
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
// 将监听事件改为OP_CONNECT
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
当监听到OP_CONNECT事件时,调用Java NIO的finishConnect结束建链流程,标识已经建链成功,为收发消息做好准备。
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
至此,客户端的建链流程分析完毕,整个流程,相对服务端简单很多。
总结
本文详细分析了Netty作为客户端和服务端的整个初始化和建链过程,其中会涉及到许多Netty的核心概念,后面会针对这些概念逐个进行分析。Netty的整体框架是基于异步编程的概念构建的,跟我们平时的编码习惯可能有些差别,不过,没关系,平时多看看源码,看多了也就习惯了。Netty的源码也有很多值得我们借鉴的编码习惯或者说技巧,在我们平时的编码中可以加以应用,针对这点,后面我也会专门写一篇文章来总结。
需要说明一下,本文是基于目前(2020.8.18)Netty最新的4.1.52版本的源码进行解析的。