并发(六)

八、AQS(AbstractQueuedSynchronizer)

从JDK1.5开始,引入了java并发包JUC,JUC大大提高了java的并发性能,而AQS就是JUC的核心
AQS提供了FIFO(先进先出)的队列,这个队列可以用来构建锁等其它同步的基础框架, 它的底层使用了双向链表,是队列的一种实现,因此我们也可以把它当做一个队列。


image.png

image.png

image.png

AQS使用方法是基础,采用了模板方法设计模式 ,使用AQS需要覆写其中的方法。


AQS的同步组件就是AQS的实现类

image.png

image.png

1、CountDownLatch(闭锁 )


CountDownLatch是一个同步辅助类,通过它可以完成阻塞当前线程的功能,换句话说就是,一个线程或多个线程一直等待,直到其它线程执行的操作完成。CountDownLatch通过一个给定的计数器来完成初始化, 该计数器的操作是原子操作,只有一个线程能操作该计数器, 调用该类await()方法的线程会一直处于阻塞状态, 直到其它线程调用countDown()方法,每次调用减一,直到当前计数器的值变为0,之后所有因调用await()方法而阻塞的线程,就会往下执行。这种操作只能执行一次,因为这种计数器是不能重置的。如果有需要重置计数器要求的需求,可以考虑CyclicBarrier

CountDownLatch使用场景
程序执行需要等待某个条件完成后,才能继续执行后续的操作。典型的应用比如并行计算,当某个计算很大时,可以将这个任务拆分成多个小的子任务,等待所有子任务都执行完成后,父任务拿到所有子任务的执行结果,进行汇总计算。

public class CountDownLatchExample1 {

   private final static int threadCount = 200;

   public static void main(String[] args) throws Exception {

       ExecutorService exec = Executors.newCachedThreadPool();

       final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   test(threadNum);
               } catch (Exception e) {
                   log.error("exception", e);
               } finally {
                   countDownLatch.countDown();
               }
           });
       }
       countDownLatch.await();
       log.info("finish");
       exec.shutdown();
   }

   private static void test(int threadNum) throws Exception {
       Thread.sleep(100);
       log.info("{}", threadNum);
       Thread.sleep(100);
   }
}
public class CountDownLatchExample2 {

   private final static int threadCount = 10;

   public static void main(String[] args) throws Exception {

       ExecutorService exec = Executors.newCachedThreadPool();

       final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   test(threadNum);
               } catch (Exception e) {
                   log.error("exception", e);
               } finally {
                   countDownLatch.countDown();
               }
           });
       }
       countDownLatch.await(10, TimeUnit.MILLISECONDS); //超过10毫秒,就不等了
       log.info("finish");
       exec.shutdown();  //未执行完的子线程并不是都关闭了,会等子线程都执行完才关闭
   }

   private static void test(int threadNum) throws Exception {
       Thread.sleep(7);
       log.info("{}", threadNum);
   }
}
执行结果:
[pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 3
[main] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - finish
[pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 4
[pool-1-thread-9] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 8
[pool-1-thread-10] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 9
[pool-1-thread-7] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 6
[pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 5
[pool-1-thread-8] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 7
[pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 0
[pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 2
[pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 1

2、Semaphore

Semaphore可以控制并发访问某个资源的线程个数。
使用场景比如:数据库访问(最大访问线程数20,但现实访问量会远远大于20,不加限制同时访问数据库,会造成获取不到连接而产生异常)
当Semaphore将并发访问控制到1,就是单线程环境

public class SemaphoreExample1 {

   private final static int threadCount = 20;

   public static void main(String[] args) throws Exception {

       ExecutorService exec = Executors.newCachedThreadPool();

       final Semaphore semaphore = new Semaphore(3);

       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   semaphore.acquire(); // 获取一个许可
                   test(threadNum);
                   semaphore.release(); // 释放一个许可
               } catch (Exception e) {
                   log.error("exception", e);
               }
           });
       }
       exec.shutdown();
   }

   private static void test(int threadNum) throws Exception {
       log.info("{}", threadNum);
       Thread.sleep(1000);
   }
}
执行结果,注意时间: 3个3个执行
10:11:55.888 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 2
10:11:55.888 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 1
10:11:55.888 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 0
10:11:56.896 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 3
10:11:56.896 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 4
10:11:56.896 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 5
10:11:57.898 [pool-1-thread-7] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 6
10:11:57.898 [pool-1-thread-8] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 7
10:11:57.898 [pool-1-thread-9] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 8
10:11:58.900 [pool-1-thread-10] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 9
10:11:58.900 [pool-1-thread-11] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 10
10:11:58.900 [pool-1-thread-12] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 11
10:11:59.902 [pool-1-thread-13] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 12
10:11:59.902 [pool-1-thread-15] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 14
10:11:59.902 [pool-1-thread-14] INFO com.mmall.concurrency.example.aqs.SemaphoreExample1 - 13
 semaphore.acquire(3); // 获取多个许可
 test(threadNum);
 semaphore.release(3); // 释放多个许可

执行结果,和单线程相似
10:14:13.294 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.SemaphoreExample2 - 0
10:14:14.298 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.SemaphoreExample2 - 1
10:14:15.300 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.SemaphoreExample2 - 2
10:14:16.304 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.SemaphoreExample2 - 3
10:14:17.305 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.SemaphoreExample2 - 4
public class SemaphoreExample3 {

   private final static int threadCount = 20;

   public static void main(String[] args) throws Exception {

       ExecutorService exec = Executors.newCachedThreadPool();

       final Semaphore semaphore = new Semaphore(3);

       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   if (semaphore.tryAcquire()) { // 尝试获取一个许可,获取不到的线程就丢弃
                       test(threadNum);
                       semaphore.release(); // 释放一个许可
                   }
               } catch (Exception e) {
                   log.error("exception", e);
               }
           });
       }
       exec.shutdown();
   }

   private static void test(int threadNum) throws Exception {
       log.info("{}", threadNum);
       Thread.sleep(1000);
   }
}
执行结果,只打印了三个线程,其它全被丢弃了
10:17:44.122 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.SemaphoreExample3 - 2
10:17:44.122 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.SemaphoreExample3 - 0
10:17:44.122 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.SemaphoreExample3 - 1
public class SemaphoreExample4 {

