countdownlatch源码分析

countdownlatch是JDK提供的一个线程控制的工具类,虽然代码短少,实现简单,但是它的作用却十分的大。

1.从一个例子开始####

1.现有一文件,文件的大小超过100G,现在的需求是,计算文件中每一行数据的MD5值。
2.现在要实现一个RPC请求模型,要求实现,RPC过程中的请求超时的判断和处理流程。

先说第一个场景,第一个场景需要计算所有文件的MD5,但是100G文件处理相对较大,那么我们势必要利用多线程去并行处理大文件,并将最后的结果输出。但是如何控制主线程在所有线程结束之后结束,是一个需要思考的过程。

第二个场景,在RPC请求发出后,我们需要在一定时间内等待请求的响应,在超时之后没有响应的,我们需要抛出异常。

上面两种场景,其实用wait notify都可以解决,但是实现起来是比较麻烦的,但是用countdownlatch解决起来十分简单。

拿第一个例子来说,我们简单的实现一下:

package countdownlatch;

import com.google.common.base.Charsets;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 多线程处理一个文件
 */
public class MultiThread {
    private static ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(10);
    private static CountDownLatch latch;
    private static final int ThreadNum = 10;

    static {
        for (int i = 0; i < 10; i++) {
            blockingQueue.add("test" + i);
        }
        latch = new CountDownLatch(10);
    }

    /**
     * 用blockQueue中的元素模拟文件分片
     * @return
     */
    public static String getFileSplit() {
        return blockingQueue.poll();
    }

    static class myThread implements Runnable {

        public void run() {
            System.out.println(Thread.currentThread().getName() + "begin running...");
            String m = getFileSplit();
            HashFunction hf = Hashing.md5();
            HashCode hc = hf.newHasher()
                    .putString(m, Charsets.UTF_8)
                    .hash();
            System.out.println(hc.toString());
            try {
                Thread.currentThread().sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            latch.countDown();
            System.out.println(Thread.currentThread().getName() + "ended");
        }
    }

    public static void main(String args[]){
        System.out.println("主线程开始运行");
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i=0;i<ThreadNum;i++){
            service.execute(new Thread(new myThread()));
        }
        service.shutdown();
        System.out.println("线程已经全部运行");
        System.out.println("等待所有线程运行结束");
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("主线程退出");
    }
}

输出是这样的:
主线程开始运行
线程已经全部运行
等待所有线程运行结束
pool-1-thread-2begin running...
pool-1-thread-6begin running...
pool-1-thread-1begin running...
pool-1-thread-3begin running...
pool-1-thread-5begin running...
pool-1-thread-9begin running...
pool-1-thread-8begin running...
pool-1-thread-10begin running...
pool-1-thread-7begin running...
pool-1-thread-4begin running...
b04083e53e242626595e2b8ea327e525
5e40d09fa0529781afd1254a42913847
8ad8757baa8564dc136c1e07507f4a98
86985e105f79b95d6bc918fb45ec7727
739969b53246b2c727850dbb3490ede6
5a105e8b9d40e1329780d62ea2265d8a
4cfad7076129962ee70c36839a1e3e15
ad0234829205b9033196ba818f7a872b
f6f4061a1bddc1c04d8109b39f581270
e3d704f3542b44a621ebed70dc0efe13
pool-1-thread-3ended
pool-1-thread-2ended
pool-1-thread-10ended
pool-1-thread-4ended
pool-1-thread-7ended
pool-1-thread-5ended
pool-1-thread-6ended
pool-1-thread-8ended
pool-1-thread-1ended
pool-1-thread-9ended
主线程退出

从输出我们可以看出,主线程确实是等所有线程结束后才退出的,这也正是我们预期的结果,有的童鞋就说了,我可以利用join实现和你一样的效果,但是Join是基于wait实现的,每一个线程join另一个线程就会有一个线程进入wait状态,也就是说同一时刻只有一个线程在运行,多余的CPU都浪费掉了,这显然不是很合理。

2.说说countdownlatch的API####

countdownlatch的API真的很少,下图是这个工具类的完整结构。


Paste_Image.png

我们可以看到主要方法有三个:await(),await(long, TimeUnit),countDown()

await():阻塞当前线程,直到latch的值为0,或当前线程被中断

     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.

await(long, TimeUnit):阻塞当前线程,知道latch为0,线程被中断,或者超时。

     * Causes the current thread to wait until the latch has counted down to
     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
     * or the specified waiting time elapses.

