1 Fork-Join
1.1 Fork/Join 体现了“分而治之”
什么是分而治之?
规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解
1.2 工作秘取(WorkStealing)
有A、B两个线程,A线程任务已经做完,而B线程任务还没有做完,这时A线程可以从B线程所属的任务中偷取任务来做,这种工作方式可提升资源利用率,提高性能。
1.3 Fork/Join实战
1.3.1 使用的范式
其中RecursiveTask、RecursiveAction、ForkJoinTask都为抽象类,我们自己的任务类要根据业务需要继承并实现这些类,并必须实现compute()方法
那么这些类有什么不同呢?
- ForkJoinTask:实现接口 public abstract class ForkJoinTask<V> implements Future<V>, Serializable
- RecursiveTask : 任务有返回值
继承public abstract class RecursiveTask<V> extends ForkJoinTask<V> - RecursiveAction:任务没有返回值
继承 public abstract class RecursiveAction extends ForkJoinTask<Void>
compute()方法逻辑
1.3.2 实战案例
1.3.2.1 Fork/Join的同步用法同时演示返回结果值:统计整形数组中所有元素的和
任务类
private static class SumTask extends RecursiveTask<Integer>{
private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;
private int[] src; //表示我们要实际统计的数组
private int fromIndex;//开始统计的下标
private int toIndex;//统计到哪里结束的下标
public SumTask(int[] src, int fromIndex, int toIndex) {
this.src = src;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}
@Override
protected Integer compute() {
if(toIndex-fromIndex < THRESHOLD) {//若计算的范围小于指定阈值,则直接进行业务累加
int count = 0;
for(int i=fromIndex;i<=toIndex;i++) {
//SleepTools.ms(1);
count = count + src[i];
}
return count;
}else {//若计算的范围大于指定阈值,则将该范围拆分,并分别分配任务,采用递归的方式继续判断求和
//fromIndex....mid....toIndex
//1...................70....100
int mid = (fromIndex+toIndex)/2;
SumTask left = new SumTask(src,fromIndex,mid);
SumTask right = new SumTask(src,mid+1,toIndex);
invokeAll(left,right);
return left.join()+right.join();
}
}
}
调用类
ForkJoinPool pool = new ForkJoinPool();
int[] src = MakeArray.makeArray();
SumTask innerFind = new SumTask(src,0,src.length-1);
long start = System.currentTimeMillis();
pool.invoke(innerFind);//同步调用
System.out.println("Task is Running.....");
System.out.println("The count is "+innerFind.join()
+" spend time:"+(System.currentTimeMillis()-start)+"ms");
1.3.2.2 Fork/Join的异步用法同时演示不要求返回值:遍历指定目录(含子目录)寻找指定类型文件
任务类
public class FindDirsFiles extends RecursiveAction{
private File path;//当前任务需要搜寻的目录
public FindDirsFiles(File path) {
this.path = path;
}
@Override
protected void compute() {
List<FindDirsFiles> subTasks = new ArrayList<>();
File[] files = path.listFiles();
if(files!=null) {
for(File file:files) {
if(file.isDirectory()) {//如果是文件夹,则继续调用任务,递归处理
subTasks.add(new FindDirsFiles(file));
}else {//如果是文件,则直接进行业务逻辑
//遇到文件,检查
if(file.getAbsolutePath().endsWith("txt")) {
System.out.println("文件:"+file.getAbsolutePath());
}
}
}
if(!subTasks.isEmpty()) {//如果当前任务中包含子任务,则调用子任务join()方法,等待子任务结束当前任务方可继续执行
for(FindDirsFiles subTask:invokeAll(subTasks)) {
subTask.join();//等待子任务执行完成
}
}
}
}
}
调用类
public static void main(String [] args){
try {
// 用一个 ForkJoinPool 实例调度总任务
ForkJoinPool pool = new ForkJoinPool();
FindDirsFiles task = new FindDirsFiles(new File("F:/"));
pool.execute(task);//异步调用
System.out.println("Task is Running......");
Thread.sleep(1);
int otherWork = 0;
for(int i=0;i<100;i++){
otherWork = otherWork+i;
}
System.out.println("Main Thread done sth......,otherWork="+otherWork);
task.join();//阻塞的方法,保证主线程等待我们的异步任务线程,防止主线程提前结束
System.out.println("Task end");
} catch (Exception e) {
e.printStackTrace();
}
}
2 常用的并发工具类
2.1 CountDownLatch
CountDownLatch是一个非常实用的多线程控制工具类。常用的就下面几个方法:
- CountDownLatch(int count) //实例化一个倒计数器,count指定计数个数
- countDown() // 计数减一
- await() //等待,当计数减到0时,所有线程并行执行
2.1.1 作用
是一组线程等待其他的线程完成工作以后在执行,加强版join
2.1.2 应用场景
2.1.3 实战
public class CountDownLatchDemo implements Runnable{
static final CountDownLatch latch = new CountDownLatch(10);//构造方法传入计数器的值
static final CountDownLatchDemo demo = new CountDownLatchDemo();
@Override
public void run() {
// 模拟业务逻辑,业务执行完调用latch.countDown();
try {
Thread.sleep(new Random().nextInt(10) * 1000);
System.out.println("check complete");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//计数减一
//放在finally避免任务执行过程出现异常,导致countDown()不能被执行
latch.countDown();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newFixedThreadPool(10);//创建固定大小的线程池
for (int i=0; i<10; i++){
exec.submit(demo);
}
// 等待10个业务扣减完毕
latch.await();
// 主线程继续执行
System.out.println("扣减完毕,主线程可继续执行");
// 关闭线程池
exec.shutdown();
}
}
2.2 CyclicBarrier
2.2.1 作用
让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行
CyclicBarrier(可重用屏障/栅栏) 类似于 [CountDownLatch](倒计数闭锁),它能阻塞一组线程直到某个事件的发生。
* 与闭锁的关键区别在于,所有的线程必须同时到达屏障位置,才能继续执行。
* 闭锁用于等待事件,而屏障用于等待其他线程。
* CyclicBarrier 可以使一定数量的线程反复地在屏障位置处汇集。当线程到达屏障位置时将调用 await()
方法,这个方法将阻塞直到所有线程都到达屏障位置。如果所有线程都到达屏障位置,那么屏障将打开,此时所有的线程都将被释放,而屏障将被重置以便下次使用。
CyclicBarrier 是 JDK 1.5 的 java.util.concurrent 并发包中提供的一个并发工具类。
* 所谓 Cyclic 即循环的意思,所谓 Barrier 即屏障的意思。
* CyclicBarrier 是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点。
* 在程序中有固定数量的线程,这些线程有时候必须等待彼此,这种情况下,使用 CyclicBarrier 很有帮助。
* 这个屏障之所以用循环修饰,是因为在所有的线程释放彼此之后,这个屏障是可以 重新使用 的。
注:CyclicBarrier在本文只做简单理解,有一些借鉴自这位博主的文章,CyclicBarrier他的这篇文章讲解比较全面https://www.jianshu.com/p/9262361a1200
2.2.2 应用场景
CyclicBarrier 常用于多线程分组计算。
比如一个大型的任务,常常需要分配好多子任务去执行,只有当所有子任务都执行完成时候,才能执行主任务,这时候,就可以选择 CyclicBarrier。
2.2.3 CyclicBarrier 方法说明
CyclicBarrier(parties) 方法
- 初始化相互等待的线程数量的构造方法。
CyclicBarrier(parties,Runnable barrierAction) 方法
- 初始化相互等待的线程数量以及屏障线程的构造方法。
- 屏障线程的运行时机:等待的线程数量 =parties 之后,CyclicBarrier 打开屏障之前。
例如在分组计算中,每个线程负责一部分计算,最终这些线程计算结束之后,交由屏障线程进行汇总计算。
getParties 方法
- 获取 CyclicBarrier 打开屏障的线程数量。
getNumberWaiting 方法
- 获取正在 CyclicBarrier 上等待的线程数量。
await 方法
在 CyclicBarrier 上进行阻塞等待,直到发生以下情形之一。
- 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
- 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
- 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 线程调用 await() 表示自己已经到达栅栏。
- BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程
- await() 时被中断或者超时。
await(timeout,TimeUnit) 方法
在 CyclicBarrier 上进行限时的阻塞等待,直到发生以下情形之一。
- 在 CyclicBarrier 上等待的线程数量达到 parties,则所有线程被释放,继续执行。
- 当前线程被中断,则抛出 InterruptedException 异常,并停止等待,继续执行。
- 当前线程等待超时,则抛出 TimeoutException 异常,并停止等待,继续执行。
- 其他等待的线程被中断,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他等待的线程超时,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
- 其他线程调用 CyclicBarrier.reset() 方法,则当前线程抛出 BrokenBarrierException 异常,并停止等待,继续执行。
isBroken 方法
获取是否破损标志位 broken 的值,此值有以下几种情况。
- CyclicBarrier 初始化时,broken=false,表示屏障未破损。
- 如果正在等待的线程被中断,则 broken=true,表示屏障破损。
- 如果正在等待的线程超时,则 broken=true,表示屏障破损。
- 如果有线程调用 CyclicBarrier.reset() 方法,则 broken=false,表示屏障回到未破损状态。
reset 方法
使 CyclicBarrier 回归初始状态,它做了两件事。
- 如果有正在等待的线程,则会抛出 BrokenBarrierException 异常,且这些线程停止等待,继续执行。
将是否破损标志位 broken 置为 false。
2.2.3 实战
public class CyclicBarrierDemo {
// 自定义工作线程
private static class Cyclic extends Thread {
private CyclicBarrier cyclicBarrier;
public Cyclic(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
super.run();
try {
cyclicBarrier.await();//调用等待方法,等待其他线程到达屏障
System.out.println(Thread.currentThread().getName() + "等待完毕,开始执行");
// 模拟业务
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "执行完毕");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
for (int i = 0; i < threadCount; i++) {
System.out.println("创建工作线程" + i);
Cyclic cyclic = new Cyclic(cyclicBarrier);
cyclic.start();
}
}
}
2.3 CountDownLatch和CyclicBarrier辨析
- countdownlatch放行由第三者控制,CyclicBarrier放行由一组线程本身控制
- countdownlatch放行条件》=线程数,CyclicBarrier放行条件=线程数
- CountDownLatch 是一个线程(或者多个),等待另外 N 个线程完成某个事情之后才能执行;CyclicBarrier 是 N 个线程相互等待,任何一个线程完成之前,所有的线程都必须等待。
- CountDownLatch 的计数器只能使用一次。而 CyclicBarrier 的计数器可以使用 reset() 方法重置;
- CyclicBarrier 能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
- CountDownLatch 采用减计数方式;CyclicBarrier 采用加计数方式。
2.4 Semaphore
以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。这个停车系统中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程。假如里面依然是三个车位,但是看门人改变了规则,要求每次只能停两辆车,那么一开始进入两辆车,后面得等到有车离开才能有车进入,但是得保证最多停两辆车。对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。
Semaphore主要用于控制当前活动线程数目,就如同停车场系统一般,而Semaphore则相当于看守的人,用于控制总共允许停车的停车位的个数,而对于每辆车来说就如同一个线程,线程需要通过acquire()方法获取许可,而release()释放许可。如果许可数达到最大活动数,那么调用acquire()之后,便进入等待队列,等待已获得许可的线程释放许可,从而使得多线程能够合理的运行。
2.4.1 作用
Semaphore主要方法:
Semaphore(int permits):
构造方法,创建具有给定许可数的计数信号量并设置为非公平信号量。
Semaphore(int permits,boolean fair):
构造方法,当fair等于true时,创建具有给定许可数的计数信号量并设置为公平信号量。
void acquire():
从此信号量获取一个许可前线程将一直阻塞。相当于一辆车占了一个车位。
void acquire(int n):
从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。比如n=2,就相当于一辆车占了两个车位。
void release():
释放一个许可,将其返回给信号量。就如同车开走返回一个车位。
void release(int n):
释放n个许可。
int availablePermits():
当前可用的许可数。
getQueueLength():
等待线程队列长度
2.4.2 应用场景
2.4.3 实战
public class SemaphoreDemo {
private static final Semaphore semaphore=new Semaphore(3);
private static final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
private static class InformationThread extends Thread{
private final String name;
private final int age;
public InformationThread(String name,int age)
{
this.name=name;
this.age=age;
}
public void run()
{
try
{
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+":大家好,我是"+name+"我今年"+age+"岁当前时间为:"+System.currentTimeMillis());
Thread.sleep(1000);
System.out.println(name+"要准备释放许可证了,当前时间为:"+System.currentTimeMillis());
System.out.println("当前可使用的许可数为:"+semaphore.availablePermits());
semaphore.release();
}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
public static void main(String[] args)
{
String[] name= {"李明","王五","张杰","王强","赵二","李四","张三"};
int[] age= {26,27,33,45,19,23,41};
for(int i=0;i<7;i++)
{
Thread t1=new InformationThread(name[i],age[i]);
threadPool.execute(t1);
}
}
2.5 Exchange
2.5.1 作用
Exchanger 是 JDK 1.5 开始提供的一个用于两个工作线程之间交换数据的封装工具类,简单说就是一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据。其定义为 Exchanger<V> 泛型类型,其中 V 表示可交换的数据类型,对外提供的接口很简单,具体如下:
Exchanger():无参构造方法。
V exchange(V v):等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。
V exchange(V v, long timeout, TimeUnit unit):等待另一个线程到达此交换点(除非当前线程被中断或超出了指定的等待时间),然后将给定的对象传送给该线程,并接收该线程的对象。
可以看出,当一个线程到达 exchange 调用点时,如果其他线程此前已经调用了此方法,则其他线程会被调度唤醒并与之进行对象交换,然后各自返回;如果其他线程还没到达交换点,则当前线程会被挂起,直至其他线程到达才会完成交换并正常返回,或者当前线程被中断或超时返回。
2.5.2 应用场景
2.5.3 实战
public class UseExchange {
private static final Exchanger<String> exchange
= new Exchanger<String>();
public static void main(String[] args) {
System.out.println("ssssssssssss");
//第一个线程
new Thread(new Runnable() {
@Override
public void run() {
Set<String> setA = new HashSet<String>();//存放数据的容器
String dataA = "111";
try {
/*添加数据
* set.add(.....)
* */
System.out.println("数据交换前:"+dataA);
dataA = exchange.exchange(dataA);//交换data
System.out.println("数据交换前:"+dataA);
/*处理交换后的数据*/
} catch (InterruptedException e) {
}
}
}).start();
//第二个线程
new Thread(new Runnable() {
@Override
public void run() {
Set<String> setB = new HashSet<String>();//存放数据的容器
String dataB = "222";
try {
/*添加数据
* set.add(.....)
* set.add(.....)
* */
System.out.println("数据交换前:"+dataB);
dataB = exchange.exchange(dataB);//交换data
System.out.println("数据交换前:"+dataB);
/*处理交换后的数据*/
} catch (InterruptedException e) {
}
}
}).start();
}
}
2.6 Callable、Future和FutureTask
2.6.1 继承结构图
2.6.2 Future接口常用方法
V get();阻塞方法,获取Callable的返回值
V get(long,Timeunit);设定超时时间和超时操作
isDone(),结束,正常还是异常结束,或者自己取消,返回true;
isCancelled() 任务完成前被取消,返回true;
cancel(boolean):
- 任务还没开始,返回false
- 任务已经启动,cancel(true),中断正在运行的任务,中断成功,返回true,cancel(false),不会去中断已经运行的任务
- 任务已经结束,返回false
2.6.3 实战案例
public class UseFuture {
/*实现Callable接口,允许有返回值*/
private static class UseCallable implements Callable<Integer>{
private int sum;
@Override
public Integer call() throws Exception {
System.out.println("Callable子线程开始计算");
Thread.sleep(2000);
for(int i=0;i<5000;i++) {
sum = sum+i;
}
System.out.println("Callable子线程计算完成,结果="+sum);
return sum;
}
}
public static void main(String[] args)
throws InterruptedException, ExecutionException {
UseCallable useCallable = new UseCallable();//创建自定义的Callable
FutureTask<Integer> futureTask = new FutureTask<Integer>(useCallable);//新建FutureTask
new Thread(futureTask).start();//将FutureTask通过线程启动
Random r = new Random();
SleepTools.second(1);
if(r.nextBoolean()) {//随机决定是获得结果还是终止任务
System.out.println("Get UseCallable result = "+futureTask.get());//阻塞获取Callable中call()方法的返回值
}else {
System.out.println("中断计算");
futureTask.cancel(true);
}
}
}
2.6.4 应用场景
包含图片和文字的文档的处理:图片(云上),可以用future去取图片,主线程继续解析文字。