一.synchronized存在的问题
在Java并发编程学习(二)——synchronized关键字一文中,我们重点分析了synchronized的用法和实现,我们知道synchronized可以解决多线程时的内存可见性问题,保证线程安全,并且使用非常简单,但是synchronized同样存在一些问题:
- 不可中断:当两个线程竞争进入临界区时,如果一个线程先进入,另一个线程就需要不停的尝试争夺锁,在这个过程中不响应中断。
- 不公平:当一个获得锁的线程释放锁的时候,其他线程不管先来后到都有可能获得锁,这就有可能造成先开始等待的线程一直得不到执行
- 读写不分离:有的时候,某个临界区内只是读取共享状态,当获得锁的线程进入只读的临界区时,其他线程依赖不能进入,要一直等到当前线程释放锁,这就造成不必要的等待,降低吞吐量
- 不灵活:synchronized只能控制一个代码块或一个方法,有时,我们可能想在一个方法中加锁,在另外一个方法中释放锁,这时,synchronized就无能为力了。
jdk1.5中引入了著名的java.util.concurrent包,其中有一个子包locks,提供了更为强大的锁机制,可以解决上面所有问题。
今天我们先来看一下locks包中的ReentrantLock(可重入锁)。
二.ReentrantLock的好处
俗话说:是骡子是马,拉出来溜溜。ReentrantLock真的能提供比synchronized更强大的功能吗?我们拭目以待。
我们先来看一个早点摊卖油条的需求:老王开了一个早点摊卖油条,每天街坊邻居都排着队买,为了简化逻辑,我们假设老王每天只卖100根油条,老王每天起早炸好100根油条,然后开始卖。
由于油条是有限的,但是买的人很多,因此必须对油条这个共享资源进行同步操作,否则,就可能出现一根油条同时卖给两个人的情况。
问题似乎不难,使用synchronized就可以解决这个问题了,代码如下:
// 油条
class FriedBreadStick {
}
class FriedBreadStickQueue {
private Queue<FriedBreadStick> stickQueue = new ArrayBlockingQueue<FriedBreadStick>(10);
// 炸好油条后放入
public void putStick(FriedBreadStick stick) {
if (stickQueue.size() < 10) {
stickQueue.add(stick);
}
}
// 有人买油条时获取油条
public synchronized List<FriedBreadStick> getStick(int num) {
List<FriedBreadStick> list = new ArrayList<FriedBreadStick>();
if (stickQueue.size() >= num) {
for (int i = 0; i < num; i++) {
list.add(stickQueue.poll());
}
}
try {
Thread.sleep(200); // 造成其他线程等待的情况
} catch (InterruptedException e) {
e.printStackTrace();
}
return list;
}
}
class Customer extends Thread {
private FriedBreadStickQueue queue;
public Customer(String name, FriedBreadStickQueue queue) {
super(name);
this.queue = queue;
}
@Override
public void run() {
List<FriedBreadStick> list = queue.getStick(2); // 买两根油条
if (list.size() == 2) {
System.out.println(Thread.currentThread().getName() + " buy 2 friedBreadStick");
} else {
System.out.println(Thread.currentThread().getName() + " buy no friedBreadStick");
}
}
}
public class BreakfastShop {
public static void main(String[] args) {
FriedBreadStickQueue queue = new FriedBreadStickQueue();
for (int i = 0; i < 10; i++) {
queue.putStick(new FriedBreadStick());
}
for (int i = 0; i < 6; i++) {
new Customer("customer" + i, queue).start();
try {
Thread.sleep(10); // 每个线程启动后,等待10ms,造成不同线程等待时间不同的情况
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
上面代码中,我们使用synchronized
修饰了卖油条的方法,使卖油条的过程是同步的,同一时刻,只能由一个人买油条。为了造成线程等待的情况,我们故意使每次卖油条的过程持续200ms以上,并且买油条的人每隔10ms多一个,这样,就会造成线程等待,而且不同线程等待的时候不同。
执行结果如下:
customer0 buy 2 friedBreadStick
customer5 buy 2 friedBreadStick
customer4 buy 2 friedBreadStick
customer3 buy 2 friedBreadStick
customer2 buy 2 friedBreadStick
customer1 buy no friedBreadStick
我们可以看到,有5个人买到了油条,没有造成一根油条卖给多个人的情况,但是customer1明显来的更早,却没有买到油条,反而被来的最晚的customer抢去了,这明显不公平。
我们再来看看使用ReentrantLock的情况。
class FriedBreadStickQueue {
private Queue<FriedBreadStick> stickQueue = new ArrayBlockingQueue<FriedBreadStick>(10);
private final Lock lock = new ReentrantLock(true); // 参数为true表示希望获得一个公平锁
// 炸好油条后放入
public void putStick(FriedBreadStick stick) {
if (stickQueue.size() < 10) {
stickQueue.add(stick);
}
}
// 有人买油条时获取油条
public List<FriedBreadStick> getStick(int num) {
List<FriedBreadStick> list = new ArrayList<FriedBreadStick>();
lock.lock();
if (stickQueue.size() >= num) {
for (int i = 0; i < num; i++) {
list.add(stickQueue.poll());
}
}
try {
Thread.sleep(200); // 造成其他线程等待的情况
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
return list;
}
}
再次执行,结果如下:
customer0 buy 2 friedBreadStick
customer1 buy 2 friedBreadStick
customer2 buy 2 friedBreadStick
customer3 buy 2 friedBreadStick
customer4 buy 2 friedBreadStick
customer5 buy no friedBreadStick
可以看到,顾客们按照先来后到的顺序买油条,这样就公平了,大家都没有怨言。
不过值得注意的是,ReentrantLock并不能完全保证线程执行的公平性,但是会尽量保证。
我们再来看另一个例子,我们经常会去银行取钱,当有多个人进行取钱时,就需要排队,同一个窗口同一时间只能服务以为顾客,因此柜台窗口成为了共享资源,对共享资源的获取需要进行同步:
// 银行业务
class Business {
public synchronized void getMoney() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " begin getMoney");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was interrupted during get Money");
return;
}
System.out.println(Thread.currentThread().getName() + " finish getMoney");
}
}
class Customer extends Thread {
private Business business;
public Customer(String name, Business business) {
super(name);
this.business = business;
}
@Override
public void run() {
try {
business.getMoney();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was interrupted");
}
}
}
public class Bank {
public static void main(String[] args) {
Business business = new Business();
Customer customer1 = new Customer("customer1", business);
Customer customer2 = new Customer("customer2", business);
customer1.start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
customer2.start(); // 顾客1开始取钱100ms后顾客2开始排队等待
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
customer2.interrupt(); // 顾客2排队等待100ms后,被打断
}
}
我们这个例子主要是为了测试线程在等待锁的过程中能否响应中断,运行结果如下:
customer1 begin getMoney
customer1 finish getMoney
customer2 begin getMoney
customer2 was interrupted during get Money
在customer2 等待100ms时,就已经收到了中断信号,此时,customer1仍然没有完成取钱的操作,但是customer2 依然执着的等待着锁,并没有响应中断,直到获得锁之后,才响应。
下面修改一下程序:
class Business {
private Lock lock = new ReentrantLock();
public void getMoney() throws InterruptedException {
lock.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + " begin getMoney");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was interrupted");
return;
}
lock.unlock();
System.out.println(Thread.currentThread().getName() + " finish getMoney");
}
}
注意,我们这次使用了Lock,并且在加锁时使用了lockInterruptibly
方法,运行结果如下:
customer1 begin getMoney
customer2 was interrupted
customer1 finish getMoney
显然,在customer2 获得锁之前,就响应了中断,不再竞争锁了。
以上,我们看了两个例子,一个是有关锁获取时公平性的,一个是有关竞争锁时是否响应中断的,可以看出,ReentrantLock相比synchronized确认可以更加灵活:
- 提供了公平锁和非公平锁的选项
- 提供了可中断加锁以及不可中断加锁的选项
- 此外,ReentrantLock还可以试探性的获取锁,如果获取成功,则进入临界区,否则可以自由选择是继续等待还是放弃
可见,synchronized之于ReentrantLock,就相当于匕首与瑞士军刀,一个锋利无比,一个功能多样;也相当于沙县小吃和自助餐,一个简单美味,一个玲琅满目,任君挑选。
三.具体实现
下面我们分析一下ReentrantLock
的核心实现逻辑。
通过查看源代码,我们可以看到,ReentrantLock
基本上是在AbstractQueuedSynchronizer
类的基础上实现的,AbstractQueuedSynchronizer
是真正的幕后英雄。
AbstractQueuedSynchronizer
提供了一个支持FIFO(先进先出)等待队列的阻塞锁同步框架,我们借着分析ReentrantLock
的过程也顺便了解AbstractQueuedSynchronizer
。
ReentrantLock类中定义了一个内部类Syn
,它继承自AbstractQueuedSynchronizer
,并添加了一些支持锁的常用操作,如lock方法(加锁),tryRelease(释放锁)方法等。
而Syn
根据是否公平,又有不同的子类,包括FairSync
(用于实现公平锁)、NonfairSync
(用于实现非公平锁)。通过ReentrantLock类构造函数中的boolean fair
参数,如果fair为true,则将ReentrantLock实例中的sync初始化为FairSync
,否则设置为NonfairSync
。
1.lock()&unlock()
这两个方法是ReentrantLock
中最基础的方法,我们先来看一下非公平的实现:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
这里涉及到三个方法:compareAndSetState
、setExclusiveOwnerThread
、acquire
,这三个方法都是AbstractQueuedSynchronizer
中的方法。
compareAndSetState
:简称cas,是一种无锁算法,底层对应的是一个CPU指令,该指令接收3个参数(内存地址、预期值、新值),执行过程为判断内存地址的值是否是预期值,如果是的话,就将该内存地址的值更新为新值,否则什么都不做。
setExclusiveOwnerThread
:设置当前锁被哪个线程占有。
acruire
:具体实现如下:
// 获得锁
public final void acquire(int arg) {
// 如果tryAcquire成功,则直接返回
// 如果tryAcquire失败,则会执行addWaiter将当前线程入队,然后执行acquireQueued,返回true说明当前线程被唤醒了,已经拿到了锁,但是需要被中断,返回false说明已经成功获得了锁,并且没有被中断
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); // 执行中断
}
// 尝试获得锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// 非公平获得锁过程
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 获取当前同步状态
if (c == 0) { // 0表示当前锁未被占有
if (compareAndSetState(0, acquires)) { // 执行cas操作抢占锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 锁被当前线程占有,可重入
int nextc = c + acquires;
if (nextc < 0) // 溢出,超过int型的容量限制
throw new Error("Maximum lock count exceeded");
setState(nextc); // 将status更新
return true;
}
return false;
}
// 将当前线程加入到等待队列,队列基于链表实现
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 如果可以一次操作成功,则操作完成后立即返回
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// compareAndSetTail失败,说明存在竞争,重新执行插入(无限循环尝试)
enq(node);
return node;
}
// 线程将会被阻塞在这里,直到获得锁时结束,返回的boolean值表示是否需要被中断
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false; // 默认没有中断
for (;;) {
final Node p = node.predecessor();
// 前继节点是head,并且当前线程成功获得锁,说明前继节点已经释放了锁,可以出队了
if (p == head && tryAcquire(arg)) {
setHead(node); // 将当前节点设置为head
p.next = null; // help GC
return interrupted;
}
// 获取锁失败时需要被阻塞,并且检查当前线程需要被中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}
上面涉及到多个方法,逻辑较为复杂,简单总结一下:当一个线程尝试获得锁时,先执行一次cas指令,如果成功,则获得锁,如果失败,则进入等待队列,直到被唤醒,在等待队列中自旋等待时,不响应中断,但是会被是否有中断记录下来,等拿到锁后再响应中断。
我们再看unlock方法。
public void unlock() {
sync.release(1);
}
只有一行代码,再看看具体实现:
public final boolean release(int arg) {
if (tryRelease(arg)) { // 释放成功
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒头节点的后继节点
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 当前线程不是占有锁的线程,理论上不应该出现这种情况,抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // c=0说明此次释放之后,已经没有线程占有锁了
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
上面的lock与unlock我们看的都是非公平的实现,下面我们再来看一下公平锁的实现:
final void lock() {
acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (isFirst(current) &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
我们可以看到,公平锁与非公平锁的区别其实只有一句:
isFirst(current)
公平锁,只有当isFirst返回true,并且cas操作成功,才会获得锁。看来公平性的关键就在这里了,我们再来看一下isFirst
的实现。
final boolean isFirst(Thread current) {
Node h, s;
// head为空说明队列为空,当前线程是第一个节点
// head不为空,但是current是head的第一个后继,也说明current是实际有效的第一个节点(head中的thread为null)
return ((h = head) == null ||
((s = h.next) != null && s.thread == current) ||
fullIsFirst(current));
}
// 从tail开始一直往前找,直到节点的thread为nul,如果最终发现的目标为current,说明current为第一个节点
final boolean fullIsFirst(Thread current) {
Node h, s;
Thread firstThread = null;
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (firstThread = s.thread) != null))
return firstThread == current;
Node t = tail;
while (t != null && t != head) {
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread == current || firstThread == null;
}
整理一下上面的逻辑:当当前线程处于队列当中第一个实际有效的线程时,isFirst才会返回true,而队列是按照FIFO来排队的,这样就保证了锁的公平性。而非公平的锁,每个线程都在不停的竞争锁,一旦acs操作成功,不管该线程处于队列中哪个位置,都可以获得锁。
2.lockInterruptibly
我们再来看一下lockInterruptibly,它支持线程在等待锁的过程中放弃等待,响应中断,我们看一下具体实现:
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted()) // 如有中断,立即抛出中断异常
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// 可中断的锁获取过程
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //注意这里,和非中断模式的区别,非中断模式只是标记,这里会break
break;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// 退出锁竞争
cancelAcquire(node);
// 抛出中断异常
throw new InterruptedException();
}
ReentrantLock
类代码较多,我们这里重点分析了公平锁、非公平锁、中断与非中断锁的实现逻辑,读者有兴趣也可以阅读更多的代码逻辑哈。在下一篇文章中,我们将分析ReentrantReadWriteLock
读写锁,敬请期待!
参考资料
本文已迁移至我的博客:http://ipenge.com/1252.html