countDown():使latch的值减小1

       Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.

3.说说countdownlatch的实现

countdownlatch其实是基于同步器AbstractQueuedSynchronizer实现的,ReentrantLock其实也是基于AbstractQueuedSynchronizer实现的,那么好像预示了什么。

首先看构造函数:

    /**
     * Constructs a {@code CountDownLatch} initialized with the given count.
     *
     * @param count the number of times {@link #countDown} must be invoked
     *        before threads can pass through {@link #await}
     * @throws IllegalArgumentException if {@code count} is negative
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

构造函数的参数是一个整数值,意思是说需要多少个latch。
实体化Sync,sync是countdownlatch的内部类,它继承了AbstractQueuedSynchronizer。

 Sync(int count) {
            setState(count);
        }

主要是将latch的值赋予AbstractQueuedSynchronizer的State
再看await()方法:

 public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

await()内调用了 sync.acquireSharedInterruptibly(1) ;

/**
     * Acquires in shared mode, aborting if interrupted.  Implemented
     * by first checking interrupt status, then invoking at least once
     * {@link #tryAcquireShared}, returning on success.  Otherwise the
     * thread is queued, possibly repeatedly blocking and unblocking,
     * invoking {@link #tryAcquireShared} until success or the thread
     * is interrupted.
     * @param arg the acquire argument
     * This value is conveyed to {@link #tryAcquireShared} but is
     * otherwise uninterpreted and can represent anything
     * you like.
     * @throws InterruptedException if the current thread is interrupted
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

这里先检测了线程中断状态,中断了则抛出异常,接下来调用tryAcquireShared,tryAcquireShared是Syn的实现的:

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

其实就是简单的获取了同步器的state,判断是否为0,之前博客里面有写ReentrantLock,两者的机制是一样的。因为countDownLacth实例化之后的State一般不是0,那么此方法返回-1.进入doAcquireSharedInterruptibly:

/**
/**
    * Acquires in shared interruptible mode.
    * @param arg the acquire argument
    */
   private void doAcquireSharedInterruptibly(int arg)
       throws InterruptedException {
       final Node node = addWaiter(Node.SHARED);
       boolean failed = true;
       try {
           for (;;) {
               final Node p = node.predecessor();
               if (p == head) {
                   int r = tryAcquireShared(arg);
                   if (r >= 0) {
                       setHeadAndPropagate(node, r);
                       p.next = null; // help GC
                       failed = false;
                       return;
                   }
               }
               if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
                   throw new InterruptedException();
           }
       } finally {
           if (failed)
               cancelAcquire(node);
       }
   }

这段代码是比较熟悉的在ReentrantLock中分析过,这里关键的点是parkAndCheckInterrupt()

/**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     * 
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

执行到此处时,线程会阻塞,知道有其他线程唤醒此线程,执行await之后,上文中的主线程阻塞在这。
接下来分析下countDown():

    /**
     * Decrements the count of the latch, releasing all waiting threads if
     * the count reaches zero.
     *
     * <p>If the current count is greater than zero then it is decremented.
     * If the new count is zero then all waiting threads are re-enabled for
     * thread scheduling purposes.
     *
     * <p>If the current count equals zero then nothing happens.
     */
    public void countDown() {
        sync.releaseShared(1);
    }

调用了Sync的releaseShared:

    /**
     * Releases in shared mode.  Implemented by unblocking one or more
     * threads if {@link #tryReleaseShared} returns true.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryReleaseShared} but is otherwise uninterpreted
     *        and can represent anything you like.
     * @return the value returned from {@link #tryReleaseShared}
     */
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

接下来是tryReleaseShared

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

此方法是用CAS减小State的值。如果State=0那么尝试唤醒等待线程,执行doReleaseShared:

    /**
     * Release action for shared mode -- signal successor and ensure
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

这里需要关注一点就是unparkSuccessor,这个方法是唤醒上文中的主线程。至此countdownlatch的主流程就走通了。

不得不说countdownlatch是一个很高的线程控制工具,极大的方便了我们开发。由于知识能力有限,上面是自己的一点见识,有什么错误还望提出,便于我及时改进。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,699评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,124评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,127评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,342评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,356评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,057评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,654评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,572评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,095评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,205评论 3 339
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,343评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,015评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,704评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,196评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,320评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,690评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,348评论 2 358

推荐阅读更多精彩内容