   private final static int threadCount = 20;

   public static void main(String[] args) throws Exception {

       ExecutorService exec = Executors.newCachedThreadPool();

       final Semaphore semaphore = new Semaphore(3);

       for (int i = 0; i < threadCount; i++) {
           final int threadNum = i;
           exec.execute(() -> {
               try {
                   if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可,等5000毫秒,后面的就丢弃
                       test(threadNum);
                       semaphore.release(); // 释放一个许可
                   }
               } catch (Exception e) {
                   log.error("exception", e);
               }
           });
       }
       exec.shutdown();
   }

   private static void test(int threadNum) throws Exception {
       log.info("{}", threadNum);
       Thread.sleep(1000);
   }
}
执行结果:3个3个执行,20个线程,5000毫秒执行了15个,其它就不执行了
10:20:53.151 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 2
10:20:53.151 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 1
10:20:53.151 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 0
10:20:54.158 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 4
10:20:54.158 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 3
10:20:54.158 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 5
10:20:55.160 [pool-1-thread-7] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 6
10:20:55.160 [pool-1-thread-8] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 7
10:20:55.160 [pool-1-thread-9] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 8
10:20:56.165 [pool-1-thread-10] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 9
10:20:56.166 [pool-1-thread-11] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 10
10:20:56.166 [pool-1-thread-12] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 11
10:20:57.169 [pool-1-thread-13] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 12
10:20:57.169 [pool-1-thread-14] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 13
10:20:57.169 [pool-1-thread-15] INFO com.mmall.concurrency.example.aqs.SemaphoreExample4 - 14

3、CyclicBarrier

image.png

CyclicBarrier也是一个同步辅助类,它允许一组线程相互等待,直到到达某个工作屏障点,通过它可以完成多个线程之间相互等待,只有当每个线程都准备就绪后,才能各自向下继续执行。它和CountDownLatch有些相同的地方,都是用计数器实现的,当某个线程调用了await()方法之后,该线程就进入了等待状态,而且计数器执行是+1操作。当计数器的值达到了我们设置的初始值时,因为调用await()方法而进入等待状态的线程会被唤醒,继续执行他们的操作。由于CycliBarrier在释放线程后还可以重用(使用reset()方法),所以我们也称它为循环屏障。
CyclicBarrier的使用场景和CountDownLatch相似
CyclicBarrier强调的是多个线程间的相互等待

public class CyclicBarrierExample1 {

   private static CyclicBarrier barrier = new CyclicBarrier(5);

