并发编程之原子性、可见性和有序性
Volatile关键字:因为Java内存模型(JMM)即每个线程都会有一份本地缓存,当读取共享变量时可能会读取缓存而无法读到最新的值,Java语言提供了一种销弱的同步机制,即volatile变量,用来确保将变量的更新操作通知到其他线程。当把变量声明为volatile类型后,编译器与运行时都会注意到这个变量是共享的,因此不会将该变量上的操作与其他 内存操作一起重排序。volatile变量不会被缓存在寄存器或者其他处理器不可见的地方,因此在读取volatile类型的变量时总会返回最新写入的值。
Volatile关键字保证了可见性和有序性,但无法保证原子性
AbstractQueuedSynchronizer(AQS)
AQS是抽象类,内置自旋锁实现的同步队列,封装入队和出队的操作,提供独占、共享、中断等特性的方法。AQS的子类可以定义不同的资源实现不同性质的方法。比如可重入锁ReentrantLock,定义state为0时可以获取资源并置为1。若已获得资源,state不断加1,在释放资源时state减1,直至为0;CountDownLatch初始时定义了资源总量state=count,countDown()不断将state减1,vstate=0时才能获得锁。释放衙state就一直为0,所有线程调用await()都不会等待,所以CountDownLatch是一次性的,用完后如果再想用就只能重新创建一个;如果希望循环使用,推荐使用基于ReentrantLock实现的CyclicBarrier。Semaphore与CountDownLatch略有不同,同样也是定义了资源总量state=permits,当state>0时就能获得锁,并将state减1,当state=0时只能等待其他线程释放锁,当释放锁时state加1,其他等待线程又能获取这个锁。当Semphore的permits定义为1时,就是互斥锁,当permits>1就是共享锁。
CAS(比较并交换指令)
CAS无锁机制(乐观锁),包含了3个操作数(需要读写的内存位置V、读取内存V的值的值A(将当时读取到的V值赋值给A)和新值B)。当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任务操作。无论位置V的值是否等于A,都将返回V原有的值。如何更新失败,CAS会不断循环重试进行更新操作。使用while(true)的方式不断重试,比锁操作的挂起与唤醒效率要高。但如果一个线程操作时间长,其他线程不断的重试会大量消耗CPU资源。
CAS存在ABA问题,如果V的值首先由A变成B,再由B变成A,其实是已经发生了变化,需要重新执行算法中的某些步骤,但程序无法感知,解决ABA的方案:加版本号,对比V与A的值和对比版本号是否相同,通过两个属性值来确认。
重入锁
当某个线程请求一个由其他线程持有的锁时,发出请求的线程就会阻塞。然而,由于内置锁(synchronized等)是可重入的,因此如果某个线程试图获得一个已经由它自己持有的锁,那么这个请求就会成功。“重入”意味着获取锁的操作的粒度是"线程",而不是"调用"。
重入的一种实现方法是,为每个锁关联一个获取计数值和一个所有者线程
。当计数值为0时,这个锁就被认为是没有被任何线程持有。当线程请求一个未被持有的锁时,JVM将记下锁的持有者,并且将获取计数值置为1.如果同一个线程再次获取这个锁,计数值将递增(1+1=2)
,而当线程退出同步代码块时,计数器会相应地递减。当计数值为0时,这个锁将被释放。
重入锁进一步提升了加锁行为的封装性,因此简化了面向对象并发代码的开发。例如以下两个同步方法
doSomething方法调用doSomething2()的方法,此时如果没有可重入的锁,那么这段代码将产生死锁。由于两个方法都是synchronized方法,因此每个doSomething方法在执行前都会获取synchronized (this)的锁。然而,如果内置锁不是可重入的,那么在调用doSomething2()时将无法获得Widget上的锁,因为这个锁已经被持有,从而线程将永远停顿下去,等待一个永远 也无法获取的锁(死锁)。重入锁则避免了这种鲜红锁情况的发生
。调用doSomething()方法时计数值为1,再调用在doSomething()方法里调用doSomething2()方法计数值为2,当执行完doSomething2()方法后计数值变为1,再执行完doSomething()方法后,计数值变为0。此时其他线程就能获取锁。
public class Widget{
public void doSomething(){
synchronized (this){
System.out.println("synchronized");
doSomething2();
}
}
public void doSomething2(){
synchronized (this){
System.out.println("synchronized2");
}
}
}
ReentrabtLock(轻量级锁)对于Lock接口的实现主要依赖了Sync,而Sync继承了AbstractQueuedSynchronizer(AQS),它是JUC包实现同步的基础工具。在AQS中定义了一个volatile int state变量作为共享资源,如果线程获取资源失败,则进入同步FIFO队列中等待;如果成功获取资源就执行临界区代码。执行完释放资源时,会通知同步队列中的等靠墙主线程来获取资源后出队并执行。
阻塞队列
阻塞队列提供了可阻塞的put和take方法,以及支持定时的offer和poll方法。如果队列已经满了,那么put(添加)方法将阻塞直到有空间可用;如果队列为空,那么take(获取 )方法将会阻塞直到有元素可用。队列可以是有界的也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法也永远不会阻塞。
阻塞对队的好处是我们不需要判断什么时候需要阻塞线程,什么时候需要唤醒线程,因为BlockQueue阻塞队列已经做好了。
使用队列需要合理调整生产者线程数据和消息者线程数量之间的比率,从而实现更高的资源利用率。如果不合理比如生产者每秒put10000个,消费者每秒take100个,那消费者就处理不过来,从而会降低程序的效率
在concurrent类库中包含了BlockingQueue的多种实现,其中LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,二者分别与LinkedList和ArrayList类似,但比同步List摇篮有更好的并发性能。PriorityBlockingQueue是一个按优先级排序的队列 ,当需要按照某种顺序而不是FIFO来处理元素时,就可以使用这个队列。PriorityBlockingQueue既可以根据元素的自然顺序来比较元素(如果它们实现了Comparable方法),也可以使用Comparator来比较
Java6增加了Deque和BlockingDeque,它们分别对Queue和BlockingQueue进行了扩展。Deque是一个双端队列,烊现了在队列头和队列尾的高效插入和移除。具休实现包括ArrayDeque和LinkedBlockingDeque
ArrayBlockingQueue:由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:由链表结构组成的有界(但大小默认为Integer.MAX_VALUE)阻塞队列。
PriorityBlockingQueue:支持优先级排序的无界阻塞队列。
DelayQueue:使用优先级队列实现的延迟无界阻塞队列。
SynchronousQueue:不存储元素的阻塞队列,每一个put操作必须要等待一个take操作,否则不能继续添加元素。反过来take也是。
LinkedTransferQueue:由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:由链表结构组成的双向阻塞队列。
并发包常用类(CountDownLatch、FutureTask、)
CountDownLatch(主要方法await()和countDown())
CountDownLatch是一种灵活的闭锁实现,闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到零,这表示所有需要等待的事件都已经发生,就可以执行await后面的代码。
如果计数器的值非零,那么await会一直阻塞直到计数器计为零,或者await(timeout,timeUnit)设置的时间已到,或者等待中的线程中断,才会执行await下后面的代码
。
下面的代码设置new CountDownLatch(1)的计数器为1,主线程countDownLatch.await(),子线程睡5秒后执行countDownLatch.countDown(),此时计数器为0,那么System.out.println("6666666666")将会在5秒后执行。
如果将countDownLatch.await()换成countDownLatch.await(1, TimeUnit.SECONDS),那么1秒后不管计算器是否为0,都会执行await(1, TimeUnit.SECONDS)后面的代码。
public class CountDownLatchDemo implements Runnable{
private static CountDownLatch countDownLatch = new CountDownLatch(1);
public static void main(String[] args) throws InterruptedException {
CountDownLatchDemo countDownLatchDemo = new CountDownLatchDemo();
Thread thread = new Thread(countDownLatchDemo);
thread.start();
/*// 设置await的等待时长为1秒
countDownLatch.await(1, TimeUnit.SECONDS);*/
countDownLatch.await();
System.out.println("6666666666");
}
@Override
public void run() {
try {
// 睡5秒
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
countDownLatch.countDown();
}
}
FutureTask
FutureTask可取消的异步任务
Semaphore(主要方法acquire()和release()),可用于控制并发
Semaphore计数信息量,用来控制同时访问某个特定资源的操作数量。new Semaphore(3),计数信息量设置为3,通过acquire()方法请求操作,如果计数信息量不为0,则能往下执行,并且计数信息量会减1;如果为0,则会阻塞。当执行release()时计数信息量会+1。
比如只有3间厕所,同一时间只能允许3个人上厕所,如果同时有10个人都想上,那么他们执行acquire()方法,谁先执行谁就先上,当有3个人执行了acquire()方法,计数信息量变成0,其他人就会阻塞。等有人执行了release()(从厕所出来了)计数信息量会+1,此时其他人就可以抢厕所了。
public class SemaphoreDemo extends Thread {
final static Semaphore semaphore = new Semaphore(3);
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "等待上厕所");
// 抢厕所
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "有位置可以上");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName() + "舒服");
// 释放
semaphore.release();
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new SemaphoreDemo().start();
}
}
}
CyclicBarrier(主要方法:await())
CyclicBarrier栅栏可以使一定数量的参与方在栅栏位置汇集,当线程到达栅栏位置时将调用await方法,这个方法将阻塞直接所有线程都到达栅栏位置。如果所有线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被生活地以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号,我们可以利用这些索引来"选举"产生一个领导线程,并在下一次迭代中由该领导线程执行一些特殊的工作。
比如约了五个人一起吃饭,必须要等5个人都到齐才能开始吃饭。只要有人没来就一直阻塞等待
public class CyclicBarrierDemo extends Thread {
private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"就位");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName()+"人齐开始吃饭");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new CyclicBarrierDemo().start();
}
}
}
线程池
线程池是指管理一组工作线程的资源池,通过重用现有的线程而不是创建新线程,复用线程可以减少创建和销毁过程中前产生的巨大的性能开销。当请求到达时,工作线程已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高 了响应性。通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时调用线程池大小还可以防止无限的创建线程使应用程序耗尽内存,可控制最大并发数。
总结就是:线程复用,控制最大并发数,管理线程生命周期。
concurrent类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法之一来创建一个线程池。
Executors.newFixedThreadPool(int nThreads) 创建一个固定长工的线程池
newFixedThreadPool将创建一个固定长工的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化(如果某个线程)。
Executors.newCachedThreadPool() 创建一个可缓存的线程池
newCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程(空闲线程存活时间为60秒),而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
Executors.newScheduledThreadPool(int corePoolSize)
newScheduledThreadPool创建一个可设置核心线程数线程池(默认最大线程数为Integer.MAX_VALUE),而且以延迟或定时的方式来执行任务,类似于Timer
Executors.newSingleThreadExecutor() 是一个单线程的Executor
newSingleThreadExecutor是一个单线程的Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束 ,会创建另一个线程来替代,newSingleThreadExecutor能确保依照任务在队列中的顺序来串行执行(例如FIFO、LIFO、优先级)
上面的4种线程池底层都是通过ThreadPoolExecutor类来实现的,但阿里巴巴手册讲到,生产上不能使用Executors来创建线程池,FixedThreadPoll和SingleTreadPool的队列都是设置成Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM,CachedTreadPool和ScheduledThreadPool的最大线程数为Integer.MAX_VALUE,可能会创建大量线程,导致OOM
我们可以通过ThreadPoolExecutor来自定义线程池
ExecutorService executor = new ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
ThreadPoolExecutor共有7个参数
int corePoolSize:核心线程数,(当有任务提交时就会创建,创建后一直存活,直到线程池被关闭)
int maximumPoolSize:最大线程数,必须大于等于1
long keepAliveTime:多余的空闲线程存活时间
TimeUnit unit :keepAliveTime存活时间的时间单位
BlockingQueue<Runnable> workQueue:任务队列,被提交但尚未初执行的任务存放到队列中
ThreadFactory threadFactory:当前线程池的线程工厂,用于创建线程,设置初始参数,可自定义线程的名称
RejectedExecutionHandler handler:拒绝策略,表示当队列满了并且工作线程大于等于最大线程数时,后面的线程如何拒绝提交任务。 (默认报异常)
假设corePoolSize=2 maximumPoolSize=10 keepAliveTime = 1 unit=TimeUnit.SECONDS workQueue=LinkedBlockingQueue<>(5) threadFactory和handler使用默认值
结合这7个参数说说原理:当有请求来时,首先会创建核心线程(核心线程创建后是伴随着线程池存活的)。当访问的线程数超过2时,会把后面的线程任务放到队列中,如果当并发访问的线程到16时,此时核心线程被占用,队列也满了,就会通过设置的最大线程数来创建新的线程,减去核心线程数,最多能创建8个新的线程。当队列满了,工作线程还大于10时,就会执行拒绝策略,比如后面来的线程直接报异常。
当任务处理完了,之前除了2核心线程还创建了8个线程。这8个线程没有任务执行,这8个线程就会空闲1秒后被销毁
线程池的拒绝策略有4种
AbortPolicy(默认):直接抛出RejectedExecutionException异常
CallerRunsPolicy:不会抛弃任务,也不会抛出异常,而是将某些任务退回到调用者(让调用者的线程自己执行)
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
DiscardPolicy:直接丢弃任务,不给予任务处理也不抛出异常。如果允许丢失,是最好的方案
自定义线程数设置线程池的大小
线程不宜设置的过大或过小,如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空闲的处理器无法执行工作,从而降低吞吐率。
要想正确地设置线程池的大小,必须分析计算环境、资源预算和任务的特性。
对于CPU密集型的任务
,配置成系统CPU(处理器)核数量+1,通常能实现最优的利用率。
对于IO密集型
,由于线程执行完成速度比较慢,因此线程池的大小应该更大,第一种方式可配置成CPU核数*2。要正确地设置线程池的大小,必须估算出任务的等待时间与计算时间的比值(阻塞系数,在0.8~0.9之间),并且可以通过一些分析或监控工具来获得。cpu核数/(1-0.9) 8核就是 8/0.1 = 80
如果计算密集型的任务
和IO密集型或其他阻塞操作的任务
都有,并且它他之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。
什么是CPU密集型:用java代码写的循环
什么是IO密集型:需要用到IO流来请求的,比如访问数据库和redis等