Java并发-从同步容器到并发容器

引言

容器是Java基础类库中使用频率最高的一部分,Java集合包中提供了大量的容器类来帮组我们简化开发,我前面的文章中对Java集合包中的关键容器进行过一个系列的分析,但这些集合类都是非线程安全的,即在多线程的环境下,都需要其他额外的手段来保证数据的正确性,最简单的就是通过synchronized关键字将所有使用到非线程安全的容器代码全部同步执行。这种方式虽然可以达到线程安全的目的,但存在几个明显的问题:首先编码上存在一定的复杂性,相关的代码段都需要添加锁。其次这种一刀切的做法在高并发情况下性能并不理想,基本相当于串行执行。JDK1.5中为我们提供了一系列的并发容器,集中在java.util.concurrent包下,用来解决这两个问题,先从同步容器说起。

同步容器Vector和HashTable

为了简化代码开发的过程,早期的JDK在java.util包中提供了Vector和HashTable两个同步容器,这两个容器的实现和早期的ArrayList和HashMap代码实现基本一样,不同在于Vector和HashTable在每个方法上都添加了synchronized关键字来保证同一个实例同时只有一个线程能访问,部分源码如下:

//Vector

public synchronized int size() {};

public synchronized E get(int index) {};

//HashTable

public synchronized V put(K key, V value) {};

public synchronized V remove(Object key) {};

通过对每个方法添加synchronized,保证了多次操作的串行。这种方式虽然使用起来方便了,但并没有解决高并发下的性能问题,与手动锁住ArrayList和HashMap并没有什么区别,不论读还是写都会锁住整个容器。其次这种方式存在另一个问题:当多个线程进行复合操作时,是线程不安全的。可以通过下面的代码来说明这个问题:

public static void deleteVector(){

int index = vectors.size() - 1;

vectors.remove(index);

}

代码中对Vector进行了两步操作,首先获取size,然后移除最后一个元素,多线程情况下如果两个线程交叉执行,A线程调用size后,B线程移除最后一个元素,这时A线程继续remove将会抛出索引超出的错误。

那么怎么解决这个问题呢?最直接的修改方案就是对代码块加锁来防止多线程同时执行:

public static void deleteVector(){

synchronized (vectors) {

int index = vectors.size() - 1;

vectors.remove(index);

}

}

如果上面的问题通过加锁来解决没有太直观的影响,那么来看看对vectors进行迭代的情况:

public static void foreachVector(){

synchronized (vectors) {

for (int i = 0; i < vectors.size(); i++) {

System.out.println(vectors.get(i).toString());

}

}

}

为了避免多线程情况下在迭代的过程中其他线程对vectors进行了修改,就不得不对整个迭代过程加锁,想象这么一个场景,如果迭代操作非常频繁,或者vectors元素很大,那么所有的修改和读取操作将不得不在锁外等待,这将会对多线程性能造成极大的影响。那么有没有什么方式能够很好的对容器的迭代操作和修改操作进行分离,在修改时不影响容器的迭代操作呢?这就需要java.util.concurrent包中的各种并发容器了出场了。

并发容器CopyOnWrite

CopyOnWrite--写时复制容器是一种常用的并发容器,它通过多线程下读写分离来达到提高并发性能的目的,和前面我们讲解StampedLock时所用的解决方案类似:任何时候都可以进行读操作,写操作则需要加锁。不同的是,在CopyOnWrite中,对容器的修改操作加锁后,通过copy一个新的容器来进行修改,修改完毕后将容器替换为新的容器即可。

这种方式的好处显而易见:通过copy一个新的容器来进行修改,这样读操作就不需要加锁,可以并发读,因为在读的过程中是采用的旧的容器,即使新容器做了修改对旧容器也没有影响,同时也很好的解决了迭代过程中其他线程修改导致的并发问题。

JDK中提供的并发容器包括CopyOnWriteArrayList和CopyOnWriteArraySet,下面通过CopyOnWriteArrayList的部分源码来理解这种思想:

//添加元素

