Java并发编程高级篇(十):分离任务的执行和结果的处理

在之前的例子中,我们使用执行器框架都是在主类中提交任务,等待任务执行完毕后再去处理任务执行的结果。接下来我们打算将任务的提交和结果的处理都放置到线程中去执行。在每个任务内部提交自己到执行器,然后通过一个统一的结果处理线程来处理所有任务执行的结果。

为了解决这个问题,执行器框架为我们提供了一个CompletionService类,任务执行线程和结果处理线程能够共享这个类,结果处理线程便可以在这里渠道已经执行完毕的任务的结果。CompletionService类的内部也是通过一个ExecutorService来提交任务的。

首先,创建任务线程,实现Callable接口。模拟报表生成过程。

/**
 * 模拟生成报告
 *
 * Created by hadoop on 2016/11/3.
 */
public class ReportGenerator implements Callable<String> {
    private String sender;
    private String title;

    public ReportGenerator(String sender, String title) {
        this.sender = sender;
        this.title = title;
    }

    @Override
    public String call() throws Exception {
        long duration = (long)(Math.random() * 10);

        System.out.printf("ReportGenerator: Generator report %s_%s duration %d seconds.\n", sender, title, duration);

        TimeUnit.SECONDS.sleep(duration);

        return sender + "_" + title;
    }
}

然后我们创建任务提交线程,这个线程的构造方法接受两个参数,分别是报表名称和CompletionService对象。将报表生成任务提交到CompletionService去执行。

import java.util.concurrent.CompletionService;

/**
 * 模拟请求获取报告
 *
 * Created by hadoop on 2016/11/3.
 */
public class ReportRequest implements Runnable {
    private String name;
    private CompletionService<String> service;

    public ReportRequest(String name, CompletionService<String> service) {
        this.name = name;
        this.service = service;
    }

    @Override
    public void run() {
        ReportGenerator generator = new ReportGenerator(name, "Report");
        service.submit(generator);
    }
}

下面我们创建任务结果处理类,来打印生成的报表。这个类同样会拿到CompletionService的引用,然后循环调用CompletionService.poll()方法来从任务结果队列中获取执行的结果,这个方法接受一个时间参数,如果当前结果队列为空,那么则等待这个时间,超时返回null。不带参数的poll()方法,如果对别为空则直接返回null。

/**
 * 处理报表结果
 *
 * Created by hadoop on 2016/11/3.
 */
public class ReportProcessor implements Runnable {
    private boolean end;
    private CompletionService<String> service;

    public ReportProcessor(boolean end, CompletionService<String> service) {
        this.end = end;
        this.service = service;
    }

    @Override
    public void run() {
        while (!end) {
            try {
                Future<String> future = service.poll(20, TimeUnit.SECONDS);

                if (future != null) {
                    System.out.printf("ReportReceiver: received %s\n", future.get());
                }

            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }

    public void setEnd(boolean end) {
        this.end = end;
    }
}

最后我们创建主方法类。在这里我们创建ExecutorServer并把它赋值给ExecutorCompletionService。之后创建两个报表请求任务和一个报表处理任务,同时持有ExecutorCompletionService的引用。

import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 在执行器中分离任务执行和结果处理
 *
 * 我们如何处理在一个对象里发送任务给执行器,在另一个对象里处理任务执行结果。
 * 对于这种情况Java API提供了CompletionService类
 *
 * CompletionService使用Executor对象类执行任务。
 *   优势在于:可以共享CompletionService。
 *   缺点在于:CompletionService获取的Future对象只能是已经执行完毕的任务,他没有办法控制任务状态,只能处理任务结果。
 *
 * 我们创建了一个ExecutorService,然后使用这个ExecutorService来初始化一个ExecutorCompletionService<String>(executor)。
 *
 * 首先创建了两个ReportRequest任务,然后在任务内部使用service.submit(generator)方法调用报表生成任务。
 *
 * 然后再ReportProcessor中不断调用service.poll(20, TimeUnit.SECONDS);方法获取已经执行完的结果,如果当前没有结果那么等待20秒。
 *
 * CompletionService还提供了两个人方法:
 *   poll():如果没有任何Future直接返回null。
 *   take():如若任务队列中没有Future那么阻塞知道有可用的Future。
 *
 * Created by hadoop on 2016/11/3.
 */
public class Main {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();

        CompletionService<String> service = new ExecutorCompletionService<String>(executor);

        ReportRequest request1 = new ReportRequest("Face", service);
        ReportRequest request2 = new ReportRequest("Online", service);

        ReportProcessor processor = new ReportProcessor(false, service);

        Thread thread1 = new Thread(request1);
        Thread thread2 = new Thread(request2);
        Thread thread3 = new Thread(processor);

        thread1.start();
        thread2.start();
        thread3.start();

        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        executor.shutdown();

        try {
            executor.awaitTermination(1, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        processor.setEnd(true);
    }
}

控制台中,我们可以看到两个任务的提交信息和结果处理信息。

ReportGenerator: Generator report Online_Report duration 4 seconds.
ReportGenerator: Generator report Face_Report duration 1 seconds.
ReportReceiver: received Face_Report
ReportReceiver: received Online_Report
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容

  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,766评论 1 19
  • 一.线程安全性 线程安全是建立在对于对象状态访问操作进行管理,特别是对共享的与可变的状态的访问 解释下上面的话: ...
    黄大大吃不胖阅读 811评论 0 3
  • 作者: 一字马胡 转载标志 【2017-11-01】 更新日志 日期更新内容备注2017-11-01新建文章V1...
    一字马胡阅读 7,252评论 9 133
  • 一、并发 进程:每个进程都拥有自己的一套变量 线程:线程之间共享数据 1.线程 Java中为多线程任务提供了很多的...
    SeanMa阅读 2,373评论 0 11
  • 图&文|珞寂朵 成都有个很文艺的名字,如标题那般。 成都有很多美丽的花朵,金秋十月,满城五颜六色。 但最令我印象深...
    珞寂朵阅读 1,785评论 91 65