基础知识模块
很长时间没有写笔记了,把最近看的一点知识分享给大家,主要是并发编程相关的基础知识,准备年前把这一部分都更新了,还请各位大佬多多关照,多多指点。
主要内容的鱼骨图如下:
1、同步容器类
同步容器类包括Vector和Hashtable,二者是早期JDK的一部分,此外还包括1.2版本中添加的一些相似功能的类,这些同步类的封装是由Collections.synchronizedXxx等工厂方法创建的。这些类实现线程安全的方式为:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能够访问容器的状态
1.1、同步容器类的问题
同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作,常见的复合操作有:迭代,跳转,以及条件运算。在同步容器类中,这些复合操作在没有客户端加锁的情况下是线程安全的,但当其他线程并发的修改容器时,他们可能会表现出意料之外的问题。
“先检查再运行”
public static Object getLast(Vector list){
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list){
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
上边的代码看似没有问题,无论多少个线程调用他们,也不会破坏Vector。但是如果出现以下交替调用getLast和deleteLast这种情况,就会抛出异常,因为两个方法的size不一致了。怎么解决呢?通过使用synchronized方法或者lock方法对list加锁(客户端加锁)。
“元素迭代”
for(int i = 0;i < vector.size();i++)
doSomething(vector.get(i))
这里的原因与上边一样,都是size与get交替执行,导致方法抛出异常。
synchronized(vector){
for(int i = 0;i < vector.size();i++)
doSomething(vector.get(i))
}
1.2、迭代器
无论是直接迭代还是使用for-each循环,对容器迭代的标准方式都是使用Iterator。如果有其他线程并发的修改容器,那么即使是使用迭代器也无法避免应在迭代期间对容器加锁;否则就会抛出ConcurrentModificationException异常。
list<widget> widgetList = Collections.synchronizedList(new ArrayList<Widget>());
for(Widget w : widgetList)
doSomthing(w);
如果你不希望在迭代期间加锁,那么另外一种替代方法是“克隆”容器,在副本上进行迭代。由于副本被封闭在线程内,因此其他线程不会在迭代期间对其进行修改。
1.3、隐藏迭代
有时存在隐藏的遍历,比如toString,hashCode和equals这种方法,会间接的践行迭代操作。
2、并发容器
这里介绍一下更高级一点的应用,同步容器将所有的对容器状态的访问都串行化,来实现他们的线程安全,这种方法的代价是严重的降低并发性,多个线程竞争时,吞吐量将严重减低。并发容器是针对多个线程并发访问设计的。JDK中新增了ConcurrentHashMap,用来替换同步且基于散列的Map,以及CopyOnWriteArrayList,用于在遍历操作为主要操作的情况下代替同步的List。在新的ConcurrentMap接口中增加了一下常见的复合操作的支持,例如“若没有则添加”、替换以及有条件删除等。ConcurrentSkipListMap并发的替代sortedMap,ConcurrentSkipListSet并发的替换SortedSet。
新增了两种新的容器类型:Queue和BlockingQueue。Queue用来临时保存一组等待处理的元素。他们提供了几种实现,包括ConcurrentLinkedQueue,这是一个传统的先进先出队列,以及PriorityQueue,这是一个(非并发)优先队列。Queue上的操作不会阻塞,如果队列为空,那么获取元素的操作将返回空。
BlockingQueue扩展了Queue,增加了可阻塞的插入和获取等操作;如果队列为空,那么获取元素的操作将一直阻塞,知道队列中出现一个可用元素。如果队列已满,那么插入元素的操作将一直阻塞,直到队里中出现可用的空间。
2.1、ConcurrentHashMap
与之前不同,ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器。而是采用一种分段机制,任意数量的读取线程可以并发的访问Map,执行读取操作的线程和执行写入操作的线程可以并发的访问Map,并且一定数量的写入线程可以并发的修改Map。它们提供的迭代不会抛出异常,具有“弱一致性”,可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(不保证)在迭代器被构造后将修改操作反应给容器。
由于是并发的访问,size和empty方法遭到了削弱,它们将返回一个近似值而不是一个精确值。因为在并发时,它们总是在不断变化。当然,get,put,containKey和remove等操作得到了性能上的优化。与同步容器相比,ConcurrentHashMap能够提高代码的可伸缩性,只有当应用程序需要加锁Map来进行独占访问时,才应该放弃ConcurrentHashMap的使用。
2.2、额外的原子Map操作
由于ConcurrentHashMap不能被加锁来进行独占访问,因此我们无法使用客户端加锁来创建新的原子操作。但是ConcurrentMap提供了“若没有则添加”,“若相等则移除”和“若相等则替换”的复合操作,都已经实现原子操作,并且在接口中声明。
2.3、CopyOnWriteArrayList
“写入时复制”(copy-On-Write)容器的线程安全性在于,只要正确的发布一个事实不可变的对象,那么在访问该变量对象时就不再需要进一步的同步;每次在修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。“写入时复制”容器的迭代器会保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需要确保数组内容的可见性。迭代也不会抛出异常,返回的元素与迭代器创建时的元素一样。
每次修改容器时都需要复制底层数组,这样以来当容器规模特别大的时候,开销是很大的。仅当迭代操作远远多于修改操作时,才应该使用“写入时复制”容器。
3、阻塞队列和生产者-消费者模式
阻塞队列提供了可阻塞的put和take方法,以及支持特定的offer和poll方法。如果队列空间已满,那么put方法将阻塞直到有空间可用;如果队列为空,那么take方法将会阻塞直到有元素可用。队列可以是有界的也可以是无界的,无界队列永远不会充满。
下面以洗盘子为例,二者的劳动分工也是一种生产者-消费者模式:其中一个人把洗好的盘子放在架子上,而另一个人从盘架上取出盘子并把它们烘干。在这个过程中,盘架相当于阻塞队列。阻塞队列提供一个offer方法,如果数据项不能被添加到队列中,那么将返回一个失败状态。这样就能够灵活的处理负荷过载的情况。
BlockingQueue有多种实现,LinkedBlockingQueue和ArrayBlockingQueue是FIFO队列,PriorityBlockingQueue是一个按照优先级排序的队列;SynchronousQueue实际上不是一个真正的队列,它不维护队列中元素的存贮空间,它维护一组线程;相当于在洗盘子的例子中,不要盘架一样。因此SynchronousQueue中put和take会一直阻塞。
3.1、双端队列与工作密取
Deque和BlockingDeque,它们分别对Queue和BlockingQueue进行了扩展。Deque是一个双端队列,实现了在队列头和队列尾的高效插入和移除。具体实现包括ArrayDeque和LinkedBlockingDeque。
双端队列适用于工作密取模式:每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者双端队列的尾部秘密地获取工作。
4、阻塞与中断
线程可能会阻塞或者暂停执行。当线程被中断时,应该恢复被中断的状态。在出现InterruptedException异常时,应该捕获它做相应处理,否则更高层的代码将无法处理中断。
public class TaskRunnable implements Runnable {
BlockingQueue<Yask> queue;
public void run(){
try{
proceddtask(queue.take());
}catch(InterruptedException e){
//恢复被中断的状态
Thread.currentThread().interrupt();
}
}
}
5、同步工具类
同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。
5.1、闭锁
闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。
CountDownLatch是一种灵活的闭锁实现,可以使一个或者多个线程等待一组事件的发生。闭锁状态包括一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量,countDown方法递减计数器,表示一个事件发生了;而await方法等待计数器达到零,表示所有的事件都已经发生。
//初始化
fianl CountDownLatch startGate = new CountDownLatch(1);
//阻塞,直到为0
startGate.await();
//减少
startGate.countDown();
5.2、FutureTask
FutureTask也可以用作闭锁,表示一种抽象的可生成结果的计算。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于三种状态:等待运行,正在运行和运行完成(正常结束,取消结束和异常结束)。当FutureTask进入完成状态后,它会永远停留在这一状态。
FutureTask.get的行为取决于任务的状态,如果任务完成,那么get会立即返回结果,否则将会阻塞直到任务进入完成状态,然后返回结果或者抛出异常。FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,而FutureTask的规范确保了这种传递的安全性。
5.3、信号量
计数信号量用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
public class BoundedHashSet<T>{
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound){
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);//初始化容量
}
public boolean add(T o) throws InterruptedException {
sem.acquire();//获得一个许可
boolean wasAdded = false;
try{
wasAdded = set.add(o);
return wasAdded;
}finally {
if(!wasAdded){
sem.release();//释放许可
}
}
}
public boolean remove(Object o){
boolean wasRemoved = sem,remove(o);
if(wasRemoved)sem.release();
return wasRemoved();
}
}
5.4、栅栏
闭锁是一次对象,一旦进入终止状态,就不能被重置。栅栏与闭锁的不同在于,所有线程必须同时到达栅栏位置才能继续执行。闭锁用于等待事件,栅栏用于等待线程。
CyclicBarrier可以使一定数量的参与方反复的在栅栏处汇集,它在并行迭代算法中非常有用。
Exchanger是一种两房栅栏,各方在栅栏位置上交换数据。