探秘Java并发模块:容器与工具类

并发与多线程是每个人程序员都头疼的内容,幸好Java库所提供了丰富并发基础模块,这些多线程安全的模块作为并发工具类将帮助大家来应对并发开发的各种需求。

扩展阅读:

  1. 多线程安全性:每个人都在谈,但是不是每个人都谈地清
  2. 对象共享:Java并发环境中的烦心事
  3. 从Java内存模型角度理解安全初始化
  4. 从任务到线程:Java结构化并发应用程序
  5. 关闭线程的正确方法:“优雅”的中断
  6. 驾驭Java线程池:定制与扩展

1. 同步容器类

在谈及同步容器之前,必须要说说他们的老前辈同步容器类。同步容器类的代表就是VectorHashTable,这是早期JDK中提供的类。此外Collections.synchronizedXXX等工厂方法也可以把普通的容器(如HashMap)封装成同步容器。这些同步容器类的共同点就是:使用同步Synchronized)方法来封装容器的操作方法,以保证容器多线程安全,但这样也使得容器的每次操作都会对整个容器上锁,所以同一时刻只能有一个线程访问容器。

1.1 同步容器的复合操作问题

同步容器类虽然对于单一操作是线程安全的,但是对于复合操作(即由多个操作组合而成,如迭代,跳转),就不一定能保证线程安全。如下面的代码:

public class UnsafeVectorHelpers {
    //复合操作,先检查再运行,并不是经常安全的,
    public static Object getLast(Vector list) {
        int lastIndex = list.size() - 1;
        return list.get(lastIndex);
    }
}

getLast方法中存在“先检查再运行”的情况:先去获得容器大小,再去获得容器中最后一个元素。虽然这两个操作单独都是同步的,但是复合在一起并不能保证整个方法的原子性,所以还需要额外的同步操作。线程安全的代码如下:

public class SafeVectorHelpers {
    public static Object getLast(Vector list) {
        //额外的同步操作
        synchronized (list) {
            int lastIndex = list.size() - 1;
            return list.get(lastIndex);
        }
    }
}

1.2 同步容器类与迭代器

正因为同步容器类没有解决复合操作的线程安全问题,所以在使用迭代器时,其也不能避免迭代器被修改。甚至同步容器类的迭代器在设计时就没有考虑并发修改的问题,而是采用快速失败(fail-fast)的处理方法,即在容器迭代的过程中,发现容器被修改了,就抛出异常ConcurrentModificationException

虽然也可以通过给容器上锁解来决迭代器被并发修改的问题,但是这样做也会带来性能问题:如果迭代的过程很费事,其他访问容器的操作都会被拥塞。

除此之外,一些隐式调用迭代器的情况让同步容器的使用情况更为复杂。

public class HiddenIterator {
    //应该使用并发容器
    @GuardedBy("this") private final Set<Integer> set = new HashSet<Integer>();

    public synchronized void add(Integer i) {
        set.add(i);
    }

    public synchronized void remove(Integer i) {
        set.remove(i);
    }

    public void addTenThings() {
        Random r = new Random();
        for (int i = 0; i < 10; i++)
            add(r.nextInt());
        // 隐式地调用了迭代器,
        // 连接字符串操作会调用StringBuilder.append(Object),
        // 而这个方法又会调用容器Set的toString(),
        // 标准容器(不仅是Set)的ToString方法会使用迭代器依次使用容器内元素的toString方法。
        System.out.println("DEBUG: added ten elements to " + set);
    }
}

注释中已经解释了容器的toString()方法是如何迭代调用容器元素的toString方法。同样的,容器的hashCodeequals方法都是隐式调用迭代器。

2. 并发容器

从Java 5开始,JDK中提供了并发容器类来改进同步容器类的不足。Java 5 中提供了ConcurrentHashMap来代替同步的HashMap,提供了CopyOnWriteArrayList来代替同步都是List。

Java 6 中又继续引入了ConcurrentSkipListMapConcurrentSkipLIstSet来分别代替同步的SortedMapSortedList

并发容器并不对整个容器上锁,故而允许多个线程同时访问容器,改进了同步容器因串行化而效率低的问题。

2.1 ConcurrentHashMap

ConcurrentHashMap也是基于散列的Map,但是并不是在操作的过程中对整个容器上锁,而是使用一种粒度更细的锁,即分段锁

