之前在bootStrap去注册通道的时候,顺便粗略的过了一下NioEventLoop的工作过程。只能大概看清楚工作流程,但是其原理,还是没有很细致的说到,所以这次还是详细看看,NioEventLoop是如何工作的,其实本质上,只是看Netty如何去维护这些channel的读写以及注册操作。
入口还是之前的demo,代码如下
public class TimeClient {
public static void main(String args[]) {
connect();
}
private static void connect() {
//用于客户端处通道的读写
EventLoopGroup work = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(work).option(ChannelOption.TCP_NODELAY, true).channel(NioSocketChannel.class)
.handler(new TimeClientHandler());
ChannelFuture cf = null;
try {
//一直阻塞,直到连接上服务端
cf = b.connect(ConnectConfig.getHost(), ConnectConfig.getPort()).sync();
//一直阻塞,直到该通道关闭
cf.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//避免线程没有杀死
work.shutdownGracefully();
}
}
}
NioEventLoopGroup
为什么说这个类呢?可以先解释一下,为什么说这个类,因为在实际的代码运行中,netty是通过这个类,去维护channel的读写以及注册功能的。然后在此之前,要先讲一下NioEventLoopGroup这个类,NioEventLoopGroup.这个链接中有基本的结构,就不过多说了。
基本的接口
1.EventExecutorGroup
其实从接口结构上看是比较简单的
具备的功能(管理EventExecutor)
1.说明具备了线程池的基本功能,继承了ScheduledExecutorService接口
2.新加了几个方法,主要是用于管理EventExecutor的。
boolean isShuttingDown();
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
Future<?> terminationFuture();
EventExecutor next();
2.EventExecutor
其实从接口结构上看是比较简单的, 为EventExecutorGroup的子类
具备的功能,因为有一些地方还木有理解,就说比较关键的点。
1.获取工作组,具体体现在以下方法中
EventExecutorGroup parent();
2.判断某个线程是否与EventExecutor处于同一个线程。
boolean inEventLoop();
3.EventLoopGroup
为EventExecutor的子类,另外新增了几个方法。
新增的功能:
1.将获取的类型改为EventLoop(为EventLoopGroup的子类)
@Override
EventLoop next();
2.注册通道
ChannelFuture register(Channel channel);
4.EventLoop
为EventLoopGroup的子类,没有太特殊的地方
有以下的方法:
@Override
EventLoopGroup parent();
结构上来看:EventExecutorGroup -》EventExecutor -》EventLoopGroup -》EventLoop
最后可以看出EventLoop大概是这么一个东西,具备判断线程池的功能,注册通道,还有获取自身对应的parent(Group),以及当前线程与EventLoop所绑定的线程是否一致(这里其实是为了减少资源竞争,一个channel的相关维护,只会被一个线程所处理)。
EventLoop是如何维护channel的读写以及注册的?
NioEventLoop的结构,以及生成
首先要从NioEventLoopGroup的构造方法入手
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
传入线程数,SelectorProvider,还有一个选择策略工厂,以及拒绝策略(当任务队列过长的时候)
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
}
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
默认的eventLoop线程数,为核心线程数*2
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
}
//到最后定位到此处。。。将无关代码省略了。为了方便阅读
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//该group管理的EventExecutor
private final EventExecutor[] children;
//EventExecutor的选择器
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
//暴露给子类,用于实例化EventExecutor
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//executor的类型为ThreadPerTaskExecutor,其实就是该executor每执行一个任务,都会创建一个线程去执行
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//初始化数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
关键点在这里,这里涉及到EventExecutor的实例化。
newChild是一个抽象方法,在子类中被实现。
那么看看是如何实例化的,具体实现位于子类NioEventLoop中..........................................................................................................................................
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
}
设置EventLoop的选择器,用于获取EventLoop。
chooser = chooserFactory.newChooser(children);
}
}
EventExecutor的实例化,newChild方法的实现, 在EventLoopGroup中被覆写
public class NioEventLoopGroup extends MultithreadEventLoopGroup {
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
}
public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
设置拒绝策略,以及任务队列的长度,以及线程池的类型,还是直接往里面看..............
父类SingleThreadEventExecutor的构造方法
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
//这个provider是单例,所以可以确保这个工作组的selector是公用同一个
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
//执行器,其实说白就是通过executor来创建线程。netty没那么傻,肯定是有变量去判断这个eventLoop是否已经持有线程了。
//因为executor的类型是ThreadPerTaskExecutor。感兴趣看看内部实现
this.executor = ObjectUtil.checkNotNull(executor, "executor");
//设置任务队列
taskQueue = newTaskQueue(this.maxPendingTasks);
//拒绝策略
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
}
ThreadPerTaskExecutor的内部实现,每执行一个任务创建一个线程。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
从上面的代码来看。可以得知,NioEventLoop具体有的功能如下:
1.创建线程(说白了就是每个NioEventLoop初始化的时候都是没对应的线程的,所以需要创建)
2.有任务队列(NioEventLoop去执行任务的时候,可以用于临时存储任务)
3.有拒绝策略(任务队列满了就拒绝)
NioEventLoop是如何去执行任务的
接下来,举个例子,看看是如何执行任务的。刚刚我们说到NioEventLoop的基本功能。
那么就从channel的注册,到连接,看看如何保证一个channel,只会被一个NioEventLoop处理。
下面还是先从注册开始吧
注册
注册通道的入口
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
注册是在这里执行的======================注册是使用NioEventLoop去注册的,所以重点看此方法。
最后实现其实是在SingleThreadEventLoop,单线程的eventLoop,看看下面的代码片段。
拿到注册的回调,因为注册是异步运行的。
所以注册的回调有几种情况,注册完成了,注册没完成。
针对注册没完成,还有一些操作,所以加了监听器,在注册完成的时候会去调用监听器的方法。
而注册成功的,直接去连接就好了。
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
若注册成功
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
连接服务端
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
若注册没成功,先添加监听器,等到注册完成的时候,再去进行连接服务端
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
}
如何注册通道
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
通过unsafe来注册,Unsafe是一个内部类,这里不关心它是啥,只看是怎么注册的。
通过debug,看到是在内部类中做的,直接看下面的代码片段。
promise.channel().unsafe().register(this, promise);
return promise;
}
}
注册的实现。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//eventLoop为空,则抛异常
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
//通道注册了没?不得二次注册,这里netty的channel有一个标记用于判断channel是否已经注册
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
//这里没细看 估计是eventLoop的类型与通道是不是匹配,不是那么重要,跳过。。。。
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//给这个channel设置对应的eventLoop,说明这个channel被这个eventLoop
AbstractChannel.this.eventLoop = eventLoop;
重点,判断当前线程是否与eventLoop中的线程是一个线程,是的话就同步执行好了。一开始当然不是
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
通过eventLoop的执行方法去执行注册。
同步的就没必要看了,因为我们不关心它怎么注册,而是线程如何去执行这个注册的task
======================================直接debug,定位到位置
SingleThreadEventExecutor的execute方法。直接看下一个代码片段。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
//给回调设置异常信息
safeSetFailure(promise, t);
}
}
}
}
SingleThreadEventExecutor的execute方法
其实下面这个方法,看上去已经简单明了了。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements
OrderedEventExecutor {
默认eventLoop没分配线程,所以是not_started状态,避免分配多个线程。
private volatile int state = ST_NOT_STARTED;
eventLoop对应的所分配的线程变量
private volatile Thread thread;
12345 分别对应几个状态
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
任务队列
private final Queue<Runnable> taskQueue;
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
判断当前线程,与eventLoop分配的线程是不是同一个。
boolean inEventLoop = inEventLoop();
如果是同一个线程,则直接加任务即可。一开始eventLoop没分配线程,所以必然需要开启线程。
if (inEventLoop) {
addTask(task);
} else {
开启线程=============================这是很重要的方法
下面的方法会提到
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
//比较简单的写法,双重锁
if (state == ST_NOT_STARTED) {
//cas 无锁
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
这是重点了===============================================
doStartThread();
}
}
}
private void doStartThread() {
executor.execute(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
这里我们可以猜一下,由于线程起来了,要一直运行,所以里面必然是一个死循环。
只有EventLoop状态为关闭的时候,才会跳出这个循环。
所以接下来直接看看这个run方法。
============================================================
这个run方法子类已经实现了,在NioEventLoop中,所以直接定位到这个位置。看下面的代码片段。
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
下面省略了一部分,其实是EventLoop关闭的时候,怎么去修改状态的内容。相对来说比较简单,所以直接忽略了。
}
}
});
}
}
线程是如何去维护channel的读写以及各种任务的
NioEventLoop中的run方法就实现了。
在注册的时候,channel会注册到eventLoop中的selector中。所以eventloop只需要通过分配到的线程,去做一个死循环,轮询selector中就绪的事件即可。这样子就可以维护多个channel了。一个channel只能被一个eventLoop维护,而一个eventLoop可以维护多个channel。
public final class NioEventLoop extends SingleThreadEventLoop {
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
处理就绪的事件,被该eventLoop所维护的channel。因为注册的时候会将相关的selectkey都存到对应的selectkey集合中,位于父类的成员变量中。
processSelectedKeys();
} finally {
执行队列里面的任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
处理就绪的事件
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
执行队列里面的任务,其实就是成员变量里面的taskQueue,在父类中SingleThreadEventExecutor
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
对EventLoop关闭的处理,如果正则关闭,则该死循环结束。
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
注册完成后,如何连接呢?
先回到注册通道的入口
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
注册是在这里执行的======================注册是使用NioEventLoop去注册的,所以重点看此方法。
最后实现其实是在SingleThreadEventLoop,单线程的eventLoop,看看下面的代码片段。
拿到注册的回调,因为注册是异步运行的。
所以注册的回调有几种情况,注册完成了,注册没完成。
针对注册没完成,还有一些操作,所以加了监听器,在注册完成的时候会去调用监听器的方法。
而注册成功的,直接去连接就好了。
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
若注册成功
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
连接服务端,看看服务端的代码
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
若注册没成功,先添加监听器,等到注册完成的时候,再去进行连接服务端
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
如何连接?最后注册完,一路debug,到这里。
从这里可以看到,最后还是通过通道对应的eventLoop来。所以从通道的注册,到连接,以及处理通道中就绪的事件,一切都是在某个eventLoop中来维护的。
这种做法就避免了一个channel的事件被多个线程执行,不存在资源竞争的关系。
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
}
概括一下:
注册操作:
channel的注册,会被某一个eventLoop所执行,同时注册到该eventLoop的selector中。同时channel会设置自己的eventLoop,即该channel只会被该eventLoop所维护。
连接操作:
channel获取自己的eventLoop,然后将连接任务丢到任务队列中,由eventLoop分配的线程去处理。
读写操作:
eventLoop分配的线程,轮询selector中的key,进行处理。