案例
public class AtomicDemo {
private static int count=0;
static Lock lock=new ReentrantLock(true);
public static void inc(){
lock.lock(); //获得锁(互斥锁) ThreadA 获得了锁
try {
Thread.sleep(1);
count++;
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();//释放锁 ThreadA释放锁 state=1-1=0
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 1000; i++) {
new Thread(()->AtomicDemo.inc()).start();
}
Thread.sleep(4000);
System.out.println("result:"+count);
}
}
lock加锁
lock()
public void lock() {
sync.lock();
}
sync有两个实现,ReentrantLock默认构造方法使用的是NonfairSync非公平锁,也可以使用公平锁,使用boolean参数的构造方法,传入true。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
先试用cas乐观锁方式保证只有一个线程获取到锁,此时设置state为1,并且将挡ReentrantLock的属性exclusiveOwnerThread设置为当前线程,表示由哪个线程获取的锁。
final void lock() {
//并发控制
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());//保存当前的线程
else
//未获取到锁
acquire(1);
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
protected final void setExclusiveOwnerThread(Thread thread) {
//设置为获取到锁的线程
exclusiveOwnerThread = thread;
}
未获取到锁的线程走到acquire(1)方法,该方法中使用if判断,注意只有第一个条件为true时,才会进行第二个条件的判断。
- tryAcquire:再次使用cas尝试获取锁,或者为锁的重入,state状态+1.
- addWaiter:将未获得锁的线程加入到队列
- acquireQueued:去抢占锁或者阻塞.
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//获得当前的线程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {//无锁
if (compareAndSetState(0, acquires)) {//cas
setExclusiveOwnerThread(current);//设置为当前线程
return true;//成功就不需要入队及阻塞
}
}
//是当前线程,表示重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;//state+1
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);//不需要cas,因为只有当前线程在操作state的值。
return true;
}
return false;//又没抢到,倒霉。。。。。。
}
ddWaiter(Node.EXCLUSIVE), arg),创建node节点并加入到队列。
创建一个新的node,如果有尾节点,就cas尝试放入末尾,失败了调用enq方法,enq首先判断有没有node双向链表,没有的话创建一个空node,将head和tail指向他,然后将这次的node放入末尾,并将tail指向他。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);//新建node
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//尝试放入末尾,cas,因为此处tail有竞争
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//双向链表时空的
if (t == null) { // Must initialize
//空node
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//cas尝试放到尾部。
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued抢占或者阻塞
此处是做Node节点线程的自旋过程,自旋过程主要检查当前节点是不是head节点的next节点,如果是,则尝试获取锁,如果获取成功,那么释放当前节点,同时返回。至此一个非公平锁的锁获取过程结束。
当然不是无限自旋,某个条件达成,当前线程将会阻塞。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//如果当前node时head的next,尝试获取一次锁
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
检查一下当前Node的前置节点pred是否是SIGNAL,如果是SIGNAL,那么证明前置Node的线程已经Park了,如果pre节点状态>0,表示节点退出或者中断,将该节点从队列移除,如果waitStatus<=0,那么设置waitStatus为SIGNAL,因为是自旋操作,所以最终一定会返回true。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt使当前的线程park,即暂停了线程的轮询。当Unlock时会做后续节点的Unpark唤醒线程继续争抢锁。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
如果线程意外中断,将调用Thread.interrupted(),判断是否中断,若中断返回true,导致interrupted = true运行,此时failed也为true,触发cancelAcquire移除退出或中断的节点,最后执行selfInterrupt(),将线程再次中断。
unlock释放锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
代码相对lock简单,只说一些重点,tryRelease释放锁,如果为true并且head不为空,等待状态不为0,表示有后续等待的node,则唤醒后一个node。
tryRelease
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//是否是当前线程
throw new IllegalMonitorStateException();
int nextc = getState() - releases;//state值-1
boolean free = exclusiveCount(nextc) == 0;
if (free)//重入锁是否已经都释放了
setExclusiveOwnerThread(null);//置为空
setState(nextc);//设置state
return free;
}
unparkSuccessor将head状态设置为0,找寻有效的next节点,唤醒next的node节点。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
唤醒后acquireQueued进行自旋转获取锁,并移除head,将head设置为当前节点,至此lock和unlock已经全部阅读完毕。