并发工具-JUC中的并发
- Lock
- Semaphore
- CountdownLatch
- CyclicBarrier
1. AQS原理
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
AQS,AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架,特点有
- 用state属性来表示资源状态,分为独占模式和共享模式,子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- getState,获取状态
- setState,设置状态
- compareAndSetState,cas机制设置state状态
- 独占模式只允许一个线程访问资源,共享模式允许多个线程访问资源
- 提供了基于FIFO的等待队列,类似于Monitor的EntryList
- AQS的等待队列是CLH
- The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks.
- 条件变量实现等待和唤醒机制,支持多个条件变量,类似于Monitor的WaitSet
- AQS使用park和unpark实现阻塞和释放
子类要实现以下方法
- tryAcquire,尝试获取锁
- tryRelease,尝试释放锁
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
使用AQS自定义不可重入锁,
class MyLock implements Lock {
// 独占锁
class MySync extends AbstractQueuedSynchronizer {
@Override // 尝试获取锁
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
// 加上了锁,并设置上锁线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override // 尝试释放锁
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
// state使用volatile修饰,放在后面,可以使得设置线程语句对其他线程可见
setState(0);
return true;
}
@Override // 是否持有独占锁
protected boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
public Condition newCondition() {
return new ConditionObject();
}
}
private MySync sync = new MySync();
// 加锁,不成功等待
@Override
public void lock() {
sync.acquire(1);
}
// 加锁,可打断
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 尝试加锁
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
// 尝试在限定时间加锁
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
// 解锁
@Override
public void unlock() {
sync.release(0);
}
// 创建条件变量
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
基于AQS的锁,需要在内部定义一个AQS的子类,是一种典型的模板设计模式。
2. ReentrantLock原理
有了synchronized,为什么还需要JDK自定义AQS? AQS源码分析
3. ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用读写锁让读操作可以并发,提高性能。类似于数据库中的乐观锁,select ... from ... lock in share mode
。
使用如下
class DataContainer {
private Object data;
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock rLock = rwLock.readLock();
private ReentrantReadWriteLock.WriteLock wLock = rwLock.writeLock();
public Object read() {
log.debug("获取读锁...");
Object res = null;
rLock.lock();
try {
log.debug("读取");
Thread.sleep(2000);
res = data;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放读锁...");
rLock.unlock();
}
return res;
}
public void write() {
log.debug("获取写锁...");
wLock.lock();
try {
log.debug("写入");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放写锁...");
wLock.unlock();
}
}
}
- 读锁是乐观锁,写锁是悲观锁
- 读锁不支持条件变量,写锁支持
- 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致写锁永久等待
- 重入时降级支持:即持有写锁的情况下,可以获得读锁
ReentrantReadWriteLock源码中提供了这部分的示例
- 读写锁使用的是同一个Sync同步器,因此,等待队列和state也是同一个
- 写锁占据state的低16位,读锁占据state的高16位
4. StampedLock
该类自JDK 8加入,是为了进一步优化读性能,它的特点是在使用读写锁时都必须配合【戳】使用。
- 使用stamp验证锁的有效性
- 在调用方法
tryOptimisticRead
时,并没有真正的加锁 - 之后,需要验证数据是否有修改,调用
validate
,验证stamp -
validate
失败后,需要尝试添加真正的读锁,调用readLock
- 源码中给出了锁升级的例子
class DataContainerStamped {
private int data;
private final StampedLock lock = new StampedLock();
public DataContainerStamped(int data) {
this.data = data;
}
public int read(long readTime) {
long stamp = lock.tryOptimisticRead();
log.debug("尝试获取乐观读锁 {}", stamp);
if (lock.validate(stamp)) {
log.debug("数据读取成功 {}", stamp);
return data;
}
log.debug("尝试获取乐观读锁失败 {}", stamp);
stamp = lock.readLock();
int result = 0;
try {
log.debug("获取读锁 {}", stamp);
Thread.sleep(readTime);
result = data;
log.debug("读取成功 {}", stamp);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放读锁 {}", stamp);
lock.unlockRead(stamp);
}
return result;
}
public void write(int newData) {
long stamp = lock.writeLock();
log.debug("添加写锁 {}", stamp);
try {
Thread.sleep(2000);
data = newData;
log.debug("写入成功 {}", stamp);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
log.debug("释放写锁 {}", stamp);
lock.unlockWrite(stamp);
}
}
}
缺陷:
- StampedLock不支持条件变量
- StampedLock不支持可重入
5. Semaphore
信号量,用来限制能同时访问共享资源的线程上限。
- 构造方法传入允许的线程数量
- 线程中需要调用
acquire
方法,获取锁 - 调用完成后,释放锁,
release
应用:
- 使用Semaphore限流,它仅仅是限制线程数,而不是限制资源数(例如连接数,可以对比Tomcat中LimitLatch的实现)
- 可以用Semaphore实现简单的连接池,用于线程数与资源数一一对应的情形(例如数据库连接池),性能和可读性显然更好
6. CountdownLatch
倒计时锁,用来进行线程同步协作,等待所有线程完成倒计时。
join
方法无法应用于可复用线程的线程池中,CountdownLatch搭配线程池,可以完成线程协作,如下
ExecutorService pool = Executors.newFixedThreadPool(4);
CountDownLatch latch = new CountDownLatch(3);
pool.submit(() -> {
log.debug("begin...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...");
});
pool.submit(() -> {
log.debug("begin...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...");
});
pool.submit(() -> {
log.debug("begin...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
log.debug("end...");
});
pool.submit(() -> {
try {
log.debug("wait...");
latch.await();
log.debug("end wait...");
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("do something...");
log.debug("end...");
});
存在的问题:
- CountdownLatch无法重用,不能重新设置次数
7. CyclicBarrier
循环栅栏,用来进行线程协作,等待线程满足某个计数。与CountdownLatch类似,区别是,CyclicBarrier可以重用。
在计数变为0后,再次调用await
方法,计数重新开始。
ExecutorService service = Executors.newFixedThreadPool(2);
CyclicBarrier barrier = new CyclicBarrier(2, () -> {
log.debug("task1 task2 finish...");
});
for (int i = 0; i < 3; i++) {
service.submit(() -> {
log.debug("task1 begin...");
try {
Thread.sleep(1000);
barrier.await(); // 类似于countDown,返回值大于0时,阻塞
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
log.debug("task1 end...");
});
service.submit(() -> {
log.debug("task2 begin...");
try {
Thread.sleep(2000);
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
log.debug("task2 end...");
});
}
service.shutdown();
构造方法可以添加一个Runnable参数,在计数为0时运行。