实战Java高并发程序设计笔记第三章

JDK并发包

本章内容:
1、关于同步控制的工具
2、线程池
3、JDK的一些并发容器

多线程的团队协作:同步控制

synchronized的功能扩展:重入锁

  • 可以完全替代synchronized,使用java.util.concurrent.locks.ReentrantLock类来实现

    public class ReenterLock implements Runnable {
      public static ReentrantLock lock = new ReentrantLock();
      public static int i = 0;
    
      @Override
      public void run() {
          for (int j = 0; j < 1000000; j++) {
              lock.lock();
              try {
                  i++;
              } finally {
                  lock.unlock();
              }
    
          }
      }
    
      public static void main(String args[]) throws InterruptedException {
          ReenterLock reenterLock = new ReenterLock();
          Thread thread1 = new Thread(reenterLock);
          Thread thread2 = new Thread(reenterLock);
    
          thread1.start();
          thread2.start();
    
          thread1.join();
          thread2.join();
    
          System.out.println(i);
      }
    
    }
    
    • 执行结果: 2000000
  • 如何理解重入?

这种锁可以反复进入,一个线程连续两次获得同一把锁

  • 中断响应:lockInterruptibly
    public class IntLock implements Runnable {
      public static ReentrantLock lock1 = new ReentrantLock();
      public static ReentrantLock lock2 = new ReentrantLock();
      int lock;
    
      public IntLock(int lock) {
          this.lock = lock;
      }
    
      @Override
      public void run() {
          try {
              if (lock == 1) {
                  lock1.lockInterruptibly();
                  Thread.sleep(500);
                  lock2.lockInterruptibly();
              } else {
                  lock2.lockInterruptibly();
                  Thread.sleep(500);
                  lock1.lockInterruptibly();
              }
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              if (lock1.isHeldByCurrentThread()) {
                  lock1.unlock();
              }
              if (lock2.isHeldByCurrentThread()) {
                  lock2.unlock();
              }
              System.out.println(Thread.currentThread().getId() + ":线程退出");
          }
    
      }
    
      public static void main(String args[]) throws InterruptedException {
          IntLock r1 = new IntLock(1);
          IntLock r2 = new IntLock(2);
    
          Thread thread1 = new Thread(r1);
          Thread thread2 = new Thread(r2);
    
          thread1.start();
          thread2.start();
    
          Thread.sleep(1000);
    
          thread2.interrupt();
    
      }
    }
    
    • 执行结果:
    java.lang.InterruptedException
      at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:944)
      at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1263)
      at java.base/java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:317)
      at chapter3.IntLock.run(IntLock.java:27)
      at   java.base/java.lang.Thread.run(Thread.java:835)
    15:线程退出
    14:线程退出
    
  • 锁申请等待限时trylock
    带参的trylock
    
    public class TimeLock implements Runnable {
      public static ReentrantLock lock = new ReentrantLock();
    
      @Override
      public void run() {
          try {
              if (lock.tryLock(5, TimeUnit.SECONDS)) {
                  System.out.println(Thread.currentThread().getName());
                  System.out.println("get lock success");
                  Thread.sleep(60000);
              } else {
                  System.out.println(Thread.currentThread().getName());
                  System.out.println("get lock failed");
              }
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              if (lock.isHeldByCurrentThread()) {
                  lock.unlock();
              }
          }
      }
    
    
      public static void main(String args[]) {
          TimeLock timeLock = new TimeLock();
          Thread thread1 = new Thread(timeLock);
          Thread thread2 = new Thread(timeLock);
    
          thread1.start();
          thread2.start();
      }
    }
    
    不带参的trylock
    public class TryLock implements Runnable {
      public static ReentrantLock lock1 = new ReentrantLock();
      public static ReentrantLock lock2 = new ReentrantLock();
      int lock;
    
      public TryLock(int lock) {
          this.lock = lock;
      }
    
      @Override
      public void run() {
          if (lock == 1) {
              while (true) {
                  if (lock1.tryLock()) {
                      try {
                          try {
                              Thread.sleep(500);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          if (lock2.tryLock()) {
                              try {
                                  System.out.println(Thread.currentThread().getId() + ":My Job done;");
                                  return;
                              } finally {
                                  lock2.unlock();
                              }
                          }
                      } finally {
                          lock1.unlock();
                      }
                  }
              }
          } else {
              while (true) {
                  if (lock2.tryLock()) {
                      try {
                          try {
                              Thread.sleep(500);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                          if (lock1.tryLock()) {
                              try {
                                  System.out.println(Thread.currentThread().getId() + ":My Job done;");
                                  return;
                              } finally {
                                  lock1.unlock();
                              }
                          }
                      } finally {
                          lock2.unlock();
                      }
                  }
              }
    
          }
      }
    
      public static void main(String args[]) {
          TryLock r1 = new TryLock(1);
          TryLock r2 = new TryLock(2);
          Thread thread1 = new Thread(r1);
          Thread thread2 = new Thread(r2);
    
          thread1.start();
          thread2.start();
      }
    
    }
    
  • 公平锁

大多数情况下,锁的申请都是非公平的,系统只是从等待队列中随机地的选出一个线程。公平类似于一种先到先服务策略,不会导致某些线程一直得不到执行从而产生饥饿的现象

重入锁可以对公平性进行设置

public ReentrantLock(boolean fair)

公平锁案例:


public class FairLock implements Runnable {

  public static ReentrantLock fairLock = new ReentrantLock(true);//设置true指定锁是公平的,也可以不设置,分别运行观察公平锁与非公平锁间的区别
  //public static ReentrantLock unfairLock = new ReentrantLock();

  @Override
  public void run() {
      while (true) {
          try {
              fairLock.lock();
              // unfairLock.lock();
              System.out.println(Thread.currentThread().getName() + "获得锁");
          } finally {
              fairLock.unlock();
              // unfairLock.unlock();
          }
      }
  }

  /**
   * 公平锁的一个特点是:不会产生饥饿现象,只要排队最终都会得到资源.
   * <p/>
   * 但是实现公平锁要求系统维护一个有序队列,因此公平锁的实现成本较高,性能相对低下.
   *
   * @param args
   */
  public static void main(String args[]) {
      FairLock r1 = new FairLock();
      Thread thread1 = new Thread(r1, "Thread_t1");
      Thread thread2 = new Thread(r1, "Thread_t2");
      Thread thread3 = new Thread(r1, "Thread_t3");

      thread1.start();
      thread2.start();
      thread3.start();
  }

}

ReetrantLock()的几个重要方法:

lock()  //获得锁,如果锁已经被占有,则等待
lockInterruptibly()   //获得锁,但优先响应中断
tryLock()  //尝试获得锁,如果成功,返回true,失败返回false,不等待,立即返回
tryLock(long time,TimeUnit unit)//在给定的时间尝试获取锁
unlock()    //释放锁

重入锁的实现主要包含三个要素:
1、原子状态

使用CAS操作来村粗当前锁状态,判断锁是否已经被别的线程持有
2、等待队列
所有没有请求到锁的线程,会进入等待队列进行等待。待有线程释放锁后,系统从等待队列中唤醒一个线程,继续工作
3、阻塞原语park()和unpark(),用来挂起和恢复线程

重入锁的好搭档:Condition条件(与wait()和notify()的作用大致相同)

  • Condition接口提供的基本方法如下:
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long tine,TimeUnit unit) throws InterruptedException;
    boolean awaiUtilt(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
    
    Condition功能案例:参照wait与notify的工作流程
    
    public class ReenterLockCondition implements Runnable {
      public static ReentrantLock lock = new ReentrantLock();
      public static Condition condition = lock.newCondition();
    
      @Override
      public void run() {
    
          try {
              lock.lock();
              condition.await();
              System.out.println("Thread is going on");
          } catch (InterruptedException e) {
              e.printStackTrace();
          } finally {
              lock.unlock();
          }
    
      }
    
      public static void main(String args[]) throws InterruptedException {
          ReenterLockCondition reenterLockCondition = new ReenterLockCondition();
          Thread thread1 = new Thread(reenterLockCondition);
          thread1.start();
          System.out.println("睡眠2秒钟");
          Thread.sleep(2000);
          lock.lock();
          condition.signal();
          lock.unlock();
      }
    }
    
    ArrayBlockingQueue使用重入锁和Condition对象的案例:
private final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();

public void put(E e) throws InterruptedException{
   if(e==null) throw new NullPointerException;
   final E[] items = this.items;
   final ReentrantLock lock = this.lock;
   lock.lockInterruptibly();
   try{
       try{
           while(count==items.length){
               notFull.await();
           }catch(InterruptedException e){
               notFull.signal();
               throw e;
           }
       }
       insert(e);
   }finally{
       lock.unlock();
   }
}

private void insert(E x){
   items[putIndex] = x;
   putIndex = inc(putIndex);
   ++count;
   notEmpty.signal();
}


public E take(E e) throws InterruptedException{
   final ReentrantLock lock = this.lock;
   lock.lockInterruptibly();
   try{
       try{
           while(count==0){
               notEmpty.await();
           }catch(InterruptedException e){
               notEmpty.signal();
               throw e;
           }
       }
       extract(e);
   }finally{
       lock.unlock();
   }
}

private E extract(){
   final E[] items = this.items;
   E x = items[takeIndex];
   items[takeIndex] = null;
   takeIndex = inc(takeIndex);
   --count;
   notFull.signal();
   return x;
}

允许多个线程同时访问:信号量(Semphore)

什么是信号量?
信号量是对锁的扩展,对于内部锁synchronized和重入锁ReentrantLock,一次只允许一个线程访问一个资源,而信号量可以指定多个线程,同时访问同一个资源

  • 信号量的构造函数
public Semaphore(int permits)
public Semaphore(int permits,boolean fair)

许可是什么意思?
许可也就是准入数,表示一次可以有多少个线程同时访问同一个资源

  • 信号量的主要逻辑方法:
public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout,TimeUnit unit)
public void release()

信号量使用实例:程序不会停止???


  public class SemapDemo implements Runnable {
    final Semaphore semp = new Semaphore(5);

    @Override
    public void run() {
        try {
            semp.acquire();
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":done!");
            semp.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 总共20个线程,系统会以5个线程一组为单位,依次执行并输出
     *
     * @param args
     */
    public static void main(String args[]) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            executorService.submit(demo);
        }
    }
  }

