浅谈Collections线程安全包装和ConcurrentHashMap

青春须早为,岂能长少年

Collections.synchronizedXXX

线程安全包装类是怎么实现的?

        ArrayList<Integer> list = new ArrayList<>();
        List<Integer> integers = Collections.synchronizedList(list);

ArrayList 实现了RandomAccess


public class ArrayList<E> extends AbstractList<E>
        implements List<E>, RandomAccess, Cloneable, java.io.Serializable
...

Collections.synchronizedList 是这么实现的:

    //根据指定的list返回一个线程安全的list。
    public static <T> List<T> synchronizedList(List<T> list) {
        return (list instanceof RandomAccess ?
                new SynchronizedRandomAccessList<>(list) :
                new SynchronizedList<>(list));
    }

然后去看看SynchronizedRandomAccessList,这个是一个静态内部类来着,父类是SynchronizedList

    static class SynchronizedRandomAccessList<E>
        extends SynchronizedList<E>
        implements RandomAccess {

        SynchronizedRandomAccessList(List<E> list) {
            super(list);
        }
        private static final long serialVersionUID = 1530674583602358482L;
        // 省略...

    }

去看看父类SynchronizedList


    /**
     * @serial include
     */
    static class SynchronizedList<E>
        extends SynchronizedCollection<E>
        implements List<E> {
        private static final long serialVersionUID = -7754090372962971524L;

        final List<E> list;

        SynchronizedList(List<E> list) {
            super(list);
            this.list = list;
        }
        SynchronizedList(List<E> list, Object mutex) {
            super(list, mutex);
            this.list = list;
        }
        //省略...
    }

可以看到还是用synchronized来实现的,那么mutex这把锁默认是什么呢?
去看一下父类:SynchronizedCollection

/**
     * @serial include
     */
    static class SynchronizedCollection<E> implements Collection<E>, Serializable {
        private static final long serialVersionUID = 3053995032091335093L;

        final Collection<E> c;  // Backing Collection
        final Object mutex;     // Object on which to synchronize

        SynchronizedCollection(Collection<E> c) {
            this.c = Objects.requireNonNull(c);
            mutex = this;
        }

        SynchronizedCollection(Collection<E> c, Object mutex) {
            this.c = Objects.requireNonNull(c);
            this.mutex = Objects.requireNonNull(mutex);
        }

        //省略...
    }

好,现在可以知道默认的锁就是this,也就是自己,还可以在创建时订制锁。

一言蔽之,Collections.synchronizedList 是通过把list所有已知的方法都用synchronized关键字包装起来,从而达到串行访问。

ConcurrentHashMap

ConcurrentHashMap 如何实现线程安全?

  • 大量的利用了volatile,final,CAS等lock-free技术来减少锁竞争对于性能的影响,

JDK1.7的实现

    public V put(K key, V value) {

        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();

        //得到hash值
        int hash = hash(key);
        // 获取到段号
        int j = (hash >>> segmentShift) & segmentMask;


        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
             //段为空,创建段
            s = ensureSegment(j);
        // 
        return s.put(key, hash, value, false);
    }

为了保证创建时的线程安全,采用了volatile和CAS操作


    private Segment<K,V> ensureSegment(int k) {
        //省略...

            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {

            //尝试获取锁
            //tryLock:如果当前线程可以得到锁,返回true
            //scanAndLockForPut:
            HashEntry<K,V> node = tryLock() ? null :   scanAndLockForPut(key, hash, value);
            
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    //省略...
                }
            } finally {
                //释放锁
                unlock();
            }
            return oldValue;
        }

总结
大概总结一下就是:

  • 利用volatile:UNSAFE.getObjectVolatile
    • 保证读取对象的可见性
  • 利用原子操作CAS:UNSAFE.compareAndSwapObject
    • 保证写入的原子性
  • ConcurrentHashMap并不允许key或者value为null
  • UNSAFE是一个面向底层的类,技能比较逆天,相当于c的malloc和指针,可以直接操作内存。

JDK 1.8的实现

  • ConcurrentHashMap中有一个内部类Node
    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }

