Master-Worker模式的实例代码:
【Master.class】
package com.jxb.thread11;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Master-Worker模式——并行计算模式
* Master 负责接收和分配任务
*/
public class Master {
//1、应该有一个 承装所有任务的容器
private ConcurrentLinkedQueue<Task> workQueue=new ConcurrentLinkedQueue<Task>();
//2、使用HashMap去承装所有的worker对象
private HashMap<String, Thread> workers=new HashMap<String, Thread>();
//3、使用一个容器承装每一个worker并行执行的任务的结果集
private ConcurrentHashMap<String, Object> resultMap=new ConcurrentHashMap<String, Object>();
/**
* 4、构造函数
* @param worker worker对象
* @param worlerCount 多少个worker对象(子节点)
*/
public Master(Worker worker,int worlerCount){
// 每一个worker对象都需要有master的引用,workQueue由于任务的领取
worker.setWorkerQueue(this.workQueue);
// resultMap 用于任务的提交
worker.setResultMap(this.resultMap);
//循环将worker装入workers容器
for(int i=0;i<worlerCount;i++){
//key表示每一个worker的名字(方便跟踪),value表示线程执行对象(worker对象,实现线程)
workers.put("子节点"+Integer.toString(i), new Thread(worker));
}
}
//5、提交方法(往任务队列里面装任务)
public void submit(Task task){
this.workQueue.add(task);
}
//6、执行方法(启动应用程序,让所有的worker工作)
public void execute(){
//循环Map的方式(启动workers容器 的 所有线程),让worker开始工作
for(Map.Entry<String, Thread> me:workers.entrySet()){
me.getValue().start();
}
}
//7、判断任务是否执行完毕(线程是否都停止)
public boolean isComplete() {
for(Map.Entry<String, Thread> me:workers.entrySet()){
//判断线程的状态,是否是停止(Thread.State.TERMINATED)
if(me.getValue().getState()!=Thread.State.TERMINATED){
return false; //只要有一个线程的状态不是停止,就返回false
}
}
return true;
}
//8、返回结果集
public int getResult() {
int ret=0;
for (Map.Entry<String, Object> result : resultMap.entrySet()) {
//汇总结果集的逻辑(不一定是++)
ret+=(Integer)result.getValue();
}
return ret;
}
}
【Worker.class】
package com.jxb.thread11;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
*Master-Worker模式——并行计算模式
* Worker 负责处理子任务
*/
public class Worker implements Runnable {
//接收引用
private ConcurrentLinkedQueue<Task> workQueue;
private ConcurrentHashMap<String, Object> resultMap;
/**
* 引用 Master里面的workerQueue,方便领取任务(领取一个少一个)
* @param workQueue
*/
public void setWorkerQueue(ConcurrentLinkedQueue<Task> workQueue) {
this.workQueue=workQueue;
}
/**
* 引用 Master里面的resultMap,方便将每个worker里卖弄处理的结果,返回给Master的结果集容器里面
* @param resultMap
*/
public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap=resultMap;
}
/**
* 处理一个一个的任务
*/
@Override
public void run() {
while(true){
Task input=this.workQueue.poll(); //从队列中取出任务并移除
if(input==null){ //当没有任务时,退出循环
break;
}
/**
* 真正的去做业务处理
*/
//Object output=handler(input);
Object output=MyWorker.handler(input); //优化后的代码01
//将任务的返回结果 放入结果集容器里面(任务编号/任务名称,任务处理的结果)
this.resultMap.put(Integer.toString(input.getId()), output);
}
}
//------------------------------------代码优化01(将具体的任务实现封装出去)MyWorker,去重写-------------------------------------
/**
* 具体的任务
* @param input 任务
* @return 返回处理的结果
*/
/*private Object handler(Task input) {
Object output=null;
try {
//表示处理task任务的耗时,可能是数据的加工也可能是操作数据库
Thread.sleep(500);
output=input.getPrivce();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
}*/
public static Object handler(Task input){
return null;
}
}
【Main.class】
package com.jxb.thread11;
import java.util.Random;
/**
* Master-Worker 模式的测试类
*/
public class Main {
public static void main(String[] args) {
/**
* (实例化一个worker,需要几个子任务(进程))
*/
//Master master=new Master(new Worker(), 10);
//Master master=new Master(new MyWorker(), 10); //代码优化01
System.out.println("当前机器可用的线程数:"+Runtime.getRuntime().availableProcessors());
//线程数不能随便加,应该根据自己的电脑性能来
Master master=new Master(new MyWorker(), Runtime.getRuntime().availableProcessors()); //代码优化02
// 100个任务
Random random=new Random();
for (int i = 1; i <= 100; i++) {
Task task=new Task();
task.setId(i);
task.setName("任务"+i);
task.setPrivce(random.nextInt(1000));
master.submit(task); //提交任务
}
master.execute(); //启动任务
long start=System.currentTimeMillis();
//判断所有任务是否执行完成(所有的线程都执行完成了)
while(true){
if(master.isComplete()){
long end=System.currentTimeMillis()-start;
int ret= master.getResult();
System.out.println("汇总结果集最后的值:"+ret+" ,执行耗时"+end);
break;
}
}
}
}
【Task.class】
package com.jxb.thread11;
/**
* 任务
*/
public class Task {
private int id; //编号
private String name; //任务名称
private int privce; //价格
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getPrivce() {
return privce;
}
public void setPrivce(int privce) {
this.privce = privce;
}
}
【MyWorker.class】
package com.jxb.thread11;
/**
* 具体需要完成的任务类,可以多个
*/
public class MyWorker extends Worker{
/**
* 重写handler()——具体的任务
* @param input 任务
* @return 返回处理的结果
*/
public static Object handler(Task input) {
Object output=null;
try {
//表示处理task任务的耗时,可能是数据的加工也可能是操作数据库
Thread.sleep(500);
output=input.getPrivce();
} catch (InterruptedException e) {
e.printStackTrace();
}
return output;
}
}
【效果截图】