   public static void main(String[] args) throws Exception {

       ExecutorService executor = Executors.newCachedThreadPool();

       for (int i = 0; i < 10; i++) {
           final int threadNum = i;
           Thread.sleep(1000);
           executor.execute(() -> {
               try {
                   race(threadNum);
               } catch (Exception e) {
                   log.error("exception", e);
               }
           });
       }
       executor.shutdown();
   }

   private static void race(int threadNum) throws Exception {
       Thread.sleep(1000);
       log.info("{} is ready", threadNum);
       barrier.await();  
       log.info("{} continue", threadNum);
   }
}
执行结果:每等够五个线够一组,就一组一起往下执行
10:39:53.964 [pool-1-thread-1]  0 is ready
10:39:54.965 [pool-1-thread-2]  1 is ready
10:39:55.970 [pool-1-thread-3]  2 is ready
10:39:56.973 [pool-1-thread-4]  3 is ready
10:39:57.978 [pool-1-thread-5]  4 is ready
10:39:57.978 [pool-1-thread-5]  4 continue
10:39:57.978 [pool-1-thread-1]  0 continue
10:39:57.978 [pool-1-thread-2]  1 continue
10:39:57.979 [pool-1-thread-3]  2 continue
10:39:57.979 [pool-1-thread-4]  3 continue
10:39:58.983 [pool-1-thread-6]  5 is ready
10:39:59.987 [pool-1-thread-4]  6 is ready
10:40:00.988 [pool-1-thread-3]  7 is ready
10:40:01.988 [pool-1-thread-1]  8 is ready
10:40:02.990 [pool-1-thread-2]  9 is ready
10:40:02.990 [pool-1-thread-2]  9 continue
10:40:02.990 [pool-1-thread-6]  5 continue
10:40:02.991 [pool-1-thread-4]  6 continue
10:40:02.991 [pool-1-thread-1]  8 continue
10:40:02.991 [pool-1-thread-3]  7 continue
public class CyclicBarrierExample2 {

   private static CyclicBarrier barrier = new CyclicBarrier(5);

   public static void main(String[] args) throws Exception {

       ExecutorService executor = Executors.newCachedThreadPool();

       for (int i = 0; i < 10; i++) {
           final int threadNum = i;
           Thread.sleep(1000);
           executor.execute(() -> {
               try {
                   race(threadNum);
               } catch (Exception e) {
                   log.error("exception", e);
               }
           });
       }
       executor.shutdown();
   }

   private static void race(int threadNum) throws Exception {
       Thread.sleep(1000);
       log.info("{} is ready", threadNum);
       try {
           barrier.await(2000, TimeUnit.MILLISECONDS);
       } catch (Exception e) {
           log.warn("BarrierException");
       }
       log.info("{} continue", threadNum);
   }
}
执行结果:最多等待2000毫秒,超时的就抛出异常,不执行
10:44:13.480 [pool-1-thread-1]  0 is ready
10:44:14.480 [pool-1-thread-2]  1 is ready
10:44:15.484 [pool-1-thread-3]  2 is ready
10:44:15.490 [pool-1-thread-1]  BarrierException
10:44:15.490 [pool-1-thread-1]  0 continue
10:44:15.490 [pool-1-thread-2]  BarrierException
10:44:15.490 [pool-1-thread-3]  BarrierException
10:44:15.490 [pool-1-thread-2]  1 continue
10:44:15.490 [pool-1-thread-3]  2 continue
10:44:16.486 [pool-1-thread-4]  3 is ready
10:44:16.487 [pool-1-thread-4]  BarrierException
10:44:16.487 [pool-1-thread-4]  3 continue
10:44:17.490 [pool-1-thread-3]  4 is ready
10:44:17.491 [pool-1-thread-3]  BarrierException
10:44:17.491 [pool-1-thread-3]  4 continue
10:44:18.492 [pool-1-thread-4]  5 is ready
10:44:18.492 [pool-1-thread-4]  BarrierException
10:44:18.492 [pool-1-thread-4]  5 continue
10:44:19.496 [pool-1-thread-3]  6 is ready
10:44:19.496 [pool-1-thread-3]  BarrierException
10:44:19.496 [pool-1-thread-3]  6 continue
10:44:20.500 [pool-1-thread-4]  7 is ready
10:44:20.500 [pool-1-thread-4]  BarrierException
10:44:20.500 [pool-1-thread-4]  7 continue
public class CyclicBarrierExample3 {