在ConcurrentHashMap的实现中,其使用了16锁来分段保护容器,每个锁保护着散列表的1/16,其第N个散列桶的位置由第(N mod 16)个锁来保护。如果访问的元素不是由同一个锁来保护,则允许并发被访问。这样做虽然增加了维护和管理的开销,但是提高并发性。不过,ConcurrentHashMap中也存在对整个容器加锁的情况,比如容器要扩容,需要重新计算所有元素的散列值, 就需要获得全部的分段锁。

ConcurrentHashMap所提供的迭代器也不会抛出ConcurrentModificationException异常,所以不需要为其加锁。并发容器的迭代器具有弱一致性(Weakly Consistent),容忍并发的修改,可以(但是不保证)将迭代器上的修改操作反映给容器。

需要注意的是,为了提高对元素访问的并发性,ConcurrentHashMap中对容器整体操作的语义被消弱,比如size和isEmpty等方法,其返回的结果都是估计值,可能是过期的。

2.2 CopyOnWriteArrayList

CopyOnWriteArrayList用于代替同步的List,其为“写时复制(Copy-on-Write)”容器,本质为事实不可变对象,一旦需要修改,就会创建一个新的容器副本并发布。容器的迭代器会保留一个指向底层基础数组的引用,这个数组是不变的,且其当前位置位于迭代器的起始位置。

由于每次修改CopyOnWriteArrayList都会有容器元素复制的开销,所以其更适合迭代操作远远多于修改操作的使用场景中。

2.3 拥塞队列

Java 5 还新增了两种容器类型:QueueBlockingQueue

  • 队列Queue,其实现有ConcurrentLinkedQueue(并发的先进先出队列)和PriorityQueue(非并发的优先级队列);Queue上的操作不会被拥塞,如果队列为空 ,会立刻返回null,如果队列已满,则会立刻返回失败;
  • 拥塞队列BlockingQueue,是Queue的一种扩展,其上的操作是可拥塞的:如果队列为空,则获取元素的操作将被拥塞直到队列中有可用元素,同理如果队列已满,则放入元素的操作也会被用塞到队列有可用的空间。

队列的相关内容在前文中已经介绍过了,这里不再展开。

此外Java 6 还提供了双端队列 DequeBlockingDeque,即队列头尾都可以都可以插入和移除元素。双端队列适用于一种特殊的生产者-消费者模式——密取模式:即每个消费者都有一个双端队列,当自己队列中的元素被消费完之后,就可以秘密地从别的消费者队列的末端取出元素使用。

3. 同步工具类

Java中还提供了同步工具类,这些同步工具类可以根据自身的状态来协调线程的控制流,上面提到的拥塞队列就是一种同步工具类,除此之外还有闭锁(Latch)信号量(Semaphore)栅栏(Barrier)

3.1 闭锁

闭锁是一种同步工具类 ,可以延迟线程的进度直到其到达终止状态。闭锁的作用就像一扇门:在闭锁到达结束状态之前,这扇门处于关闭状态,所有的线程都不能通过;当闭锁达到终止状态后,这扇门打开,所有线程都可以通过。闭锁一旦到达终止状态后,其状态就不会再被改变。

闭锁可以用来保证一些活动在其所依赖的活动执行完毕之后再继续执行,如等待资源初始化,等待依赖的服务完毕等等。

CountDownLatch是闭锁的一种实现,其包括一个计数器,其被初始化为一个正整数,表示要等到事件数量。countDown方法表示一个事件已经放生了,await方法表示等到闭锁达到终止状态(拥塞方法,支持中断和超时)。

下面是一个使用闭锁的实例,来实现任务计时功能:

public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task)
            throws InterruptedException {
        // 开始锁
        final CountDownLatch startGate = new CountDownLatch(1);
        // 结束锁
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        // 等待主线程初始化完毕
                        startGate.await();
                        try {
                            task.run();
                        } finally {
                            // 结束锁释放一个
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) {
                    }
                }
            };
            t.start();
        }

        // 记录当前时间为开始时间
        long start = System.nanoTime();
        // 初始化完毕,开启开始锁,子线程可以运行
        startGate.countDown();
        // 等到个子线程运行完毕
        endGate.await();
        // 统计执行时间
        long end = System.nanoTime();
        return end - start;
    }
}

3.2 FutureTask

之前讨论过的FutureTask其实也可以作为闭门使用,Future.get方法会被拥塞直到对应的任务完成。

下面的例子中使用FutureTask来等到预加载任务的完成。

public class Preloader {
    ProductInfo loadProductInfo() throws DataLoadException {
        return null;
    }

