10.并发编程之MasterWorker

Master-Worker模式:常用的并行计算模式,核心思想是系统由两类进行协作工作:Master进程 和Worker进程。
Master负责接收和分配任务,Worker负责处理子任务。当各个Worker子进程处理完成后,会将结果返回给Master,由Master做归纳与总结。
好处是将一个大任务分解成若干个小任务,并行执行,提高系统吞吐量。
实际具体的业务处理方法handle()不应该写在核心框架中,最好写在Worker子类中,且是抽象的,模板方法。在Main函数中可以new自己的子类,进行解耦。

    package demo5;

    public class Task {
        private int     id;
        private String  name;
        private int     price;

        /**
         * @return the id
         */
        public int getId() {
            return id;
        }

        /**
         * @param id
         *            the id to set
         */
        public void setId(int id) {
            this.id = id;
        }

        /**
         * @return the name
         */
        public String getName() {
            return name;
        }

        /**
         * @param name
         *            the name to set
         */
        public void setName(String name) {
            this.name = name;
        }

        /**
         * @return the price
         */
        public int getPrice() {
            return price;
        }

        /**
         * @param price
         *            the price to set
         */
        public void setPrice(int price) {
            this.price = price;
        }
    }
    package demo5;

    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;

    public abstract class Worker implements Runnable {

        private ConcurrentLinkedQueue<Task>         workQueue;
        private ConcurrentHashMap<String, Object>   resultMap;

        public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
            this.workQueue = workQueue;
        }

        public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
            this.resultMap = resultMap;
        }

        public abstract Object handle(Task input);

        @Override
        public void run() {
            while (true) {
                Task input = this.workQueue.poll();
                if (input == null) {
                    break;
                }
                // 真正去做业务处理
                Object output = handle(input);
                this.resultMap.put(Integer.toString(input.getId()), output);
            }

        }

    }
    package demo5;

    public class MyWorker extends Worker {
        public Object handle(Task input) {

            Object output = null;
            try {
                Thread.sleep(500);
                output = input.getPrice();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return output;
        }
    }

    package demo5;

    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedQueue;

    public class Master {
        // 1.应该有一个盛装任务的集合
        private ConcurrentLinkedQueue<Task>         workQueue   = new ConcurrentLinkedQueue<Task>();

        // 2.使用HashMap去盛装所有worker对象
        private HashMap<String, Thread>             workers     = new HashMap<String, Thread>();

        // 3.使用一个容器盛装每一个worker并发执行任务的结果集
        private ConcurrentHashMap<String, Object>   resultMap   = new ConcurrentHashMap<String, Object>();

        // 4.构造方法
        public Master(Worker worker, int workerCount) {
            // 每一个worker对象都需要有Master的引用workQueue用于任务的领取,resultMap用于任务的提交
            worker.setWorkerQueue(this.workQueue);
            worker.setResultMap(this.resultMap);
            for (int i = 0; i < workerCount; i++) {
                // key表示每一个worker的名字,value表示线程执行对象
                workers.put("子节点" + Integer.toString(i), new Thread(worker));
            }
        }

        // 5.提交方法
        public void submit(Task task) {
            this.workQueue.add(task);
        }

        // 6.需要有一个执行的方法,启动应用程序,让所有的worker工作
        public void execute() {
            for (Map.Entry<String, Thread> me : workers.entrySet()) {
                me.getValue().start();
            }
        }

        // 7.判断线程是否执行完毕
        public boolean isComplete() {
            for (Map.Entry<String, Thread> me : workers.entrySet()) {
                if (me.getValue().getState() != Thread.State.TERMINATED) {
                    return false;
                }
            }
            return true;
        }

        // 8.返回结果集数据
        public int getResult() {
            int ret = 0;
            for (Map.Entry<String, Object> me : resultMap.entrySet()) {
                ret += (Integer) me.getValue();
            }
            return ret;
        }
    }

    package demo5;

    import java.util.Random;

    public class Main {
        public static void main(String[] args) {
            System.out.println("我的机器可用processor数量:" + Runtime.getRuntime().availableProcessors());

            Master master = new Master(new MyWorker(), Runtime.getRuntime().availableProcessors());
            Random r = new Random();
            for (int i = 0; i <= 100; i++) {
                Task t = new Task();
                t.setId(i);
                t.setName("任务" + i);
                t.setPrice(r.nextInt(1000));
                master.submit(t);
            }
            master.execute();
            long start = System.currentTimeMillis();
            while (true) {
                if (master.isComplete()) {
                    long end = System.currentTimeMillis() - start;
                    int result = master.getResult();
                    System.out.println("最终结果:" + result + ", 执行耗时: " + end);
                    break;
                }
            }
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 173,536评论 25 708
  • 前文再续,就书接上一回,随着与Server、TCP、Protocol的邂逅,Swoole终于迎来了自己的故事,今天...
    蜗牛淋雨阅读 1,785评论 1 14
  • 关于黑夜 没有谁能够比白天更懂他 这是开始 也是白天辛苦孕育的胎芽 快乐挣扎 原来最美的一刻就是临盆的刹那
    候鸟飞呀阅读 188评论 0 4
  • 不知不觉已经坚持了长达36天写作了。虽然有很多篇文章无法令自己满意,可收获的是一个好习惯。21天形成习惯不是吹的。...
    Miss文小姐阅读 180评论 1 0
  • 也许大家已经看到了我和两位助教在一起的照片,曙东和现发两位助教专程为我赶来,还有我的EMO和几位学长,其原因就是因...
    卜芳阅读 200评论 2 5