基本概念:
锁:控制多线程并发访问资源;
队列同步器:管理同步状态,实现锁;
同步状态:同步器的操作对象,int类型;
同步队列:同步器通过同步队列管理同步状态;
同步器实现锁:
1.自定义同步器;
2.同步器定义如何获取、释放同步状态;
3.锁通过同步器来实现语义;
public class Mutex implements Lock {
//自定义的同步器
static class MySyncer extends AbstractQueuedSynchronizer {
//独占式获取同步状态
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0, 1)){
//设置当前线程独占同步器
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//独占式释放同步状态
@Override
protected boolean tryRelease(int arg) {
if(getState() == 0)
throw new IllegalMonitorStateException();
//设置同步器无占用线程
setExclusiveOwnerThread(null);
//设置同步状态为0
setState(0);
return true;
}
//同步器是否被独占
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
Condition newCondition(){
return new ConditionObject();
}
}
//同步器对象,用来实现锁
private final MySyncer syncer = new MySyncer();
@Override
public void lock() {
syncer.acquire(1);
}
@Override
public boolean tryLock() {
return syncer.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return syncer.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void lockInterruptibly() throws InterruptedException {
syncer.acquireInterruptibly(1);
}
@Override
public void unlock() {
syncer.release(1);
}
@Override
public Condition newCondition() {
return syncer.newCondition();
}
}
同步器get/update同步状态:
getState(); //获取同步状态
setState(); //设置同步状态
compareAndSetState(int expect, int update); //CAS设置同步状态,保证原子性
同步器acquire/release同步状态(可重写):
//模板方法调用,非阻塞
tryAcquire(int arg); //独占式占用同步状态
tryRelease(int arg); //独占式释放同步状态
tryAcquireShared(int arg); //共享式占用同步状态
tryReleaseShared(int arg); //共享式释放同步状态
注:单词get与acquire区别,get指直接获取,acquire指通过努力后获取/占用;
同步器的模板方法(不可重写):
锁通过同步器对象直接调用模板方法来实现语义;
模板方法调用上面tryXxxx(int arg)方法来实现状态占用/释放;
//独占式,阻塞式
acquire(int arg); //独占式占用同步状态
acquireInterruptibly(int arg); //独占式,可响应中断
tryAcquireNanos(int arg, long nanosTimesout); //独占式,可响应中断,超时等待
release(int arg); //独占式释放同步状态
//共享式,阻塞式
acquireShared(int arg); //共享式占用同步状态
acquireSharedInterruptibly(int arg); //共享式,可响应中断
tryAcquireSharedNanos(int arg, long nanosTimesout); //共享式, 可响应中断,超时等待
releaseShared(int arg);//共享式释放同步状态
注:阻塞式指同步器未获取同步状态时线程会阻塞,非阻塞指线程不会阻塞;
同步队列:
1.同步队列是由Node组成的链表;
2.当线程获取锁失败时,就把thread及等待状态信息包装成node插入队列尾部,并更改tail节点,通过CAS操作保证并发性;
3.当head节点的线程释放锁时,就唤醒其next节点,next节点获取同步状态成功时就把自己设为head节点;
独占式(共享式)占用和释放同步状态:
区别:同一时刻,独占式只有一个线程可以获取同步状态,而共享式可以有多个线程获取同步状态;
相同点:第一次获取同步状态失败后都生成节点,插入同步队列尾部;都是通过自旋来获取同步状态;前驱为头节点时才能获取到同步状态;
自定义同步主键TwinsLock-双线程锁:
同一时刻有两个线程可以持有锁,即同步器共享式占用同步状态,且同步状态数为2;
public class TwinsLock implements Lock {
//同步器对象
private Syncer syncer = new Syncer(2);
static class Syncer extends AbstractQueuedSynchronizer {
//初始化同步状态的数量
public Syncer(int stateCount) {
setState(stateCount);
}
//共享式获取同步状态,非阻塞
@Override
protected int tryAcquireShared(int arg) {
for (; ; ) {
int currentState = getState();
int newState = currentState - 1;
if (newState < 0 || compareAndSetState(currentState, newState))
return newState;
}
}
//共享式释放同步状态,非阻塞
@Override
protected boolean tryReleaseShared(int arg) {
for (; ; ) {
int currentState = getState();
if (compareAndSetState(currentState, currentState + 1))
return true;
}
}
}
//加锁
@Override
public void lock() {
//acquireShared()会调用tryAcquireShared()
syncer.acquireShared(1);
}
//释放锁
@Override
public void unlock() {
syncer.releaseShared(1);
}
}
lock()调用模板方法acquireShared()共享式获取同步状态, red()调用用户自定义的tryAcquireShared()来获取同步状态,若获取成功,则线程拥有了锁;若tryAcquireShared()获取同步状态失败,则把线程及其状态包装成node插入同步队列的尾部,并进行自旋来获取同步状态;自旋过程中,当node的前置节点是head节点且释放同步状态后,当前节点就调用tryAcquireShared()来获取同步状态,若获取成功则表示线程拥有了锁,若获取失败则继续自旋;
重入锁:
重入锁指线程获取到锁后能够再次获取该锁,而不会被锁阻塞;
公平模式:
锁的获取顺序符合请求的绝对时间顺序;
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//同步状态
int c = getState();
if (c == 0) {
//锁未被任何线程占用
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//没有更早的线程处于WAITTING状态,且当前线程获取同步状态成功
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
//锁被当前线程占用
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平模式:
有新线程请求锁时,先争夺一下锁,没成功再去排队;排队之后依然满足FIFO规则,即前面节点的线程先获取锁;
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//同步状态
int c = getState();
if (c == 0) {
//锁未被任何线程占用
if (compareAndSetState(0, acquires)) {
//当前线程获取同步状态成功
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
//锁被当前线程占用
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
总结:公平和非公平模式下调用lock()获取锁时,调用acquire()模板方法,acquire()方法首先都调用tryAcquire(1)第一次尝试获取同步状态,若锁此刻锁未被占用,则公平模式下会判断是否有更早的线程处于WAITTING状态,若有则第一次尝试获取锁失败,若没有则尝试获取同步状态;而非公平模式下则直接尝试获取同步状态,不考虑是否有更早的线程是否处于WAITTING状态;公平模式和非公平模式在第一次尝试获取同步状态失败后,都会把线程及其状态包装成node插入FIFO同步队列尾部,之后通过自旋来获取同步状态,公平和非公平模式下都需要等待前置节点来唤醒自己;
性能对比:公平锁具有大量的线程切换,因此其吞吐性不如非公平锁;
读写锁:
排它锁:同一时刻只能有一个线程获取锁(如ReentrantLock);
1.读锁是非排它锁,同一时刻可以有多个读线程获取锁;但是写线程获取锁时,所有读线程和其它写线程均被阻塞;
2.读写锁是一对锁,包括读锁(可重入共享锁)和写锁(可重入排它锁);
3.包括公平模式和非公平模式;
4.支持重进入,且在获取写锁后还能获取读锁,但获取读锁后不能获取写锁;
5.在读多于写的情况下,读写锁比排它锁具有更好的吞吐性;
读写状态的设计:
整型变量的高16位表示读、低16位表示写,则线程获取读锁后,同步状态S=S+(1<<16),线程获取写锁后,同步状态S=S+1;
写锁的获取与释放:
写锁获取成功的条件:1.写锁此刻被当前线程拥有;2.读锁or写锁此刻没被任何线程拥有;否则获取写锁失败,线程进入WAITTING状态;
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
//存在读锁or写锁
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
//存在读锁or不是当前线程获取写锁,则获取写锁失败
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
//之前是当前线程获取的写锁,因此不存在并发问题,不需要CAS操作
setState(c + acquires);
return true;
}
//之前没任何线程获取读锁or写锁,此刻可能有多个写线程并发请求写锁,需要CAS操作设置同步状态
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//当前线程首次获取同步状态成功
setExclusiveOwnerThread(current);
return true;
}
读锁的获取与释放:
读锁获取成功的条件:1.写锁此刻被当前线程拥有;2.写锁此刻没被任何线程拥有;否则获取写锁失败,线程进入WAITTING状态;
只要"其它线程拥有写锁"的情况不出现,则当前线程就不断尝试获取同步状态,而不是进入等待状态,是因为当前线程此刻仍然具有获取读锁的资格,而不用等待资格;但是当其它线程拥有写锁时,则当前线程获取同步状态失败,失去获取读锁的资格,退出for循环,进入WAITTING状态,并把线程及其状态包装成node插入FIFO同步队列尾部,之后通过自旋来获取同步状态
protected final int tryAcquireShared(int unused) {
//只要"其它线程拥有写锁"的情况不出现,则当前线程就不断尝试获取同步状态,
for (; ; ) {
Thread current = Thread.currentThread();
int c = getState();
int nextC = c + (1 << 16);
if (nextC < c)
throw new Error("Maximum lock count exceeded");
if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
//存在写锁(写状态不为0),且写锁拥有线程不是当前线程,则获取读锁失败
return -1;
if (compareAndSetState(c, nextC))
//获取读锁成功
return 1;
}
}
锁降级:
锁降级的过程:拥有写锁->预处理(写/改)数据->拥有读锁->释放写锁->使用(读)数据->释放读锁,即读锁降级到写锁;
占用读锁,释放写锁,读数据,这样做有两个好处:1.使用(读)数据期间,其它数据只读的线程可以获取到读锁,而不至于堵塞,提高了效率;2.保证使用(读)数据的过程中数据是没有发生变化的,因为在释放读锁前,其它线程无法获取写锁来更改数据;
public class LockDown {
static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
static Lock readLock = lock.readLock();
static Lock writeLock = lock.writeLock();
static class Thread1 implements Runnable{
//线程是否完成了数据准备的更新
static volatile boolean update = false;
@Override
public void run() {
}
public void processData(){
readLock.lock();
if(update == false){
readLock.unlock();
writeLock.lock();
try {
//预处理(写/改)数据
prepareData();
update = true;
//先获取读锁
readLock.lock();
}finally {
//然后释放写锁
writeLock.unlock();
}
}
try {
//使用(读)数据
useData();
}finally {
//最后释放读锁
readLock.unlock();
}
}
public void prepareData(){}
public void useData(){}
}
}
注:Java不支持锁升级,锁升级会引起死锁;
LockSupport工具:
LockSupport工具类定义了一组public static方法,用来阻塞当前线程、唤醒被阻塞的线程;
void park(); //阻塞当前线程
void unpark(Thread thread); //唤醒阻塞的线程thread
void park(Object blocker); //blocker是当前线程等待的对象(阻塞对象,锁对象??)
Condition接口:
提供锁的监视器方法:await()、signal()、signalAll(),用于线程间通信;
//Condition对象通过Lock对象创建
Condition condition = lock.newCondition();
等待队列:
当线程调用condition.await()方法时,就把线程及其状态包装成node插入等待队列的尾部,此刻线程进入WAITTING状态;
等待队列与同步队列不同之处:
1.同步队列是线程获取获取锁(同步状态失败)时插入的队列,等待队列是线程调用wait()方法时插入的队列;2.同步队列是双链表,而等待队列是单链表;3.一个Condition对象对应一个等待队列,一个Lock对象可以有多个Condition对象,即可以拥有多个等待队列,但只能拥有一个同步队列;
等待队列与与同步队列相同之处:
1.共用同步器的静态内部类AbstractQueuedSynchronized.Node来生成节点;2.链表结构相似;
等待队列的结构:
单向FIFO链表,head节点指向链表的头,tail节点指向链表的尾;
线程进入等待状态:
1.同步队列head节点的线程构造新节点并加入等待队列;2.释放同步状态;3.唤醒后继节点;4.进入等待状态;
线程被通知:
1.等待队列的head节点移动到同步队列的尾节点上,线程从WAITTING状态进入BLOCKED状态;2.唤醒线程去竞争同步状态;3.非同步队列的首节点,获取同步状态失败,再次阻塞;4.最终通过自旋获取同步状态成功后从await()返回,此刻线程成功获取了锁;