前言
Netty源码(一)Netty架构解析分析了netty的基本原理和工作流程,其中EventLoop是netty中最核心的组件,本文将会分析EventLoop的主要设计思想,至于具体的代码细节下一篇会具体分析。本文将主要分析NIOEventLoop基于4.1.49.Final版本。
核心思想
- 单线程
之前介绍过EventLoop在它的生命周期内只和一个 Thread 绑定,一个EventLoop管理一个或者多个Channel。为什么是单线程?首先多线程编程是很难很麻烦的事情,而且为了保证线程安全加锁或者CAS都会影响执行效率,某些场景下线程数量越多导致的效率还会更低。采用单线程模型第一不用考虑线程安全,第二没有线程上下文切换带来的损耗。 - 执行流程
与EventLoop所绑定的Channel的事件都由EventLoop所拥有的Thread处理,实现的核心流程如下:
//当前调用线程是否EventLoop所属的Thread
if (executor.inEventLoop()) {
//do something
} else {
executor.execute(new Runnable() {
@Override
public void run() {
//do something
}
});
如果当前调用线程正是支撑 EventLoop 的线程,那么所提交的任务会直接执行。否则,EventLoop 将调度该任务以便稍后执行,并将它放入到内部队列中。EventLoop下次处理它的事件时,它会执行队列中的那些任务。这样的任务执行流程保证,Channel上的所有任务都是由它所注册的EventLoop所拥有的Thread执行,不会出现并发问题。
构造参数
NioEventLoop构造参数:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
- parent:当前EventLoop所属的NioEventLoopGroup;
- executor:为EventLoop创建线程,并且执行管理该线程生命周期的任务,默认为ThreadPerTaskExecutor;
- selectorProvider:NIO中selector的帮助类;
- selectStrategy:对selector.select()提供选择策略;
- queueFactory:创建存放任务队列的工厂。
从构造参数可以大概知道NioEventLoop的职责,前面提过一个NioEventLoopGroup管理一个或者多个NioEventLoop,parent指向NioEventLoop所属的NioEventLoopGroup。
NioEventLoop的Thread由executor去创建管理,这个thread只做三件事情:
- select: 返回EventLoop所持有的selector上已经准备就绪的Channel;
- processSelectedKeys: 处理准备就绪的IO操作;
- ranTasks: 执行队列里的任务;
由selectorProvider提供IO操作,queueFactory创建队列存放待执行的任务。
Executor
可见EventLoop继承自Executor,Executor的核心思想是将任务调用,和任务自身的执行逻辑给分离开,使用Executor去执行任务,调用方只需要关注自身任务的逻辑,把任务执行交给Executor去处理,将职责分离开。
JDK中自带的线程池采用池化的思想,基本思路是从池中取一个空闲的线程去执行任务,执行完之后将线程返回空闲列表,让其可以重复使用。因为线程是很珍贵的资源,采用池化的技术可以减少线程创建、销毁的损耗,但不能减少线程上下文切换所带来的消耗,随着线程数量的增加这种消耗会更多。所以EventLoop采用单线程的模型去实现Executor避免多线程的上下文切换,和多线程的并发编程问题。
Feature
Netty中所有操作都是异步进行的,实现异步编程很关键的对象就是Feature,因为异步操作都是立即返回的不会等待任务完成,所以需要有一种机制去表示异步执行的结果 ,这种机制就是Feature。
JDK Feature
public interface Future<V> {
//取消任务
boolean cancel(boolean mayInterruptIfRunning);
//任务是否取消
boolean isCancelled();
//任务是否执行完成
boolean isDone();
//阻塞获取执行结果
V get() throws InterruptedException, ExecutionException;
//指定时间内获取执行结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
可以看出jdk的Feature接口提供了对异步执行结果的几个操作,取消、获取结果、判断是否完成等。但是这有个缺点就是调用方其实不知道任务什么时候结束,没有一种机制去通知,只有自己不断的去检测或者阻塞等待执行完成,这都不是很好的方法。所以netty自己实现了一套Feature,相比jdk的Feature对重要的就是提供了一种通知机制。
Netty Feature
public interface Future<V> extends java.util.concurrent.Future<V> {
//省略其他方法
......
//添加监听器
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
//添加多个监听器
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
//移除监听器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
//移除多个监听器
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
}
Netty的Feature与jdk自带的Feature,最大的不同就是Listener。使用是一种观察者模式,将执行结果通知出去。
GenericFutureListener
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
/**
* Invoked when the operation associated with the {@link Future} has been completed.
*
* @param future the source {@link Future} which called this callback
*/
void operationComplete(F future) throws Exception;
}
GenericFutureListener只有一个operationComplete方法,通过实现GenericFutureListener接口的operationComplete调用方就能知道异步执行的结果。
MpscUnboundedArrayQueue
前面提到,NioEventLoop中有一个用于存放任务的队列,如果没有指定特定的queueFactory,默认为MpscUnboundedArrayQueue。
MpscUnboundedArrayQueue是JCTools所提供的的一种队列,JCTools提供了几种目前jdk没有实现的队列MpscUnboundedArrayQueue就是其中一种。从名字可知(MPSC - Multi Producer Single Consumer )多生产者单消费者,是一种使用于多生产者单消费者场景的队列。而EventLoop正符合这种多生产者单消费者的场景,所以采用MpscUnboundedArrayQueue队列来存放任务。
下面说一下MpscUnboundedArrayQueue的几个好处,具体细节就不深入研究了
1.缓存行填充
为什么要有缓存行填充?首先要知道“伪共享”的概念,伪共享和 CPU 内部的 Cache 有关,Cache 内部是按照缓存行(Cache Line)管理的,缓存行的大小通常是 64 个字节。CPU加载数据是按照一个缓存行为单位加载的。
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
如上图如果出队索引和入队索引用32位的int类型,CPU会把takeIndex、putIndex加载到一个缓存行。当一个入队操作的时候会修改putIndex,导致整个缓存行失效,需要重新加载到缓存中,但是入队操作并不会修改takeIndex,由于putIndex与takeIndex共享同一个缓存行,所以也会导致takeIndex失效,这种情况就是伪共享。MpscUnboundedArrayQueue采用让变量独占缓存行的形式解决伪共享的问题。
abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue
{
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;
}
-
无锁
首先offer方法,这里只分析关键部分
while (true)
{
//省略代码
......
//cas设置生产者索引
if (casProducerIndex(pIndex, pIndex + 2))
{
break;
}
}
// INDEX visible before ELEMENT
final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
//放置元素
soRefElement(buffer, offset, e); // release element e
return true;
}
offer方法采用cas的方式去放入元素,而poll由于是单线程消费,无需加锁和cas操作。
EventLoopGroup
EventLoopGroup是对EventLoop进行管理,那么EventLoopGroup是如何对EventLoop进行管理的呢?先看一个简单的execute方法
@Override
public void execute(Runnable command) {
next().execute(command);
}
是调用next返回一个对象后执行的execute方法,跟进去看一下next方法
/**
* Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
*/
EventExecutor next();
返回一个EventExecutorGroup所管理的EventExecutor对象,对于EventLoopGroup来说next返回的就是所管理的EventLoop对象,next方法就是EventLoopGroup管理的EventLoop关键方法。
最后找一下next的实现,在MultithreadEventExecutorGroup类中
@Override
public EventExecutor next() {
return chooser.next();
}
chooser变量
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
最后再看一下EventExecutorChooserFactory的实现,只有一个实现类EventExecutorChooserFactory
/**
* Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
*/
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
很简单的实现,按照管理executors数量的奇偶数返回不同的策略,核心逻辑就是循环依次返回executors中的executor。
总结
本文主要分析了Netty的线程模型EventLoop,最重要的要关注的点就是EventLoop是单线程模型,所有提交给EventLoop都需要判断调用线程是否为EventLoop所持有的thread,如果是直接执行,否则添加到EventLoop的任务队列里面。这一部分个人感觉是netty中设计很精妙的一点,如果要使用多线程就用EventLoopGroup去管理EventLoop,EventLoopGroup中的EventLoop又互相隔离,互不影响也不会出现线程安全的问题。
本文主要分析了EventLoop的继承关系和一些设计思想,并没有具体去分析一些代码细节,下篇文章会具体分析EventLoop的thread所负责的三件事情:
- select: 返回EventLoop所持有的selector上已经准备就绪的Channel;
- processSelectedKeys: 处理准备就绪的IO操作;
- ranTasks: 执行队列里的任务。