   private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
       log.info("callback is running");
   });

   public static void main(String[] args) throws Exception {

       ExecutorService executor = Executors.newCachedThreadPool();

       for (int i = 0; i < 10; i++) {
           final int threadNum = i;
           Thread.sleep(1000);
           executor.execute(() -> {
               try {
                   race(threadNum);
               } catch (Exception e) {
                   log.error("exception", e);
               }
           });
       }
       executor.shutdown();
   }

   private static void race(int threadNum) throws Exception {
       Thread.sleep(1000);
       log.info("{} is ready", threadNum);
       barrier.await();
       log.info("{} continue", threadNum);
   }
}
执行结果:每一组准备就绪,优先执行new CyclicBarrier(5, () -> { log.info("callback is running"); });
10:47:41.673 [pool-1-thread-1]  0 is ready
10:47:42.674 [pool-1-thread-2]  1 is ready
10:47:43.677 [pool-1-thread-3]  2 is ready
10:47:44.682 [pool-1-thread-4]  3 is ready
10:47:45.685 [pool-1-thread-5]  4 is ready
10:47:45.685 [pool-1-thread-5]  callback is running
10:47:45.685 [pool-1-thread-5]  4 continue
10:47:45.685 [pool-1-thread-1]  0 continue
10:47:45.685 [pool-1-thread-2]  1 continue
10:47:45.685 [pool-1-thread-3]  2 continue
10:47:45.685 [pool-1-thread-4]  3 continue
10:47:46.690 [pool-1-thread-6]  5 is ready
10:47:47.694 [pool-1-thread-4]  6 is ready
10:47:48.695 [pool-1-thread-3]  7 is ready
10:47:49.698 [pool-1-thread-1]  8 is ready
10:47:50.702 [pool-1-thread-2]  9 is ready
10:47:50.702 [pool-1-thread-2]  callback is running
10:47:50.702 [pool-1-thread-2]  9 continue
10:47:50.703 [pool-1-thread-6]  5 continue
10:47:50.703 [pool-1-thread-4]  6 continue
10:47:50.703 [pool-1-thread-1]  8 continue
10:47:50.703 [pool-1-thread-3]  7 continue

4、ReetrantLock


java主要分两类锁,一种是 synchronized(jvm实现 【操作系统实现 】)关键字修饰的锁,一种就是JUC提供的ReetrantLock(JDK实现【代码实现】)。

ReetrantLock实现是一种自旋锁,通过循环调用CAS操作来实现加锁,它的性能比较好,也就是优于此能避免线程进入内核态,形成阻塞状态

synchronized在优化前,它的性能被ReetrantLock差很多,但是自从synchronized引入了偏向锁、轻量级锁(自旋锁)后 ,他们两者的性能就差不多了,synchronized其实也是借鉴了ReetrantLock的CAS技术,都是试图在用户态加锁进行解决,避免进入内核态的线程阻塞。在两种锁都可用的状态下,官方建议使用synchronized,因为它的写法更简单简洁,由编译器保证锁的加锁和释放。ReetrantLock需要手动加锁和释放锁,特别是释放锁,为了避免因为忘记或者异常而导致未释放锁,因将释放锁操作,放到finally中。

ReetrantLock在使用锁的细腻度和灵活度上,是优于synchronized的。

ReetrantLock有自己独有的功能:
1、ReetrantLock能指定公平锁(先等待的线程先获得锁)还是非公平锁(竞争)默认是非公平的锁。而synchronized只能是非公平锁

Lock lock = new  ReentrantLock(boolean fair) 
//fair == true ? new FairSync() : new NonfairSync();

2、ReetrantLock提供Condition(条件)类,实现分组唤醒需要唤醒的线程。而synchronized只能随机唤醒一个线程,或者唤醒所有线程。
3、提供能够中断等待锁的线程的机制,lock.lockInterruptibly()。

