线程串行并行调度实现

线程串行并行调度实现

问题描述

问题描述:线程A、B、C并行执行,然后和线程D串行执行,如何实现。

问题具体化:现在有A、B、C三个线程,每一个线程分别完成打印0~4的任务,有一个线程D,在A、B、C完成打印之后打印“Hello world!”。

解决思路

我们都知道线程就是用来实现并发执行的,而要实现的结果是在A、B、C三个线程并行执行完之后才执行线程D的任务,那么就需要获取等到A、B、C三个线程的结果,之后再调用线程D的执行方法,就需要想办法让线程D暂不执行,等待A、B、C三个线程执行完才执行,根据这个思路,我们可以想到的解决方案方法如下:

  • 通过一些flag变量控制判断A、B、C三个线程是否执行完

这是一个理解起来最简单也是最原始的方法,通过不断循环判断A、B、C三个线程对应的标识符是否完成执行来决定是否该执行线程D。

  • 在线程D执行前阻塞线程,等待A、B、C三个线程执行完在执行

这个思路和上面差不多,只不过一个是循环判断,一个是阻塞线程。阻塞线程的方法有很多,sleep了,上锁了之类的,但是我们除了阻塞线程外,还需要有一个时机去唤醒阻塞,这个时机的触发点就是A、B、C三个线程执行完,综合考虑,能满足这样条件的阻塞方式有以下几种:

1、thread.join() 阻塞等待当前线程执行完

2、利用FeatureTask实现的线程,通过get()方法可以阻塞等待线程结果返回

3、CountDownLatch,通过闭锁计数器的方式,通过await方法阻塞线程,在A、B、C三个线程执行完之后减少计数,唤醒阻塞

4、CyclicBarrier,栅栏原理,通过其await方法阻塞线程,在A、B、C三个线程执行完之后触发到达栅栏数,唤醒阻塞

以上就是能想到的实现方式,接下来一个一个用代码的方式实现并简要介绍其原理和特性。

解决方案及原理分析

通过flag变量控制

  • 代码实现
//线程类
class MyRunnable implements Runnable {
    Flag flag;//变量
    CountDownLatch countDownLatch;//闭锁
    CyclicBarrier cyclicBarrier;//栅栏

    /**
     * 变量控制的方法
     *
     * @param flag
     */
    public MyRunnable(Flag flag) {
        this.flag = flag;
    }

    /**
     * 通过其他阻塞的方法时调用
     */
    public MyRunnable() {

    }
    /**
     * 通过CountDownLatch的方式
     */
    public MyRunnable(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
    /**
     * 通过CyclicBarrier的方式
     */
    public MyRunnable(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            try {
                Thread.sleep(new Random().nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + i);
        }
        if (countDownLatch != null) {//通过闭锁方式
            countDownLatch.countDown();
        }
        if (cyclicBarrier != null) {//通过栅栏方式
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
        if (flag != null) {//通过flag变量方式
            flag.setFlag(true);
            System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + flag.isFlag());
        }
    }
}

class Flag {
    boolean flag = false;

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

    public boolean isFlag() {
        return flag;
    }
}

/**
 * 线程调度方法1 flag变量控制
 * Created by anonyper on 2019/10/17.
 */
public class ThreadTest {
    Flag flagA = new Flag();//考虑一下这里为什么用一个对象包装一个boolean而不是直接用boolean对象来传递呢
    Flag flagB = new Flag();
    Flag flagC = new Flag();

    public void testThread() {
        Thread threadA = new Thread(new MyRunnable(flagA), "A");
        Thread threadB = new Thread(new MyRunnable(flagB), "B");
        Thread threadC = new Thread(new MyRunnable(flagC), "C");
        Thread threadD = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
            }
        }, "D");
        threadA.start();
        threadB.start();
        threadC.start();
        while (!flagA.isFlag() || !flagB.isFlag() || !flagC.isFlag()) {
            Thread.yield();//释放CPU资源
            //this.wait(0L); //这样也可以等待 释放CPU资源
            //sleep(0L);//这样也可以等待 释放CPU资源
        }
        threadD.start();
    }

    public static void main(String[] args) {
        new ThreadTest().testThread();
    }
}

//执行结果:
当前线程:A >> 0
当前线程:B >> 0
当前线程:C >> 0
当前线程:A >> 1
当前线程:A >> 2
当前线程:B >> 1
当前线程:C >> 1
当前线程:B >> 2
当前线程:A >> 3
当前线程:B >> 3
当前线程:A >> 4
当前线程:A >> true
当前线程:B >> 4
当前线程:B >> true
当前线程:C >> 2
当前线程:C >> 3
当前线程:C >> 4
当前线程:C >> true
当前线程:D >> Hello World!
  • 原理分析

