背景
近来多有涉及,但多数时候都是浅尝辄止,其具体实现原理及应用未有系统总结。近日特地学习了一下相关源码。对AtomicInteger, ReentrantLock, Semaphore这几部分做个简单总结。
演化历程
Java 5之前多线程通过synchronized关键字来实现,简单回顾,其使用方式分为:
The synchronized keyword can be used to mark four different types of blocks:
- Instance methods
- Static methods
- Code blocks inside instance methods
public void add(int value){
synchronized(this){
this.count += value;
}
}
- Code blocks inside static methods
几种使用也都很常见了,整体这种方式的concurrency model被叫做shared state concurrency model
当然,由于动辄block掉一整块代码的执行,这种方式的concurrency还是在性能上有很大的劣势。只是日常工作中有时候相应的concurrency情景对性能要求不是那么高,所以简单稳妥的方式,比如整个method加上synchronized keyword也是可接受的。
其他劣势还包括:
- 不同线程不能保证按照请求时间顺序执行(ReentrantLock的公平锁)
- 对于两个及以上thread希望同时进行读不能很好的支持
- 多个thread希望进入同一个代码块(Semaphore
)
Synchronized blocks in Java have several limitations. For instance, a synchronized block in Java only allows a single thread to enter at a time. However, what if two threads just wanted to read a shared value, and not update it? That might be safe to allow. As alternative to a synchronized block you could guard the code with a Read / Write Lock which as more advanced locking semantics than a synchronized block. Java actually comes with a built in ReadWriteLock class you can use.What if you want to allow N threads to enter a synchronized block, and not just one? You could use a Semaphore to achieve that behaviour. Java actually comes with a built-in Java Semaphore class you can use.Synchronized blocks do not guarantee in what order threads waiting to enter them are granted access to the synchronized block. What if you need to guarantee that threads trying to enter a synchronized block get access in the exact sequence they requested access to it? You need to implement Fairness yourself. What if you just have one thread writing to a shared variable, and other threads only reading that variable? Then you might be able to just use a volatile variable without any synchronization around
Java 5以后针对这些问题,对concurrency做出了很多提高。以下就针对AtomicInteger, ReentrantLock, Semaphore这几部分简单总结。
AtomicInteger
位于Package java.util.concurrent.atomic, 顾名思义,希望保证integer的原子性,官方介绍是
A small toolkit of classes that support lock-free thread-safe programming on single variables.
其常用用例就是实现一个计数器Counter, 多线程下多个线程都在修改计数器进行递增,典型的race condition。
要解决这个问题:
The Old School
通过synchronized关键字也能解决这个问题,就是把递增对应的method锁住,同一时间只有同一个线程可以进入该code block。
public class Counter{
long count = 0;
public synchronized void add(long value){
this.count += value;
}
}
public class CounterThread extends Thread{
protected Counter counter = null;
public CounterThread(Counter counter){
this.counter = counter;
}
public void run() {
for(int i=0; i<10; i++){
counter.add(i);
}
}
}
AtomicInteger实现方式
前一部分介绍过这种方式不好,而AtomicInteger恰好能同时保证thread-safe和lock-free。
通过AtomicInteger可以不block住其他线程同时实现counter的thread-safe.
public class SafeCounterWithoutLock {
private final AtomicInteger counter = new AtomicInteger(0);
public int getValue() {
return counter.get();
}
public void increment() {
while(true) {
int existingValue = getValue();
int newValue = existingValue + 1;
if(counter.compareAndSet(existingValue, newValue)) {
return;
}
}
}
}
AtomicInteger如何做到的
通过阅读AtomicInteger源代码可以发现其多数method都是基于这个compareAndSet method实现的
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return true if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
可以看到其底层又是依赖sun.misc.Unsafe实现的。Unsafe是一个单例,通过Reflection拿到然后进行一些内存等底层操作。这里AtomicInteger采用的是Unsafe的compare-and-swap指令级操作。也有观点认为compare-and-swap本质上就是乐观锁。
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object update);
public final native boolean compareAndSwapInt(Object o, long offset, int expected,int update);
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long update);
关于Unsafe更多的介绍参考链接。
ReentrantLock
顾名思义,相形于传统的Lock, ReentrantLock实现了锁的可重入。其关键的好处在于增加了灵活性。下面是官方文档对其总结
A reentrant mutual exclusion
Lock
with the same basic behavior and semantics as the implicit monitor lock accessed usingsynchronized
methods and statements, but with extended capabilities.AReentrantLock
is owned by the thread last successfully locking, but not yet unlocking it. A thread invokinglock
will return, successfully acquiring the lock, when the lock is not owned by another thread. The method will return immediately if the current thread already owns the lock. This can be checked using methodsisHeldByCurrentThread()
, andgetHoldCount()
.The constructor for this class accepts an optional fairness parameter. When settrue
, under contention, locks favor granting access to the longest-waiting thread. Otherwise this lock does not guarantee any particular access order. Programs using fair locks accessed by many threads may display lower overall throughput (i.e., are slower; often much slower) than those using the default setting, but have smaller variances in times to obtain locks and guarantee lack of starvation. Note however, that fairness of locks does not guarantee fairness of thread scheduling. Thus, one of many threads using a fair lock may obtain it multiple times in succession while other active threads are not progressing and not currently holding the lock. Also note that the untimedtryLock
method does not honor the fairness setting. It will succeed if the lock is available even if other threads are waiting.
简单实例
public void some_method()
{
reentrantlock.lock();
try
{
//Do some work
}
catch(Exception e)
{
e.printStackTrace();
}
finally
{
reentrantlock.unlock();
}
}
典型的问题是,现有两个线程,交替进行打印操作。
public void dao(Runnable printDao) throws InterruptedException {
lock.lock();
for (int i = 0; i < n; i++) {
while (isEmpty) {
condition.await();
}
isEmpty = true;
printDao.run();
condition.signal();
}
lock.unlock();
}
public void fish(Runnable printFish) throws InterruptedException {
lock.lock();
for (int i = 0; i < n; i++) {
while (!isEmpty) {
condition.await();
}
isEmpty = false;
printFish.run();
condition.signal();
}
lock.unlock();
}
公平性问题
ReentrantLock为了效率考量默认是不保证公平性的。需要保证公平性,则通过调用构造函数的时候专门设置。
ReentrantLock(boolean fair)
Creates an instance ofReentrantLock
with the given fairness policy.
而公平(fairness)一词其实就是各个线程调用lock方法有先后顺序,谁等的最久就给谁。
原理学习
ReentrantLock的实现是基于AQS(AbstractQueuedSynchronizer)的,AQS做的事情其实就是,如果请求的资源未被占用,则将该资源让当前线程独占,如果不是的话,则让线程阻塞放入队列中(基于Craig、Landin and Hagersten队列),等待资源释放(lock.unlock())后从队列中唤醒相应的线程将资源让该线程独占。
ReentrantLock实现了一个内部类Sync, 基于是否公平,则会调用NonfairSync 或者FairSync。
至于前面实例中的lock.newCondition方法则实现了传统的用Object当Lock的时候的那些个wait, notify, notifyAll方法。
Condition newCondition() Returns a
Condition
instance for use with thisLock
instance.The returnedCondition
instance supports the same usages as do theObject
monitor methods (wait
,notify
, andnotifyAll
) when used with the built-in monitor lock. If this lock is not held when any of theCondition
waiting or signalling methods are called, then anIllegalMonitorStateException
is thrown.When the condition waiting methods are called the lock is released and, before they return, the lock is reacquired and the lock hold count restored to what it was when the method was called. If a thread is interrupted while waiting then the wait will terminate, anInterruptedException
will be thrown, and the thread's interrupted status will be cleared. Waiting threads are signalled in FIFO order. The ordering of lock reacquisition for threads returning from waiting methods is the same as for threads initially acquiring the lock, which is in the default case not specified, but for fair locks favors those threads that have been waiting the longest.
Semaphore
Semaphore的应用场景就是有多个线程,我们支持里面n个同时占用资源,而不是一个线程独占。那么,Semaphore则很好的支持了这个情况。
哲学家用餐问题
典型的哲学家用餐问题,我们希望防止死锁,回顾一下,5个哲学家做一个圆桌,有五个叉子,一个人同时左右手拿到两个叉子才能吃饭,如果五个人各自拿一个叉子且等待另外一个,那么谁也吃不到,也就是deadlock问题。
转化一下这个问题,如果我们限制一下同一时间只能有4个人拿叉子,deadlock问题是不是就迎刃而解了呢。
这里Semaphore的实现方式。
public DiningPhilosophers() {
locks = new ReentrantLock[5];
for (int i = 0; i < 5; i++) {
locks[i] = new ReentrantLock();
}
semaphore = new Semaphore(4);
}
public void wantsToEat(int philosopher,
Runnable pickLeftFork,
Runnable pickRightFork,
Runnable eat,
Runnable putLeftFork,
Runnable putRightFork) throws InterruptedException {
int leftIdx = (philosopher - 1 + 5) % 5;
int rightIdx = philosopher;
semaphore.acquire();
locks[leftIdx].lock();
locks[rightIdx].lock();
pickLeftFork.run();
pickRightFork.run();
eat.run();
putLeftFork.run();
putRightFork.run();
locks[leftIdx].unlock();
locks[rightIdx].unlock();
semaphore.release();
}
注意这里五个folk用了5个ReentrantLock, semaphore则保证最多同时四个人有folk.
原理
也是基于AQS实现的。