同步容器类
- Vector、Hashtable,及由Collections.synchronizedXxx等工程方法创建的类。
- 这些类实现线程安全方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能方法容器状态。
同步容器类的问题
- 客户端新建的复合操作需要通过容器类的锁来保证原子性。
- 在迭代操作中,多线程情况下回出现ArrayIndexOutOfBoundsException异常,如果要解决,则需要在客户端加锁,但是牺牲了一些伸缩性。
迭代器与CncurrrentModificationException
- 发现容器在迭代过程中被修改过,会抛出CME异常。实现原理:将容器计数器与容器关联起来:如果在迭代期间计数器被修改,那么hasNext或next将抛出CME异常。然后,这种检查在没有同步的情况下进行的,因此可能看到失效的计数值:ABA问题。
隐藏的迭代器
- 容器的toString、hashCode、equals、containsAll、removeAll、retainAll以及包容器作为参数的构造函数都会对容器进行迭代,在并发下就可能出现CME异常。所以在并发情况下,在迭代期间需要对容器加锁。
并发容器
- 并发容器为了改变同步容器的性能,同步容器对容器状态的访问时串行化,以实现线程安全,这种方法降低了并发性,和吞吐量。
- 并发容器针对多线程并发访问设计的,如果需要提高并发性使用以下替代方案。
- ConcurrentHashMap代替同步且散列的Map.
- CopyOnWriteArrayList用于在遍历操作为主要操作的情况下代替同步的List。
- ConcurrentMap接口中增加了常见复合操作的支持:“若没有则添加”、替换、有条件删除。
- ConcurrentSkipListMap代替SortedMap
- ConcurrentSkipListSet代替SortedSet
*并发容器类提供的迭代器不会抛出CME异常。
Queue、BlockingQueue容器接口
- Queue用来临时保存一组待处理元素,实现类包括:ConcurrentLinkedQueue(先进先出队列)、PriorityQueue(优先队列,非并发)
- Queue上的操作不会阻塞,如果队列为空,返回空值。
- Queue通过LinkedList来实现的。
- BlockingQueue扩展Queue,增加了可阻塞的插入和获取等操作,如果队列为空,那么获取元素操作将一直阻塞,直到队列中出现一个可用元素。插入时,如果队列已满(对于有界队列来说,无界队列除外),插入操作将一直阻塞,知道队列中出现可用空间。适合“生产者-消费者模式”
- BlockingQueue提供了offer方法,当队列满时offer添加元素失败返回false,此时可以做相应策略调整(减少生产线程或序列化数据到磁盘),同样也提供了poll方法,在一定时间内如果没有获取到元素则返回null。
ConcurrentHashMap
- 使用分段锁(Lock Striping)来提高并发性和伸缩性。
- size和isEmpty返回的结果可能会过期,实际上他只是一个估值,但这两个方法用的不多。
- 在CHM中没有实现对Map加锁以提供独占方法,在Hashtable、synchronizedMap中获得Map的锁能防止其他线程访问这个Map
- 与Hashtable、synchronizedMap相比,CHM有更多优势,大多数情况下使用CHM来提高代码的伸缩性,只有当程序需要加锁Map以进行独占访问,或者需要依赖同步Map带来的一些其他作用是才应该放弃CHM.
CopyOnWriteArrayList、CopyOnWriteArraySet
- 写入时复制容器,在修改的时候会创建并发布一个新的容器副本。
- 修改方法(删除、添加)是加锁的,避免多个线程调用产生多个副本。
- 修改时会复制底层数组,所以需要一定开销,所以适合读远大于写的业务场景。
Java并发编程:并发容器之CopyOnWriteArrayList(转载)
阻塞队列和生产者-消费者模式
- BlockingQueue的多种实现:LinkedBlockingQueue、ArrayBlockingQueue是FIFO队列。PriorityBlockingQueue是按优先级排序队列,即可根据元组自然排序来比较元素(如果他们实现Comparable方法),也可以使用Comparator来比较。
双端队列
- Deque(发音“deck”)和BlockingDeque对Queue、BlockingQueue进行了扩展。Deque是一个双端队列,实现了在队列头和尾高效插入和移除。
- 实现为ArrayDeque、LinkedBlockingQueue
阻塞方法与中断方法
阻塞:等待I/O操作结束、等待获取一个锁,等待从Thread.sleep方法中醒来、等待另一个线程的计算结果。阻塞线程处于阻塞状态,等待某个不受它控制的事件发生后才能继续执行,当外部事件发生时,线程被置回Runnable(就绪状态)。
LinkedBlockingQueue的take、put等方法会抛出InterruptedException与Thread.sleep一样。当方法抛出InterruptedException异常,表示该方法时一个阻塞方法,如果这个方法被中断,那么它将努力提前结束阻塞状态。(只是努力不是保证)
LinkedBlockingQueue的take阻塞方法源码
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
- 中断,Thread类提供了interrupt方法,用于中断线程或者查询线程是否已经被中断。每个线程都有一个布尔类型的属性,标识线程的中断状态,当中断线程时将设置这个状态。
- 中断是一种协作机制,一个线程不能强制其它线程停止正在执行的操作而去执行其它的操作。
- A线程中断B时,A仅仅是要求B在执行到某个可以暂停的地方停止正在执行的动作---前提是如果线程B愿意停止下来。