08. 并发工具-JUC中的并发

并发工具-JUC中的并发

  • Lock
  • Semaphore
  • CountdownLatch
  • CyclicBarrier

1. AQS原理

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable

AQS,AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架,特点有

  • 用state属性来表示资源状态,分为独占模式和共享模式,子类需要定义如何维护这个状态,控制如何获取锁和释放锁
    • getState,获取状态
    • setState,设置状态
    • compareAndSetState,cas机制设置state状态
  • 独占模式只允许一个线程访问资源,共享模式允许多个线程访问资源
  • 提供了基于FIFO的等待队列,类似于Monitor的EntryList
  • AQS的等待队列是CLH
  • The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks.
  • 条件变量实现等待和唤醒机制,支持多个条件变量,类似于Monitor的WaitSet
  • AQS使用park和unpark实现阻塞和释放

子类要实现以下方法

  • tryAcquire,尝试获取锁
  • tryRelease,尝试释放锁
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

使用AQS自定义不可重入锁,

class MyLock implements Lock {

    // 独占锁
    class MySync extends AbstractQueuedSynchronizer {

        @Override // 尝试获取锁
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                // 加上了锁,并设置上锁线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override // 尝试释放锁
        protected boolean tryRelease(int arg) {
            setExclusiveOwnerThread(null);
            // state使用volatile修饰,放在后面,可以使得设置线程语句对其他线程可见
            setState(0);
            return true;
        }

        @Override // 是否持有独占锁
        protected boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        public Condition newCondition() {
            return new ConditionObject();
        }
    }

    private MySync sync = new MySync();

    // 加锁,不成功等待
    @Override
    public void lock() {
        sync.acquire(1);
    }

    // 加锁,可打断
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    // 尝试加锁
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    // 尝试在限定时间加锁
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    // 解锁
    @Override
    public void unlock() {
        sync.release(0);
    }

    // 创建条件变量
    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

基于AQS的锁,需要在内部定义一个AQS的子类,是一种典型的模板设计模式。

2. ReentrantLock原理

ReentrantLock原理

有了synchronized,为什么还需要JDK自定义AQS? AQS源码分析

3. ReentrantReadWriteLock

当读操作远远高于写操作时,这时候使用读写锁让读操作可以并发,提高性能。类似于数据库中的乐观锁,select ... from ... lock in share mode

使用如下

class DataContainer {
    private Object data;
    private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
    private ReentrantReadWriteLock.ReadLock rLock = rwLock.readLock();
    private ReentrantReadWriteLock.WriteLock wLock = rwLock.writeLock();

    public Object read() {
        log.debug("获取读锁...");
        Object res = null;
        rLock.lock();
        try {
            log.debug("读取");
            Thread.sleep(2000);
            res = data;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            log.debug("释放读锁...");
            rLock.unlock();
        }
        return res;
    }

    public void write() {
        log.debug("获取写锁...");
        wLock.lock();
        try {
            log.debug("写入");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            log.debug("释放写锁...");
            wLock.unlock();
        }
    }
}
  • 读锁是乐观锁,写锁是悲观锁
  • 读锁不支持条件变量,写锁支持
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致写锁永久等待
  • 重入时降级支持:即持有写锁的情况下,可以获得读锁

ReentrantReadWriteLock源码中提供了这部分的示例

  • 读写锁使用的是同一个Sync同步器,因此,等待队列和state也是同一个
  • 写锁占据state的低16位,读锁占据state的高16位

4. StampedLock

该类自JDK 8加入,是为了进一步优化读性能,它的特点是在使用读写锁时都必须配合【戳】使用。

  • 使用stamp验证锁的有效性
  • 在调用方法tryOptimisticRead时,并没有真正的加锁
  • 之后,需要验证数据是否有修改,调用validate,验证stamp
  • validate失败后,需要尝试添加真正的读锁,调用readLock
  • 源码中给出了锁升级的例子
class DataContainerStamped {
    private int data;
    private final StampedLock lock = new StampedLock();

    public DataContainerStamped(int data) {
        this.data = data;
    }

    public int read(long readTime) {
        long stamp = lock.tryOptimisticRead();
        log.debug("尝试获取乐观读锁 {}", stamp);
        if (lock.validate(stamp)) {
            log.debug("数据读取成功 {}", stamp);
            return data;
        }
        log.debug("尝试获取乐观读锁失败 {}", stamp);
        stamp = lock.readLock();
        int result = 0;
        try {
            log.debug("获取读锁 {}", stamp);
            Thread.sleep(readTime);
            result = data;
            log.debug("读取成功 {}", stamp);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            log.debug("释放读锁 {}", stamp);
            lock.unlockRead(stamp);
        }
        return result;
    }

    public void write(int newData) {
        long stamp = lock.writeLock();
        log.debug("添加写锁 {}", stamp);
        try {
            Thread.sleep(2000);
            data = newData;
            log.debug("写入成功 {}", stamp);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            log.debug("释放写锁 {}", stamp);
            lock.unlockWrite(stamp);
        }
    }
}

缺陷:

  • StampedLock不支持条件变量
  • StampedLock不支持可重入

5. Semaphore

信号量,用来限制能同时访问共享资源的线程上限。

  • 构造方法传入允许的线程数量
  • 线程中需要调用acquire方法,获取锁
  • 调用完成后,释放锁,release

应用:

  • 使用Semaphore限流,它仅仅是限制线程数,而不是限制资源数(例如连接数,可以对比Tomcat中LimitLatch的实现)
  • 可以用Semaphore实现简单的连接池,用于线程数与资源数一一对应的情形(例如数据库连接池),性能和可读性显然更好

6. CountdownLatch

倒计时锁,用来进行线程同步协作,等待所有线程完成倒计时。

join方法无法应用于可复用线程的线程池中,CountdownLatch搭配线程池,可以完成线程协作,如下

ExecutorService pool = Executors.newFixedThreadPool(4);
CountDownLatch latch = new CountDownLatch(3);
pool.submit(() -> {
    log.debug("begin...");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    latch.countDown();
    log.debug("end...");
});
pool.submit(() -> {
    log.debug("begin...");
    try {
        Thread.sleep(1500);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    latch.countDown();
    log.debug("end...");
});
pool.submit(() -> {
    log.debug("begin...");
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    latch.countDown();
    log.debug("end...");
});
pool.submit(() -> {
    try {
        log.debug("wait...");
        latch.await();
        log.debug("end wait...");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    log.debug("do something...");
    log.debug("end...");
});

存在的问题:

  • CountdownLatch无法重用,不能重新设置次数

7. CyclicBarrier

循环栅栏,用来进行线程协作,等待线程满足某个计数。与CountdownLatch类似,区别是,CyclicBarrier可以重用。

在计数变为0后,再次调用await方法,计数重新开始。

ExecutorService service = Executors.newFixedThreadPool(2);
CyclicBarrier barrier = new CyclicBarrier(2, () -> {
    log.debug("task1 task2 finish...");
});
for (int i = 0; i < 3; i++) {
    service.submit(() -> {
        log.debug("task1 begin...");
        try {
            Thread.sleep(1000);
            barrier.await(); // 类似于countDown,返回值大于0时,阻塞
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        log.debug("task1 end...");
    });
    service.submit(() -> {
        log.debug("task2 begin...");
        try {
            Thread.sleep(2000);
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        log.debug("task2 end...");
    });
}
service.shutdown();

构造方法可以添加一个Runnable参数,在计数为0时运行。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容