    //FutureTask 实现了Runnable和Future
    private final FutureTask<ProductInfo> future =
        new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
            public ProductInfo call() throws DataLoadException {
                return loadProductInfo();
            }
        });
    private final Thread thread = new Thread(future);

    //预先开始加载任务
    public void start() { thread.start(); }

    public ProductInfo get()
            throws DataLoadException, InterruptedException {
        try {
            //等待任务完成
            return future.get();
        } catch (ExecutionException e) {
            Throwable cause = e.getCause();
            //已知异常
            if (cause instanceof DataLoadException)
                throw (DataLoadException) cause;
            else //未知异常
                throw LaunderThrowable.launderThrowable(cause);
        }
    }

    interface ProductInfo {
    }
}

//自定义的异常类型
class DataLoadException extends Exception { }

5.3 信号量

Semaphore是信号量的实现,用来控制的特定资源的操作数,也就是一组虚拟的资源许可:得到资源的同时获得信号量,使用完资源时释放信号量,如果当前没有可用信号量就得等待。如果是二值信号量,也就是一种互斥锁。

下面的例子使用信号量将普通的容器变为有界阻塞的容器

public class BoundedHashSet <T> {
    private final Set<T> set;
    // 信号量
    private final Semaphore sem;

    public BoundedHashSet(int bound) {
        // 获得同步容器
        this.set = Collections.synchronizedSet(new HashSet<T>());
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        // 请求获得信号量,可能拥塞
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded)
                // 无论添加操作是否成功,都释放信号量
                sem.release();
        }
    }

    public boolean remove(T o) {
        boolean wasRemoved = set.remove(o);
        // 移除成功之后,会释放一个信号量
        if (wasRemoved)
            sem.release();
        return wasRemoved;
    }
}

5.3 栅栏

栅栏(Barrier)和闭锁是类似的,能拥塞一种线程直到某个事件的发生,只有当所有的线程都达到栅栏的位置,才能继续执行。栅栏用于等待其他线程,而闭锁用于等待某个事件。

栅栏的使用场景类似于“明天早上八点,所有人学校操场集合(栅栏),然后再去春游”。

CyclicBarrier是栅栏的一种实现,其可以让一定数量的参与方反复在栅栏的位置汇聚,其await方法表示某个方法到达栅栏。这个模型在并行迭代算法中很有意思,以下是《java concurrency in practive》中给出的使用范例。

public class CellularAutomata {
    private final Board mainBoard;
    //栅栏
    private final CyclicBarrier barrier;
    //子任务
    private final Worker[] workers;

    public CellularAutomata(Board board) {
        this.mainBoard = board;
        //环境中CPU的个数
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count,
                new Runnable() {
                    public void run() {
                        //当所有子任务完成,更新数值
                        mainBoard.commitNewValues();
                    }});
        this.workers = new Worker[count];
        //划分子任务;
        for (int i = 0; i < count; i++)
            workers[i] = new Worker(mainBoard.getSubBoard(count, i));
    }

    private class Worker implements Runnable {
        private final Board board;

        public Worker(Board board) { this.board = board; }
        public void run() {
            while (!board.hasConverged()) {
                for (int x = 0; x < board.getMaxX(); x++)
                    for (int y = 0; y < board.getMaxY(); y++)
                        //设置当前子任务的结果
                        board.setNewValue(x, y, computeValue(x, y));
                try {
                    //完成计算,等待其他任务完成
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }

        private int computeValue(int x, int y) {
            // Compute the new value that goes in (x,y)
            return 0;
        }
    }

    public void start() {
        for (int i = 0; i < workers.length; i++)
            new Thread(workers[i]).start();
        mainBoard.waitForConvergence();
    }

    interface Board {
        int getMaxX();
        int getMaxY();
        int getValue(int x, int y);
        int setNewValue(int x, int y, int value);
        void commitNewValues();
        boolean hasConverged();
        void waitForConvergence();
        Board getSubBoard(int numPartitions, int index);
    }
}

要说明的是,CyclicBarrier的构造器中可以传进一个Runnable对象,表示当所有线程到达栅栏之后要执行什么任务。

栅栏的一种特殊形式是Exchange,它是一种两方栅栏(Two-party Barrier) ,双方会在栅栏处交换数据,这是一种线程间安全交互数据的方法。具体交换数据的时机取决于程序的响应需求,最简单的方案为:当缓冲区被填满时,由填充任务进行数据交换;当缓冲区为空时,由读取任务交换数据。这样的模型在双方执行不对等操作时很有用,比如一个任务向缓冲区A写数据,另一个从缓冲区B读数据,然后使用Exchange来汇合两个任务,将被写满或是被读空的缓冲区相互交换。

5.4 实例:高效的结果缓存

最后展示一个并发容器类的使用实例:计算结果缓存,即将已经计算完的结果保存起来,如果调用有缓存的计算结果,则直接返回,如果没有缓存再进行计算。

以下是同步方法的实现方式:

public class Memoizer1 <A, V> implements Computable<A, V> {
    @GuardedBy("this") private final Map<A, V> cache = new HashMap<>();
    private final Computable<A, V> c;

    public Memoizer1(Computable<A, V> c) {
        this.c = c;
    }

    // 该方法对整个容器上锁,如果容器过大可能导致操作时间比没有缓存的情况更久
    // 建议使用并发容器;
    public synchronized V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result == null) {
            result = c.compute(arg);
            cache.put(arg, result);
        }
        return result;
    }
}