public boolean add(E e) {

//独占锁

final ReentrantLock lock = this.lock;

lock.lock();

try {

Object[] elements = getArray();

int len = elements.length;

//复制一个新的数组newElements

Object[] newElements = Arrays.copyOf(elements, len + 1);

newElements[len] = e;

//修改后指向新的数组

setArray(newElements);

return true;

} finally {

lock.unlock();

}

}

public E get(int index) {

//未加锁,直接获取

return get(getArray(), index);

}

代码很简单,在add操作中通过一个共享的ReentrantLock来获取锁,这样可以防止多线程下多个线程同时修改容器内容。获取锁后通过Arrays.copyOf复制了一个新的容器,然后对新的容器进行了修改,最后直接通过setArray将原数组引用指向了新的数组,避免了在修改过程中迭代数据出现错误。get操作由于是读操作,未加锁,直接读取就行。CopyOnWriteArraySet类似,这里不做过多讲解。

CopyOnWrite容器虽然在多线程下使用是安全的,相比较Vector也大大提高了读写的性能,但它也有自身的问题。

首先就是性能,在讲解ArrayList的文章中提到过,ArrayList的扩容由于使用了Arrays.copyOf每次都需要申请更大的空间以及复制现有的元素到新的数组,对性能存在一定影响。CopyOnWrite容器也不例外,每次修改操作都会申请新的数组空间,然后进行替换。所以在高并发频繁修改容器的情况下,会不断申请新的空间,同时会造成频繁的GC,这时使用CopyOnWrite容器并不是一个好的选择。

其次还有一个数据一致性问题,由于在修改中copy了新的数组进行替换,同时旧数组如果还在被使用,那么新的数据就不能被及时读取到,这样就造成了数据不一致,如果需要强数据一致性,CopyOnWrite容器也不太适合。

并发容器ConcurrentHashMap

ConcurrentHashMap容器相较于CopyOnWrite容器在并发加锁粒度上有了更大一步的优化,它通过修改对单个hash桶元素加锁的达到了更细粒度的并发控制。在了解ConcurrentHashMap容器之前,推荐大家先阅读我之前对HashMap源码分析的文章--Java集合(5)一 HashMap与HashSet,因为在底层数据结构上,ConcurrentHashMap和HashMap都使用了数组+链表+红黑树的方式,只是在HashMap的基础上添加了并发相关的一些控制,所以这里只对ConcurrentHashMap中并发相关代码做一些分析。

还是先从ConcurrentHashMap的写操作开始,这里就是put方法:

final V putVal(K key, V value, boolean onlyIfAbsent) {

if (key == null || value == null) throw new NullPointerException();

int hash = spread(key.hashCode()); //计算桶的hash值

int binCount = 0;

//循环插入元素,避免并发插入失败

for (Node<K,V>[] tab = table;;) {

Node<K,V> f; int n, i, fh;

if (tab == null || (n = tab.length) == 0)

tab = initTable();

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

//如果当前桶无元素,则通过cas操作插入新节点

if (casTabAt(tab, i, null,

new Node<K,V>(hash, key, value, null)))

break;

}

//如果当前桶正在扩容,则协助扩容

else if ((fh = f.hash) == MOVED)

tab = helpTransfer(tab, f);

else {

V oldVal = null;

//hash冲突时锁住当前需要添加节点的头元素,可能是链表头节点或者红黑树的根节点

synchronized (f) {

if (tabAt(tab, i) == f) {

if (fh >= 0) {

binCount = 1;

for (Node<K,V> e = f;; ++binCount) {

K ek;

if (e.hash == hash &&

((ek = e.key) == key ||

(ek != null && key.equals(ek)))) {

oldVal = e.val;

if (!onlyIfAbsent)

e.val = value;

break;

}

Node<K,V> pred = e;

if ((e = e.next) == null) {

pred.next = new Node<K,V>(hash, key,

value, null);

break;

}

}

}

else if (f instanceof TreeBin) {

Node<K,V> p;

binCount = 2;

if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,

value)) != null) {

oldVal = p.val;

if (!onlyIfAbsent)

p.val = value;

}

}

}

}

if (binCount != 0) {

if (binCount >= TREEIFY_THRESHOLD)

treeifyBin(tab, i);

if (oldVal != null)

return oldVal;

break;

}

}

}

addCount(1L, binCount);

return null;

}

在put元素的过程中,有几个并发处理的关键点:

如果当前桶对应的节点还没有元素插入,通过典型的无锁cas操作尝试插入新节点,减少加锁的概率,并发情况下如果插入不成功,很容易想到自旋,也就是for (Node<K,V>[] tab = table;;)。

如果当前桶正在扩容,则协助扩容((fh = f.hash) == MOVED)。这里是一个重点,ConcurrentHashMap的扩容和HashMap不一样,它在多线程情况下或使用多个线程同时扩容,每个线程扩容指定的一部分hash桶,当前线程扩容完指定桶之后会继续获取下一个扩容任务,直到扩容全部完成。扩容的大小和HashMap一样,都是翻倍,这样可以有效减少移动的元素数量,也就是使用2的幂次方的原因,在HashMap中也一样。

在发生hash冲突时仅仅只锁住当前需要添加节点的头元素即可,可能是链表头节点或者红黑树的根节点,其他桶节点都不需要加锁,大大减小了锁粒度。

通过ConcurrentHashMap添加元素的过程,知道了ConcurrentHashMap容器是通过CAS + synchronized一起来实现并发控制的。这里有个额外的问题:为什么使用synchronized而不使用ReentrantLock?前面我的文章也对synchronized以及ReentrantLock的实现方式和性能做过分析,在这里我的理解是synchronized在后期优化空间上比ReentrantLock更大。

并发容器ConcurrentSkipListMap

java.util中对应的容器在java.util.concurrent包中基本都可以找到对应的并发容器:List和Set有对应的CopyOnWriteArrayList与CopyOnWriteArraySet,HashMap有对应的ConcurrentHashMap,但是有序的TreeMap或并没有对应的ConcurrentTreeMap。

为什么没有ConcurrentTreeMap呢?这是因为TreeMap内部使用了红黑树来实现,红黑树是一种自平衡的二叉树,当树被修改时,需要重新平衡,重新平衡操作可能会影响树的大部分节点,如果并发量非常大的情况下,这就需要在许多树节点上添加互斥锁,那并发就失去了意义。所以提供了另外一种并发下的有序map实现:ConcurrentSkipListMap。

ConcurrentSkipListMap内部使用跳表(SkipList)这种数据结构来实现,他的结构相对红黑树来说非常简单理解,���现起来也相对简单,而且在理论上它的查找、插入、删除时间复杂度都为log(n)。在并发上,ConcurrentSkipListMap采用无锁的CAS+自旋来控制。

跳表简单来说就是一个多层的链表,底层是一个普通的链表,然后逐层减少,通常通过一个简单的算法实现每一层元素是下一层的元素的二分之一,这样当搜索元素时从最顶层开始搜索,可以说是另一种形式的二分查找。

一个简单的获取跳表层数概率算法实现如下:

int random_level() {

K = 1;

while (random(0,1))

K++;

return K;

}

通过简单的0和1获取概率,1层的概率为50%,2层的概率为25%,3层的概率为12.5%,这样逐级递减。

一个三层的跳表添加元素的过程如下:

插入值为15的节点:

插入后:

维基百科中有一个添加节点的动图,这里也贴出来方便理解:

通过分析ConcurrentSkipListMap的put方法来理解跳表以及CAS自旋并发控制:

private V doPut(K key, V value, boolean onlyIfAbsent) {

Node<K,V> z; // added node

if (key == null)

throw new NullPointerException();

Comparator<? super K> cmp = comparator;

outer: for (;;) {

for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) { //查找前继节点

if (n != null) { //查找到前继节点

Object v; int c;

Node<K,V> f = n.next; //获取后继节点的后继节点

if (n != b.next) //发生竞争,两次节点获取不一致,并发导致

break;

if ((v = n.value) == null) { // 节点已经被删除

n.helpDelete(b, f);

break;

}

if (b.value == null || v == n)

break;

if ((c = cpr(cmp, key, n.key)) > 0) { //进行下一轮查找,比当前key大

b = n;

n = f;

continue;

}

if (c == 0) { //相等时直接cas修改值

if (onlyIfAbsent || n.casValue(v, value)) {

@SuppressWarnings("unchecked") V vv = (V)v;

return vv;

}

break; // restart if lost race to replace value

}

// else c < 0; fall through

}

z = new Node<K,V>(key, value, n); //9. n.key > key > b.key

if (!b.casNext(n, z)) //cas修改值

break; // restart if lost race to append to b

break outer;

}

}