ReadWriteLock读写锁

什么是读写锁?
读写锁准确的来说,是读写分离的锁,当使用内部锁或重入锁时,所有的读与写之间的线程都是串行执行,然而实际上,读与读之间不存在线程安全的问题,可以同时操作,读写锁就是为了解决这个问题而出现的

  • 读写锁的访问约束
    非阻塞 阻塞
    阻塞 阻塞
  • 适用场景:系统中读的次数远远多于写的次数

读写锁ReadWriteLock与非读写锁Lock的对比案例:

    public Object handleRead(Lock lock) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);//模拟读操作
            System.out.println("读操作:" + value);
            return value;
        } finally {
            lock.unlock();
        }
    }

    public void handleWrite(Lock lock, int index) throws InterruptedException {
        try {
            lock.lock();
            Thread.sleep(1000);//模拟写操作
            System.out.println("写操作:" + value);
            value = index;
        } finally {
            lock.unlock();
        }
    }

    public static void main(String args[]) {
        final ReadWriteLockDemo demo = new ReadWriteLockDemo();

        Runnable readRunnable = new Runnable() {
            @Override
            public void run() {
                //分别使用两种锁来运行,性能差别很直观的就体现出来,使用读写锁后读操作可以并行,节省了大量时间
                try {
                    demo.handleRead(readLock);
                    //demo.handleRead(lock);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };

        Runnable writeRunnable = new Runnable() {
            @Override
            public void run() {
                //分别使用两种锁来运行,性能差别很直观的就体现出来
                try {
                    demo.handleWrite(writeLock, new Random().nextInt(100));
                    //demo.handleWrite(lock, new Random().nextInt(100));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
        };
        for (int i = 0; i < 18; i++) {
            new Thread(readRunnable).start();
        }
        for (int i = 18; i < 20; i++) {
            new Thread(writeRunnable).start();
        }
    }
  }

倒计时器:CountDownLatch

什么是CountDownLatch?
是一种多线程并发控制工具,类比于与火箭的检查工作,在倒计时结束后,线程才开始执行

  • 构造函数:
    public CountDownLatch(int count)
    
  • 案例:
    public class CountDownLatchDemo implements Runnable {
      static final CountDownLatch end = new CountDownLatch(10);
      static final CountDownLatchDemo demo = new CountDownLatchDemo();
    
      @Override
      public void run() {
    
          try {
              Thread.sleep(new Random().nextInt(3) * 1000);
              System.out.println("check complete");
              end.countDown();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
    
      public static void main(String args[]) throws InterruptedException {
          ExecutorService executorService = Executors.newFixedThreadPool(10);
          for (int i = 0; i < 10; i++) {
              executorService.submit(demo);
          }
          //等待检查
          end.await();
          //发射火箭
          System.out.println("Fire!");
          executorService.shutdown();
      }
    }
    

循环栅栏:CyclicBarrier

什么是循环栅栏?
另外一种多线程并发控制实用工具,与CountDownlatch类似,比其更强大
实例:

线程阻塞工具类:LockSupport

类似于wait()和notify()

线程复用:线程

在实际生产环境中,线程的数量必须得以控制。盲目的大量创造线程是对系统有伤害的,所以就出现了线程池,对线程加以控制和管理

什么是线程池

当需要使用线程时,从线程池获取一个线程执行,当执行完毕后,将线程返还给线程池

  • 线程池的作用.PNG

不重复发明轮子:JDK 对线程池的支持

  • Executor框架结构图
  • Executor 提供了各种类型的线程池
  • 1.固定大小的线程池案例:
  • 2.计划任务newScheduledThreadPool

刨根究底:核心线程池的内部实现

无论是newFixedThreadPool()方法、newSingleThreadExecutor还是newCachedThreadPool()方法,均使用ThreadPoolExecutor实现

public static ExecutorService newFixedThreadPool(int nThreads)
{ return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLSECONDS,
new LinkedBlockingQueue<Runanble>());
}
public static ExecutorService newSingleThreadPool(int nThreads){ return new FinalizableDelegatedExecutorService( new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLSECONDS,new LinkedBlockingQueue<Runanble>()));
}
public static ExecutorService newCachedThreadPool(int nThreads){ return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runanble>());
}

ThreadPoolExecutor最重要的构造函数:

public ThreadPoolExecutor(int corePoolSize,//指定线程池中的线程数量
int maximumPoolSize,//指定线程池中最大线程数量
long keepAliveTime,//超过corePoolSize的空闲线程,在多长时间内,会被销毁
TimeUnit unit,//keepAliveTime的单位
BlockingQueue<Runanble> workQueue,//任务队列,被提交但尚未被执行的任务
ThreadFactory threadFactory,//线程工厂
RejecttedExecutionHandler handler)//拒绝策略,当任务太多来不及处理,如何拒绝任务

workQueue:指被提交但未执行的任务队列,是一个BlockingQueue接口的对象,ThreadPoolExecutor可使用以下几种BlockingQueue

  • 直接提交的队列:SynchronousQueue
  • 有界的任务队列:ArrayBlockingQueue
  • 无界的任务队列:LinkedBlockingQueue
  • 优先任务队列:PriorityBlockingQueue
    ThreadPoolExecutor线程池的核心调度代码

调度逻辑可总结为:

  • ThreadPoolExecutor的调度逻辑.PNG

超负载了怎么办:拒绝策略

RejecttedExecutionHandler handler
JDK 内置提供了四种拒绝策略:

  • AbortPolicy :直接抛出异常,阻止系统正常工作
  • CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务
  • DiscardOldestPolicy:丢弃最老的一个请求(也就是即将被执行的任务),并尝试再次提交当前任务
  • DiscardPolicy:默默丢弃,不做任何处理

自定义线程池和拒绝策略的使用:

public class RejectThreadPoolDemo{
   public static class MyTask implements Runnable{
       @Override
       public void run(){
           System.out.println(System.currentTimeMills()+":Thread ID:"
               + Thread.currentThread().getId());
           try{
               Thread.sleep(100);
           }catch(InterruptedException e){
               e.printStackTrace();
           }
       }
   }
   public static void main(String[] args) {
       MyTask task = new MyTask();
       ExecutorService es = new ThreadPoolExecutor(5,5,
           0L,TimeUnit.MILLSECONDS,
           new LinkedBlockingQueue<Runnable>(10),
           Executors.defaultThreadFactory(),
           new RejectExecutionHandler(){
               @Override
               public void rejectedExecution(Runnable r,
                   ThreadPoolExecutor executor){
                       System.out.println(r.toString()+ is disacrd"");
               }
       });
       for (int i=0;i<Integer.MAX_VALUE ;i++ ) {
           es.submit(task);
           Thread.sleep(10);
       }
   }

}

自定义线程创建:ThreadFactory

ThreadFactory是一个接口,只有一个方法,用来创建线程:

Thread newThread(Runnable r);

自定义线程池的好处:

  • 可以跟踪线程池究竟在何时创建了线程,也饥饿自定义线程的名称、组以及优先级等信息,甚至可以任性地将所有的线程设置为守护线程
    如下:main主线程运行2s后,JVM自动退出,将会强制销毁线程池
public static void main(String args[]) throws InterruptedException {
        MyTask myTask = new MyTask();

        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 
            0L, TimeUnit.SECONDS, 
            new SynchronousQueue<Runnable>(),
            new ThreadFactory(){
                @Override
                public Thread new Thread(){
                    Thread t = new Thread(r);
                    t.setDaemon(true);
                    System.out.println("create" + t);
                    return t;
                }
            }, 
        });

        for (int i = 0; i < 5; i++) {
            es.submit(task);
        }
        Thread.sleep(2000);
    }

我的应用我做主:扩展线程池

什么时候需要扩展线程池?
当想监控每个任务的开始和结束时间,或者其他一些自定义的增强功能时(动态代理),就需要对线程池进行扩展

ThreadPoolExecutor也是一个可扩展的线程池,提供了beforeExecute()、afterExecutor()和terminated()三个方法用于增强
ThreadPoolExecutor.Worker.runTask()方法的内部实现:

  • runTask方法.PNG

    线程池扩展的例子:

public class ExtThreadPool {

    public static class MyTask implements Runnable {
        public String name;

        public MyTask(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println("正在执行:Thread ID:" + Thread.currentThread().getId() + ",Task Name:" + name);

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String args[]) throws InterruptedException {
        ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:" + ((MyTask) r).name);
            }

            protected void afterExecute(Thread t, Runnable r) {
                System.out.println("执行完成:" + ((MyTask) r).name);
            }

            protected void terminated() {
                System.out.println("线程池退出!");
            }
        };

        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("TASK-GEYM-" + i);
            executorService.execute(task);
            Thread.sleep(10);
        }
        executorService.shutdown();
    }
}