由于同步方法是对整个容器上锁,所以并发的效率不好,因此要使用并发容器作为计算结果的缓存,改进代码如下:

public class Memoizer2 <A, V> implements Computable<A, V> {
    private final Map<A, V> cache = new ConcurrentHashMap<>();
    private final Computable<A, V> c;

    public Memoizer2(Computable<A, V> c) {
        this.c = c;
    }

    // cache是并发容器,支持多线程同时访问,
    // 但是不能表示出某个结果正在被计算
    public V compute(A arg) throws InterruptedException {
        V result = cache.get(arg);
        if (result == null) {
            result = c.compute(arg);
            cache.put(arg, result);
        }
        return result;
    }
}

这样代码的并发效率就可以被大大提升了。不过这样使用并发容器类还有一点小问题:缓存仅仅记录下那些结果被计算出来,但是不能反映出那些结果正在被计算,如果计算的过程很漫长,也会照成重复计算,而浪费大量时间。这时就可以使用Future来表示任务的生命周期,存进缓存中。完善的代码如下:

public class Memoizer <A, V> implements Computable<A, V> {
    // 记录那些结果的计算已经开始
    private final ConcurrentMap<A, Future<V>> cache
            = new ConcurrentHashMap<>();
    private final Computable<A, V> c;

    public Memoizer(Computable<A, V> c) {
        this.c = c;
    }

    public V compute(final A arg) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(arg);
            if (f == null) { // 没有缓存结果,添加计算结果
                Callable<V> eval = new Callable<V>() {
                    public V call() throws InterruptedException {
                        return c.compute(arg);
                    }
                };
                FutureTask<V> ft = new FutureTask<>(eval);
                // 如果不存在缓存则提交任务, 
                // 如果已经缓存则得到缓存值;
                f = cache.putIfAbsent(arg, ft);
                if (f == null) { 不存在缓存结果
                    f = ft;
                    ft.run(); //开始计算
                }
            }
            try {
                // 获得计算结果,如果已经计算完毕,则会立刻返回
                // 如果计算还在进行中,就会拥塞
                return f.get(); 
            } catch (CancellationException e) {
                cache.remove(arg, f);
            } catch (ExecutionException e) {
                throw LaunderThrowable.launderThrowable(e.getCause());
            }
        }
    }
}

扩展阅读:

  1. 多线程安全性:每个人都在谈,但是不是每个人都谈地清
  2. 对象共享:Java并发环境中的烦心事
  3. 从Java内存模型角度理解安全初始化
  4. 从任务到线程:Java结构化并发应用程序
  5. 关闭线程的正确方法:“优雅”的中断
  6. 驾驭Java线程池:定制与扩展
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容

  • 一.线程安全性 线程安全是建立在对于对象状态访问操作进行管理,特别是对共享的与可变的状态的访问 解释下上面的话: ...
    黄大大吃不胖阅读 834评论 0 3
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,216评论 11 349
  • 一、设计线程安全的类 在设计线程安全类的过程中,需要包含以下三个基本要素: 找出构成对象状态的所有变量 找出约束状...
    端木轩阅读 580评论 1 3
  • 我一直在试图寻找一个更加理性的角度,用非常客观的态度去解读微商,但很可惜,恐怕很难做得到,因为微商这种生态夹杂着商...
    任凯晔阅读 302评论 0 1
  • 浪花 我坐在岸边的礁石上 望着泛白的海岸线 海鸥在海面上低璇 仰头冲对天空喊话 我猜想 它也许是在对碧天说情话 礁...
    鹤起阅读 321评论 0 1