JAVA JDK并发包里面提供了 synchronized关键字和Lock接口,synchronized关键字从语言层面为开发者提供了锁,是隐式锁,锁的获取和释放全部被封装起来了,很笨重,不能提供可中断的获取锁,也不能实现等待超时模式获取锁,而且是独占锁,如果我们有可中断的获取、等待超时模式获取、获取共享锁的需求,可以自己实现Lock接口来改写。Lock锁是显示锁,使用需要遵循一些范式;
lock.lock();
try{
}finally{
lock.unlock;
}
这里有个要点,解锁操作必须要放在finally代码块里面,加锁操作不能放在try代码块里面;前者是防止出错之后锁不能正常释放,后者是防止出错不能正常加锁,反而解锁了;
JDK显示锁 提供了 ReentrantLock(可重入锁),ReentrantReadWriteLock(读写锁)两个常用的实现类;
锁的可重入:当锁作用的代码块发生递归的时候,已经加锁的线程还是可以获取原来的锁,不会因为是递归就要继续等待,JDK提供的所有锁都是支持可重入的;
ReentrantReadWriteLock:读写锁,内部维护了readLock,writeLock两个锁,读锁是共享锁,和其他读锁共享,和写锁互斥;写锁是独占锁,和其他线程的写锁、所有读锁互斥;当线程拿到读锁时,其他线程可以继续获取读锁;当读锁存在时,写锁阻塞,等到所有读锁释放之后才可以拿到写锁,而这时其他线程获取锁的时候阻塞,拿不到任何锁;当写锁释放时,读锁唤醒,循环这一操作;读写锁是针对读多写少的时候,可以大大提高服务器响应的时间;因为读状态是共享锁,只有写的时候需要等待写线程完成,然后其他读线程可以读到最新状态;
什么是AQS?
AQS是一种队列同步器,是用来构建锁或者其他同步组件的基础框架,它使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
简单来说:AQS就是一种实现同步状态的基础框架,我们通过它来实现锁或者其他同步组件(CountDownLatch、CyclicBarrier)。
仔细看AQS源码我们发现其内部维护了一个双向链表,Node,在Node里面存了前置后置节点,各种状态----CANCEL(取消)、CONDITION、EXCLUSIVE(独占)、PROPAGATE(传播),总体上都是对队列的操作,如下图:
在同步器里面维护了一个head(头部节点),tail(尾部节点),每次一个新的线程进来,都会将当前线程封装成一个Node节点加入到尾部节点,节点加入同步器的变化如下:
因为不止一个线程要进行这个操作,所以添加到尾部节点需要使用循环的CAS操作来实现;当线程释放锁之后,AQS会将当前节点的后置节点设置成首节点,然后尝试去获取状态,并将前置节点设置为空,如果后置节点为取消状态,则会从队列的尾部开始循环,找到一个最近的没有被取消的节点唤醒;
在看看Condition的队列实现
在Condition内部,是一个单向链表,在同步器里面,维护了首节点和尾节点,而其他节点都指向了后面的节点;再看看Condition同步器和同步队列之间的关系
可以看到,我们可以维护多个Condition对象挂载到同步队列里面,所以每次唤醒的时候,我们只需要唤醒一个就可以了,使用signal()方法,当然,如果是有特殊要求,还是需要唤醒当前Condition下面的所有节点signalAll(),不能跨Condition唤醒。
了解了AQS的流程之后,我们需要自己来通过AQS来实现一个自己的锁。
public class SelfLockDemo implements Lock {
private static class Sync extends AbstractQueuedSynchronizer {
//3
@Override
protected boolean isHeldExclusively() {
return getState() > 0;
}
//4
public Condition newCondition() {
return new ConditionObject();
}
//1
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
} else if (getExclusiveOwnerThread() == Thread.currentThread()) {
setState(getState() + 1);
return true;
}
return false;
}
//2
@Override
protected boolean tryRelease(int arg) {
if (getExclusiveOwnerThread() != Thread.currentThread()) {
throw new IllegalMonitorStateException();
}
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setState(getState() - 1);
if (getState() == 0) {
setExclusiveOwnerThread(null);
}
return true;
}
}
private Sync sync = new Sync();
//5
@Override
public void lock() {
System.out.println(Thread.currentThread().getName() + " ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName() + " already got lock");
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
//6
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName() + " ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName() + " already released lock");
}
//7
@Override
public Condition newCondition() {
return sync.newCondition();
}
上面的1、2、3、4、5、6、7都是我们要实现的方法,上面是实现了一个独占锁的例子,首先实现Lock接口,然后做一个内部类来实现AQS抽象类,重写了tryAcquire(int arg) 、tryRelease(int arg)、isHeldExclusively() 方法,看AQS源码,发现其内部实现了一些模板方法,所以我们要实现自己的锁,需要重写这些方法;tryAcquire()方法是去获取锁,尝试用一次CAS操作改变状态,如果成功,就将当前线程设置成独占模式,如果不成功,查看当前线程是不是和独占模式的线程是同一个(可重入的实现),如果都不是,则会调用AQS内部的方法,将当前线程加入同步队列的尾部,阻塞,等待锁释放之后唤醒;tryRelease 方法是 释放锁,里面没有任何CAS操作,是因为需要先判断是不是拿锁的线程,如果是,就对自己进行操作,如果状态为0,就释放锁;isHeldExclusively 判断线程是否被占用,这个实现很简单,就是看同步状态;newCondition 这个就类似JAVA提供的wait()、notify()的实现,其内部维护的是一个Condition同步队列,当调用await()方法时候,当前线程释放锁,然后进入阻塞状态;当被signal()方法唤醒时,继续去获取锁,然后执行下一步操作;5、6、7、8就很简单了,调用AQS内部的方法来实现的;
下面再写一个实现共享锁的实现
public class TrinityLockDemo implements Lock {
private Sync sync = new Sync(3);
private static class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalMonitorStateException();
}
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
for (;;) {
int current = getState();
int newCount = current - arg;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for (;;) {
int current = getState();
int newCount = current + arg;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
final Condition newCondition() {
return new ConditionObject();
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
共享锁的区别就是实现的方法不同,tryAcquireShared、tryReleaseShared,这个类容许一定数量的线程获取锁。而这些方法内部的实现都是循环调用CAS来操作的;和独占锁有所不同,是因为AQS源码里面,对独占锁的内部实现里面是循环调用CAS来操作的;
下面来说下JAVA实现的ReentrantLock(里面包含的公平锁和非公平锁)ReentrantReadWriteLock,读写锁的实现;
第一个:在AQS源码里面,是没有公平和非公平概念的。公平锁的概念是 先等待的线程先获取到锁,所以要实现公平锁,就需要在tryAcquire方法里面,对每一个线程都需要检查是否在队列里面,如果不在,需要将后来的线程加入到队列内部来实现公平;而对于非公平锁,先去尝试获取一次锁,如果获取到了,就直接使用,获取不到,就进入队列继续进行;
第二个:读写锁
因为AQS只提供了一个int成员变量来同步状态,而读写锁有两个锁,需要两种状态,所以JDK里面就用int 32位的高16位来代表读锁,低16位代表写锁;但是有个问题,读锁可以同时有多个线程进入,而int 的 16位不能代表各个线程进入的次数的,这里就引入了ThreadLocal线程本地变量,每个线程获得读锁之后,就在ThreadLocal里面维护数据,这样保证了读锁的正确计算;而对于写锁,因为是独占的,所以每次只有一个线程可以获取,直接进行位运算就可以了。
至于锁的升降级,写锁可以退化为读锁,而读锁不能升级为写锁。因为写锁的操作需要对所有读锁可见,如果读锁升级为写锁,那么某一数据被更改之后,是不能及时通知其他线程的,所以 读写锁只能支持写锁降级,不支持读锁升级。