Java同步工具类

1. CountDownlatch(计数器)

描述:

一个同步工具类,允许一个或多个线程等待其它线程完成操作

类图

image

通过指定的count值进行初始化,调用await方法的线程将被阻塞,直到count值通过countDown()方法减小到0,所有等待的线程才会被释放继续执行。另外CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值

事例:

package com.lkf.concurrent;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;

import static java.util.concurrent.Executors.newFixedThreadPool;


public class CountDownlatchTest {
    /**
     * 计数器,用来控制线程数量,传入参数2,表示计数器计数为2
     */
    private final static CountDownLatch M_COUNT_DOWN_LATCH = new CountDownLatch(2);

    /**
     * 示例工作线程类
     */
    private static class WorkerThreadA implements Runnable {
        private final String mThreadName;
        private final int mSleepTime;

        public WorkerThreadA(String name, int sleepTime) {
            mThreadName = name;
            mSleepTime = sleepTime;
        }

        @Override
        public void run() {
            System.out.println("[" + mThreadName + "] started!");
            try {
                Thread.sleep(mSleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            M_COUNT_DOWN_LATCH.countDown();
            System.out.println("[" + mThreadName + "] end!");
        }
    }

    /**
     * 工作线程类
     */
    private static class WorkerThreadB implements Runnable {

        @Override
        public void run() {
            System.out.println("[WorkerThread] started!");
            try {
                // 阻塞在这里等待 mCountDownLatch 里的count变为0;                
                M_COUNT_DOWN_LATCH.await();
            } catch (InterruptedException e) {

            }
            System.out.println("[WorkerThread] end!");
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = newFixedThreadPool(3);
        // 最先run WorkerThread
        executorService.submit(new WorkerThreadB());
        // 运行两个工作线程
        // 工作线程1运行3秒        
        executorService.submit(new WorkerThreadA("WorkingThread1", 3000));
        // 工作线程2运行2秒
        executorService.submit(new WorkerThreadA("WorkingThread2", 2000));
    }
}

2. CyclicBarrier(同步屏障)

描述

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(intparties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞

类图

image

事例

package com.lkf.concurrent;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {
    public static void main(String[] args) {
        //初始化四个线程
        int threadNum = 4;
        CyclicBarrier barrier = new CyclicBarrier(threadNum, new WorkerThreadA());
        for (int i = 0; i < threadNum; i++) {
            new WorkerThread(barrier).start();
        }
    }

    static class WorkerThread extends Thread {
        private CyclicBarrier cyclicBarrier;

        public WorkerThread(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName() + "正在执行");
            try {
                Thread.sleep(5000);      //以睡眠来模拟操作
                System.out.println("线程" + Thread.currentThread().getName() + "执行完毕,等待其他线程执行完成");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println("所有线程执行完成,继续处理其他任务...");
        }
    }

    static class WorkerThreadA extends Thread {
        @Override
        public void run() {
            System.err.println("我是特殊任务");
        }
    }
}

有一个高级构造函数,当一组线程执行完毕后,优先执行某个方法,CyclicBarrier(int parties, Runnable barrierAction),可以用来处理特殊的任务

应用场景

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如一个1000万行数据的大文件,统计数据最大的钱五个数,假如我们用五个线程,将大文件分成5份,分别计算每一份中最大的数,最后,barrierAction用这些线程的计算结果,计算出整个文件中最大的五个数。

CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次,CyclicBarrier的计数器可以使用reset() 方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。比如以下代码执行完之后会返回true。

3. Semaphore(信号量)

描述

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源,就是控制并发线程的数量

类图

image

应用场景

Semaphore可以用于做流量控制,特别公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发的读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有十个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,我们就可以使用Semaphore来做流控

事例

public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = newFixedThreadPool(THREAD_COUNT);

    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire();
                        System.out.println("save data");
                        s.release();
                    } catch (InterruptedException e) {
                    }
                }
            });
        }

        threadPool.shutdown();
    }
}

其他方法

Semaphore还提供一些其他方法:

int availablePermits() :返回此信号量中当前可用的许可证数。

int getQueueLength():返回正在等待获取许可证的线程数。

boolean hasQueuedThreads() :是否有线程正在等待获取许可证。

void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。

Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。

4. Exchanger(线程间数据交换)

描述

Exchanger(交换器)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据, 如果第一个线程先执行exchange方法,它会一直等待第二个线程也执行exchange,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方

类图

image

应用场景

Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。

Exchanger也可以用于校对工作。比如我们需要将纸制银流通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对这两个Excel数据进行校对,看看是否录入的一致

事例

public class ExchangerTest {
    //交换器
    private static final Exchanger<String> exgr = new Exchanger<String>();

    //线程池
    private static ExecutorService threadPool = newFixedThreadPool(2);

    public static void main(String[] args) {

        threadPool.execute(() -> {
            try {
                String threadAData = "线程A的数据";
                exgr.exchange(threadAData);
            } catch (InterruptedException e) {
            }
        });

        threadPool.execute(() -> {
            try {
                String threadBData = "线程B的数据";
                String threadAData = exgr.exchange("B");
                System.out.println("A和B数据是否一致:" + threadAData.equals(threadBData) + ",A录入的是:"
                        + threadAData + ",B录入是:" + threadBData);
            } catch (InterruptedException e) {
            }
        });

        threadPool.shutdown();

    }
}

其它方法

如果两个线程有一个没有到达exchange方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以设置最大等待时长。

public V exchange(V x, long timeout, TimeUnit unit)
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,252评论 6 516
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,886评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,814评论 0 361
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,869评论 1 299
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,888评论 6 398
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,475评论 1 312
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,010评论 3 422
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,924评论 0 277
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,469评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,552评论 3 342
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,680评论 1 353
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,362评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,037评论 3 335
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,519评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,621评论 1 274
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,099评论 3 378
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,691评论 2 361

推荐阅读更多精彩内容