10-Executor框架

Java的线程既是工作单元,也是执行机制。JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

1.Executor框架简介

①Executor框架的两级调度模型

java.lang.Thread被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。

在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。

②Executor框架的结构与成员

1)Executor框架的结构
  • 任务。包括被执行任务需要实现的接口Runnable和Callable接口。
  • 任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
  • 一步计算的结果。包括接口Future和实现Future接口的FutureTask类。
  • Executor:是一个接口,是Executor框架的基础。
  • ThreadPoolExecutor:是线程池的核心实现类,用来执行被提交的任务。
  • ScheduledThreadPoolExecutor:是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。比Timer更灵活,功能更强大。
  • Future接口和实现接口的FutureTask类:代表异步计算的结果。
  • Runnable接口和Callable接口的实现类:都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。
2)Executor框架的成员
  • ThreadPoolExecutor

    newFixedThreadPool:适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    

newSingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

newCachedThreadPool是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
  • ScheduleThreadPoolExecutor

    newScheduledThreadPool:包含若干个线程的ScheduledThreadPoolExecutor。适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。

        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
        public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }
    

    newSingleThreadScheduledExecutor:只包含一个线程的ScheduledThreadPoolExecutor。适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各个任务的应用场景。

        public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
        }
        public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1, threadFactory));
        }
    
  • Future

    <T> Future<T> submit(Callable<T> task)
    <T> Future<T> submit(Runnable task, T result)
    Future<?> submit(Runnable task)
    
  • Runnable和Callable

        public static Callable<Object> callable(Runnable task) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<Object>(task, null);
        }
        public static <T> Callable<T> callable(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            return new RunnableAdapter<T>(task, result);
        }
        static final class RunnableAdapter<T> implements Callable<T> {
            final Runnable task;
            final T result;
            RunnableAdapter(Runnable task, T result) {
                this.task = task;
                this.result = result;
            }
            public T call() {
                task.run();
                return result;
            }
        }
    

2.ScheduleThreadPoolExecutor详解

继承自ThreadPoolExecutor。主要用来在给定的延迟后执行运行任务,或者定期执行任务。功能与Timer类似,但功能更强大、更灵活。Timer对应的是单个后台线程,而ScheduleThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

①ScheduleThreadPoolExecutor的运行机制

  • 当调用scheduleAtFixedRate方法或者scheduleWithFixedDelay方法时,会向ScheduleThreadPoolExecutor的DelayedWorkQueue添加了一个实现了RunnableScheduledFuture接口的ScheduledFutureTask。

  • 线程池中的线程从DelayedWorkQueue中获取ScheduledFutureTask,然后执行任务。

ScheduleThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了如下的修改。

  • 使用DelayedWorkQueue作为任务队列。
  • 获取任务的方式不同。
  • 执行周期任务后,增加了额外的处理。

②ScheduleThreadPoolExecutor的实现

ScheduledFutureTask主要包含3个成员变量:

  • long time;//表示这个任务将要被执行的具体时间。
  • long sequenceNumber;//表示这个任务被添加到ScheduleThreadPoolExecutor中的序号。
  • long period;//表示任务执行的间隔周期。
  • 从DelayWorkQueue中获取已到期的ScheduleFutureTask(DelayWorkQueue.take())。
  • 线程1执行这个ScheduleFutureTask。
  • 线程1修改ScheduleFutureTask的time变量为下次将要执行的时间。
  • 线程1把这个修改time之后的ScheduleFutureTask放回DelayWorkQueue中(DelayWorkQueue.add())。
        public RunnableScheduledFuture<?> take() throws InterruptedException {
            //获取锁
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    //获取周期任务
                    RunnableScheduledFuture<?> first = queue[0];
                    //任务数组为空
                    if (first == null)
                        available.await();//等待
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0) 
                            return finishPoll(first);//获取头部元素,将最后一个元素放到第一位,并自上而下添加到其堆排序点。
                        //头部元素的time时间比当前时间大
                        first = null; // don't retain ref while waiting
                        if (leader != null) 
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread; 
                            try {
                                //到condition中等待到time时间
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();//释放锁
            }
        }
        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            lock.lock();//1 获取锁
            try {
                int i = size;
                if (i >= queue.length)
                    grow();//调整堆数组大小
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);//2.1 在底部添加到堆排序点
                }
                if (queue[0] == e) {
                    leader = null;
                    available.signal();//2.2 通知
                }
            } finally {
                lock.unlock();//3 释放锁
            }
            return true;
        }

3.FutureTask详解

①FutureTask简介

实现了Future接口和Runnable接口,可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。

FutureTask.get():未启动和已启动状态时调用,会导致线程阻塞。已完成状态时,调用线程立即返回结果或抛出异常。

FutureTask.cancel():未启动状态时,调用将导致此任务永远不会被执行。已启动状态时,cancel(true)将以中断执行此任务线程的方式来视图停止任务;cancel(false)将不会对正在执行此任务的线程产生任何影响(让正在执行的任务运行完成)。已完成状态时,执行cancel返回false。

②FutureTask的使用

当一个线程需要等待另一个线程把某个任务执行完成后才能继续执行,此时可以使用FutureTask。

假设有多个线程执行若干个任务,每个任务最多只能被执行一次,当多个线程视图同时执行同一个任务时,只允许一个线程执行任务,其他线程需要等待这个任务执行完成后才能继续执行。示例代码:

    private final ConcurrentHashMap<Object, Future<String>> taskCache = new ConcurrentHashMap<>();//每个任务只执行一次,多个线程可以获取到执行的结果
    private String executionTask(final String taskName) throws ExecutionException, InterruptedException {
        while (true) {
            Future<String> future = taskCache.get(taskName);    //1.1   执行完1.3后执行  2.1
            if (future == null) {
                FutureTask<String> futureTask = new FutureTask<>(() -> taskName);//1.2
                //如果存在taskName,不更改value,返回旧值,如果不存在,put
                future = taskCache.putIfAbsent(taskName, futureTask);//1.3
                if (future == null) {//put成功了
                    future = futureTask;
                    futureTask.run();//1.4执行任务
                }
            }
            try {
                return future.get();//1.5 2.2
            } catch (CancellationException e) {
                taskCache.remove(taskName, future);//当taskName和future有映射关系的时候,才移除
            }
        }
    }

当两个线程试图同时执行同一个任务时,如果Thread1执行了1.3后Thread2执行2.1,那么接下来Thread2将在2.2等待,知道Thread1执行完成1.4后Thread2才能从2.2返回。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容