通过wihle循环以及变量控制,让当前线程等待A、B、C三个线程执行完之后在执行D线程。这种方法是理解简单,但是实现挺麻烦的,线程越多要控制的变量就越多,非常不便。

注意实现Runnable中参数传递,这里涉及到值传递和对象传递的知识点。

Thread.join方法

  • 代码实现
/**
 * 线程调度方法1 flag变量控制
 * Created by anonyper on 2019/10/17.
 */
public class ThreadTest {

    public void testThread() {
        Thread threadA = new Thread(new MyRunnable(), "A");
        Thread threadB = new Thread(new MyRunnable(), "B");
        Thread threadC = new Thread(new MyRunnable(), "C");
        Thread threadD = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
            }
        }, "D");
        threadA.start();
        threadB.start();
        threadC.start();
        try {
            threadA.join();//阻塞等待线程A运行完
            threadB.join();//阻塞等待线程B运行完
            threadC.join();//阻塞等待线程C运行完
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        threadD.start();
    }

    public static void main(String[] args) {
        new ThreadTest().testThread();
    }
}

//执行结果
当前线程:A >> 0
当前线程:C >> 0
当前线程:B >> 0
当前线程:B >> 1
当前线程:A >> 1
当前线程:C >> 1
当前线程:B >> 2
当前线程:C >> 2
当前线程:A >> 2
当前线程:C >> 3
当前线程:B >> 3
当前线程:C >> 4
当前线程:A >> 3
当前线程:A >> 4
当前线程:B >> 4
当前线程:D >> Hello World!
  • 原理分析

线程的join方法会阻塞等待当前线程执行完成,其源代码如下:

public final void join() throws InterruptedException {
    this.join(0L);
}
public final synchronized void join(long var1) throws InterruptedException {
        long var3 = System.currentTimeMillis();//当前时间
        long var5 = 0L;
        if (var1 < 0L) {//等待时间不能小于0
            throw new IllegalArgumentException("timeout value is negative");
        } else {
            if (var1 == 0L) {//如果为零,则等待执行完成
                while(this.isAlive()) {//判断线程是否激活,涉及到线程状态,执行完之后返回false
                    this.wait(0L);//等待0秒后唤醒
                }
            } else {//等待一定时间后判断是否执行完
                while(this.isAlive()) {
                    long var7 = var1 - var5;
                    if (var7 <= 0L) {
                        break;
                    }

                    this.wait(var7);
                    var5 = System.currentTimeMillis() - var3;
                }
            }

        }
    }

我们看到join方法如果不传时间则默认等待线程执行完,但是其过程可能会被Interrupted,所以使用该方法需要针对异常做好判断。

FeatureTask的get方法

  • 代码实现
/**
 * 实现Callable接口,重写call方法
 */
class MyCall implements Callable<Boolean> {
    @Override
    public Boolean call() {
        for (int i = 0; i < 5; i++) {
            try {
                Thread.sleep(new Random().nextInt(100));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("当前线程:" + Thread.currentThread().getName() + " >> " + i);
        }
        return true;
    }
}

/**
 * 线程调度方法1 flag变量控制
 * Created by anonyper on 2019/10/17.
 */
public class ThreadTest {

    public void testThread() {
        FutureTask futureTaskA = new FutureTask(new MyCall());
        FutureTask futureTaskB = new FutureTask(new MyCall());
        FutureTask futureTaskC = new FutureTask(new MyCall());
        Thread threadA = new Thread(futureTaskA, "A");
        Thread threadB = new Thread(futureTaskB, "B");
        Thread threadC = new Thread(futureTaskC, "C");
        Thread threadD = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
            }
        }, "D");
        threadA.start();
        threadB.start();
        threadC.start();
        try {
            futureTaskA.get();
            futureTaskB.get();
            futureTaskC.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        threadD.start();
    }

    public static void main(String[] args) {
        new ThreadTest().testThread();
    }
}

//运行结果
当前线程:A >> 0
当前线程:C >> 0
当前线程:B >> 0
当前线程:A >> 1
当前线程:A >> 2
当前线程:C >> 1
当前线程:A >> 3
当前线程:B >> 1
当前线程:C >> 2
当前线程:C >> 3
当前线程:B >> 2
当前线程:A >> 4
当前线程:B >> 3
当前线程:C >> 4
当前线程:B >> 4
当前线程:D >> Hello World!
  • 原理分析

1、Callable接口

public interface Callable<V> {
    V call() throws Exception;
}

2、Future接口

public interface Future<V> {
    boolean cancel(boolean var1);//取消

    boolean isCancelled();//是否取消

