Java线程池

线程池的作用

暂且不表

线程池

java提供的线程池类是ThreadPoolExecutor
下图是类ThreadPoolExecutor的继承关系

ThreadPoolExcecutor继承关系

使用线程池,我们一般关心以下几个场景:

  • 为了控制线程数量,需要固定线程池大小
  • 希望压力小时线程少点,压力大时多创建点线程。压力再次减小时,多余的线程过一段时间自动销毁,以节省资源
  • 当请求过多时,多余的请求放入什么样的等待队列中。
  • 当请求太多,连队列都放不下的时候,应采取什么样的抛弃策略。

鉴于以上场景,ThreadPoolExecutor有4个构造函数,能够满足所有的需求:

public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

我们需要先看看构造函数的参数都是什么意思:

int corePoolSize: 线程池中alive线程的最少数量。

默认情况下,创建线程池后,线程池中是没有任何线程的。线程的创建延迟到了请求到达的时候。也就是说,一开始线程池中线程数量是0,来一个请求就创建一个线程。当线程数量达到corePoolSize时,线程池中线程的数量最少为corePoolSize

int maximumPoolSize: 线程池中最大线程数量。

在maximumPoolSize>corePoolSize的情况下,线程池中线程数量可超过corePoolSize,以应对过大的压力。当压力降下来的时候,一部分线程过一段时间会自动销毁,直至数量减少到corePoolSize。如果maximumPoolSize=corePoolSize,那么线程池的大小就是固定的了。
Q: 线程数什么时候才会超过corePoolSize呢?
A: 多余的请求会先进入等待队列,当等待队列满了的时候,会创建新的线程来处理请求(前提是maximumPoolSize>corePoolSize),线程数最大为maximumPoolSize。当还有更多请求时,就要采取抛弃策略了。
Q: 那岂不是新请求会被先执行?

long keepAliveTimeTimeUnit unit: 线程存活时间

maximumPoolSize中提到线程数量可以超过corePoolSize。这些额外的线程过多场时间销毁呢?就是由keepAliveTime和unit决定的。keepAliveTime是个数字,unit表示时间。TimeUnit的可选值有:

  • TimeUnit.DAYS; //天
  • TimeUnit.HOURS; //小时
  • TimeUnit.MINUTES; //分钟
  • TimeUnit.SECONDS; //秒
  • TimeUnit.MILLISECONDS; //毫秒
  • TimeUnit.MICROSECONDS; //微妙
  • TimeUnit.NANOSECONDS; //纳秒

如keepAliveTime=1,unit为TimeUnit.MINUTES,表示多余的线程过1分钟后销毁。

BlockingQueue<Runnable> workQueue:等待队列,存放等待的请求。当线程数达到corePoolSize时,再来的请求会放入等待队列

  • ArrayBlockingQueue: 基于数组的先进先出队列,此队列创建时必须指定大小;
  • LinkedBlockingQueue: 基于链表的先进先出队列,可以指定大小。如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
  • SynchronousQueue: 这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

RejectedExecutionHandler handler: 线程池中线程不够用,队列也放不下了,采取什么样的策略处理新请求。

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃线程队列里最近的一个任务,执行新提交的任务
  • ThreadPoolExecutor.CallerRunsPolicy:用调用者的线程来运行任务

根据实际需求,设置好以上这些参数,就能创建出一个可用的线程池。

除了上面的两个构造函数,ThreadPoolExecutor还提供了支持ThreadFactory创建线程的构造函数。如下:

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
        
     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
         BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
}

ThreadFactory的作用......

有了ThreadPoolExecutor,只需设置一些参数就可以拥有一个线程池,是不是很简单!是!但是还可以更简单。

程序员一直都有懒的天性。因为懒,所以才创造出了各种工具、各种语言。秉承着懒懒更健康的原则,对于常见线程池,JDK提供了创建工具Executors

Executors

Executors是一个创建线程池的工具类。它提供了最常见的线程池的创建方法。不需要我们再苦思冥想设置参数了,只需选择合适的函数,一个最通用的线程池就创建了。

Executors主要提供了以下四种工具:

  • newFixedThreadPool(int nThreads): 创建线程数量固定的线程池。
  • newSingleThreadExecutor(): 创建只有一个线程的线程池。
  • newCachedThreadPool(): 创建不限制线程数量的线程池。
  • newScheduledThreadPool(int corePoolSize): 创建一个定时调度的线程池。

