对于多核CPU,传统的串行程序已经无法很好的发挥CPU的性能。这就需要通过多线程的方式挖掘CPU的潜能
1、并行程序设计模式
并行设计模式是对一些常用的多线程结构的总结和抽象
1.1 Future模式
核心是去除了主线程中的等待时间,在这个等待时间内可以去处理其他的业务逻辑
下面通过自实现的简单函数来模拟Future的原理
Data接口
package XingNeng.future.test;
/**
*Data
*/
public interface Data {
/**
* String
* @return String
*/
String getResult();
}
FutureData类
package XingNeng.future.test;
public class FutureData implements Data {
protected RealData realdata = null;
protected boolean isReady = false;
/**
* @param realdata
*/
public synchronized void setRealData(RealData realdata) {
if (isReady) {
return;
}
this.realdata = realdata;
isReady = true;
notifyAll();
}
/**
* @return
*/
public synchronized String getResult() {
while (!isReady) {
System.out.println("isReady " + isReady);
try {
wait();
} catch (InterruptedException e) {
}
}
return realdata.result;
}
}
RealData 类
package XingNeng.future.test;
public class RealData implements Data {
protected final String result;
/**
* @param para
*/
public RealData(String para) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append(para);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
result = sb.toString();
}
public String getResult() {
return result;
}
}
Client的实现
package XingNeng.future.test;
public class Client {
public Data request(final String queryStr) {
final FutureData future = new FutureData();
new Thread() {
public void run() {
RealData realdata = new RealData(queryStr);
future.setRealData(realdata);
}
}.start();
return future;
}
}
Main的实现
package XingNeng.future.test;
public class Main {
public static void main(String[] args) {
Client client = new Client();
Data data = client.request("a");
System.out.println("发送请求成功");
long start = System.currentTimeMillis();
System.out.println("尝试获取真实数据 " + data.getResult());
long end = System.currentTimeMillis();
System.out.println(end - start);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("真实数据 " + data.getResult());
}
}
输出结果
1.2 Master-Worker模式
Master-Worker模式是将串行任务并行化的方法,被分解的子任务在系统中并行处理。如果需要,Master可以不等到所有worker完成就可以返回结果。
Master进程为主要进程,维护了worker线程队列、任务队列及子结果集。
简单示例代码:
Worker
package XingNeng.materwrk;
import java.util.Map;
import java.util.Queue;
public class Worker implements Runnable {
protected Queue<Object> workQueue;
protected Map<String, Object> resultMap;
public void setWorkQueue(Queue<Object> workQueue) {
this.workQueue = workQueue;
}
public void setResultMap(Map<String, Object> resultMap) {
this.resultMap = resultMap;
}
public Object handle(Object input) {
return input;
}
@Override
public void run() {
while (true) {
Object input = workQueue.poll();
if (input == null) { break; }
Object re = handle(input);
resultMap.put(Integer.toString(input.hashCode()), re);
}
}
}
Master
package XingNeng.materwrk;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Master {
protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>();
protected Map<String, Thread> threadMap = new HashMap<String, Thread>();
protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
public boolean isComplete() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
if (entry.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
public Master(Worker worker, int countWorker) {
worker.setWorkQueue(workQueue);
worker.setResultMap(resultMap);
for (int i = 0; i < countWorker; i++) {
threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
}
}
public void submit(Object job) {
workQueue.add(job);
}
public Map<String, Object> getResultMap() {
return resultMap;
}
public void execute() {
for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {
entry.getValue().start();
}
}
}
测试代码
package XingNeng.materwrk;
import java.util.Map;
import java.util.Set;
import org.junit.Test;
public class TestMasterWorker {
public class PlusWorker extends Worker {
public Object handle(Object input) {
Integer i = (Integer)input;
return i * i * i;
}
}
@Test
public void testMasterWorker() {
Master m = new Master(new PlusWorker(), 5);
for (int i = 0; i < 100; i++) { m.submit(i); }
m.execute();
int re = 0;
Map<String, Object> resultMap = m.getResultMap();
while (resultMap.size() > 0 || !m.isComplete()) {
Set<String> keys = resultMap.keySet();
String key = null;
for (String k : keys) {
key = k;
break;
}
Integer i = null;
if (key != null) { i = (Integer)resultMap.get(key); }
if (i != null) { re += i; }
if (key != null) { resultMap.remove(key); }
}
System.out.println("testMasterWorker:" + re);
}
@Test
public void testPlus() {
int re = 0;
for (int i = 0; i < 100; i++) {
re += i * i * i;
}
System.out.println("testPlus:" + re);
}
}
1.3 不变模式
- 使用不变对象,在多线程环境中,不需要同步操作或是加锁,来提高系统性能。
- 不变模式的使用场景需要满足下面两个条件:当对象创建后,其内部状态和数据不再发生任何变化;对象需要被共享、被多线程频繁访问。
java中如何实现不变对象:
- 去除set及所有修改自身属性的方法
- 属性为
private final
- 类声明为 final,保证没有子类
- 有一个可以完成创建对象的构造函数
- 例子
public final class Product {
private final String no;
private final String name;
private final double price;
public Product(String no, String name, double price) {
super();
this.no = no;
this.name = name;
this.price = price;
}
public String getNo() {
return no;
}
public String getName() {
return name;
}
public double getPrice() {
return price;
}
}
1.4 生产者消费者模式
生产者和消费者通过共享缓冲队列来平衡彼此的处理速度,可以缓解彼此的性能差。
1.5 Guarded Suspension模式
核心思想是:仅当服务进程准备好服务时才提供服务。假设当请求过多的时候,服务端可能来不及处理,每个请求又都不能丢弃,最佳的方案就是让客户请求进行排队处理。
2、JDK多任务执行框架
- 核心思想是线程池技术
- 优化线程池大小
《java 并发编程实践》给出一个估算线程池大小的公式
Ncpu = CPU的数量
Ucpu = 目标CPU的使用率 0<=Ucpu<=1
W/C = 等待时间与计算时间的比率
最优的线程池大小为:** Nthreads = Ncpu * Ucpu * (1 + W/C)**