10.并发编程之MasterWorker

Master-Worker模式:常用的并行计算模式,核心思想是系统由两类进行协作工作:Master进程 和Worker进程。
Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后,会将结果返回给Master,由Master做归纳与总结。
好处是将一个大任务分解成若干个小任务,并行执行,提高系统吞吐量。
实际具体的业务处理方法handle()不应该写在核心框架中,最好写在Worker子类中,且是抽象的,模板方法。在Main函数中可以new自己的子类,进行解耦。

    package demo5;

    public class Task {
        private int     id;
        private String  name;
        private int     price;

        /**
         * @return the id
         */
        public int getId() {
            return id;
        }

        /**
         * @param id
         *            the id to set
         */
        public void setId(int id) {
            this.id = id;
        }

        /**
         * @return the name
         */
        public String getName() {
            return name;
        }

        /**
         * @param name
         *            the name to set
         */
        public void setName(String name) {
            this.name = name;
        }

        /**
         * @return the price
         */
        public int getPrice() {
            return price;
        }

        /**
         * @param price
         *            the price to set
         */
        public void setPrice(int price) {
            this.price = price;
        }
    }
    package demo5;

    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;

    public abstract class Worker implements Runnable {

        private ConcurrentLinkedQueue<Task>         workQueue;
        private ConcurrentHashMap<String, Object>   resultMap;

        public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
            this.workQueue = workQueue;
        }

        public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
            this.resultMap = resultMap;
        }

        public abstract Object handle(Task input);

        @Override
        public void run() {
            while (true) {
                Task input = this.workQueue.poll();
                if (input == null) {
                    break;
                }
                // 真正去做业务处理
                Object output = handle(input);
                this.resultMap.put(Integer.toString(input.getId()), output);
            }

        }

    }
    package demo5;

    public class MyWorker extends Worker {
        public Object handle(Task input) {

            Object output = null;
            try {
                Thread.sleep(500);
                output = input.getPrice();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return output;
        }
    }

    package demo5;

    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;

    public class Master {
        // 1.应该有一个盛装任务的集合
        private ConcurrentLinkedQueue<Task>         workQueue   = new ConcurrentLinkedQueue<Task>();

        // 2.使用HashMap去盛装所有worker对象
        private HashMap<String, Thread>             workers     = new HashMap<String, Thread>();

        // 3.使用一个容器盛装每一个worker并发执行任务的结果集
        private ConcurrentHashMap<String, Object>   resultMap   = new ConcurrentHashMap<String, Object>();

        // 4.构造方法
        public Master(Worker worker, int workerCount) {
            // 每一个worker对象都需要有Master的引用workQueue用于任务的领取,resultMap用于任务的提交
            worker.setWorkerQueue(this.workQueue);
            worker.setResultMap(this.resultMap);
            for (int i = 0; i < workerCount; i++) {
                // key表示每一个worker的名字,value表示线程执行对象
                workers.put("子节点" + Integer.toString(i), new Thread(worker));
            }
        }

        // 5.提交方法
        public void submit(Task task) {
            this.workQueue.add(task);
        }

        // 6.需要有一个执行的方法,启动应用程序,让所有的worker工作
        public void execute() {
            for (Map.Entry<String, Thread> me : workers.entrySet()) {
                me.getValue().start();
            }
        }

        // 7.判断线程是否执行完毕
        public boolean isComplete() {
            for (Map.Entry<String, Thread> me : workers.entrySet()) {
                if (me.getValue().getState() != Thread.State.TERMINATED) {
                    return false;
                }
            }
            return true;
        }

        // 8.返回结果集数据
        public int getResult() {
            int ret = 0;
            for (Map.Entry<String, Object> me : resultMap.entrySet()) {
                ret += (Integer) me.getValue();
            }
            return ret;
        }
    }

    package demo5;

    import java.util.Random;

    public class Main {
        public static void main(String[] args) {
            System.out.println("我的机器可用processor数量:" + Runtime.getRuntime().availableProcessors());

            Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors());
            Random r = new Random();
            for (int i = 0; i <= 100; i++) {
                Task t = new Task();
                t.setId(i);
                t.setName("任务" + i);
                t.setPrice(r.nextInt(1000));
                master.submit(t);
            }
            master.execute();
            long start = System.currentTimeMillis();
            while (true) {
                if (master.isComplete()) {
                    long end = System.currentTimeMillis() - start;
                    int result = master.getResult();
                    System.out.println("最终结果:" + result + ", 执行耗时: " + end);
                    break;
                }
            }
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,455评论 25 707
  • 前文再续,就书接上一回,随着与Server、TCP、Protocol的邂逅,Swoole终于迎来了自己的故事,今天...
    蜗牛淋雨阅读 1,709评论 1 14
  • 关于黑夜 没有谁能够比白天更懂他 这是开始 也是白天辛苦孕育的胎芽 快乐挣扎 原来最美的一刻就是临盆的刹那
    候鸟飞呀阅读 177评论 0 4
  • 不知不觉已经坚持了长达36天写作了。虽然有很多篇文章无法令自己满意,可收获的是一个好习惯。21天形成习惯不是吹的。...
    Miss文小姐阅读 175评论 1 0
  • 也许大家已经看到了我和两位助教在一起的照片,曙东和现发两位助教专程为我赶来,还有我的EMO和几位学长,其原因就是因...
    卜芳阅读 179评论 2 5