多线程常见的四种同步工具类有:Semaphore信号量、CountDownLatch 闭锁、CyclicBarrier 栅栏、Exchanger 交换。
1. Semaphore 信号量
Semaphore 信号量,通过维护自身线程个数,并提供同步机制。使semaphore可以控制同时访问资源的线程个数。可以实现互斥锁的功能
与互斥锁的区别,互斥锁别的线程在拿到资源需要自己释放才能让其他线程获取资源,而semaphore对象是可以让另一个对象来释放锁,可以用于对死锁的恢复。
例子如下:
package ThreadPractice;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3); // 3栈信号灯
for(int i=0;i<10;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
sp.acquire(); // 要获取灯
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"进入,当前已有" + (3-sp.availablePermits()) + "个并发"); // 获得信号
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程" + Thread.currentThread().getName() +
"即将离开");
sp.release(); // 释放灯
System.out.println("线程" + Thread.currentThread().getName() +
"已离开,当前已有" + (3-sp.availablePermits()) + "个并发");
}
};
service.execute(runnable);
}
}
}
=======console=======
线程pool-1-thread-1进入,当前已有1个并发
线程pool-1-thread-2进入,当前已有2个并发
线程pool-1-thread-3进入,当前已有3个并发
线程pool-1-thread-2即将离开
线程pool-1-thread-2已离开,当前已有2个并发
线程pool-1-thread-4进入,当前已有3个并发
线程pool-1-thread-3即将离开
线程pool-1-thread-3已离开,当前已有2个并发
线程pool-1-thread-5进入,当前已有3个并发
线程pool-1-thread-4即将离开
每一个acquire()方法表示获取一个信号,当型号量等于规定值是则等待直到有信号被release()释放。也就是说semaphore是可以获取与释放值的。
semaphore的感觉有点像线程池,不过线程池是用来管理线程提高效率且实际工作线程是由线程池来创建的,而信号灯主要用来限制管理资源且需要自己手动创建线程,当信号灯不够时则被刮起,当有一个释放后就重新唤醒等待队列中的线程。这个等待队列内部又是基于AQS共享模式建立。
semaphore比如操场上有5个跑道,一个跑道一次只能有一个学生在上面跑步,一旦所有跑道在使用,那么后面的学生就需要等待,直到有一个学生不跑了
Semaphore与ReentrantLock一样,既可以实现公平锁也可以实现非公平锁。公平锁就是调用acquire获取信号的顺序遵循FIFO队列;而非公平锁则是抢占式的,通过抢占CPU资源实现。
2. CountDownLatch 闭锁
CountDownLatch 闭锁,内置计数器,每个线程计数,当线程都归零时,主线程才继续运行。即通过调用CountDownLatch对象的countDown方法就将计数器减1,当计数到达0时,则所有等待者或单个等待者开始执行。
package ThreadPractice;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CountdownLatchTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CountDownLatch cdOrder = new CountDownLatch(1);
final CountDownLatch cdAnswer = new CountDownLatch(3);
for(int i=0;i<3;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
System.out.println("线程" + Thread.currentThread().getName() +
"正准备接受命令");
cdOrder.await(); // 等待归零后就继续运行
System.out.println("线程" + Thread.currentThread().getName() +
"已接受命令");
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"回应命令处理结果");
cdAnswer.countDown(); // 将计数减1
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将发布命令");
cdOrder.countDown();
System.out.println("线程" + Thread.currentThread().getName() +
"已发送命令,正在等待结果");
cdAnswer.await();
System.out.println("线程" + Thread.currentThread().getName() +
"已收到所有响应结果");
} catch (Exception e) {
e.printStackTrace();
}
service.shutdown();
}
}
======console======
线程pool-1-thread-1正准备接受命令
线程pool-1-thread-3正准备接受命令
线程pool-1-thread-2正准备接受命令
线程main即将发布命令
线程main已发送命令,正在等待结果
线程pool-1-thread-1已接受命令
线程pool-1-thread-2已接受命令
线程pool-1-thread-3已接受命令
线程pool-1-thread-2回应命令处理结果
线程pool-1-thread-1回应命令处理结果
线程pool-1-thread-3回应命令处理结果
线程main已收到所有响应结果
3. CyclicBarrier 栅栏
CyclicBarrier 栅栏,cyclic会等待其他线程,此时当前线程会阻塞,计算阻塞的个数,当阻塞线程到了规定值之后,再继续放行。
package ThreadPractice;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CyclicBarrierTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final CyclicBarrier cb = new CyclicBarrier(3); // 约定3个等待
for(int i=0;i<3;i++){
Runnable runnable = new Runnable(){
public void run(){
try {
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "¸个已经到达" + (cb.getNumberWaiting()==2?"都到齐了,继续走":"正在等候"));
cb.await(); // 开始等待
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "¸个已经到达" + (cb.getNumberWaiting()==2?"都到齐了,继续走":"正在等候"));
cb.await();
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程" + Thread.currentThread().getName() +
"即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "¸个已经到达" + (cb.getNumberWaiting()==2?"都到齐了,继续走":"正在等候"));
cb.await();
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.execute(runnable);
}
service.shutdown();
}
}
=====console======
线程pool-1-thread-2即将到达集合地点1,当前已有1¸个已经到达正在等候
线程pool-1-thread-3即将到达集合地点1,当前已有2¸个已经到达正在等候
线程pool-1-thread-1即将到达集合地点1,当前已有3¸个已经到达都到齐了,继续走
线程pool-1-thread-1即将到达集合地点2,当前已有1¸个已经到达正在等候
线程pool-1-thread-2即将到达集合地点2,当前已有2¸个已经到达正在等候
问:上面讲的Semaphore与CyclicBarrier比较?
答:感觉semaphore与cyclicbarrier刚好相反,前者是一开始运行并获取信号,直到信号到达规定值后就阻塞,通过release()后再继续,而cyclicbarrier则是一开始await()就开始阻塞,直到到达规定值后栅栏才放行运行,两个刚好相反。
问:和之前讲的CyclicBarrier栅栏,也是等到大家到了一起出发,两者都是让线程等待其他线程,这与CountDownLatch有什么区别呢?
答:总体来说的区别:
1.CountDownLatch采用计数器减1,而CyclicBarrier栅栏则是加1到规定值后一起放行。
2.CountDownLatch通过调用await()阻塞通过countdown()减1,当归零的时候就恢复,这两个方法是分开的,也就是说调用countdown()减1之后线程并不会阻塞,而是接着执行任务,而CyclicBarrier只能通过await()开始阻塞直到阻塞到一定程度栅栏才开始放行。
3.CountDownLatch计数归零后就归零了,无法重置,故也就无法重复使用。而CyclicBarrier通过栅栏机制,只计算当前阻塞的个数是否到达目标值,未达到阻塞,满了则放行,是对值的比较,故是可以重复使用的。
4. Exchanger 交换
Exchanger 交换,实现两个线程之间的数据交换。一个线程等待另一个都到达了交换点就进行数据的交换。
package ThreadPractice;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExchangerTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Exchanger exchanger = new Exchanger();
service.execute(new Runnable(){
public void run() {
try {
String data1 = "苹果";
System.out.println("线程" + Thread.currentThread().getName() +
"正在把数据" + data1 +"换出去");
Thread.sleep((long)(Math.random()*10000)); // 休息时间不一样
String data2 = (String)exchanger.exchange(data1);
System.out.println("线程" + Thread.currentThread().getName() +
"换回的数据为" + data2);
}catch(Exception e){
}
}
});
service.execute(new Runnable(){
public void run() {
try {
String data1 = "钱";
System.out.println("线程" + Thread.currentThread().getName() +
"正在把数据" + data1 +"换出去");
Thread.sleep((long)(Math.random()*10000));
String data2 = (String)exchanger.exchange(data1);
System.out.println("线程" + Thread.currentThread().getName() +
"换回的数据为" + data2);
}catch(Exception e){
}
}
});
}
}
======console======
线程pool-1-thread-1正在把数据苹果换出去
线程pool-1-thread-2正在把数据钱换出去
线程pool-1-thread-1换回的数据为钱
线程pool-1-thread-2换回的数据为苹果