在Java并行程序基础知识一文中我们提到使用syschronized关键字做同步控制来决定一个线程是否可以访问临界区资源,同时使用线程等待Object.wait(),线程通知Object.notify()做多线程间的协作。这一节我们主要学习syschronized,Object.wait(),Object.notify()的增强版——重入锁。
1.1 syschronized的功能扩展:重入锁
- 重入锁加锁和释放锁
重入锁上使用java.util.concurrent.locks.ReentrantLock
类来实现
public class Demo implements Runnable{
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;
static Demo demo = new Demo();
public static void main(String[] arg) throws InterruptedException {
Thread t1 = new Thread(demo);
Thread t2 = new Thread(demo);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.print(i);
}
@Override
public void run() {
for(int j = 0;j < 10000;j++){
//加锁
lock.lock();
//lock.lock();
try{
i++;
}finally {
//释放锁
lock.unlock();
//lock.unlock();
}
}
}
}
注意:
- 重入锁相对syschronized对逻辑的控制要好很多,但必须注意,加了锁之后,在退出临界区的时候必须释放锁,否则其它线程就没有机会再访问临界区了;
- 之所以叫重入锁是因为一个线程可以连续两次获得同一把锁,但是在退出临界区的时候要释放同样次数的锁
- 重入锁中断响应
对于syschronized,如果一个线程在等待锁,那么有两种结果:获得锁继续执行;保持等待。而使用重入锁,提供了另外的一种可能,那就是线程可以被中断。
public class Demo implements Runnable{
public static ReentrantLock lock1 = new ReentrantLock();
public static ReentrantLock lock2 = new ReentrantLock();
int lock;
/**
* 方便构建死锁
* @param lock
*/
public Demo(int lock) {
this.lock = lock;
}
public static void main(String[] arg) throws InterruptedException {
Demo demo1 = new Demo(1);
Demo demo2 = new Demo(2);
Thread t1 = new Thread(demo1);
Thread t2 = new Thread(demo2);
t1.start();
t2.start();
Thread.sleep(1000);
//中断线程2
t2.interrupt();
}
@Override
public void run() {
try{
if(lock == 1){
//可以对中断进行响应的锁申请操作
lock1.lockInterruptibly();
Thread.sleep(300);
lock2.lockInterruptibly();
}else{
lock2.lockInterruptibly();
Thread.sleep(300);
lock1.lockInterruptibly();
}
}catch (InterruptedException e){
e.printStackTrace();
}finally {
if(lock1.isHeldByCurrentThread()){
lock1.unlock();
}
if (lock2.isHeldByCurrentThread()){
lock2.unlock();
}
}
System.out.println("线程退出!")
}
}
上面的例子构建了一个死锁,t1和t2互相等待,当t2中断,线程1和2全部退出,1完成任务,2放弃任务。需要特别注意的使用lockInterruptibly()申请锁。
- 重入锁申请等待限时和公平锁
- 申请等待限时ReentrantLock.tryLock()方法接受两个参数,第一个等待时长,第二个计时单位tryLock(5,TimeUnit.SECONDS),表示等待5秒,如果超过5秒返回false;这个方法也支持不传入参数,表示不进行等待,立即返回申请结果;
- 在大数情况下,锁的申请是不公平的,不公平锁经常会出现饥饿现象,重入锁ReentrantLock支持公平锁,当构造方法
ReentrantLock(boolean fair)
传入true表示锁是公平的,但是维护公平锁必然要维护一个有序的队列,所以公平锁成本很高。
1.2 重入锁的好搭档:Condition条件
Condition条件其实和Object.wait(),Object.notify()的作用大致相同,它是配合重入锁使用的。Condition接口提供如下方法:
// 造成当前线程在接到信号或被中断之前一直处于等待状态。
void await()
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
// 造成当前线程在接到信号之前一直处于等待状态。
void awaitUninterruptibly()
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
boolean awaitUntil(Date deadline)
// 唤醒一个等待线程。
void signal()
// 唤醒所有等待线程。
void signalAll()
示例如下:
public class ReenterLockCondition implements Runnable {
public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
@Override
public void run() {
try {
lock.lock();
condition.await();
System.out.println("Thread is going on");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] arg) throws InterruptedException {
ReenterLockCondition t1 = new ReenterLockCondition();
Thread t1 = new Thread(t1);
t1.start();
Thread.sleep(1000);
//通知线程1继续执行
lock.lock();
condition.signal();
lock.unlock();
}
}
1.3 信号量(Semaphore)
信号量可以容许多个线程同时访问一个资源,其主要的构造方法如下:
//信号量准入数
public Senaphore(int permits)
//第二个参数表示是否公平
public Senaphore(int permits,boolean fair)
主要逻辑方法如下:
//获取一个许可证(响应中断),在没有可用的许可证时当前线程被阻塞。
public void acquire() throws InterruptedException
//获取一个许可证(不响应中断)
public void acquireUninterruptibly()
//尝试获取许可证(非公平获取),立即返回结果(非阻塞)。
public boolean tryAcquire()
//尝试获取许可证(定时获取)
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
// 释放许可证(获得许可之后一定要释放,防止信号量泄露)
public void release()
1.4 读写锁
使用重入锁或者内部锁,理论上所有的读写之间都是串行操作,但是读操作不会对数据造成整体的破坏,所以这种等待不合理,这种情况可以使用读写锁ReadWriteLock,器约束情况如下:
- 读-读不互斥:读读之间不阻塞
- 读-写互斥:读阻塞写,写阻塞读
- 写-写互斥:写写互斥
Java并发包中ReadWriteLock是一个接口,主要有两个方法,如下:
public interface ReadWriteLock {
/**
* 返回读锁
*/
Lock readLock();
/**
* 返回写锁
*/
Lock writeLock();
}
示例如下:
public class ReentratReadWriteLockDemo {
public static void main(String[] args) {
News news = new News();
//read
for(int n = 0; n < 3; n++){
new Thread(new Runnable() {
@Override
public void run() {
String pre = "";
while(true){
String s = news.getLast();
if(s == null)
continue;
if(!s.equals(pre)) {
pre = s;
System.out.println(Thread.currentThread().getName() + " get the last news : " + s);
if(Integer.parseInt(s) == 9)
break;
}
}
}
}, "read thread" + n).start();
}
//write
new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 10; i++){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
news.add(i + "");
}
}
}).start();
}
static class News {
private final List<String> newsList = new ArrayList<>();
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private Lock readLock = lock.readLock();
private Lock writeLock = lock.writeLock();
public String getLast(){
readLock.lock();
try{
if(newsList.size() == 0)
return null;
return newsList.get(newsList.size() - 1);
}
finally {
readLock.unlock();
}
}
public void add(String news) {
writeLock.lock();
try{
newsList.add(news);
System.out.println("add a news:" + news);
}
finally {
writeLock.unlock();
}
}
}
}
1.5 倒计时器CountDownLatch
CountDownLatch是一个多线程控制工具类。通常用来控制线程等待,它可以让一个线程一直等待知道计时结束才开始执行,其构造参数如下:
//count 计数器个数
public CountDownLatch(int count)
示例如下:
public class CountDownLatchDemo implements Runnable{
private static final CountDownLatch end = new CountDownLatch(10);
@Override
public void run() {
System.out.println("线程名称:" + Thread.currentThread().getName());
//数据业务处理
end.countDown();//倒计时器计数减1
}
public static void main(String[] args) {
CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo();
System.out.println("计数开始");
for (int i = 0; i < 10; i++) {
new Thread(countDownLatchDemo).start();
}
try {
end.await();
System.out.println("计数结束");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
1.6 循环栅栏CyclicBarrier
CyclicBarrier也是一种多线程并发控制的工具。相比CountDownLatch,CyclicBarrier功能更加强大。其构造参数如下:
public CyclicBarrier(int parties, Runnable barrierAction)
注意:
- CyclicBarrier的构造方法可以传入一个Runnable的barrierAction,可用于线程集结完毕后做一件特定的事情
- CyclicBarrier可以重复使用,当一批线程凑满parties个是,计数器会归零,重新开始计数
示例如下:
public class CyclicBarrierDemo {
public static class Soldier implements Runnable{
private final CyclicBarrier cyclicBarrier;
Soldier(CyclicBarrier cyclicBarrier){
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("士兵 " + Thread.currentThread().getId() + " 报道");
try {
//等待所有士兵到齐
cyclicBarrier.await();
//执行任务
doWork();
//等待所有士兵完成工作
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
void doWork() throws InterruptedException {
Thread.sleep(1000);
System.out.println("士兵" + Thread.currentThread().getId()+ " 任务完成");
}
}
public static class Commond implements Runnable{
@Override
public void run() {
System.out.println("任务结束");
}
}
public static void main(String[] args) {
CyclicBarrierDemo cyclicBarrierDemo = new CyclicBarrierDemo();
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, cyclicBarrierDemo.new Commond());
for (int i = 0; i < 20; i++) {
new Thread(cyclicBarrierDemo.new Soldier(cyclicBarrier)).start();
}
}
}
1.7 线程阻塞工具类:LockSupport
LockSupprot是以个方便实用的线程阻塞工具,它可以在线程内任意位置让线程阻塞。和Thread.suspend()相比,它弥补了由于resume()在前发生,导致线程无法继续执行的情况。和Object.wait()对比,它不需要先获得某个对象的锁,也不会抛出InterruptException异常。LockSupport的静态方法park()可以阻塞当前线程,类似的parkNanos()和parkUntil()等方法,它们实现了一个限时的等待。
public classTestLockSupport {
class TestRunnable implements Runnable{
Object u = new Object();
@Override
public void run() {
synchronized(u){
System.out.println(Thread.currentThread().getName()+" 阻塞了!");
LockSupport.park();
System.out.println(Thread.currentThread().getName()+" 动次打次!");
}
}
}
public void test() throws InterruptedException{
TestRunnable r = new TestRunnable();
Thread thread1= newThread(r,"线程1");
Thread thread2 = newThread(r,"线程2");
thread1.start();
Thread.sleep(1000);
thread2.start();
LockSupport.unpark(thread1);
LockSupport.unpark(thread2);
}
}
上代码依然无法保证unpark()发生在park()之后,但测试多次可以发现,上代码自始至终都可以正常结束,不会因为park()方法而导致线程永久的挂起。这是因为LockSupport采用了类似信号量的机制,它为每个线程分配了一个许可,如果许可可用park()会立即返回,并且消费掉这个许可(也就是将许可变为不可用),如果许可不可用,则会阻塞,而unpark()则会使得许可变为可用。LockSupport.park()还支持中断影响,但park()不会抛出InterruptedException异常,它只会默默的返回,但我们可可以从Thread.interrupted()等方法中获得中断标记。
待续