同步容器类
同步容器类包括Vector和Hashtable以及由Collections.synchronizedXxx等工厂方法创建的同步封装器类。这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。同步容器对所有容器状态的访问都串行化,严重降低了并发性;当多个线程竞争锁时,吞吐量严重下降。
同步容器类存在的问题
同步容器类都是线程安全的,但是在某些情况下可能需要额外的客户端加锁来保护复合操作。
比如,在Vecotr中,getLast()和deleteLast()操作,如果是在多线程的环境下运行,如果不加锁,会产生异常情况。一个线程在getLast()后,另一个线程deleteLast(),然后该线程继续执行,进行deleteLast()操作,此时会抛出下标越界的异常。
又比如,在迭代的过程中,使用get(index)的操作,如果有多个线程运行,可能会删除其中元素,同样会造成异常。
对于如上的情况,我们需要通过客户端加锁来解决线程安全的问题。如在迭代时加锁:
synchronized(vector){for(inti=0;i
迭代器
在迭代或者for-each循环语法时,对容器类进行迭代的标准方式都是使用Iterator。然而,在设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为时“及时失败”的,也就是当它们发现容器在迭代过程中被修改时,就会抛出ConcurrentModificationException。
如果在迭代期间,对容器加锁,首先会降低效率,提高线程的等待时间;然后还可能会产生死锁;降低了吞吐量和CPU的利用率。
如果不希望在迭代期间加锁,可以使用克隆容器的方法,并在克隆副本上进行迭代。
加锁可以防止迭代器抛出ConcurrentModificationException,但是要在所有对容器进行迭代的地方都要加锁。如hashCode,equals,containsAll,removeAll,retainAll等方法,在以容器为参数时,都会对容器进行迭代。这些间接的迭代操作可能抛出ConcurrentModificationException。
并发容器
Java 5.0提供了多种并发容器类来改进同步容器的性能。同步容器对所有容器状态的访问都串行化,严重降低了并发性;当多个线程竞争锁时,吞吐量严重下降。
并发容器是针对多个线程并发访问设计的。通过并发容器来替代同步容器,可以极大地提高伸缩性并降低风险。并发容器包括ConcurrentHashMap(替代Map),CopyOnWriteArrayList(替代List),ConcurrentLinkedQueue,BlockingQueue等等。
ConcurrentHashMap
同步容器类在执行每个操作期间都持有一个锁。ConcurrentHashMap采用了不同的加锁策略来提供更高的并发性和伸缩性。它并不是将每个方法都在同一个锁上同步,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁。
分段锁机制使得任意数量的读取线程可以并发访问Map,执行读取操作的线程和执行写入操作的线程可以并发访问Map,并且一定数量的写入线程可以并发地修改Map,因此提高了并发访问的吞吐量。
并发容器增强了同步容器类,它们提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。其迭代器具有弱一致性,可以容忍并发的修改,在创建迭代器时会遍历已有元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。size(),isEmpty()等方法返回的是一个近似值。
由于ConcurrentHashMap与Hashtable和synchronizedMap有更多的优势,因此大多数情况应该使用并发容器类,至于当需要对整个容器加锁进行独占访问时,才应该放弃使用并发容器。
注意,此时不能再通过客户端加锁新建新的原子操作了,客户端只能对并发容器自身加锁,但并发容器内部使用的并不是自身锁。
CopyOnWriteArrayList
写入时复制容器,在每次修改时都会加锁并创建和重新发布一个新的容器副本,直接修改容器引用,从而实现可见性。
写操作在一个复制的数组上进行,读操作还是在原始数组中进行,读写分离,互不影响。写操作需要加锁,防止并发写入时导致写入数据丢失。写操作结束之后需要把原始数组指向新的复制数组。
CopyOnWriteArrayList 在写操作的同时允许读操作,大大提高了读操作的性能,因此很适合读多写少的应用场景。
但是 CopyOnWriteArrayList 有其缺陷:
内存占用:在写操作时需要复制一个新的数组,使得内存占用为原来的两倍左右;
数据不一致:读操作不能读取实时性的数据,因为部分写操作的数据还未同步到读数组中。
阻塞队列
阻塞队列支持生产者-消费者模式。简化了开发过程,消除了生产者和消费者之间的代码依赖性。阻塞队列简化了生产者-消费者设计的实现过程。一种常见的生产者-消费者设计模式就是线程池与工作队列的组合。
阻塞队列提供了四种处理方法:
抛出异常,使用add(e)插入,remove()删除,element()查询。当阻塞队列满时,插入元素;当队列空,删除元素都会抛出异常。
返回特殊值,使用offer(e)插入,poll()删除,peek()查询。插入时,如果成功返回true,移除时,如果没有对应的元素返回null。
阻塞,使用put(e)插入,take()删除。队列满,插入元素时会阻塞;队列空,取元素会阻塞。
超时退出:使用offer(e,time,unit)插入,poll(time,unit)删除。当队列满时,会阻塞,超过一定的时间,线程会退出。
阻塞队列有多种实现。
ArrayBlokcingQueue和LinkedBlockingQueue分别是数组和链表结构组成的有界的FIFO阻塞队列。
PriorityBlockingQueue是一个支持优先级排序的无界阻塞队列。
SynchronousQueue是一个不存储元素的阻塞队列,它不会为队列中元素维护存储空间。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
双端队列与工作密取
Java 6提供了Dqueue和BlockingDeque,是双端队列,实现了在队列头和队列尾的高效插入和移除。双端队列适用于工作密取模式。在工作密取中,每个消费者都有各自的双端队列。如果一个消费者完成了自己的双端队列的全部工作,可以从其他消费者双端队列末尾秘密的获取工作。因为工作者线程不会再单个共享的任务队列上发生竞争。适用于既是生产者又是消费者问题。
阻塞方法与中断方法
线程会阻塞或暂停执行。被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行。当在代码中调用一个可以抛出InterruptedException的方法时,自己的方法就编程了阻塞方法,必须处理中断的响应。如果这个方法被中断,那么它将努力提前结束状态。
处理中断的响应有两种基本选择:
传递InterruptedException,把该异常抛出给方法的调用者。
恢复中断,捕获异常,并调用当前线程的interrupt方法恢复中断,引发更高层的代码中断。
publicvoidrun(){try{ something(); }catch(InterruptedException e){ Thread.currentThread().interrupt(); }}
同步工具类
同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。包括阻塞队列,信号量,栅栏以及闭锁。
闭锁
闭锁用来确保某些活动直到其他活动都完成了才继续执行。如果有多个线程,其中一个线程需要等到其他所有线程活动结束后才继续执行,使用闭锁。
CountDownLatch是一种闭锁的实现,可以使得一个或者多个线程等待一组事情发生。包括一个计数器,表示需要等待的事件数量;countDown方法用来递减计数器,表示有一个事件已经发生了;await方法等待计数器为0,表示所有需要等待的事情已经发生。
// 初始化闭锁,并设置资源个数CountDownLatch latch =newCountDownLatch(2);Thread t1 =newThread(newRunnable(){publicvoidrun(){// 加载资源1加载资源的代码……// 本资源加载完后,闭锁-1latch.countDown(); }} ).start();Thread t2 =newThread(newRunnable(){publicvoidrun(){// 加载资源2资源加载代码……// 本资源加载完后,闭锁-1latch.countDown(); }} ).start();Thread t3 =newThread(newRunnable(){publicvoidrun(){// 本线程必须等待所有资源加载完后才能执行latch.await();// 当闭锁数量为0时,await返回,执行接下来的任务任务代码…… }} ).start();复制代码
栅栏(同步屏障)
闭锁是一次性对象,一旦进入终止状态,就不能被重置。栅栏类似于闭锁,能阻塞一组进程直到某个时间发生。栅栏与闭锁的区别在于,所有线程必须同时到达栅栏位置,才能继续执行。
若有多条线程,他们到达屏障时将会被阻塞,只有当所有线程都到达屏障时才能打开屏障,所有线程同时执行,若有这样的需求可以使用同步屏障。此外,当屏障打开的同时还能指定执行的任务。
闭锁只会阻塞一条线程,目的是为了让该条任务线程满足条件后执行;
而同步屏障会阻塞所有线程,目的是为了让所有线程同时执行(实际上并不会同时执行,而是尽量把线程启动的时间间隔降为最少)。
// 创建同步屏障对象,并制定需要等待的线程个数 和 打开屏障时需要执行的任务CyclicBarrier barrier =newCyclicBarrier(3,newRunnable(){publicvoidrun(){//当所有线程准备完毕后触发此任务}});// 启动三条线程for(inti=0; i<3; i++ ){newThread(newRunnable(){publicvoidrun(){// 等待,(每执行一次barrier.await,同步屏障数量-1,直到为0时,打开屏障)barrier.await();// 任务任务代码…… } } ).start();}复制代码
信号量
信号量用于控制同时访问某个特定资源的操作数量,或者执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
信号量可以用于实现资源池,也可以用于将容器变为有界阻塞容器。信号量管理着一组虚拟的许可,在执行操作时首先获取许可,并在使用以后释放许可。如果没有许可,将阻塞直到有许可或被中断,超时。
信号量的使用场景是,有m个资源,n个线程,且n>m,同一时刻只能允许m条线程访问资源。
// 创建信号量对象,并给予3个资源Semaphore semaphore =newSemaphore(3);// 开启10条线程for(inti=0; i<10; i++ ) {newThread(newRunnbale(){publicvoidrun(){// 获取资源,若此时资源被用光,则阻塞,直到有线程归还资源semaphore.acquire();// 任务代码……// 释放资源semaphore.release(); } } ).start();}
FutureTask
可以用作闭锁,是一种可以生成结果的Runnable,可以处于以下三种状态:等待运行,正在运行和运行完成。当FutureTask进入完成状态后,它会停止在这个状态上。
FutureTask在Executor框架中表示异步任务,此外还可以用来表示一些时间较长的运算,这些计算可以在使用计算结构之前启动。
实战:构建缓存
首先,使用HashMap和同步机制来初始化缓存。
publicinterfaceComputable{Vcompute(A arg)throwsInterruptedException;}publicclassExpensiveFuncimplementsComputable{@OverridepublicBigIntegercompute(String arg)throwsInterruptedException{returnnewBigInteger(arg); }}publicclassMemoizer1implementsComputable{privatefinalMap cache=newHashMap<>();privatefinalComputable c;publicMemoizer1(Computable<A,V> c){this.c=c; }@OverridepublicsynchronizedVcompute(A arg)throwsInterruptedException{ V result=cache.get(arg);if(result==null){ result=c.compute(arg); cache.put(arg,result); }returnresult; }}
在这种实现方法中,使用HashMap保存之前计算的结果。首先检查需要的结果是否已经在缓存中,如果存在则返回之前计算,否则将计算结果缓存到HashMap再返回。
为了确保线程安全,将整个compute方法进行同步。但是这样伸缩性差,缓存的性能并没有得到提升。
下面使用ConcurrentHashMap替换HashMap。但是,这种方法存在一些不足,当两个线程同时调用compute时,可能会导致计算得到相同的值。这样是低效的,因为缓存的作用就是避免相同的数据被计算多次。其问题在于,如果某个线程启动了一个计算,而其他线程并不知道这个计算正在进行,很可能会重复这个计算。
针对如上问题,我们考虑可以使用FutureTask来解决。使用该类来表示计算的过程,如果有结果可用,则返回结果,否则一直阻塞。
publicclassMemo2implementsComputable{privatefinalMap> cache=newConcurrentHashMap<>();privatefinalComputablec;publicMemo2(Computable<A,V>c){this.c=c; }@OverridepublicVcompute(A arg)throwsInterruptedException{ Future future=cache.get(arg);if(future==null){ Callable eval=newCallable() {@OverridepublicVcall()throwsException{returnc.compute(arg); } }; FutureTask ft=newFutureTask<>(eval); future=cache.putIfAbsent(arg,ft);if(future==null){ future=ft; ft.run(); } }try{returnfuture.get(); }catch(ExecutionException e){ e.printStackTrace(); }returnnull; }}
在此我向大家推荐一个架构学习交流群。交流学习群号:938837867 暗号:555 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备