    boolean isDone();//是否完成

    V get() throws InterruptedException, ExecutionException;//返回结果

    V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;//等待具体时间内返回结果
}

2、RunnableFuture接口实现了Runnable、Future接口

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

3、FutureTask实现了RunnableFuture接口,构造函数接受一个Callable对象

public class FutureTask<V> implements RunnableFuture<V> {
    private Callable<V> callable;
    public FutureTask(Callable<V> var1) {
        if (var1 == null) {
            throw new NullPointerException();
        } else {
            this.callable = var1;
            this.state = 0;
        }
    }
    public void run() {
      ...
      Callable var1 = this.callable;
      var2 = var1.call;
      ...
    }
        public V get() throws InterruptedException, ExecutionException {//get方法
        int var1 = this.state;
        if (var1 <= COMPLETING) {//状态未完成时等待完成
            var1 = this.awaitDone(false, 0L);
        }

        return this.report(var1);//完成后返回结果
    }
    private int awaitDone(boolean var1, long var2) throws InterruptedException {
        long var4 = var1 ? System.nanoTime() + var2 : 0L;
        FutureTask.WaitNode var6 = null;
        boolean var7 = false;

        while(!Thread.interrupted()) {
            int var8 = this.state;
            if (var8 > COMPLETING) {//完成或被打断,返回结果
                if (var6 != null) {
                    var6.thread = null;
                }
                return var8;
            }

            if (var8 == COMPLETING) {//快要完成了,等待一会
                Thread.yield();
            } else if (var6 == null) {
                var6 = new FutureTask.WaitNode();
            } else if (!var7) {
                var7 = UNSAFE.compareAndSwapObject(this, waitersOffset, var6.next = this.waiters, var6);
            } else if (var1) {//有等待时间限制
                var2 = var4 - System.nanoTime();
                if (var2 <= 0L) {
                    this.removeWaiter(var6);
                    return this.state;
                }

                LockSupport.parkNanos(this, var2);
            } else {
                LockSupport.park(this);//通过LockSupport.park方法阻塞
            }
        }
                //最后没有等待结果时被唤醒(LockSupport.uppark())抛出打断异常
        this.removeWaiter(var6);
        throw new InterruptedException();
    }

4、FutureTask对象作为runnable实现类,传入Thread中,调用run方法调用的是callable的call方法,get方法在call方法没有返回时阻塞线程,等待结果返回。

该方法学会了理解起来也简单,但是相对写的类比较多,如果比较在意线程结果返回值来做判断条件的话,可以使用。

CountDownLatch实现

  • 代码实现
//MyRunnable的代码
        /**
     * 通过CountDownLatch的方式
     */
    public MyRunnable(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }
        @Override
    public void run() {
        ...
        if (countDownLatch != null) {//完成后减1
            countDownLatch.countDown();
        }
        ...
    }

public class ThreadTest {

    public void testThread() {
        CountDownLatch countDownLatch = new CountDownLatch(3);//闭锁的数量 三个线程
        //MyRunnable的实现见第一种方式代码,此处不在重复
        Thread threadA = new Thread(new MyRunnable(countDownLatch), "A");
        Thread threadB = new Thread(new MyRunnable(countDownLatch), "B");
        Thread threadC = new Thread(new MyRunnable(countDownLatch), "C");
        Thread threadD = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
            }
        }, "D");
        threadA.start();
        threadB.start();
        threadC.start();
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        threadD.start();
    }

    public static void main(String[] args) {
        new ThreadTest().testThread();
    }
}

//执行结果
当前线程:A >> 0
当前线程:B >> 0
当前线程:C >> 0
当前线程:A >> 1
当前线程:B >> 1
当前线程:A >> 2
当前线程:A >> 3
当前线程:C >> 1
当前线程:B >> 2
当前线程:A >> 4
当前线程:C >> 2
当前线程:B >> 3
当前线程:C >> 3
当前线程:B >> 4
当前线程:C >> 4
当前线程:D >> Hello World!
  • 原理分析

CountDownLatch的核心方法有三个:

1、构造方式传入一个计数数量

private final CountDownLatch.Sync sync;

public CountDownLatch(int var1) {
    if (var1 < 0) {
        throw new IllegalArgumentException("count < 0");
    } else {
        this.sync = new CountDownLatch.Sync(var1);
    }
}

2、countDown计数减1

public void countDown() {
    this.sync.releaseShared(1);
}

3、await阻塞等待,还有一个方法是await(long var1, TimeUnit var3),等待具体的时间

public void await() throws InterruptedException {
    this.sync.acquireSharedInterruptibly(1);
}

public boolean await(long var1, TimeUnit var3) throws InterruptedException {
    return this.sync.tryAcquireSharedNanos(1, var3.toNanos(var1));
}