注意:

1.此处用的是execute()方法提交任务,没有用submit()
2.shutdown()关闭线程池时,并不是暴力终止所有线程,而是发给一个关闭信号给线程池,线程池此时不能再接收其他任务,执行完池内剩余的线程

合理的选择:优化线程池数量

  • 线程池的数量.PNG

堆栈去哪里了:在线程池中寻找堆栈

submit()竟然没有异常提示:


public class NoTraceDivTaskDemo {
    public static class DivTask implements Runnable {
        int a, b;

        public DivTask(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public void run() {
            double re = a / b;
            System.out.println(re);
        }
    }


    public static void main(String args[]) {
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        for (int i = 0; i < 5; i++) {
            poolExecutor.submit(new DivTask(100, i)); //没有报错提示
//            poolExecutor.execute(new DivTask(100, i));//有报错提示
        }
    }
}

很明显,当当我执行上述程序时,应该后出现除零异常,但是程序运行结果实际上如下:

100.0
25.0
33.0
50.0

注释submit,使用execute提交,会有错误提示:

Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    at chapter3.NoTraceDivTaskDemo$DivTask.run(NoTraceDivTaskDemo.java:21)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:835)
100.0
25.0
33.0
50.0

自己查看提交任务线程的堆栈信息:


public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public void execute(Runnable task) {
        super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
    }

    private Runnable wrap(final Runnable task, final Exception clientTrace, String name) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    task.run();
                } catch (Exception e) {
                    clientTrace.printStackTrace();
                    throw e;
                }
            }
        };
    }
    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }
}
public static void main(String args[]) {
        ThreadPoolExecutor threadPoolExecutor = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        for (int i = 0; i < 5; i++) {
            threadPoolExecutor.execute(new NoTraceDivTaskDemo.DivTask(100, i));
        }
    }
  • 运行结果:
java.lang.Exception: Client stack trace
    at chapter3.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:35)
    at chapter3.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:17)
    at chapter3.TraceDivTaskDemo.main(TraceDivTaskDemo.java:14)
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    at chapter3.NoTraceDivTaskDemo$DivTask.run(NoTraceDivTaskDemo.java:21)
    at chapter3.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:25)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:835)
100.0
25.0
50.0
33.0

分而治之:Fork/Join框架

Fork和Join是什么意思?
Fork:创建子线程
Join:等待
使用fork后系统多了一个执行分支(线程),因此需要等待这个执行分支完成,才能得到最终的结果
分而治之:把一个计算任务分成许多小任务,将这些小任务的计算结果再合成

  • Fork/Join的执行逻辑如下图:


    fork-join.PNG
  • 互相帮助的线程:
  • 互相帮助的线程.PNG

    当线程试图帮助别人是,总是从其任务队列的底部开始拿数据,而线程执行自己的任务时,则是从相反的顶部开始拿。有利于避免数据竞争
    Fork/Join框架的使用,计算数列之和:

package chapter3;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * Created by 13 on 2017/5/5.
 */
public class CountTask extends RecursiveTask {
    private static final int THRESHOLD = 10000;

    private long start;
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }


    @Override
    protected Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        if (canCompute) {
            for (long i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            long step = (start+end) / 100;
            ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
            long pos = start;
            for (int i = 0; i < 100; i++) {
                long lastOne = pos + step;
                if (lastOne > end) {
                    lastOne = end;
                }
                CountTask subTask = new CountTask(pos, lastOne);
                pos += step + 1;
                subTasks.add(subTask);
                subTask.fork();
            }
            for (CountTask t : subTasks) {
                sum += (Long) t.join();
            }
        }
        return sum;
    }
    public static void main(String args[]) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0, 200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        long res = 0;
        try {
            res = result.get();
            System.out.println("sum=" + res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

不要重复发明轮子:JDK的并发容器

超好用的工具类:并发集合简介

JDK提供的这些容器大部分都在java.util.concurrent包中

  • ConcurrentHashMap:线程安全的HashMap
  • CopyOnWriteArrayList:在读多写少的场合,性能远好于Vector
  • ConcurrentLinkedQueue:高效的并发队列,使用链表实现,线程安全的LinkedList
  • BlockingQueue:一个接口,JDK内部通过链表、数组等方式实现了这个接口
  • ConcurrentSkipListMap:跳表的实现,Map,使用跳表的数据结构进行快速查找
  • Vector和Collections工具类可以将任何集合包装成线程安全带集合

线程安全的HashMap

  • 两种实现方式:
  • 使用Collections.synchronizedMap()包装HashMap,如下得到的m就是线程安全的HashMap:
public static Map m = Collections.synchronizedMap(new HashMap());

高效读取:不变模式下的CopyOnWriteArrayList

数据共享通道:BlockingQueue

“中间媒介”

  • ArrayBlockingQueue
  • LinkedBlockingQueue

随机数据结构:跳表

什么是跳表?
一种用来快速查找的数据结构,类似于平衡树。但是平衡树的插入和删除可能导致平衡树进行一次全局的调整,而对跳表的插入和删除只需要对整个数据结构的局部进行操作即可。因此在高并发时,平衡树需要一个全局锁来保证线程安全,跳表只需要局部锁即可,跳表的查找时间复杂度同样为O(logn).JDK 使用跳表实现一个Map.

  • 随机。本质是同时维护了多个链表,并且链表是分层的


    跳表的数据结构.PNG
  • 所有链表都是有序的
  • 使用跳表实现Map和使用哈希算法实现Map的一个区别:哈希并不会保存元素的顺序,而跳表内所有元素都是有序的
  • ConcurrentSkipListMap
    • Node:每个节点存放键值对和指向下一个节点的指向
    static final class Node<K,V>{
      final K key;
      volatile Object value;
      volatile Node<K,V> next;
    }
    
    • Index:包装了Node,同时增加了向下和向右的引用
    static class Index<K,V>{
      final Node<K,V> node;  
      final Index<K,V> down;
      final Index<K,V> right;
    }
    
    • HeadIndex
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容