源码如下:

public class Executors {
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newSingleThreadExecutor() {
     return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){
    new ScheduledThreadPoolExecutor(corePoolSize);
    }
}

通过源码,我们可以知道:

  • newFixedThreadPool(int nThreads)创建的线程池其实就是corePoolSize=maximumPoolSize,使用无界队列的线程池
  • newCachedThreadPool()创建的线程池其实是corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,线程闲置60s后自动销毁,同样使用无界队列的线程池。

抛弃策略是啥?
有人可能发现工具类中没有提供抛弃策略的参数。其实是使用了默认的抛弃策略:

private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

Executors中还有其他创建线程池的方法,可自行查阅,选取适合自己需求的使用。

线程池的常用操作

Future<?> submit(Runnable task);

向线程池中提交一个Runnable类,返回一个Future对象。当调用get()方法时,会阻塞。线程结束时,返回null。

<T> Future<T> submit(Runnable task, T result);

向线程池中提交一个Runnable类,返回一个Future对象。当调用get()方法时,会阻塞。线程结束时,返回传入的result。

<T> Future<T> submit(Callable<T> task);

向线程池中提交一个Callable类,返回一个Future对象。当调用get()方法时,会阻塞。线程结束时,返回执行结果。

void shutdown();

关闭线程。已经submit的会继续执行直至结束,不会再接收新的任务

List<Runnable> shutdownNow();

如果shutdown()时,有线程在一直执行,不结束,总不能一直等吧。shutdownNow()会结束所有的线程,返回结果是所有等待任务。

Executor & ExecutorService & Executors

public interface Executor {
    void execute(Runnable command);
}
public interface ExecutorService extends Executor {
    void shutdown();
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
}
public interface ExecutorService extends Executor {
    void shutdown();
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
}
  • Executor 和 ExecutorService 这两个接口主要的区别是:ExecutorService 接口继承 Executor 接口,是 Executor 的子接口
  • Executor 和 ExecutorService 第二个区别是:Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象。
  • Executor 和 ExecutorService 接口第三个区别是 Executor 中的 execute() 方法不返回任何结果,而 ExecutorService 中的 submit()方法可以通过一个 Future 对象返回运算结果。
  • Executor 和 ExecutorService 接口第四个区别是除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown() 方法终止线程池。可以通过 《Java Concurrency in Practice》 一书了解更多关于关闭线程池和如何处理 pending 的任务的知识。
  • Executors 类提供工厂方法用来创建不同类型的线程池。比如: newSingleThreadExecutor() 创建一个只有一个线程的线程池,newFixedThreadPool(int numOfThreads)来创建固定线程数的线程池,newCachedThreadPool()可以根据需要创建新的线程,但如果已有线程是空闲的会重用已有线程。

应用实例

理论再多,不如看实际应用中的源码:
metrics-core定时report源码:

public abstract class ScheduledReporter{
    private final ScheduledExecutorService executor;
    protected ScheduledReporter(MetricRegistry registry,
                                String name,
                                MetricFilter filter,
                                TimeUnit rateUnit,
                                TimeUnit durationUnit) {
        this(registry, name, filter, rateUnit, durationUnit,
                Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(name + '-' + FACTORY_ID.incrementAndGet())));
    }
public void start(long period, TimeUnit unit) {
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    report();
                } catch (RuntimeException ex) {
                    LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this.getClass().getSimpleName(), ex);
                }
            }
        }, period, period, unit);
    }
    
    public void stop() {
        executor.shutdown(); // Disable new tasks from being submitted
        try {
            // Wait a while for existing tasks to terminate
            if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // Cancel currently executing tasks
                // Wait a while for tasks to respond to being cancelled
                if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                    System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
                }
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            executor.shutdownNow();
            // Preserve interrupt status
            Thread.currentThread().interrupt();
        }
    }
}

参考

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,252评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,886评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,814评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,869评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,888评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,475评论 1 312
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,010评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,924评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,469评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,552评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,680评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,362评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,037评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,519评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,621评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,099评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,691评论 2 361

推荐阅读更多精彩内容