这三个方法都关联一个类:Sync,该类继承AbstractQueuedSynchronizer类,简称AQS,AQS是用来构建锁或者其他同步组件(信号量、事件等)的基础框架类,JDK中许多并发工具类的内部实现都依赖于AQS,如ReentrantLock, Semaphore, CountDownLatch等等,具体源码此处不做分析。

CountDownLatch简单理解就是await方法阻塞线程,在等待计数为0时唤醒等待,减少计数的方法可以是在不同的线程中,也可以在同一个线程中。使用也比较简单,理解也容易。

CyclicBarrier实现

  • 代码实现
//MyRunnable中的代码
/**
  * 通过CyclicBarrier的方式
  */
public MyRunnable(CyclicBarrier cyclicBarrier) {
  this.cyclicBarrier = cyclicBarrier;
}


@Override
public void run() {
  ...
    if (cyclicBarrier != null) {//通过栅栏方式
    try {
        cyclicBarrier.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (BrokenBarrierException e) {
        e.printStackTrace();
      }
    }
  ...
}

public class ThreadTest {

    public void testThread() {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(4);//这个地方是4,不是3
        Thread threadA = new Thread(new MyRunnable(cyclicBarrier), "A");
        Thread threadB = new Thread(new MyRunnable(cyclicBarrier), "B");
        Thread threadC = new Thread(new MyRunnable(cyclicBarrier), "C");
        Thread threadD = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("当前线程:" + Thread.currentThread().getName() + " >> Hello World!");
            }
        }, "D");
        threadA.start();
        threadB.start();
        threadC.start();
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        threadD.start();
    }

    public static void main(String[] args) {
        new ThreadTest().testThread();
    }
}

//执行结果
当前线程:B >> 0
当前线程:B >> 1
当前线程:C >> 0
当前线程:B >> 2
当前线程:C >> 1
当前线程:B >> 3
当前线程:B >> 4
当前线程:A >> 0
当前线程:C >> 2
当前线程:C >> 3
当前线程:A >> 1
当前线程:C >> 4
当前线程:A >> 2
当前线程:A >> 3
当前线程:A >> 4
当前线程:D >> Hello World!
  • 原理分析

CyclicBarrier是通过ReentrantLock锁来实现的,具体源码就不分析了,其里面也有几个重要的方法:

1、构造方法

public CyclicBarrier(int var1){//传入等待条件数
  ...
}

2、等待方法,当wait方法调用次数达到设定的次数之后,统一唤醒所有等待地方。

public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return this.dowait(false, 0L);
    } catch (TimeoutException var2) {
        throw new Error(var2);
    }
}

CyclicBarrier和CountDownLatch有点类似,但是也有点不一样,用在跑步比赛上的区别就是:

1、CountDownLatch 计数减到0之后,阻塞的地方就可以开始跑了(阻塞了一个地方)

2、CyclicBarrier 当有运动员没有准备好(调用await方法)时,其他的运动员都等着(await阻塞),只有都准备好了再开始跑。

3、CountDownLatch计数只能用一次,CyclicBarrier可以循环使用。

所以用CyclicBarrier的实现思路,相当于让A、B、C三个线程阻塞在最后一步,然后线程D就绪,A、B、C三个线程收个尾之后线程D开始运行,严格意义上并不是A、B、C三个线程和D串行,在理论上实现了串行。

总结

以上就是我能想到的实现A、B、C三个线程并发然后和线程D串行执行的几种方法,代码都以贴出。如果有其他方法,欢迎提出补充。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,137评论 6 511
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,824评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,465评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,131评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,140评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,895评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,535评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,435评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,952评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,081评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,210评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,896评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,552评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,089评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,198评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,531评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,209评论 2 357

推荐阅读更多精彩内容

  • 专业考题类型管理运行工作负责人一般作业考题内容选项A选项B选项C选项D选项E选项F正确答案 变电单选GYSZ本规程...
    小白兔去钓鱼阅读 9,003评论 0 13
  • 正常情况下,每个子线程完成各自的任务就可以结束了。不过有的时候,我们希望多个线程协同工作来完成某个任务,这时就涉及...
    野梦M阅读 496评论 0 2
  • 《爱情公寓4 》在2014年年初完结了,它陪伴我度过了完整的大学时光!所有美好的记忆都在江城这座长江边上的城市就此...
    夏目心叶阅读 928评论 0 6
  • 本周学习了歌谣和故事 一,歌谣 歌谣在英语启蒙阶段是贯穿始终的。 因为它有一个很重要的作用,培养孩子的声韵觉识。所...
    旻mn阅读 167评论 0 0