线程池-Executor

目录:
Executor 框架简介
1.Executor
2.ExecutorService
3.Executors
4.Executor、ExecutorService、Executors对比
5.ThreadPoolExecutor参数详解

Executor 框架简介

在 Java 5 之后,并发编程引入了一堆新的启动、调度和管理线程的API。Executor 框架便是 Java 5 中引入的,其内部使用了线程池机制,它在 java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在 Java 5之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免 this 逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用 Executor 在构造器中。

Executor 框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable 等。

因为创建和管理线程非常心累,并且操作系统通常对线程数有限制,所以建议使用线程池来并发执行任务,而不是每次请求进来时创建一个线程。使用线程池不仅可以提高应用的响应时间,还可以避免"java.lang.OutOfMemoryError: unable to create new native thread" 之类的错误。


1.Executor

Executor 是一个抽象层面的核心接口(大致代码如下)。

public interface Executor {
    void execute(Runnable command);
}

2.ExecutorService

ExecutorService 接口 对 Executor 接口进行了扩展,提供了返回 Future 对象,终止,关闭线程池等方法。当调用 shutDown 方法时,线程池会停止接受新的任务,但会完成正在 pending 中的任务。

Future 对象提供了异步执行,这意味着无需等待任务执行的完成,只要提交需要执行的任务,然后在需要时检查 Future 是否已经有了结果,如果任务已经执行完成,就可以通过 Future.get() 方法获得执行结果。需要注意的是,Future.get() 方法是一个阻塞式的方法,如果调用时任务还没有完成,会等待直到任务执行结束。

通过 ExecutorService.submit() 方法返回的 Future 对象,还可以取消任务的执行。Future 提供了 cancel() 方法用来取消执行 pending 中的任务。

部分代码如下:

public interface ExecutorService extends Executor {
    void shutdown();
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
}

3.Executors

Executors 是一个工具类,类似于 Collections。提供工厂方法来创建不同类型的线程池,,分别为:

    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;

    /**
     * Created by lbc19 on 2018/6/4.
     */
    public class ExecutorsPractice {

        public static void main(String[] args) {
            /**
             Java通过Executors提供四种线程池,分别为:
             newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
             newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
             newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
             newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
             */

            /**
             1)newCachedThreadPool
             创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
             线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
             */
            ExecutorService executorService = Executors.newCachedThreadPool();

            for (int i = 0; i < 100; i++) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("a");
                    }
                });
            }

            /**
             2)newFixedThreadPool
             创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
             因为线程池大小为3,每个任务输出index后sleep 10秒,所以每10秒打印3个数字。
             定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()
             */
            ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 100; i++) {
                fixedThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("a");
                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });

            }

            /**
             3)newScheduledThreadPool
             创建一个定长线程池,支持定时及周期性任务执行。
             */

            //表示延迟3秒执行,只执行一次
            ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
            scheduledThreadPool.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("a" + new Date());
                }
            }, 3, TimeUnit.SECONDS);

            //表示延迟1秒后每3秒执行一次(一直执行)
            scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println("a" + new Date());
                }
            }, 1, 3, TimeUnit.SECONDS);

            //第一次延迟1s,之后都延迟3s(一直执行)
            scheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    System.out.println("a" + new Date());
                }
            }, 1, 3, TimeUnit.SECONDS);


            /**
             4)newSingleThreadExecutor
             创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
             */
            ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 100; i++) {
                singleThreadExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("a" + new Date());
                    }
                });
            }
        }
    }

部分源代码如下:发现他们用的都是ThreadPoolExecutor

public class Executors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
      public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
}

4.Executor、ExecutorService、Executors对比

Executor ExecutorService Executors
Executor 是 Java 线程池的核心接口,用来并发执行提交的任务; ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口,提供了异步执行和关闭线程池的方法; Executors 是一个工具类,类似于 Collections。提供工厂方法来创建不同类型的线程池,比如 FixedThreadPool 或 CachedThreadPool。
Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象
Executor 中的 execute() 方法不返回任何结果 ExecutorService 中的 submit()方法可以通过一个 Future 对象返回运算结果。1.通过 **Future.get() **方法获得执行结果,Future.get() 方法是一个阻塞式的方法;
不能取消任务 Future 提供了 cancel() 方法用来取消执行 pending 中的任务;
没有提供和关闭线程池有关的方法 ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown()方法终止线程池。

5.ThreadPoolExecutor参数详解

ThreadPoolExecutor.png
  • corePoolSize:线程池核心线程数大小
  • maximumPoolSize:线程池线程数最大值,达到最大值后线程池不会再增加线程执行任务
  • keepAliveTime:线程池中超过corePoolSize数目的空闲线程最大存活时间,allowCoreThreadTimeOut为
    true的核心线程有效时间
  • unit:时间单位
  • workQueue:阻塞任务队列,用于保存任务以及为工作线程提供待执行的任务
  • threadFactory:线程工厂,线程生成器
  • handler:当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理;

当一个新的任务提交到线程池之后,线程池是如何处理的:
1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步
3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务

ThreadPoolExecutor内部有实现4个拒绝策略,默认为AbortPolicy策略:

  • CallerRunsPolicy:由调用execute方法提交任务的线程来执行这个任务
  • AbortPolicy:抛出异常RejectedExecutionException拒绝提交任务
  • DiscardPolicy:直接抛弃任务,不做任何处理
  • DiscardOldestPolicy:去除任务队列中的第一个任务,重新提交
    创建完线程池后,可通过submit或execute方法提交任务。
    如果使用 Executors 的工厂方法创建的线程池,那么饱和策略都是采用默认的 AbortPolicy。所以如果我们想当线程池已满的情况,使用调用者的线程来运行任务,就要自己创建线程池,指定想要的饱和策略,而不是使用 Executors 了。
/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

感谢网友的分享:
http://wiki.jikexueyuan.com/project/java-concurrency/executor.html
https://blog.csdn.net/solrChina/article/details/78296253
https://www.jianshu.com/p/d621da64fae0?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容