int rnd = ThreadLocalRandom.nextSecondarySeed(); //获取随机数

if ((rnd & 0x80000001) == 0) { // test highest and lowest bits

int level = 1, max;

while (((rnd >>>= 1) & 1) != 0) // 获取跳表层级

++level;

Index<K,V> idx = null;

HeadIndex<K,V> h = head;

if (level <= (max = h.level)) { //如果获取的调表层级小于等于当前最大层级,则直接添加,并将它们组成一个上下的链表

for (int i = 1; i <= level; ++i)

idx = new Index<K,V>(z, idx, null);

}

else { // try to grow by one level //否则增加一层level,在这里体现为Index<K,V>数组

level = max + 1; // hold in array and later pick the one to use

@SuppressWarnings("unchecked")Index<K,V>[] idxs =

(Index<K,V>[])new Index<?,?>[level+1];

for (int i = 1; i <= level; ++i)

idxs[i] = idx = new Index<K,V>(z, idx, null);

for (;;) {

h = head;

int oldLevel = h.level;

if (level <= oldLevel) // lost race to add level

break;

HeadIndex<K,V> newh = h;

Node<K,V> oldbase = h.node;

for (int j = oldLevel+1; j <= level; ++j) //新添加的level层的具体数据

newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);

if (casHead(h, newh)) {

h = newh;

idx = idxs[level = oldLevel];

break;

}

}

}

// 逐层插入数据过程

splice: for (int insertionLevel = level;;) {

int j = h.level;

for (Index<K,V> q = h, r = q.right, t = idx;;) {

if (q == null || t == null)

break splice;

if (r != null) {

Node<K,V> n = r.node;

// compare before deletion check avoids needing recheck

int c = cpr(cmp, key, n.key);

if (n.value == null) {

if (!q.unlink(r))

break;

r = q.right;

continue;

}

if (c > 0) {

q = r;

r = r.right;

continue;

}

}

if (j == insertionLevel) {

if (!q.link(r, t))

break; // restart

if (t.node.value == null) {

findNode(key);

break splice;

}

if (--insertionLevel == 0)

break splice;

}

if (--j >= insertionLevel && j < level)

t = t.down;

q = q.down;

r = q.right;

}

}

}

return null;

}

这里的插入方法很复杂,可以分为3大步来理解:第一步获取前继节点后通过CAS来插入节点;第二步对level层数进行判断,如果大于最大层数,则插入一层;第三步插入对应层的数据。整个插入过程全部通过CAS自旋的方式保证并发情况下的数据正确性。

总结

JDK中提供了丰富的并发容器供我们使用,文章中介绍的也并不全面,重点是要通过了解各种并发容器的原理,明白他们各自独特的使用场景。这里简单做个总结:当并发读远多于修改的场景下需要使用List和Set时,可以考虑使用CopyOnWriteArrayList和CopyOnWriteArraySet;当需要并发使用<Key, Value>键值对存取数据时,可以使用ConcurrentHashMap;当要保证并发<Key, Value>键值对有序时可以使用ConcurrentSkipListMap。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容

  • 1.HashMap是一个数组+链表/红黑树的结构,数组的下标在HashMap中称为Bucket值,每个数组项对应的...
    谁在烽烟彼岸阅读 1,020评论 2 2
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,605评论 18 399
  • 并发编程实践中,ConcurrentHashMap是一个经常被使用的数据结构,相比于Hashtable以及Coll...
    天草二十六_简村人阅读 784评论 0 3
  • 假如生活欺骗了你咋办 痴心难改篡写复兴经典 铁心不变寻觅珍爱无害 纵有悲情乌云总会散开 假如现实生活欺骗了你 请不...
    铁娃娇涯阅读 168评论 3 3
  • 今天期中考试了,爸爸说了,不要给他压力,不要问,嗯嗯,就不问,明天他爸爸又要出发了,爷爷说想吃烤肉,那么爸爸请...
    奋斗16年阅读 227评论 0 0