synchronized和ReetrantLock都是可重入锁,都是同一个线程进入一次,锁的计数器就自增1。什么是 “可重入”,就是可以重复获取相同的锁,就是说某个线程已经获得某个锁,可以再次获取锁而不会出现死锁。当然,这样也只有锁的计数器下降为0时,才会释放锁。

       new Thread(new Runnable() {
           @Override
           public void run() {
               synchronized (this) {
                   System.out.println("第1次获取锁,这个锁是:" + this);
                   int index = 1;
                   while (true) {
                       synchronized (this) {
                           System.out.println("第" + (++index) + "次获取锁,这个锁是:" + this);
                       }
                       if (index == 10) {
                           break;
                       }
                   }
               }
           }
       }).start();
      ReentrantLock lock = new ReentrantLock();

      new Thread(new Runnable() {
          @Override
          public void run() {
              try {
                  lock.lock();
                  System.out.println("第1次获取锁,这个锁是:" + lock);

                  int index = 1;
                  while (true) {
                      try {
                          lock.lock();
                          System.out.println("第" + (++index) + "次获取锁,这个锁是:" + lock);

                          try {
                              Thread.sleep(new Random().nextInt(200));
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }

                          if (index == 10) {
                              break;
                          }
                      } finally {
                          lock.unlock();
                      }

                  }

              } finally {
                  lock.unlock();
              }
          }
      }).start();

ReetrantLock

public class LockExample2 {

   // 请求总数
   public static int clientTotal = 5000;

   // 同时并发执行的线程数
   public static int threadTotal = 200;

   public static int count = 0;

   private final static Lock lock = new ReentrantLock();

   public static void main(String[] args) throws Exception {
       ExecutorService executorService = Executors.newCachedThreadPool();
       final Semaphore semaphore = new Semaphore(threadTotal);
       final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
       for (int i = 0; i < clientTotal ; i++) {
           executorService.execute(() -> {
               try {
                   semaphore.acquire();
                   add();
                   semaphore.release();
               } catch (Exception e) {
                   log.error("exception", e);
               }
               countDownLatch.countDown();
           });
       }
       countDownLatch.await();
       executorService.shutdown();
       log.info("count:{}", count);
   }

   private static void add() {
       lock.lock();
       try {
           count++;
       } finally {
           lock.unlock();
       }
   }
}

5、ReetrantReadWriteLock

image.png

在它里面有两个锁,readerLoack和writerLock,这个类最核心的要求是它要在没有任何读写锁的时候,才能取得写入锁。
它可以用于实现悲观读取,既如果我们执行中进行读取时,经常有另一个要写入的需求,为了保持锁定,ReetrantReadWriteLock的读取锁定就可以派上用场了,然而如果读取执行情况很多,写入很少的情况下,持有ReetrantReadWriteLock可能使写入线程遭遇饥饿,也就是写入线程迟迟无法竞争到锁定,一直处于等待状态。

public class LockExample3 {

   private final Map<String, Data> map = new TreeMap<>();

   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

   private final Lock readLock = lock.readLock();

   private final Lock writeLock = lock.writeLock();

   /**
    * 封装map的get、getAllKeys、put方法,并在其中加上锁,解决并发问题
    */
   public Data get(String key) {
       readLock.lock(); //读锁
       try {
           return map.get(key);
       } finally {
           readLock.unlock();
       }
   }

   public Set<String> getAllKeys() {
       readLock.lock();
       try {
           return map.keySet();
       } finally {
           readLock.unlock();
       }
   }

   /**
    * 在没有任何读写锁的时候,才能进行写入操作。(悲观锁,只有在读写都做完了,才能进行写操作)
    * 所以在读取很多的时候,写入线程就必须一直等,永远没法执行,写入线程饥饿
    */
   public Data put(String key, Data value) {
       writeLock.lock(); //写锁
       try {
           return map.put(key, value);
       } finally {
           readLock.unlock();
       }
   }

   class Data {

   }
}

6、StampedLock

StampedLock控制锁有三种模式:分别是写、读、还有乐观读。重点是乐观读,一个StampedLock的状态由版本和模式两个部分组成,锁获取方法是返回一个数字(stamp)作为票据,它用相应的锁状态来表示并控制相关的访问 ,数字0表示获取锁失败。在读锁上分为悲观锁和乐观锁。
乐观读:乐观的认为同时读取和写入发生的几率很小,因此不悲观的使用完全的读取锁定,程序可以在查看读取完成之后,是否有写入的变更,再采取后续的措施,这一个小小的改进,可以大幅提高程序的吞吐量

public class LockExample4 {

   class Point {
       private double x, y;
       private final StampedLock sl = new StampedLock();

       void move(double deltaX, double deltaY) { // an exclusively locked method
           long stamp = sl.writeLock();
           try {
               x += deltaX;
               y += deltaY;
           } finally {
               sl.unlockWrite(stamp);
           }
       }

