Java并发学习笔记——第六章 Java并发容器和框架
ConcurrentHashMap的实现原理与使用
ConcurrentHashMap
是高效且线程安全的HashMap
。
使用ConcurrentHashMap的原因
-
HashMap
线程不安全-
HashMap
在并发执行put操作时会引起死循环,因为多线程会导致HashMap
的Entry
链表形成环形数据结构,则链表永远没有节点的next
为null
。
-
-
HashTable
效率低下- 使用
synchronized
保证线程安全,在多线程情况下效率非常低。当一个线程访问HashTable
的同步方法,其他线程也访问同步方法时则会进入阻塞或轮询。如线程1访问put
,线程2无法访问get
。
- 使用
-
ConcurrentHashMap
使用锁分段技术- 相比所有线程访问
HashTable
竞争一把锁,ConcurrentHashMap
先将数据分段存储,然后为每段数据配置一把锁,当一个线程访问其中一段数据时,其他段数据也能被正常访问。
- 相比所有线程访问
ConcurrentHashMap的结构
ConcurrentHashMap
由Segment
数组构成。
Segment
由HashEntry
数组构成,Segment
继承了可重入锁ReentrantLock
,在ConcurrentHashMap
充当数据分段锁的角色。当对ConcurrentHashMap
的一个操作需要对HashEntry
中的节点进行修改时,会获取对应的Segment
锁。
HashEntry
则为链表节点,存储键值对。
ConcurrentHashMap的初始化
默认容量大小为16,负载因子0.75 。
Segment
数组个数为ssize
,ssize
由初始变量concurrencyLevel
(默认为16)计算得出,大于等于concurrencyLevel
且为2的N次方。
sshift
等于ssize
从1向左移位的次数。若ssize
为16则sshift
为4。(1 << 4 = 16;默认为4)
segmentShift
用于定位参与散列运算的位数,等于32减sshift
。该参数用于取再散列后的用于做定位的hash位数。(默认为28)
segmentMask
为散列运算掩码,等于ssize
减1 。用于对再散列后的hash取高位做定位。(默认为15)
定位Segment
ConcurrentHashMap
通过对hashcode
使用 Wang/Jenkins hash 的变种算法进行再散列,目的是减少散列冲突。
通过变种算法的再散列后的数最大是32位二进制数据,
如存入"0001111"、"0011111"、"0111111"、"1111111",对这些数进行再散列将生成:
0100|0111|0110|0111|1101|1010|0100|1110
1111|0111|0100|0011|0000|0001|1011|1000
0111|0111|0110|1001|0100|0110|0011|1110
1000|0011|0000|0000|1100|1000|0001|1010
由于segmentShift
为28,则取再散列数右移28位,将得到的数取segmentMask
模,得4、15、7、8,可以发现没有冲突。若直接对15取模,低位相同的数输出都会为15 。
ConcurrentHashMap的操作
get操作
步骤:再散列→定位Segment
→散列算法定位所在HashEntry
链表。
get
高效的原因在于不需要加锁,除非读到值为空才加锁重读。之所以不用加锁,原因在于:
- 对表示
Segment
大小的count
字段定义为volatile
。 - 对存储值的
HashEntry
的value
定义为volatile
。
由JMM的先行发生原则可以保证get
操作不会读到过期的值。
put操作
步骤:再散列→定位到Segment
→判断是否需要对HashEntry
数组进行扩容→定位添加元素位置,将其放在HashEntry
数组里
ConcurrentHashMap
在插入前判断扩容,HashMap
在插入后判断扩容。若HashMap
扩容后不再插入新值,则进行了一次无用的扩容。
扩容时,会创建一个是原来容量两倍的数组,然后将原数组的元素再散列后插入到新数组里。ConcurrentHashMap
不会对整个容器进行扩容,而只对某个Segment
进行扩容。
size操作
Segment
中的modCount
在put
、remove
、clean
操作后都会加1 ,用来标记Segment
的版本。
ConcurrentHashMap
并没有采用直接锁住所有Segment
的方式获取count
和,它会先尝试2次不锁住Segment
的方式统计各个Segment
大小,若统计过程中容器的count
发生变化,再将所有Segment
锁住来求count
和。
而在统计size
前后比较modCount
是否变化来获得count
是否改变。
ConcurrentLinkedQueue
一种线程安全的队列。采用“wait-free“算法实现(即CAS)。
ConcurrentLinkedQueue的结构
入队列(优化入队列效率)
入队列就是将入队节点添加到队列的尾部。
入队主要做两件事:
- 将入队节点设置为当前队列尾结点的
next
节点。 - 使用CAS更新
tail
节点:若tail
节点的next
节点不为空,则将入队节点设置成tail
节点;否则将入队节点设置为tail
的next
节点。所以tail节点不总是尾节点。
不让tail
永远作为队列尾节点的原因在于,若每次都需要使用循环CAS更新tail
节点,会降低入队效率。Doug Lea大师的方法则是每添加HOPS+1(默认为1)个节点才调用一次Node.casNext()
以此减少CAS更新tail的次数。tail
和尾节点距离越长,使用CAS更新tail
次数越少,但每次定位尾节点的时间就越长。但这样本质上是通过增加对volatile
变量的读操作减少对volatile
变量的写操作,而写操作的开销要远大于读操作,因此入队效率仍然能得到提升。
出队列(优化出队列效率)
出队列就是将队列头部节点弹出。
与入队列相同,出队列使用CAS保证更新head
节点,同样不会每次出队都更新head
节点。
首先获取头节点,判断头节点是否为空。若为空,证明已有线程执行出队操作拿走元素;若不为空,则使用CAS将头节点引用设置为null
。若CAS成功,则直接返回头节点元素;若不成功,表示已有线程进行了出队操作并更新了head
节点,需要重新获取头节点。
源码中使用Node.casItem()
获取节点是否为最新;使用updateHead()
更新head
节点。
Java的阻塞队列
阻塞队列定义
阻塞队列是一个支持两个附加操作的队列。这两个附加操作支持阻塞的插入和移除方法。
- 支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。
- 支持阻塞的移除方法:当队列为空时,获取元素的线程会等待队列变为非空。
阻塞队列常用于生产者/消费者场景。
Java中的阻塞队列
JDK7提供了7个阻塞队列:
-
ArrayBlockingQueue
:由数组结构组成的有界阻塞队列。- 可以通过增加一个重入锁(内含同步队列)使队列变得具有公平性。
-
LinkedBlockingQueue
:由链表结构组成的有界阻塞队列。- 默认和最大长度为
Integer.MAX_VALUE
。
- 默认和最大长度为
-
PriorityBlockingQueue
:支持优先级排序的无界阻塞队列。- 默认采取自然升序。
- 可以指定构造参数
Comparator
类更改排序方式。
-
DelayQueue
:使用优先级队列实现的支持延时获取元素的无界阻塞队列。队列中元素必须实现
Delayed
接口。在创建元素时可以指定多久才能从队列中获取当前元素。
-
运用场景:
- 缓存系统的设计:用
DelayQueue
保存缓存元素的有效期,用一个线程循环查询DelayQueue
,能获取到元素则意味缓存有效期到了。 - 定时任务调度:用
DelayQueue
保存当天将执行的任务和执行时间,一旦从DelayQueue
中获取到任务就开始执行,如TimerQueue
。
- 缓存系统的设计:用
-
使用方法:
-
以
ScheduledThreadPoolExecutor
中的ScheduledFutureTask
类的实现为例,共有三步:///第一步,创建对象时初始化基本数据。用time记录当前对象延迟到什么时候可以使用,使用sequenceNumber表示元素在队列的先后顺序。 private static final AtomicLong sequencer = new AtomicLong(0); ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } //第二步,实现getDelay方法,该方法返回当前元素还需要延长多少时间,单位为纳秒 public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); } //第三步,实现compareTo方法指定元素顺序。 public int compareTo(Delayed other) { if (other == this) return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); }
-
实现延时阻塞队列。
//当消费者获取元素发现元素没有达到延时时,就阻塞当前线程 long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } }
-
-
SynchronousQueue
:不存储元素的阻塞队列。- 每一个
put
操作必须等待一个take
。 - 支持公平访问,默认非公平。
- 每一个
-
LinkedTransferQueue
:由链表结构组成的无界阻塞队列。- 相比其他阻塞队列,多了
tryTransfer
和transfer
方法。-
transfer
方法:若有消费者正在等待接收元素,该方法可以把生产者传入的元素立刻传输给消费者。立即返回。 -
tryTransfer
方法:试探生产者传入的元素是否能直接给消费者。等到消费者了才会返回。
-
- 相比其他阻塞队列,多了
-
LinkedBlockingDeque
:由链表结构组成的双向阻塞队列。- 可以运用在“工作窃取”模式中。
阻塞队列实现原理
使用通知模式实现。即生产者往满的队列添加元素时会阻塞住生产者,当消费者消费了一个队列元素时,通知生产者队列可用。
阻塞主要通过LockSupport.park(this)
实现。
unsafe.park
是个native方法。
Fork/Join 框架
Fork/Join框架定义
是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总每个小任务结果后得到大任务结果的框架。
工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
- 优点:充分利用线程进行并行计算,减少了线程间的竞争。
- 缺点:当双端队列中仅存一个任务时,还是会存在线程间的竞争;该算法消耗了更多的系统资源,如创建多个线程完成多个双端任务。
Fork/Join框架设计
Fork/Join框架需要完成:
- 分割任务。直到将任务分割的足够小。
- 执行任务并合并结果。分割的子任务分别放在双端列表中。
Fork/Join使用两个类完成以上事情:
-
ForkJoinTask
:使用ForkJoin
框架首先需要创建一个ForkJoin任务,它提供在任务中执行fork()
和join()
操作的机制。通常实现时继承该类的子类:-
RecursiveAction
:用于没有返回结果的任务。 -
RecursiveTask
:用于有返回结果的任务。
-
-
ForkJoinPool
:ForkJoinTask
需要ForkJoinPool
来执行。
当一个工作线程的双端队列中没有任务时,它会随机从一个双端队列获取任务进行执行。
Fork/Join框架原理
ForkJoinPool
由ForkJoinTask
数组和ForkJoinWorkerThread
数组组成,前者负责存放程序提交给ForkJoinPool
的任务,后者负责执行这些任务。
ForkJoinTask.fork()
实现原理:
当调用ForkJoinTask.fork()
时,程序会调用ForkJoinWorkerThread.pushTask()
异步执行该任务,并立即返回结果。
ForkJoinWorkerThread.pushTask()
把当前任务存放在ForkJoinTask
数组队列中,再调用ForkJoinPool.singalWork()
唤醒或创建一个工作线程来执行任务。
ForkJoinTask.join()
实现原理:
阻塞当前线程并等待获取结果。
总结
ConcurrentHashMap
通过分段加锁提高了并发效率,且每次扩容只对某一段数据扩容。
ConcurrentLinkedQueue
通过增加对volatile
变量的读操作以减少对volatile
的写操作以提高并发队列的入队、出队效率。
阻塞队列实现原理类似为生产者/消费者通信。
Fork/Join原理类似为分治算法,将分治算法中的递归上升到开新线程执行,且执行线程在执行完后会帮其他线程执行任务。