线程池是一个在多线程场景中运用很广泛的并发框架,需要异步执行或并发执行任务的程序都可以使用线程池。有任务到来时,如果不使用线程池,我们需要不断的创建/销毁线程,还需要对线程进行管理;而使用线程池,直接将任务提交到线程池即可。使用线程池有几个好处:无需重复创建/销毁线程,降低资源消耗;提高程序响应速度;提高线程的可管理性。
3.1 实现原理
线程池内部一般包含一个核心线程池,其内部的线程在创建之后一般不会销毁,执行完任务后线程会阻塞等待新任务到来。
当向线程池提交任务时,线程池会做如下判断:
- 核心线程池未满,创建线程执行任务
- 核心线程池已满,若等待队列未满,则加入到等待队列;若等待队列已满但线程池未满,创建新线程执行任务;若等待队列和线程池均已满,则按照指定策略退出/拒绝任务/丢弃任务等。
了解了实现原理,我们先来自己实现一个线程池,首先定义线程池的接口
ThreadPool
线程池的接口里面最重要的方法是execute执行任务
public interface ThreadPool<Job extends Runnable> {
//提交一个Job,这个Job需要实现Runnable接口
void execute(Job job);
//关闭线程池
void shutdown();
//增加工作者线程
void addWorkers(int num);
//减少工作者线程
void removeWorker(int num);
//得到正在等待执行的任务数量
int getJobSize();
}
CommonThreadPool
在实现线程池时,我们需要定义线程池的大小,以及保存任务的列表jobs,下面是变量定义:
// 线程池最大限制数
private static final int MAX_WORKER_NUMBERS = 100;
// 线程池默认的数量
private static final int DEFAULT_WORKER_NUMBERS = 1;
// 线程池最小数量
private static final int MIN_WORKER_NUMBERS = 1;
// 工作列表
private final LinkedList<Job> jobs = new LinkedList<Job>();
在线程池初始化时,我们要将核心线程池进行初始化,创建多个Worker线程,然后启动Worker线程。
// num 为DEFAULT_WORKER_NUMBERS 默认线程池大小
private void initializeWokers(int num) {
// 创建多个线程,加入workers中,并启动
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-"
+ threadNum.getAndIncrement());
thread.start();
}
}
Worker启动后,一直没有任务,需要阻塞在jobs上(jobs是上面定义的任务列表),Worker等待任务到来后唤醒获取队列中的任务并执行。下面的代码中,如果jobs为空,则线程等待;
// worker的代码,首先要获取jobs的锁,
synchronized (jobs) {
while (jobs.isEmpty()) {// 如果jobs是空的,则执行jobs.wait,使用while而不是if,因为wait后可能已经为空了,需要继续等待
try {
jobs.wait();
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();// 中断
return;// 结束
}
}
job = jobs.removeFirst();// 第一个job
if (job != null) {
try {
job.run();//注意,这里是run而不是start,传入的Job
} catch (Exception e) {
// 忽略Job执行中的Exception
e.printStackTrace();
}
}
}
提交任务时,只需要将任务加入jobs中,然后通知worker线程即可。worker线程获得锁后会取第一个任务执行。执行完毕,若jobs为空,worker线程继续进行休眠等待任务到来。
@Override
public void execute(Job job) {
if (job == null)
return;
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
完整的代码可以查看https://github.com/ssj234/JavaStudy_IO/tree/master/IOResearch/src/net/ssj/pool
3.2 Java的Executor框架
Java平台本身提供了Executor框架用来帮助我们使用线程池。
Executor框架最核心的类是ThreadPoolExecutor,这是各个线程池的实现类,有如下几个属性:
- corePool:核心线程池的大小 m
- maximumPool:最大线程池的大小
- keepAliveTime: 休眠等待时间
- TimeUnit unit : 休眠等待时间单位,如微秒/纳秒等
- BlockingQueue workQueue:用来保存任务的工作队列
- ThreadFactory: 创建线程的工厂
- RejectedExecutionHandler:当线程池已经关闭或线程池Executor已经饱和,execute()方法将要调用的Handler
通过Executor框架的根据类Executors,可以创建三种基本的线程池:
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
FixedThreadPool
FixedThreadPool被称为可重用固定线程数的线程池。
// 获取fixedThreadPool
ExecutorService fixedThreadPool=Executors.newFixedThreadPool(paramInt);
//内部会调用下面的方法,参数 corePoolSize、maximumPoolSize、keepAliveTime、workQueue
return new ThreadPoolExecutor(paramInt, paramInt, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
FixedTheadPool设置的线程池大小和最大数量一样;keepAliveTime为0,代表多余的空闲线程会立刻终止;保存任务的队列使用LinkedBlockingQueue,当线程池中的线程执行完任务后,会循环反复从队列中获取任务来执行。
FixedThreadPool适用于限制当前线程数量的应用场景,适用于负载比较重
的服务器。
SingleThreadExecutor
SingleThreadExecutor的核心线程池数量corePoolSize和最大数量maximumPoolSize都设置为1,适用于需要保证顺序执行
的场景
ExecutorService singleThreadExecutor=Executors.newSingleThreadExecutor();
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
CachedThreadPool
CachedThreadPool是一个会根据需要创建新线程的线程池,适用于短期异步的小任务,或负载教轻
的服务器。
ExecutorService cachedThreadPool=Executors.newCachedThreadPool();
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
SynchronousQueue是一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。corePoolSize是0,maximumPoolSize都最大,无界的。keepAliveTime为60秒,空闲线程超过60S会被终止。
ScheduleThreadPoolExecutor
ScheduleThreadPoolExecutor和Timer类似,可以设置延时执行或周期执行,但比Timer有更多的功能。Timer和TimerTask只创建一个线程,任务执行时间超过周期会产生一些问题。Timer创建的线程没有处理异常,因此一旦抛出非受检异常,会立刻终止。
ScheduledThreadPoolExecutor executor=new ScheduledThreadPoolExecutor(5);
//可以直接执行
executor.execute(new JobTaskR("executor", 0));
executor.execute(new JobTaskR("executor", 1));
System.out.println("5S后执行executor3");
//隔5秒后执行一次,但只会执行一次。
executor.schedule(new JobTaskR("executor", 3), 5, TimeUnit.SECONDS);
System.out.println("开始周期调度");
//设置周期执行,初始时6S后执行,之后每2s执行一次
executor.scheduleAtFixedRate(new JobTaskR("executor", 4), 6, 2, TimeUnit.SECONDS);
scheduleAtFixedRate或者scheduleWithFixedDelay方法,它们不同的是前者以固定频率执行,后者以相对固定延迟之后执行。
3.3 Netty的EventLoop与线程池
Netty的事件循环和事件循环组的实现中,类的层级关系比较复杂,其底层是Java线程池的实现,不过在netty的实际使用中还是比较简单的,我们只需要使用如下的代码即可,
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workGroup=new NioEventLoopGroup();
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup,workGroup)//设置事件循环组
Netty的事件循环机制有两个基本接口:EventLoop和EventLoopGroup。前者是事件循环,后者是由多个事件循环组成的组。
EventLoop自身是一个不断循环执行的线程,以NioEventLoop为例,其继承了SingleThreadEventExecutor
,内部的executor
是创建NioEventLoop时传入的线程池,用来将run方法放入线程池中执行;此外还包含为一个TaskQueue,netty在处理io过程中的task可以提交到这个队列中,事件循环会不断获取task并执行,因此但其本身也可以看做一个线程池。
NioEventLoop的run方法中,Nio的事件循环会不断select后获取任务并执行,然后根据ioRatio的设置执行TaskQueue的任务。NioEventLoop的execute方法中,其会将task加入到taskQueue等待事件循环执行。因此,我们可以将NioEventLoop当做一个不断执行的线程池,EventLoopGroup作为线程池组,线程池组的意义是采用给的的策略选取一个EventLoop并提交任务。
EventLoop的定义如下,其继承了一个顺序执行的线程池接口和EventLoopGroup,也就是说EventLoop之间有父子关系,通过parent();返回任务循环组,通过next()选取一个事件循环。线程池组的register用于将Netty的Channel注册到事件循环中。
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
}
public interface EventLoopGroup extends EventExecutorGroup {
EventLoop next();
ChannelFuture register(Channel channel);
}
NioEventLoopGroup
NioEventLoopGroup除了处理网络的异步I/O任务,还用于完成异步提交的系统任务。NioEventLoopGroup初始化时,有如下几个参数可以配置,主要用于设置线程池的相关配置。
- nThreads 子线程池数量
- Executor executor 用来执行事件循环的线程池
- chooserFactory :next()时选择线程池的策略
- selectorProvider 用于打开selector
- selectStrategyFactory 用来控制select循环行为的策略
- RejectedExecutionHandlers 线程池执行的异常处理策略
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
RejectedExecutionHandlers.reject());
}
NioEventLoopGroup初始化过程为:
- 如果传入的executor 为空,会默认使用
ThreadPerTaskExecutor
,该线程池针对每个任务会创建一个线程,创建线程方式使用DefaultThreadFactory
提供的newThread方法。 - 初始化开始,首先会根据创建nThread个子线程池,保存在childrens变量中,创建逻辑比较简单,将初始化NioEventLoopGroup时设置的参数传递给NioEventLoop对象。在创建子线程池NioEventLoop的过程中,如果一旦有失败的,就需要关闭已经创建的所有子线程池并等待这些线程池结束。
- 之后,使用chooserFactory创建
chooser
,用来在next()选择事件循环时从childrens变量选择一个返回。默认使用2的倍数的策略,也可以设置为顺序依次选择。 - 向组中所有的事件循环的
terminationFuture
注册事件,目的是等待所有事件循环结束后将事件循环组的terminatedChildren
设置为成功完成。 - 最后,将children复制保存为一个只读的集合,保存在变量
readonlyChildren
中。
至此,NioEventLoopGroup的初始化过程就结束了。我们可以看到,NioEventLoopGroup主要的用来聚合多个EventLoop,对其进行调度。
NioEventLoop
在NioEventLoopGroup的初始化过程中,会创建多个NioEventLoop,NioEventLoop用来执行实际的事件循环,初始化时有如下几个属性:
NioEventLoopGroup parent 线程池所在的Group
Executor executor 执行任务的线程池,默认是ThreadPerTaskExecutor
SelectorProvider selectorProvider 用来打开selector
SelectStrategy strategy 用来控制select循环行为的策略
RejectedExecutionHandlers 线程池执行的异常处理策略
addTaskWakesUp addTask(Runnable)添加任务时是否唤醒线程池,默认是false
maxPendingTasks 线程池中等待任务的最大数量
scheduledTaskQueue 保存定时任务的QUeue
tailTasks :保存任务的Queue,netty选择使用jctools的MpscChunkedArrayQueue,原因是为了提高效率,因为Nio线程池的线程消费者只有一个,就是一直进行的select循环,而生产者可能有多个。具体实现参见 http://blog.csdn.net/youaremoon/article/details/50351929
事件循环
- NioEventLoop初始化时,会根据配置参数
sun.nio.ch.bugLevel
和io.netty.selectorAutoRebuildThreshold
设置重建selector的阈值,这是为了解决jvm空轮询导致cpu利用率100%的问题。 - openSelector的目的是打开选择描述符Selector,并对
sun.nio.ch.SelectorImpl
的实现进行优化,将selectedKeys和publicSelectedKeys属性都修改为SelectedSelectionKeySet类,这个类使用了两个数组,使用空间换时间的方法,设置了两个数组,每次使用其中的一个。 - 打开Selector之后,在服务器启动后会调用register将选择描述符注册到EventLoopGroup,NioEventLoopGroup中会调用NioEventLoop的register,这样,事件循环中的Selector就注册到了channel上。
- 在run方法中,会根据selectStrategy调用select方法,收到io事件后使用processSelectedKeys处理,处理完成后执行TaskQueue中的方法。
提交任务
NioEventLoop初始化时,会创建/设置其包含的属性,最重要的是打开selector和创建tailTasks两个步骤;这时,由于没有任何任务,NioEventLoop不会启动线程。在netty中,向线程池提交任务可以使用下面的方法:
EventLoopGroup loop = new NioEventLoopGroup();
loop.next().submit(Callable<T> task)
loop.next().submit(Runnable task)
loop.next().execute(Runnable command);
也可以直接通过EventLoopGroup提交任务,只是EventLoopGroup内部会调用next()后再执行相关的方法。
EventLoopGroup loop = new NioEventLoopGroup();
loop.submit(Callable<T> task)
loop.submit(Runnable task)
loop.execute(Runnable command);
submit方法的内部会将Callable或Runnable包装后交给execute方法执行。
// AbstractExecutorService.java
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task); // 包装task为 ftask
execute(ftask);
return ftask;
}
execute方法被NioEventLoop的父类SingleThreadEventExecutor覆盖,程序如下:
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task); // 添加到任务队列
} else {
startThread(); // 启动线程,向EventLoop内部的线程池提交任务,会执行NioEventLoop run
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
- 判断当前线程(提交任务的线程)与当前线程池是同一个线程,也就是说是如果是当前线程池提交的任务,则直接将任务加入线程池队列即可;
- 如果不是,则需要启动线程后添加任务。启动线程的过程是,如果内部线程没有启动则启动,向NioEventLoop内部包含的executor提交一个任务,任务内部执行NioEventLoop的run方法也就是事件循环(executor是实际使用的线程池,初始化是传入,默认是ThreadPerTaskExecutor)。
- 最后根据addTaskWakesUp标志和任务是否实现了NonWakeupRunnable判断是否需要唤醒,唤醒的方法是提交一个默认的空任务WAKEUP_TASK。
3.4 事件循环解析
Nio事件循环在NioEventLoop中,主要功能:
- 处理网络I/O读写事件
- 执行系统任务和定时任务
在主循环中我们可以看到netty对I/O任务和提交到事件循环中的系统任务的调度。
3.4.1 I/O事件
- 由于NIO的I/O读写需要使用选择符,因此,netty在NioEventLoop初始化时,会使用SelectorProvider打开selector。在类加载时,netty会从系统设置中读取相关配置参数:
- sun.nio.ch.bugLevel 用来修复JDK的NIO在Selector.open()的一个BUG
- io.netty.selectorAutoRebuildThreshold select()多少次数后重建selector
static {
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
}
- NioEventLoop的构造方法中,会调用provider.openSelector()打开Selector;如果设置
io.netty.noKeySetOptimization
为true,则会启动优化,优化内容是将Selector的selectedKeys和publicSelectedKeys属性设置为可写并替换为Netty实现的集合以提供效率。
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
// 下面是优化程序,此处省略
...
return selector;
}
- NioEventLoop最核心的地方在于事件循环,具体代码在NioEventLoop.java在run方法中
- 首先根据默认的选择策略DefaultSelectStrategy判断本次循环是否select,具体逻辑为:如果当前有任务则使用selectNow立刻查询是否有准备就绪的I/O;如果当前没有任务则返回SelectStrategy.SELECT,并将wakenUp设置为false,并调用select()进行查询。
protected void run() {
for (;;) { // 事件循环
try {
// select策略
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false)); // select()
if (wakenUp.get()) {
selector.wakeup(); // 唤醒select()的线程
}
default:
// fallthrough
}
.... 后续处理
select()时需要判断当前是否有scheduledTask(定时任务),如果有则需要计算任务delay的时间,如果定时任务需要立刻执行了,那么必须马上selectNow()并返回,之后执行任务。如果没有scheduledTask,会判断当前是否有任务在等待列表,如果有任务时将wakenUp设置为true并selectNow();如果没有任务,那么会 selector.select(1000); 阻塞等待1s,直到有I/O就绪,或者有任务等待,或需要唤醒时退出,否则,会继续循环,直到前面的几种情况发生后退出。
之后,事件循环开始处理IO和任务。如果查询到有IO事件,会调用processSelectedKeysOptimized(优化的情况下),对SelectionKey进行处理。
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime; // io花费的时间
runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 按照iorate计算task的时间
}
}
- processSelectedKeysOptimized处理I/O,主要是NIO的select操作,处理相关的事件。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
......
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
3.4.2 任务处理
- runAllTasks执行提交到EventLoop的任务,首先从scheduledTaskQueue获取需要执行的任务,加入到taskQueue,然后依次执行taskQueue的任务。
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue(); // 获取定时任务
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
- ioRatio不为100时,会调用runAllTasks(ioTime * (100 - ioRatio) / ioRatio),首先计算出I/O处理的事件,然后按照比例为执行task分配事件,内部主要逻辑与runAllTasks()主要逻辑相同。