       //下面看看乐观读锁案例
       double distanceFromOrigin() { // A read-only method
           long stamp = sl.tryOptimisticRead(); //获得一个乐观读锁
           double currentX = x, currentY = y;  //将两个字段读入本地局部变量
           if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
               stamp = sl.readLock();  //如果没有,我们再次获得一个读悲观锁
               try {
                   currentX = x; // 将两个字段读入本地局部变量
                   currentY = y; // 将两个字段读入本地局部变量
               } finally {
                   sl.unlockRead(stamp);
               }
           }
           return Math.sqrt(currentX * currentX + currentY * currentY);
       }

       //下面是悲观读锁案例
       void moveIfAtOrigin(double newX, double newY) { // upgrade
           // Could instead start with optimistic, not read mode
           long stamp = sl.readLock();
           try {
               while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
                   long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
                   if (ws != 0L) { //这是确认转为写锁是否成功
                       stamp = ws; //如果成功 替换票据
                       x = newX; //进行状态改变
                       y = newY;  //进行状态改变
                       break;
                   } else { //如果不能成功转换为写锁
                       sl.unlockRead(stamp);  //我们显式释放读锁
                       stamp = sl.writeLock();  //显式直接进行写锁 然后再通过循环再试
                   }
               }
           } finally {
               sl.unlock(stamp); //释放读锁或写锁
           }
       }
   }
}
public class LockExample5 {

   // 请求总数
   public static int clientTotal = 5000;

   // 同时并发执行的线程数
   public static int threadTotal = 200;

   public static int count = 0;

   private final static StampedLock lock = new StampedLock();

   public static void main(String[] args) throws Exception {
       ExecutorService executorService = Executors.newCachedThreadPool();
       final Semaphore semaphore = new Semaphore(threadTotal);
       final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
       for (int i = 0; i < clientTotal ; i++) {
           executorService.execute(() -> {
               try {
                   semaphore.acquire();
                   add();
                   semaphore.release();
               } catch (Exception e) {
                   log.error("exception", e);
               }
               countDownLatch.countDown();
           });
       }
       countDownLatch.await();
       executorService.shutdown();
       log.info("count:{}", count);
   }

   private static void add() {
       long stamp = lock.writeLock();
       try {
           count++;
       } finally {
           lock.unlock(stamp);
       }
   }
}

总结:

  • synchronized
    在JVM层面实现的,不但可以通过一些监控工具监控线程锁定状态(jstack、jconsole),而且在执行时,代码出现异常,JVM也会自动释放锁定,JVM会自动做加锁与解锁。
  • ReentrantLockReetrantReadWriteLockStampedLock
    都是对象层面的锁定,要保证锁一定会被释放,就要放到finally里面,不然可能造成死锁。
    StampedLock对吞吐量有巨大改进,特别是在读线程越来越多的场景下

平时使用如何抉择:
当只有少量竞争者的时候,synchronized是一个很好的通用锁实现
竞争者不少,但是线程增长的趋势是可以预估的,这时候ReentrantLock是一个很好的通用锁实现

7、Condition:

Condition是多线程间,协调通信的工具类。使得某个线程等待某个条件(信号),继续执行

public class LockExample6 {

   public static void main(String[] args) {
       ReentrantLock reentrantLock = new ReentrantLock();
       Condition condition = reentrantLock.newCondition();

       new Thread(() -> {
           try {
               reentrantLock.lock();  //1、线程1获取锁,往下执行
               log.info("wait signal");
               condition.await(); //2、线程1释放锁,加入到Condition队列中去(等待被唤醒)。
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           log.info("get signal");
           reentrantLock.unlock(); //6、线程1执行完,释放锁
       }).start();

       new Thread(() -> {
           reentrantLock.lock(); //3、当线程1释放锁后,线程2获取锁,线程2往下执行
           log.info("get lock");
           try {
               Thread.sleep(3000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           condition.signalAll();//4、线程2发送唤醒信号,这个时候,Condition队列中有线程1的NODE节点。于是线程1就被取出来了,加入到AQS等待队列中去
           log.info("send signal ~ ");
           reentrantLock.unlock(); //5、线程2释放锁,此时AQS队列中只有线程1一个线程,于是AQS按照FIFO获取唤醒线程,线程1的线程就被唤醒了。于是线程1开始继续执行
       }).start();
   }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容