netty作为一个优秀的网络通信框架,应该大部分人都接触过,之前看了mina,话说是同一个作者,里面的设计和思想还是比较接近的。不过要涉及到里面原理,可能就不是那么清楚。下面我将自己在工作中接触过,和自己看源码的一些理解记录下来。注:只是个人目前的一个理解程度,这篇文章会比较长,基本都是文字和贴的源码,但如果你读完,对netty多少会有些认识,可能有些地方写的不对,忘指出。
主从模型的启动过程
如果一开始看源码,应该是一脸蒙蔽,毕竟这么庞大的东西。首先我是通过几个重要的组件,看里面具体的实现,然后看netty.bind()怎么启动的,看到bind肯定说的是服务端了,客户端应该差不多了,还没这么负载。
学习的时候,如果带着问题去看,应该就比较容易一些。
为什么说Netty是NIO
1:NIO有同步阻塞和同步非阻塞两种,这里可以理解下概念。参考
2:那netty实现NIO主要有哪些组件呢。
//1:这个我觉得是最核心的一个了,看到eventLoop 很容易会想到,这是一个线程池。确实,没错,netty的线程池,是在java的ExecutorService上进行了封装
NioEventLoopGroup();
//2:服务器引导类,这个没什么说的,就是用来引导启动的
ServerBootstrap
//3:channel,这个最终是一个channel接口,这里会设计到netty的线程模型
NioServerSocketChannel
//4:对channel中进行注册
ChannelInitializer
3:上面这几个是我自己对netty印象最深的几个组件,当然也是最重要的,下面下看一段简单的代码,然后开始看源码,了解整个的启动过程。
public void bind(int port) {
final EventLoopGroup bossGroup = new NioEventLoopGroup();
final EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
try {
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1023)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_RCVBUF,1024 * 32)
.childOption(ChannelOption.SO_SNDBUF, 1024 * 32)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,new WriteBufferWaterMark(0,128 * 1023))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 300));
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
logger.debug("websocket port: {}",3322);
future.channel().closeFuture().sync(); // TODO
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
这是主从模型,boss是用来对客户端的请求进行分发,work是真正用来工作的线程池。每个线程池数量,看里面源码就能知道,如果自己不定义的话,默认是CPU合数的两倍。
在创建服务端的时候实例化了 2 个 EventLoopGroup,1 个 EventLoopGroup 实际就是一个 EventLoop 线程组,负责管理 EventLoop 的申请和释放。
bossGroup 线程组实际就是 Acceptor 线程池,负责处理客户端的 TCP 连接请求,如果系统只有一个服务端端口需要监听,则建议 bossGroup 线程组线程数设置为 1。
workerGroup 是真正负责 I/O 读写操作的线程组,通过 ServerBootstrap 的 group 方法进行设置,用于后续的 Channel 绑定。
NIOEventLoopGroup后面会继承MultithreadEventExecutorGroup,这个的构造方法有对里面线程进行初始化。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//这个构造,主要看这里,线程的初始化就好。最后反复的实例SingleThreadEventExecutor,
//这个我之前看过一次源码,里面有个takeTask()方法,用来获取当前队列中的任务,并实现了阻 塞等待,周期等功能。
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
} finally {
//这里面应该就是一些监听触发,netty很多逻辑,会通过监听和回调去触发。
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
SelectorProvider.provider(),这个provider要了解下,就是java的spi,netty中的channel其实也是基于java NIO实现的,只是对java的SocketChannel进行了封装。
-
启动类的group()方法
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup"); return this; } 这里主要用来初始化,bossGroup和workGoup
-
channel(Class<? extends C> channelClass)方法
AbstractBootstrap
public B channel(Class<? extends C> channelClass) { return channelFactory(new ReflectiveChannelFactory<C>( ObjectUtil.checkNotNull(channelClass, "channelClass") )); } public ReflectiveChannelFactory(Class<? extends T> clazz) { ObjectUtil.checkNotNull(clazz, "clazz"); try { this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { } }
看我上面的代码,这个channeClass 我用的是 NioServerSocketChannel.class;这里面初始化好的是一个channelFactoy工厂,并且在工厂中初始化了这个class的构造,很明显,这个是后面启动的时候,会经过这个工厂调用这个构造,反射获取这个channel,有没有道理呢,可以仔细看下这个NioSererSocketChannle,实例化的时候,会初始化配置NioServerSocketChannelConfig
option()方法,是对通信进行一些参数初始。比如,阻塞,数据块大小等,或说我也不懂
-
childhandler()
这个怎么时候呢,就是你自定义的handler需要注册到channel中去,然后绑定到对应的pipeline,并且数据流会在这些handler中进行流动,最后根据转换为自己定义的数据,到业务中去。
-
bind()
上面基本是一些准备了,真正的启动是在bind()方法中。
AbstractBootstrap
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); // (1) final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); // (2) return promise; } else { } }
这么这段代码主要看两个方法就好了,我在上面标记了(1) (2),先看(2)这个方法吧。
private static void doBind0( channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } });
看到这个EventLoop()了吗,其实每个channel是对应一条线程,记得在注册在初始化NioEventLoopGroup的children[]数组的时候,里面就是NioEventLoop(这里忘记了的可以会去看下最上面)每个NioEventLoop最终会与多个channel进行绑定。这个doBind0()方法,就是将channel中的pipeline进行address绑定,channel和pipleline是一一对应的。
可能对这个地方会有些疑问,就是开始初始化children[]这个数组中的每个NioEventLoop,是如何和channel进行绑定的,可以看下我下面放出来的这段代码,next(),就是那个数组了。
但这里写着写着我也有些疑问了
就是假如我现在数组大小是16,那么就只有16个channel吗。
后来想了下,也断点看了下代码,其实channel是对应一个NIOEventLoop,也就是每个channel中只会在一个NioEventLoop中,这样是不是保证了线程安全。然后,每次有客户端进行连接的时候,我们就会在这个children[]数组中进行轮询,这里很简单了,就是维护一个自增的childIndex,然后每次去%数组长度就好了。
这样就保证了一个channel对应一个线程了。(这个说的是应该是workeGroup中的线程数组,这里其实没看懂没关系,只是我自己突然想到,就写了一下)
MultithreadEventLoopGroup
@Override
public ChannelFuture register(ChannelPromise promise) {
return next().register(promise);
}
-
接下来看(1) initAndRegister()这个方法。
AbstractBootstrap,同样这个方法里面,我们看两个地方就好了(1) (2)
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); //(1) init(channel);//(2) } catch (Throwable t) { } return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); //(3) if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
(1) 这里还是比较简单的,记得上面写的关于channel(Class<? extends C> channelClass)这个方法吗
开始是在这里初始化这个ChannleFactory工厂,真正获取实例就是在这里了,经过反射调用。
这里等服务端 Channel 创建完成之后,将其注册到多路复用器 Selector 上,用于接收客户端的 TCP 连接
(2) init(channel)
@Override
void init(Channel channel) {
setChannelOptions(channel, options0().entrySet().toArray(EMPTY_OPTION_ARRAY), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
这个方法还是比较容易理解的,就是对bossGroup的一些参数设置,然后在添加结构默认的handler
这里想到一个问题 :就是work中的handler是什么时候加进去的。 记得我们有这段代码吗
childHandler(new ChannelInitializer<SocketChannel>(){
public void initChannel(SocketChannel ch)
});
/**
*其实我们开始给workGroup添加的是一个ChannelInitialize handler,看名字就很清楚,是有拿过来初始化的
* 这个方法是在channel进行register完后会进行调用的,看下具体的调用过程
*Abstract.register0() --> pipeline.fireChannleRegistred() --> head[Context] ---*>fireChannelRegistered() ---> handler.channelRegistered(this)[这个handler就是*ChannelInitialize] ---> initChannel(ctx.channel)
**/
(3) 这里这个config()标示没看懂,但group()这个方法(是从bossGroup线程组获取一个线程),获取的是bossGroup实例(这里要分清楚boss和work)
我断点看了下,其实这个地方还是没怎么明白,服务器启动的时候,这里肯定是bossGroup(但这里只有一次呀,那其他的是什么时候会进行绑定的呢。如果是客户端进行连接的时候,与channel绑定的NioEventLoop是workGroup这里没什么疑问,客户端连接进入后面再分析吧
主从 Reactor 线程模型的特点是:服务端用于接收客户端连接的不再是个 1 个单独的 NIO 线程,而是一个独立的 NIO 线程池。Acceptor 接收到客户端 TCP 连接请求处理完成后(可能包含接入认证等),将新创建的 SocketChannel 注册到 IO 线程池(sub reactor 线程池)的某个 IO 线程上,由它负责 SocketChannel 的读写和编解码工作。Acceptor 线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端 subReactor 线程池的 IO 线程上,由 IO 线程负责后续的 IO 操作。(这里先暂时这样理解吧)
- 从主线程池中随机选择一个 Reactor 线程作为 Acceptor 线程,用于绑定监听端口,接收客户端连接;
- Acceptor 线程接收客户端连接请求之后创建新的 SocketChannel,将其注册到主线程池的其它 Reactor 线程上,由其负责接入认证、IP 黑白名单过滤、握手等操作;
- 步骤 2 完成之后,业务层的链路正式建立,将 SocketChannel 从主线程池的 Reactor 线程的多路复用器上摘除,重新注册到 Sub 线程池的线程上,用于处理 I/O 的读写操作。
这里的register就会对这个channel如bossGroup中的一个NioEventLoop进行绑定。然后会进入下面这个方法。
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
注意下这里面的Unsafe,这里会单独来分析这个东西,因为我目前也还不懂。哈哈,不过还是要继续接着看喽
下记住这个unsafe 是AbstractNioMessageChannel。
再继续看register()方法。
AbstractChannel
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
在netty中,以·0结尾的方法,一般都是真正具体干活的方法。其实上面这个方法没什么说的,就是调用register0方法
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
}
}
看到这里,应该快要到最后面了,加油哈。这里面应该也没什么,主要的就是doRegister()这个方法,然后下面如果激活,就对整个pipeline进行激活。接下来应该是看doRegister()这个方法了
AbstractNioChannel
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); //这里应该是主要做的事情了,这里是jdk的register方法,不是很懂了。
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
}
}
}
}
netty启动过程其实就差不多上面这些,不过还是留下很多疑问
-
服务端是如何监听客户端的连接
这里还是要捋清楚的,首先到服务器启动后,在bossGroup中会选择一个NIO-Acceptor,这个其实就是一个NioEventLoop,可以看下里的run方法。
NioEventLoop
@Override protected void run() { int selectCnt = 0; for (;;) { try { selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { } } else if (strategy > 0) { try { processSelectedKeys(); // 先记住这个方法 } finally { } } else { } }
上面我把很多代码都删掉了,其实先记住这个processSelectedKeys()方法。下面分析里面的东西。
这个NioEventLoop是SingleThreadEventExecutor的子类,这个run方法就是在父类中调用的。
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { try { SingleThreadEventExecutor.this.run(); //这里就启动了这个线程 success = true; } catch (Throwable t) { } finally { }); }
看到这个doStartThread()方法了吗,其实这里就是真正的启动了这个线程,然后会在NioEventLoop中的run方法中进行循环监听任务。里面的executor,其实是在构造中初始化的,看下里面应该就明白了。(有空可以专门分析一下这个SingleThreadEventExecutor),这里可能和Netty版本有关,不一样的的话启动方式可能不一样。
这里终于搞懂了服务器启动的时候,bossGroup线程组中NioEventLoop是如何启动的。
接下来接续回到processSelectedKeys()这个方法了,当有客户端请求TCP连接的时候。接着会进入这个方法:NioEventLoop
~~~java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
if (eventLoop == this) {
unsafe.close(unsafe.voidPromise());
}
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//这里是关键了
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
~~~
unsafe是AbstractNioMessageChannel实例,上面应该已经说过了,其实这个unsafe就代替了javaNio的accept()。虽然很多地方不怎么懂,但还是可以接着往下看的,接下来就是read()方法了,继续看源码
~~~java
@Override
public void read() {
try {
try {
do {
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));//看这里
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
}
~~~
接着就会进入这个pipeline.fireChannelRead()方法,先把源码贴出来吧。
DefaultChannlePipeline
~~~java
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
}
//最后经过上面两个方法的轮询后,会进入下面这个方法,这里可以稍微解释下,为什么这里需要经过轮询
//因为pipeline,是有head 和 tail的,当接受客户端请求的这种被动,会总tail -> head,也就是我们所说的入,当是一些主动业务的时候,会从 head -> tail,也就是出。这里我只是做个笔记哦,不一定会对
ServerBootStarp
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
//这里需要注意的是,这里是childGroup,还记得前面吧,我们初始化group的时候
//workgroup最后赋值给了childGroup
//还有这个child,就是客户端和服务端连接的channel,这个channel会后一个 //NioEventLoop进行绑定,保证了一个channel对应一个线程
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
//然后会调用MultithreadEventLoopGroup.register()方法,这里上面已经分析过了。只不过这的group是workGroup
~~~
**其实这个建立连接过程就差不多这些了,我们可以总结一下**:
1:服务端启动后,初始化NioServerSocketChannel,会激活NIO-acceptor线程,这里就是bossGroup中的线程组,其实:workGroup线程组也激活了。
2:bossGroup中的NIO-acceptor负责监听客户端连接。
3:当有客户端进行连接时,acceptor线程负责把这个channel分发到workGroup中去,与workGroup线程组中的一个NioEventLoop进行绑定。这就是bossGroup的一个主要工作。
4:之后建立完的客户端与服务端进行通信,都是通过这个channel,也是在一条线程当中,这就保证了IO的线程安全问题。
5:在平时工作中,上面的事情都是由netty已经做好了,我们只需要自己定义handler,对消息进行解码和编码,然后发任务分发到自己系统的业务线程中去就行。
以上就是我对netty的启动、线程模型、客户端和服务端建立连接的个人理解。当然还有很多疑问还没有写清楚,和一知识点,比如比较ByteBuf,Usafe,ChanelPromise和Future。后面继续慢慢完善。
[参考-InfoQ](https://www.infoq.cn/article/netty-threading-model)
[书籍] 《Netty Action》