这个是线程安全的,也就是说节点是线程安全的


    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        
        if (key == null || value == null) 
            throw new NullPointerException();

        //获得hash
        int hash = spread(key.hashCode());

        int binCount = 0;
        
        //table是volitate的,垃圾箱,第一次使用时创建
        //transient volatile Node<K,V>[] table;
        for (Node<K,V>[] tab = table;;) {
            
            Node<K,V> f;
            int n, i, fh;
            
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

    private transient volatile int sizeCtl;

控制表初始化和resize

  • 当sizeCtl<0:
    • sizeCtl=-1:table 正在初始化
    • sizeCtl<-1: 有 -(sizeCtl+1)个线程在扩容
  • 否则:
    • 当table==null: 表示初始化的大小,0 或者默认
    • table!=null: 表示下次扩容的大小

size()获取大小是估算值,不是实时的,直接拿历史每个槽位的统计值相加

    public int size() {
        //不加任何锁,直接数一遍,是不精确的
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }

    final long sumCount() {
        
        //当前槽位数
        CounterCell[] as = counterCells; 
        CounterCell a;
        long sum = baseCount;
        if (as != null) {
            //直接把每一个槽位拥有的数量加起来
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

    /**
     * counterCells 的长度就是表长度
     * counterCells[k]=n;表示第n个节点,存在hash相同的元素为n个
     * 这n个对象以红黑树存储
     */
    private transient volatile CounterCell[] counterCells;


    //内部类,volatile保证可见性
    @sun.misc.Contended static final class CounterCell {

        volatile long value;
        CounterCell(long x) { value = x; }
    }

    /**
     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     * <p>More formally, if this map contains a mapping from a key
     * {@code k} to a value {@code v} such that {@code key.equals(k)},
     * then this method returns {@code v}; otherwise it returns
     * {@code null}.  (There can be at most one such mapping.)
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {

        Node<K,V>[] tab; 
        Node<K,V> e, p;
        int n, eh;
        K ek;
        //获得hash
        int h = spread(key.hashCode());
        //如果表已创建
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }


    //保证读取可见性
    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }


    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        
        //不允许key,value为空
        if (key == null || value == null) 
            throw new NullPointerException();
        //得到hash
        int hash = spread(key.hashCode());
        int binCount = 0;

        //死循环,直到成功才退出
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; 
            int n, i, fh;

            //table 是懒初始化的,没初始化就初始化一遍
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 计算出table中对象的位置i,获取对象f,找不到
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //CAS赋值到table[i]
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }// 对象
            // MOVED: 前置节点的hash
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //在node上加锁,就是对应一个hash 加锁
                synchronized (f) {

                    if (tabAt(tab, i) == f) {
                    /*
                    MOVED     = -1; // hash for forwarding nodes
                    TREEBIN   = -2; // hash for roots of trees
                    RESERVED  = -3; // hash for transient reservations
                    HASH_BITS = 0x7fffffff; // usable bits of normal node hash
                    */  
                        // 链表节点
                        if (fh >= 0) {
                            //省略...
                        }//树节点
                        else if (f instanceof TreeBin) {
                            //省略...
                        }
                    }
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }


    //CAS操作
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

方法总结:

  • put方法在插入时,对一个节点加锁,锁的粒度是节点
  • put方法调用 casTabAt,对应compareAndSwapObject:CAS保证原子操作
  • get方法调用tabAt,对应getObjectVolatile,volatile来保证可见性
  • clear 方法调用setTabAt 将table[i]置为空,原子操作
  • 尽量少,短的syn同步代码块保证锁定是短暂的

对比

JDK1.7 和JDK1.8 在ConcurrentHashMaps上的对比:

  • 实现原理:
    • 1.7
      • 是基于段(Segment)实现并发的,并发受到段数量限制
      • Segment 继承ReentrantLock来实现锁
    • 1.8
      • 基于UNSAFE提供的原子操作CAS,volitile
      • 使用synchronized实现锁[1]
  • 锁的粒度:

    • 1.7: 锁在Segment 上,相对1.8比较粗糙
    • 1.8: 锁在Node上,对应一个hash,更加精细
  • 复杂程度:

    • 1.7: 简单,代码量1600+
    • 1.8: 复杂,代码量6300+
  • 退化问题:

    • 1.7
      • 同一hash以链表解决冲突,
      • 过多equals不同而hash相同的对象时,查找退化为二维数组查找
      • 节点上查找时间复杂度O(n),
    • 1.8
      • 同一个hash上链表长度过大(默认>8)时,会转为红黑树,
      • 红黑树查找时间复杂度在O(log(n))级别

synchronized和ReentrantLock 比较

  • 对象:
    • synchronized 单锁,一次锁一个,嵌套锁来实现复合锁
    • ReentrantLock 支持同时锁住多个对象
  • 公平:
    • synchronized: 不公平
    • ReentrantLock
      • 支持公平锁,默认不公平
      • 按照加锁的时间
  • 级别
    • synchronized 是关键字
    • ReentrantLock 是类级别,提供了诸如{查询请求锁的次数}等操作,更加灵活
  • 效率
    • synchronized JDK1.7 之前比ReentrantLock 差,1.7或之后效率明显提高
    • 大多数情况下,应当优先考虑synchronized
        final Lock lock = new ReentrantLock(true);
        final Condition notFull = lock.newCondition();
        final Condition notEmpty = lock.newCondition();

  1. JDK1.8对 synchronized的优化已经等于或优于ReentrantLock了

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容

  • Java8张图 11、字符串不变性 12、equals()方法、hashCode()方法的区别 13、...
    Miley_MOJIE阅读 3,697评论 0 11
  • 我,抄起了镰 开始收割昨天 我,看见了今天 太阳又开始落下 以及那些急着生长的根 我,知道明天 太阳又将升起 而根...
    讷者阅读 195评论 0 0
  • 人到中年,往往会面临很多无奈的焦虑。这个时候,向上,无依靠,向下,有人正依靠着你。所以,你是一个无可依靠,同时又被...
    神农人者阅读 498评论 6 4
  • 首先, 我是一个堂堂正正的中国人,我爱这个国家,其次,我是一个日语学习者。大学四年间,我只是学习它的文字,学...
    公瑾当年小乔初嫁阅读 776评论 1 1
  • 如果我渡过了深邃的海洋 如果我翻越了高耸的山川 如果我穿梭人山人海只为相遇 如果我愿不顾一切追寻你的踪迹 如果我再...
    万一lxq阅读 236评论 0 4