1.为什么需要锁
解决多个线程访问同一个可变的状态变量时的安全问题。
2.内置锁 - synchronized
任何一个非null对象都可以作为锁
。内置锁是可重入的非公平锁。在JVM中,内置锁也叫对象监视器
;
-
使用方法
-
1.作用在方法上,持有对象实例this的锁
public synchronized void doSomething() { //方法体 }
-
2.作用在代码块上,持有对象实例lockObject的锁
public int doSomething(){ synchronized(lockObject) { //一次只能有一个线程进入 } }
-
3.作用在静态方法上,持有类(this.getClass)的锁
public synchronized static void doSomething() { // todo }
-
4.作用在类上,持有类(class)的锁
class ClassName { public void method() { synchronized(ClassName.class) { // todo } } }
-
-
实现原理
synchronized的加锁和释放都是由JVM提供,在软件层面依赖JVM,当多个线程同时请求某个对象监视器时,对象监视器会设置几种状态用来区分请求的线程:◆
Contention List
:所有请求锁的线程将被首先放置到该竞争队列
。◆
Entry List
:Contention List中那些有资格成为候选人的线程被移到Entry List。◆
Wait Set
:那些调用wait方法被阻塞的线程被放置到Wait Set。◆
OnDeck
:任何时刻最多只能有一个线程正在竞争锁,该线程称为OnDeck。◆
Owner
:获得锁的线程称为Owner。◆
!Owner
:释放锁的线程。
新请求锁的线程将首先被加入到ConetentionList中,当某个拥有锁的线程(Owner状态)调用unlock之后,如果发现 EntryList为空则从ContentionList中移动线程到EntryList,下面说明下ContentionList和EntryList 的实现方式:
- [ ] ContentionList 虚拟队列
ContentionList并不是一个真正的Queue,而只是一个虚拟队列,原因在于ContentionList是由Node及其next指 针逻辑构成,并不存在一个Queue的数据结构。ContentionList是一个后进先出(LIFO)的队列,每次新加入Node时都会在队头进行, 通过CAS改变第一个节点的的指针为新增节点,同时设置新增节点的next指向后续节点,而取得操作则发生在队尾。显然,该结构其实是个Lock- Free的队列.因为只有Owner线程才能从队尾取元素,也即线程出列操作无争用,当然也就避免了CAS的ABA问题。
- [ ] EntryList
EntryList与ContentionList逻辑上同属等待队列,ContentionList会被线程并发访问,为了降低对 ContentionList队尾的争用,而建立EntryList。Owner线程在unlock时会从ContentionList中迁移线程到 EntryList,并会指定EntryList中的某个线程(一般为Head)为Ready(OnDeck)线程。Owner线程并不是把锁传递给 OnDeck线程,只是把竞争锁的权利交给OnDeck,OnDeck线程需要重新竞争锁。这样做虽然牺牲了一定的公平性,但极大的提高了整体吞吐量,在 Hotspot中把OnDeck的选择行为称之为“竞争切换”。
OnDeck线程获得锁后即变为owner线程,无法获得锁则会依然留在EntryList中,考虑到公平性,在EntryList中的位置不 发生变化(依然在队头)。如果Owner线程被wait方法阻塞,则转移到WaitSet队列;如果在某个时刻被notify/notifyAll唤醒, 则再次转移到EntryList。
- [ ] 自旋锁
那些处于ContetionList、EntryList、WaitSet中的线程均处于阻塞状态,阻塞操作由操作系统完成(在Linxu下通 过pthread_mutex_lock函数)。线程被阻塞后便进入内核(Linux)调度状态,这个会导致系统在用户态与内核态之间来回切换,严重影响 锁的性能
缓解上述问题的办法便是自旋,其原理是:当发生争用时,若Owner线程能在很短的时间内释放锁,则那些正在争用线程可以稍微等一等(自旋), 在Owner线程释放锁后,争用线程可能会立即得到锁,从而避免了系统阻塞。但Owner运行的时间可能会超出了临界值,争用线程自旋一段时间后还是无法 获得锁,这时争用线程则会停止自旋进入阻塞状态(后退)。基本思路就是自旋,不成功再阻塞,尽量降低阻塞的可能性,这对那些执行时间很短的代码块来说有非 常重要的性能提高。自旋锁有个更贴切的名字:自旋-指数后退锁,也即复合锁。很显然,自旋在多处理器上才有意义。
线程在进入等待队列ContentionList时,也即第一步操作前.首先进行自旋尝试获得锁,如果不成功再进入等待队列。这对那些已经在等待队列中的线程来说,稍微显得不公平。还有一个不公平的地方是自旋线程可能会抢占了 Ready线程的锁。
- [ ] 偏向锁
在JVM1.6中引入了偏向锁,偏向锁主要解决无竞争下的锁性能问题.
现在几乎所有的锁都是可重入的,也即已经获得锁的线程可以多次锁住/解锁监视对象,按照之前的HotSpot设计,每次加锁/解锁都会涉及到一些CAS操 作(比如对等待队列的CAS操作),CAS操作会延迟本地调用,因此偏向锁的想法是一旦线程第一次获得了监视对象,之后让监视对象“偏向”这个 线程,之后的多次调用则可以避免CAS操作,说白了就是置个变量,如果发现为true则无需再走各种加锁/解锁流程。
- Mark Word
HotSpot虚拟机的对象头包括两部分信息,第一部分是“Mark Word”,用于存储对象自身的运行时数据, 如哈希码(HashCode)、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID、偏向时间戳等等.
-
[ ] 未被锁定状态:
-
[ ] 其他状态:
openjdk\hotspot\src\share\vm\oops\oop.hpp下oopDesc类是JVM对象的顶级基类,故每个object都包含markOop。
class oopDesc {
friend class VMStructs;
private:
volatile markOop _mark;//markOop:Mark Word标记字段
union _metadata {
Klass* _klass;//对象类型元数据的指针
narrowKlass _compressed_klass;
} _metadata;
// Fast access to barrier set. Must be initialized.
static BarrierSet* _bs;
public:
markOop mark() const { return _mark; }
markOop* mark_addr() const { return (markOop*) &_mark; }
void set_mark(volatile markOop m) { _mark = m; }
void release_set_mark(markOop m);
markOop cas_set_mark(markOop new_mark, markOop old_mark);
// Used only to re-initialize the mark word (e.g., of promoted
// objects during a GC) -- requires a valid klass pointer
void init_mark();
Klass* klass() const;
Klass* klass_or_null() const volatile;
Klass** klass_addr();
narrowKlass* compressed_klass_addr();
....省略...
}
在HotSpot虚拟机中,最终采用ObjectMonitor类实现monitor。
openjdk\hotspot\src\share\vm\runtime\objectMonitor.hpp源码如下:
ObjectMonitor() {
_header = NULL;//markOop对象头
_count = 0;
_waiters = 0,//等待线程数
_recursions = 0;//重入次数
_object = NULL;//监视器锁寄生的对象。锁不是平白出现的,而是寄托存储于对象中。
_owner = NULL;//指向获得ObjectMonitor对象的线程或基础锁
_WaitSet = NULL;//处于wait状态的线程,会被加入到wait set;
_WaitSetLock = 0 ;
_Responsible = NULL ;
_succ = NULL ;
_cxq = NULL ;
FreeNext = NULL ;
_EntryList = NULL ;//处于等待锁block状态的线程,会被加入到entry set;
_SpinFreq = 0 ;
_SpinClock = 0 ;
OwnerIsThread = 0 ;// _owner is (Thread *) vs SP/BasicLock
previous_owner_tid = 0;// 监视器前一个拥有者线程的ID
}
3.显示锁 - Lock
synchronized存在局限性,例如:
- 占有锁的线程执行时间较长,不想让其他线程无期限地等待下去
- 读写锁中,读操作线程之间不发生冲突
- 无法得知线程有没有成功获取到锁
-
Lock
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; // 可以响应中断 boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; // 可以响应中断 void unlock(); Condition newCondition(); }
-
[ ] lock()
如果锁已被其他线程获取,则进行等待.如果采用Lock,必须主动去释放锁,在发生异常时,不会自动释放锁。private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
-
[ ] tryLock() & tryLock(long time, TimeUnit unit)
尝试获取锁,如果获取成功,则返回true;如果获取失败(即锁已被其他线程获取),则返回false. 也就是说,这个方法无论如何都会立即返回(在拿不到锁时不会一直在那等待).private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
[ ] lockInterruptibly()
获取锁方式与lock()方法类似,不同的是,若线程未获取到锁等待过程中被中断,则线程中断抛出InterruptedException.AQS
ReentrantLock实现了Lock接口,查看源码可知其实现都是基于AbstractQueuedSynchronizer.[ ] lock
ReentrantLock.lock->sync.lock()->AbstractQueuedSynchronizer.acquire(1)
-->FairSync.tryAcquire()/Sync.nonfairTryAcquire(int acquires) --> compareAndSetState(0, acquires)
成功: --> AbstractQueuedSynchronizer.setExclusiveOwnerThread(current)
失败: --> AQS.addWaiter(Node.EXCLUSIVE)->AQS.acquireQueued()
ReentrantLock->>AQS:lock
AQS->>Sync:acquire(1)
Sync->>Sync:tryAcquire()
Sync->>Sync:compareAndSetState(0, acquires)
Sync->>AQS:False
AQS->>AQS:setExclusiveOwnerThread(current)
AQS->>ReentrantLock:True
获取资源失败的情况
ReentrantLock->>AQS:lock
AQS->>Sync:acquire(1)
Sync->>Sync:tryAcquire()
Sync->>Sync:compareAndSetState(0, acquires)
Sync->>AQS:True
AQS->>AQS:addWaiter(Node.EXCLUSIVE)加入队尾
AQS->>AQS:acquireQueued等待资源
AQS->>ReentrantLock:True
-
acquire
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
addWaiter
//注意:该入队方法的返回值就是新创建的节点
private Node addWaiter(Node mode) {
//基于当前线程,节点类型(Node.EXCLUSIVE)创建新的节点
//由于这里是独占模式,因此节点类型就是Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//这里为了提搞性能,首先执行一次快速入队操作,即直接尝试将新节点加入队尾
if (pred != null) {
node.prev = pred;
//这里根据CAS的逻辑,即使并发操作也只能有一个线程成功并返回,其余的都要执行后面的入队操作。即enq()方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//完整的入队操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果队列还没有初始化,则进行初始化,即创建一个空的头节点
if (t == null) {
//同样是CAS,只有一个线程可以初始化头结点成功,其余的都要重复执行循环体
if (compareAndSetHead(new Node()))
tail = head;
} else {
//新创建的节点指向队列尾节点,毫无疑问并发情况下这里会有多个新创建的节点指向队列尾节点
node.prev = t;
//基于这一步的CAS,不管前一步有多少新节点都指向了尾节点,这一步只有一个能真正入队成功,其他的都必须重新执行循环体
if (compareAndSetTail(t, node)) {
t.next = node;
//该循环体唯一退出的操作,就是入队成功(否则就要无限重试)
return t;
}
}
}
}
- acquireQueued
final boolean acquireQueued(final Node node, int arg) {
//锁资源获取失败标记位
boolean failed = true;
try {
//等待线程被中断标记位
boolean interrupted = false;
//这个循环体执行的时机包括新节点入队和队列中等待节点被唤醒两个地方
for (;;) {
//获取当前节点的前置节点
final Node p = node.predecessor();
//如果前置节点就是头结点,则尝试获取锁资源
if (p == head && tryAcquire(arg)) {
//当前节点获得锁资源以后设置为头节点,这里继续理解我上面说的那句话
//头结点就表示当前正占有锁资源的节点
setHead(node);
p.next = null; //帮助GC
//表示锁资源成功获取,因此把failed置为false
failed = false;
//返回中断标记,表示当前节点是被正常唤醒还是被中断唤醒
return interrupted;
}
//如果没有获取锁成功,则进入挂起逻辑
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//最后会分析获取锁失败处理逻辑
if (failed)
cancelAcquire(node);
}
//首先说明一下参数,node是当前线程的节点,pred是它的前置节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前置节点的waitStatus
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前置节点的waitStatus是Node.SIGNAL则返回true,然后会执行parkAndCheckInterrupt()方法进行挂起
return true;
if (ws > 0) {
//由waitStatus的几个取值可以判断这里表示前置节点被取消
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//这里我们由当前节点的前置节点开始,一直向前找最近的一个没有被取消的节点
//注,由于头结点head是通过new Node()创建,它的waitStatus为0,因此这里不会出现空指针问题,也就是说最多就是找到头节点上面的循环就退出了
pred.next = node;
} else {
//根据waitStatus的取值限定,这里waitStatus的值只能是0或者PROPAGATE,那么我们把前置节点的waitStatus设为Node.SIGNAL然后重新进入该方法进行判断
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- [ ] tryLock
ReentrantLock.tryLock->Sync.nonfairTryAcquire() 直接尝试获取锁
- [ ] lockInterruptibly
ReentrantLock.lockInterruptibly->sync.lockInterruptibly(1)
->FairSync.tryAcquire()/Sync.nonfairTryAcquire(int acquires) --> compareAndSetState(0, acquires)
成功: --> AQS.setExclusiveOwnerThread(current)
失败: --> AQS.doAcquireInterruptibly -> AQS.addWaiter(Node.EXCLUSIVE) ->等待资源 tryAcquire & parkAndCheckInterrupt
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- [ ] tryLock(long timeout, TimeUnit unit)
ReentrantLock.tryLock(long timeout, TimeUnit unit)
-> AQS.tryAcquireNanos(1, unit.toNanos(timeout))
-> FairSync.tryAcquire()/Sync.nonfairTryAcquire(int acquires)
失败: ->doAcquireNanos(arg, nanosTimeout); ->addWaiter(Node.EXCLUSIVE) ->tryAcquire(arg) & LockSupport.parkNanos(this, nanosTimeout);
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- [ ] unLock
ReentrantLock.unlock()
-> Sync.release(1)
-> AQS.release(1)
-> Sync.tryRelease
成功: ->unparkSuccessor(head); ->LockSupport.unpark(head.next.thread)
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
- 独占锁模式流程示意图:
-
共享锁
ReentrantLock使用的是独占锁,Semaphore,CountDownLatch,ReentrantReadWriteLock使用的是共享锁。
独占锁与共享锁的实现大同小异。- 独占锁:
state为0代表有资源,state大于1代表锁已经被其他线程获取。
独占锁是只有头节点获取锁,其余节点的线程继续等待,等待锁被释放后,才会唤醒下一个节点的线程;
独占锁的同步状态state值在0和1之间切换,保证同一时间只能有一个线程是处于活动的,其他线程都被阻塞,参考ReentranLock。当owner线程再次重入时,state在1到N间切换。
独占锁是一种悲观锁。- 共享锁:
state值在整数区间内(自定义实现),如果state值<0则阻塞,否则不阻塞。
共享锁是只要头节点获取锁成功,若剩余资源大于0,就在唤醒自身节点对应的线程的同时,继续唤醒AQS队列中的下一个节点的线程,每个节点在唤醒自身的同时还会唤醒下一个节点对应的线程,以实现共享状态的“向后传播”,从而实现共享功能。
共享锁是一种乐观锁,允许多个线程同时访问共享资源。