20.执行器

任务

任务通常是一些抽象且离散的逻辑工作单元。当围绕任务执行来构建并发程序时,需要找到任务的边界,使得每个任务尽可能与其他任务独立开来,这样能够独立地在单独的线程中执行,提高并发性。

线程池

线程池是指管理一组同构工作线程的资源池,线程池与工作队列是密切相关的,在工作队列中保存了所有等待执行的任务。工作者线程的任务就是从工作队列中取出一个任务执行,执行结束返回线程池等待下一个任务。

在线程池中的线程不是根据任务临时创建的,而是事先准备好一组线程,等待任务的出现,执行完任务并不随着任务结束而销毁,而是返回线程池等待下次任务的复用。这减少了线程频繁创建和销毁的开销,同时因为线程池的大小限制,也限制住系统中最大的并发量。

为什么要使用线程池

  1. 构建和销毁一个线程是要与操作系统交互的,这个成本是很高的;
  2. 活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程对于处理器的数量,那么就会闲置一些线程。大量的线程会占用大量的内存空间,也会给垃圾回收带来压力,因为线程往往生命周期很短。大量的线程竞争CPU也会带来性能的问题;
  3. 可创建的线程数量在各个平台上有不同的限制,如果破坏了限制,可能会抛出OOM异常而终止程序。在一定范围内,增加线程可以提高系统的吞吐量,但是物极必反,再创建更多的线程只会降低程序的执行速度,过多的创建线程可能会使系统崩溃。

任务执行策略

任务是一组逻辑工作单元,需要依附于运行线程被执行。任务的执行策略有简单粗暴的把所有任务放入单线程中顺序执行、为每个任务创建一个线程执行、使用异步任务执行框架来执行。

单线程执行简单、但非常不高效且可能会因为一个任务的错误导致整个任务阻塞。每个任务创建线程的缺点上节已经阐述。最佳选择是选择异步执行框架也就是基于线程池的任务执行框架。

Executor框架

Executor是一个简单的函数式接口:

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

它提供一种标准的方法将任务的提交和执行过程解耦,使用Runnalbe作为任务的抽象。

Executor还提供了对周期性任务的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。

Executor基于生产者-消费者模式,提交任务相当于生产者,执行任务相当于消费者。

Executor有许多静态工厂方法用来构建线程池(返回的是ExecutorService):

方法 描述
newCachedThreadPool 必要时创建新线程,空闲线程会被保留60秒
newFixedThreadPool 创建指定数量线程的线程池,且线程一直保留
newSingleTheadPool 只有一个线程的线程池,改线程顺序执行提交的每一个任务
newScheduledThreadPool 用于预定执行而构建的线程池,用于替代java.util.Timer
newSingleThreadScheduledExecutor 用于执行预定任务的单线程池。

线程池是执行框架的实现的一部分,作为消费者角色,执行器将任务分发给线程池执行。

当用完一个线程池时,需要将线程池关闭,否则JVM将无法退出。关闭线程池有两种方法:

  • shutdown

此方法是平缓的关闭方式,不再接受新的任务,等待以已经启动的任务结束,当所有的任务完成,线程池中的线程死亡。

  • shutdownNow

暴力关闭方式,取消尚未开始的任务并试图中断正在运行的线程。

ExecutorService的生命周期有三种:运行、关闭、终止。Executor初始创建时处于运行状态,执行shutdown之后进入关闭状态,等所有任务都完成后进入终止状态。可以调用awaitTermination等待ExecutorService到达终止状态,或者使用isTerminated轮询状态。

使用线程池的一般逻辑:

  1. 调用Executors类的静态方法newCachedThreadPool或者newFixedThreadPoo
  2. 调用submit提交任务(Runnable或Callable对象)
  3. 如果想要取消一个任务,或者提交Callable对象,要保存好返回的Future对象
  4. 当不再提交新任务时,调用shutdown。

ExecutorService

public interface ExecutorService extends Executor {

    void shutdown();
    
    List<Runnable> shutdownNow();
    
    boolean isShutdown();
    
    boolean isTerminated();
    
    boolean awaitTermination(long timeout, TimeUnit unit)    throws InterruptedException;
    
    <T> Future<T> submit(Callable<T> task);
    
    <T> Future<T> submit(Runnable task, T result);
 
    Future<?> submit(Runnable task);
    /*
    * 提交所有对象到一个Callable对象的集合中,并返回一个Future对象列表,代表所有任务的Future对象。
    * 可以使用ExecutorCompletionService对结果按可获得的顺序排序。
    */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)         throws InterruptedException;
     
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)  throws InterruptedException;
    /*
    *提交所有对象到一个Callable对象的集合中,并返回某个已经完成的任务的结果;
    * 这个结果无法知道是哪个任务返回的,返回值之后,这个任务组就结束了。
    */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Callable和Future

Executor以Runnable为任务抽象,但是Runnable方法不能抛出异常且没有返回值。Callable是一种更好的任务抽象,它认为主入口点将返回一个值并可能抛出一个异常。

Future表示一个任务的生命周期,并提供了方法来判断是否已经完成或取消,以及获取任务的结果或者取消任务等。Future需要和Callable合作使用才能获取返回结果。

CompletionService控制任务组

如果向Executor提交了一组任务,并且希望在任务执行完成后获得结果,那么可以保留每个任务的Future对象,然后通过调用get方法获取执行结果。CompletionService提供了更好的方法来完成此需求。

CompletionService将Executor和BlockingQueue的功能融合在一起。可以将Callable任务提交给它执行,然后使用类似队列操作的take和poll方法获取已完成的结果,而且这些结果在完成时会被封装成Future对象。

ExecutorCompletionService实现了CompletionService,并将计算部分委托给Executor。

CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);
completionServie.submit(()->{//...});

Future<ImageInfo> f = completionService.take();

为任务设置时限

利用Future的限时get方法。

异构任务的并行问题

各个任务执行时间可能差距较大,导致并行任务的时间依赖于最久执行时间的任务。

Fork-join框架

并行任务执行框架。
要采用框架可用的一种方式完成递归计算,需要通过一个扩展RecursiveTasK<T>的类或者提供一个扩展RecursiveAction的类,再覆盖compute方法来生成和调用子任务,然后合并结果。

可完成Future

处理非阻塞调用的传统方法是使用事件监听器,为任务完成之后要出现的动作注册一个处理器,如果下一个动作也是异步的,在它之后的下一个动作会在一个不同的事件处理器中。这样虽然在功能上不会有什么问题,但是这个流程的代码可能分散到各处。
Java8的CompletableFuture类提供了一种候选方法,可完成Future可以组合。

    @Test
    public void testCompletableFuture() {

        String res = CompletableFuture.supplyAsync(() -> "hello").thenApplyAsync((a) -> a + "world").join();
        System.out.println(res);
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容