Netty源码分析:NioEventLoop
本文参考自慕课网《Java读源码之netty》
面试有关问题:
默认情况下netty启动线程的情况?
netty如何解决epoll的时候jdk可能会产生的空轮询bug?
如何保证异步串行无锁化?
本文分三部分:
1.问题一NioEventLoop创建
2.问题二NioEventLoop启动
3.问题三NioEventLoop执行逻辑
1.NioEventLoop创建
在我们最外层代码编写的时候往往需要初始化两个NioEventLoopGroup(int threadnumber)
,其构造函数如下:
NioEventLoopGroup.java
//step1:当没有传入线程数的时候,调用含0的构造函数
public NioEventLoopGroup() {
this(0);
}
//step2:如果传入线程数,那么就是用户定义的线程数,否则就是0,
//这里,为eventloop创建线程的excutor还是null
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor)null);
}
//step3:继续调用构造函数,传入一个provider
//provider负责为eventloop创建绑定的selector
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
经过层层调用,到了父类构造函数,这里就是传默认的0还是用户指定线程数的区别了:
MultithreadEventLoopGroup.java
//step4:如果没传线程数,那么就是默认的2*CPU核数,否者就是用户传进来的数量,问题一解决
protected MultithreadEventLoopGroup
(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
继续向上,到了核心逻辑:
MultithreadEventExecutorGroup.java
//step5:DefaultEventExecutorChooserFactory.INSTANCE负责创建excutor
protected MultithreadEventExecutorGroup
(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup
(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory,
Object... args) {
//...
else {
//step6(核心):创建线程创建器ThreadPerTaskExecutor
if (executor == null) {
executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
}
this.children = new EventExecutor[nThreads];
int j;
//step7(核心):根据线程数创建对应数量的eventloop和对应线程
for(int i = 0; i < nThreads; ++i) {
//...
this.children[i] = this.newChild((Executor)executor, args);
}
//...
}
//...
//step8(核心):线程选择器创建
this.chooser = chooserFactory.newChooser(this.children);
//...
}
1.1 核心1——step6:ThreadPerTaskExecutor
- 1.每次执行任务都会创建一个线程实体
- 2.为每个loop命名
1.1.1 每次执行任务都会创建一个线程实体
executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
//step6.1 同factory创建excutor
这段代码通过工厂返回一个excutor,这个excutor其实很简单,其全部代码如下:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
} else {
this.threadFactory = threadFactory;
}
}
public void execute(Runnable command) {
//本质上就是新建个线程跑任务
this.threadFactory.newThread(command).start();
}
}
1.1.2.为每个EvnetLoop命名
excutor产生前,先制造了一个工厂this.newDefaultThreadFactory()
,其构造函数如下:
//step6.2:这个pooltype就是传入的NioEventLoopGroup.getClass()
public DefaultThreadFactory(Class<?> poolType, int priority) {
this(poolType, false, priority);
}
//step6.3:调用到了这,调用了toPoolName
public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
this(toPoolName(poolType), daemon, priority);
}
public static String toPoolName(Class<?> poolType) {
if (poolType == null) {
throw new NullPointerException("poolType");
} else {
String poolName = StringUtil.simpleClassName(poolType);//就是NioEventLoopGroup
//...
}
}
//step6.4:获取到Poolname并做处理后(变成了“nioEventLoopGroup”),继续调用自己的构造函数
//至此工厂创建基本完毕,将prefix命名以及处理了一些绑定(例如把threadfactory绑定到当前group)
public DefaultThreadFactory(
String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
this.nextId = new AtomicInteger();
if (poolName == null) {
throw new NullPointerException("poolName");
} else if (priority >= 1 && priority <= 10) {
this.prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
} else {
throw new IllegalArgumentException("priority: " +
priority +
" (expected: Thread.MIN_PRIORITY
<= priority <= Thread.MAX_PRIORITY)
");
}
}
此时反观1.1.1excutor中的线程执行
this.threadFactory.newThread(command).start();//excutor创建1
调用的就是上面这个工厂的newThread方法:
public Thread newThread(Runnable r) {
//excutor创建2:这里调用的是另一个thread函数,
//本函数只是命名(用到了factory的prefix),以及别的配置(优先级等)
Thread t = this.newThread(
new DefaultThreadFactory.DefaultRunnableDecorator(r),
this.prefix + this.nextId.incrementAndGet());
//...
}
//excutor创建3:调用的这个,返回了一个thread
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(this.threadGroup, r, name);
}
FastThreadLocalThread是一个继承原生thread的线程,把threadlocal做了一些优化
1.2 核心2——step7:newchild()创建NioEventLoop
- 1.在创建的eventloop中保存step6创建的线程执行器
- 2.在创建的eventloop中创建一个MpscQueue;
- 3.在创建的eventloop中创建一个selecor轮循用户的事件
在父类MutiThreadEventExcutorGroup中,调用的子类NioEventLoopGroup重写的newChild()方法
NioEventLoopGroup.java
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this,
executor,
(SelectorProvider)args[0],
((SelectStrategyFactory)args[1]).newSelectStrategy(),
(RejectedExecutionHandler)args[2]);
}
这里直接去看NioEventLoop的构造函数
NioEventLoop.java
NioEventLoop(NioEventLoopGroup parent,
Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy,
RejectedExecutionHandler rejectedExecutionHandler) {
super(...);//step 7.1调用父类的构造函数,绑定了excutor
//...
else {
this.provider = selectorProvider;//step7.2绑定一个产生selector的provider
this.selector = this.openSelector();//step7.3调用provider产生一个selecotr绑定
this.selectStrategy = strategy;
}
}
继续深入step7.1,调用父类构造函数:
SingleThreadEventExecutor.java
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
//...
//step7.1.1绑定excutor:用于创建nioeventloop底层线程
this.executor = (Executor)ObjectUtil.checkNotNull(executor, "executor");
//step7.1.2当主线程有任务的时候,发现自己不是nioeventloop线程,
//就塞到一个nioeventloop这样的队列里面让nioeventloop消费
//就是一个mpscQueue(M:multi多个 P:producer生产者 S:single单个 C:cosumer消费者 )
this.taskQueue = this.newTaskQueue(this.maxPendingTasks);
this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
在上面的步骤中,一个eventLoop完成了自己消费的mpsc任务队列的绑定,线程产生器excutor的绑定以及多路复用的selector的绑定;
1.3 核心3——step8:newChooser()创建线程选择器
chooser会对新的连接绑定处理事务的NioEventLoop,通过调用chooser.next()返回一个excutor,将channel和loop绑定
MultithreadEventExecutorGroup.java
public EventExecutor next() {
return this.chooser.next();
}
回到step8中factory对chooser的创建
public EventExecutorChooser newChooser(EventExecutor[] executors) {
return (EventExecutorChooser)(
isPowerOfTwo(executors.length)
?
new DefaultEventExecutorChooserFactory
.PowerOfTowEventExecutorChooser(executors)
:
new DefaultEventExecutorChooserFactory
.GenericEventExecutorChooser(executors));
//三目运算
}
-
isPowerofTwo()
判断Loop的个数length
是否是2的幂,如果是就会产生一个PowerOfTowEventExecutorChooser,它在next的时候就会通过index++ & (length-1)
来取代GenericEventExecutorChooser,它通过取模index++%length
找到合适的 Loop
GenericEventExecutorChooser.java
public EventExecutor next() {
return this.
executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)];
}
可以看到chooser就是提供了一个简单的取模的next函数,返回下一个excutor;chooser封装了这个函数,此外另一件事就是idx是原子类确保了线程安全;chooser类没做别的了。
至此,一个NioEventLoop创建流程完毕
2.NioEventLoop启动
启动分两种类型的触发启动:
- 服务端绑定端口触发启动
- 新连接接入通过chooser绑定一个eventLoop
2.1 服务端绑定端口的过程中EventLoop干了啥
简单流程:bind()方法绑定端口并没有直接绑定,而是封装成了一个task,然后excute();
excute的时候发现不是NioEventLoop的线程,扔到了EventLoop的任务队列
发现线程还没启动,调用startThread创建线程(用之前创建的线程执行器创建fastthread)
将创建的线程保存在eventloop,别的线程把任务扔到任务队列让其执行
调用run()正式启动
dobind()绑定
在之前netty启动流程中,我们用了dobind()方法将端口绑定
private static void doBind0(final ChannelFuture regFuture,
final Channel channel, final SocketAddress localAddress,
final ChannelPromise promise) {
//key:实际的channel.bind是放在excute里面的一个runnable内
channel.eventLoop().execute(new Runnable() {
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
这个时候我们把焦点放在excute函数上
SingleThreadEventExecutor.java
public void execute(Runnable task) {
//...
boolean inEventLoop = this.inEventLoop();
if (inEventLoop) {
this.addTask(task);
} else {
this.startThread();
this.addTask(task);
//...
首先,netty现用this.inEventLoop()
判断currentThread是否是EventLoop的thread;这里bind方法是主线程的初始化绑定,所以肯定不是;
所以这里就要启动一个线程
SingleThreadEventExecutor.java
private void startThread() {
if (STATE_UPDATER.get(this) == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
this.doStartThread();
}
}
这里我们看到了用CAS尝试,确保别的线程不会重复启动,再内部的的dostart()就是一个excutor的线程启动了,同时将创建的thread绑定在实例的thread上:
SingleThreadEventExecutor.java
private void doStartThread() {
assert this.thread == null;
this.executor.execute(new Runnable() {
public void run() {
SingleThreadEventExecutor.this.thread = Thread.currentThread(); //绑定
可以简单看出基本流程:主线程将bind()作为一个runnable任务,发现不是eventloop线程,startthread()启动线程并绑定到eventloop,addtask()放入一个任务队列,由创建的线程处理
3.NioEventLoop执行
- key step:NioEventLoop.run()
这里分析的是NioEventLoop的run()方法,由于使用的是maven加载的.class文件调试,代码和源码有些出入,但是逻辑完全一样:
protected void run() {
while(true) {
while(true) {
try {
switch(this.selectStrategy
.calculateStrategy(this.selectNowSupplier, this.hasTasks())) {
case -2:
continue;
case -1:
//step1 select方法轮循IO事件
this.select(this.wakenUp.getAndSet(false));
//...
default:
this.cancelledKeys = 0;
this.needsToSelectAgain = false;
int ioRatio = this.ioRatio;
//ioRatio用来决定处理IO事件和队列任务的时间比;默认是50
if (ioRatio == 100) {
try {
//step2.1 处理select出来的发生IO事件
this.processSelectedKeys();
} finally {
//step2.2 处理外部线程扔到mpsc队列的任务
this.runAllTasks();
}
} else {
long ioStartTime = System.nanoTime();
boolean var13 = false;
try {
var13 = true;
//step2.1 处理select出来的发生IO事件
this.processSelectedKeys();
var13 = false;
}finally {
if (var13) {
long ioTime = System.nanoTime() - ioStartTime;
//这里可以看到ioRate对于队列任务的时间控制
//step2.2 处理外部线程扔到mpsc队列的任务
this.runAllTasks
(ioTime * (long)(100 - ioRatio) / (long)ioRatio);
}
}
//...
}
}
3.1 select()方法获取到发生的IO事件
- deadline计算本次select截止时间(通过判断是否有定时任务或者队列中有任务来决定)
- 阻塞式调用(根据deadline来,没什么特殊任务就阻塞一秒,且可以被外部线程awake())
- jdk的空轮询bug解决方案
select()函数源码如下:NioEventLoop.java
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
//获取到当前时间
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos =
currentTimeNanos + this.delayNanos(currentTimeNanos);
//delayNanos传入一个当前时间,比对最近需要发生的定时任务的时间,返回一个可以运行的时间
while(true) {
//每次循环都更新下还可以截止的时间
long timeoutMillis =
(selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
//如果时间<0那么就溜溜球
if (timeoutMillis <= 0L) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
//如果有task,溜溜球
if (this.hasTasks() && this.wakenUp.compareAndSet(false, true)) {
//溜溜球之前先来个非阻塞的select调用捞一把selectkey
selector.selectNow();
selectCnt = 1;
break;
}
//这里进行阻塞调用select;可以看到timeoutMillis并不是一口气阻塞完所有时间,
int selectedKeys = selector.select(timeoutMillis);
++selectCnt;
//如果满足下面条件,停止select:
if (selectedKeys != 0 || oldWakenUp //轮循到事件 || select操作需要唤醒
|| this.wakenUp.get() || this.hasTasks()//被别的线程唤醒||任务队列有任务
|| this.hasScheduledTasks()) {//定时任务队列有任务
break;//那么结束select
}
jdk空轮询bug: here
简单来说,空轮询bug就是阻塞的select被唤醒但啥都不做,结果一直循环
netty的解决方案是,当检测到空轮询,重新创建一个selector;也是一个比较完善的解决方案
private void select(boolean oldWakenUp) throws IOException {
selectCnt++; //selecor异常次数统计
//...继续之前的select函数
long time = System.nanoTime();//当前时间
//这个意思是 目前的时间time - 阻塞调用前的时间currentTimeNano >= 设置的阻塞时间timeout
//那么是正常的,异常没有发生
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;//这里证明正常,计数清零
}else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0
&& selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
//否则就证明selector并没有阻塞调用那么长的时间,
//并且这种事情发生的次数selectCnt>一个阈值,出了大问题,重新开一个selector
this.rebuildSelector();//把老的selector的key注册到新的一个selector上
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break;
}
}
3.2 processSelected()处理IO事件
3.2.1 Selector中SelectedKeySet的优化与绑定
处理前必须找到是哪些IO事件,放入selectedKeySet里面;netty优化了selectedKeySet,使对事件插入等时间复杂度降低;
在创建selector的时候,调用了openSelecor逻辑:
private Selector openSelector() {
final AbstractSelector selector;
try {
selector = this.provider.openSelector();
}
//...
//select是否 不优化,默认 不优化=false
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
} else {
//也就是说要优化 优化什么呢:selectedKeySet
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
//...
}
默认情况下Selector中的SeletedKeySet底层用HashSet实现的,这个时间成本其实会比较大,涉及到hashCode() hash()
啥的,根本没必要;
So,Netty将底层的Set弄成了一个数组;这个数组index指向当前空余位置,每次添加了后自增;默认大小1024;
同时,将Set的一些方法移除:
SelectedSelectionKeySet.java
public int size() {
return this.isA ? this.keysASize : this.keysBSize;
}
public boolean remove(Object o) {
return false;
}
public boolean contains(Object o) {
return false;
}
这个时候,netty优化了selector的keyset的实现,此时还只是一个普通的数组实现的Object;那么接着就该做一个绑定在selector里面了
private Selector openSelector() {
//...
else{
//反射机制创建一个selecttor的selector的class对象
Object maybeSelectorImplClass = AccessController
.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
try {
return Class.forName("sun.nio.ch.SelectorImpl",
false, PlatformDependent.getSystemClassLoader()
);
}
//...
}
});
//创建完成后判断是否是class类,并且是否是selector的implement;
if (maybeSelectorImplClass instanceof Class && ((Class)maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {
//如果是,那么:
final Class<?> selectorImplClass = (Class)maybeSelectorImplClass;
Object maybeException = AccessController
.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
try {
//反射拿到selector的selecedKeys,赋值为之前创建的优化的keySet
Field selectedKeysField = selectorImplClass
.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass
.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
//...
}
}
}
也就是说ProcessSeletedKey()函数调用的时候,处理的目标对象keyset在EventLoop初试化的时候已经经过优化,通过数组的实现取代HashSet(),并绑定在Selecotr上;
3.2.2 ProcessSelectedKeysOptimized()实际处理优化过的SelctedKeysSet
NioEventLoop中run()方法里面的processSeletedKeys()实际逻辑如下:
private void processSelectedKeys() {
if (this.selectedKeys != null) {
//通常情况下selectedKeys由于优化,所以会调用这
this.processSelectedKeysOptimized(this.selectedKeys.flip());
//flip()会返回key数组,返回类型为:SelectionKey
} else {
this.processSelectedKeysPlain(this.selector.selectedKeys());
}
}
这个优化后的processSelectedKeysOptimized(SelectionKey [])
函数逻辑如下:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
int i = 0;
while(true) {
SelectionKey k = selectedKeys[i];
if (k == null) {
return;
}
selectedKeys[i] = null;
Object a = k.attachment();
//这个attachment就是netty封装的channel!
if (a instanceof AbstractNioChannel) {
//确认这个attachment是一个channel,那么就开始处理这个channel
this.processSelectedKey(k, (AbstractNioChannel)a);
}
上面已经通过selectionKeys拿到了数组中的key,通过attachment获取到nettey封装的channel;然后就到了正式处理逻辑:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
//...
} else {
try {
int readyOps = k.readyOps();
//...
//channel的read事件和accept事件,调用的是unsafe的read事件
if ((readyOps & 17) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
} catch (CancelledKeyException var7) {
unsafe.close(unsafe.voidPromise());
}
}
}
那么可以看到这里用到了一个channel的unsafe对象,如果合法,那么通过selectionKey的ops看出是什么事件,然后通过unsafe调用;
-
在processselectedKeys()之后,会有一个runAlltasks()执行任务;一般来说任务有两类:
- 其他线程和当前线程产生的普通task,放在mpsc队列中
- 定时任务:基于时间的优先队列,在runAlltasks()时把定时任务队列里面所有在任务执行时间内的任务拿出来放在普通队列中(由调用fetchFromScheduledTaskQueue()函数实现),这个过程是任务聚合
当两个类型的任务都在普通队列中的时候,safeExcute()执行任务,对于exception,不会终止,只会log打印日志,当任务每执行64个(nanotime获取时间有开销),判断任务执行总时间是否超过规定的时间;超出了就溜了,下次再执行