java多线程-线程调度

java多线程编程中为了满足一些应用场景往往需要对线程进行调度,jdk提供了多种调度方法,接下来我们一一来举例说明。

  • 信号量同步 主要是为了限流控制线程的并发数,这时我们可以采用这种方法,信号量简单理解为小区入口的闸门,刷卡一次获取一个信号,当然也可以一次占用多个信号,总之信号的总量不变,占用多了能通过的线程就少了,具体我们可以从代码中来进行分析
public class SemaphoreDemo {


    private Semaphore semaphore = new Semaphore(10);

    public void printNum(int i){
        try {
            semaphore.acquire(1);
            System.out.println("queue = " + semaphore.getQueueLength());
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName()+" semaphore = " + i);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            semaphore.release(1);
        }
    }

    public static void main(String[] args){
        final SemaphoreDemo demo = new SemaphoreDemo();
        for(int i =0; i<100; i++){
            final int finalI = i;
            new Thread(new Runnable() {
                @Override
                public void run() {
                    demo.printNum(finalI);
                }
            }).start();
        }
    }
}

我们初始化了一个总量为10的信号量池,每一个线程在执行的时候都会获取一个信号 ,其中semaphore.acquire(1); 方法就是获取信号,当然也可以一次获取多个。拿到信号量后,线程开始执行自身的业务逻辑,当任务执行完成或者发生异常后我们需要释放信号量,不然会一直占用信号量,semaphore.acquire方法是阻塞执行,如果没有获取到信号量会一直阻塞代码的执行直到获取信号量成功,如果我们需要让代码异步执行可以尝试semaphore.tryAcquire,如果没有获取到信号量则会返回false告诉调用者,这样我们可以根据结果处理对应的逻辑,当然也可以设置一个超时时间,如果在指定的时间内没有获取到信号量则返回false。

  • CountDownLatch 让指定的线程全部都执行完成后在接着做后续的逻辑,可以简单理解为,运动会的时候赛场上裁判必须让运动员就位以后才开始进行比赛,CountDownLatch就是那个场上的裁判。我们来看看代码实现
public class CountDownLatchDemo {



    public void race(CountDownLatch latch){
        try {
            System.out.println(Thread.currentThread().getName()+ " is here");
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName()+ " is done");
            latch.countDown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {

        }
    }


    public static void main(String[] args){
        final CountDownLatch latch = new CountDownLatch(4);
        final CountDownLatchDemo demo = new CountDownLatchDemo();
        for(int i=0;i<4;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    demo.race(latch);
                }
            }).start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("all is done");
    }
}
Thread-0 is here
Thread-1 is here
Thread-2 is here
Thread-3 is here
Thread-1 is done
Thread-3 is done
Thread-2 is done
Thread-0 is done
all is done

通过运行结果我们可以看到,4个线程各自等待3s后在执行最后的代码,每个线程在启动的时候会进行计数,然后latch.await方法就一直处于等待状态,只有计数器为0时才会执行之后的的代码,latch.await方法还可以设置超时时间,超时后会自动执行后续的代码。

  • CyclicBarrier 过程同步,简单理解就是每个线程阶段完成各自任务后等待其他线程执行完成,然后在执行下一阶段的任务,完成后在等待其他线程,如此循环往复,就好比赛场上四名运动员跑步,哪个人领先来就停下来等其他的运动员赶上来,最后一起冲刺终点。我们看看在代码中是如何实现的
public class CyclicBarrierDemo {
    private Random random = new Random();

    public void race(CyclicBarrier barrier){
        try {
            Thread.sleep(random.nextInt(5)*1000);
            System.out.println(Thread.currentThread().getName()+"到达地点1");
            barrier.await();
            Thread.sleep(random.nextInt(5)*1000);
            System.out.println(Thread.currentThread().getName()+"到达地点2");
            barrier.await();
            Thread.sleep(random.nextInt(5)*1000);
            System.out.println(Thread.currentThread().getName()+"到达地点3");
            barrier.await();
            Thread.sleep(random.nextInt(5)*1000);
            System.out.println(Thread.currentThread().getName()+"到达地点4");
            barrier.await();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args){
        final CyclicBarrierDemo demo = new CyclicBarrierDemo();
        final CyclicBarrier barrier = new CyclicBarrier(4);
        for(int i=0;i<4;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    demo.race(barrier);
                }
            }).start();
        }
    }

}

pool-1-thread-2到达地点1
pool-1-thread-1到达地点1
pool-1-thread-3到达地点1
pool-1-thread-4到达地点1
pool-1-thread-1到达地点2
pool-1-thread-4到达地点2
pool-1-thread-3到达地点2
pool-1-thread-2到达地点2
pool-1-thread-3到达地点3
pool-1-thread-1到达地点3
pool-1-thread-2到达地点3
pool-1-thread-4到达地点3
pool-1-thread-2到达地点4
pool-1-thread-3到达地点4
pool-1-thread-1到达地点4
pool-1-thread-4到达地点4

通过设置屏障点,我们可以看到四个线程都到达屏障点后才执行后续的逻辑,哪一个线程到达屏障点后就自动等待,一直到所有的线程都到达后方才开始执行。

  • Phaser是一个更强大的、更复杂的同步辅助类,可以代替CyclicBarrier CountDownLatch的功能,但是比他们更强大。
    Phaser类机制是在每一步结束的位置对线程进行同步,当所有的线程都完成了这一步,才能进行下一步。
    当我们有并发任务并且需要分解成几步执行的时候,这种机制就非常适合。
    CyclicBarrier CountDownLatch 只能在构造时指定参与量,而phaser可以动态的增减参与量。
    phaser 使用说明:
    使用phaser.arriveAndAwaitAdvance(); //等待参与者达到指定数量,才开始运行下面的代码
    使用phaser.arriveAndDeregister(); //注销当前线程,该线程就不会进入休眠状态,也会从phaser的数量中减少
    模拟代替CountDownLatch功能,只需要当前线程arriveAndAwaitAdvance()之后运行需要的代码之后,就arriveAndDeregister()取消当前线程的注册。
    phaser有一个重大特性,就是不必对它的方法进行异常处理。置于休眠的线程不会响应中断事件,不会抛出interruptedException异常, 只有一个方法会响应:AwaitAdvanceInterruptibly(int phaser).
    其他api

arrive():这个方法通知phase对象一个参与者已经完成了当前阶段,但是它不应该等待其他参与者都完成当前阶段,必须小心使用这个方法,因为它不会与其他线程同步。
awaitAdvance(int phase):如果传入的阶段参数与当前阶段一致,这个方法会将当前线程至于休眠,直到这个阶段的所有参与者都运行完成。如果传入的阶段参数与当前阶段不一致,这个方法立即返回。
awaitAdvanceInterruptibly(int phaser):这个方法跟awaitAdvance(int phase)一样,不同处是:该访问将会响应线程中断。会抛出interruptedException异常
将参与者注册到phaser中:
register():将一个新的参与者注册到phase中,这个新的参与者将被当成没有执完本阶段的线程。
bulkRegister(int parties):将指定数目的参与者注册到phaser中,所有这些新的参与者都将被当成没有执行完本阶段的线程。
减少参与者
只提供了一个方法来减少参与者:arriveAndDeregister():告知phaser对应的线程已经完成了当前阶段,并它不会参与到下一阶段的操作中。
强制终止
当一个phaser么有参与者的时候,它就处于终止状态,使用forceTermination()方法来强制phaser进入终止状态,不管是否存在未注册的参与线程,当一个线程出现错误时,强制终止phaser是很有意义的。
当phaser处于终止状态的时候,arriveAndAwaitAdvance() 和 awaitAdvance() 立即返回一个负数,而不再是一个正值了,如果知道phaser可能会被终止,就需要验证这些方法的值,以确定phaser是不是被终止了。
被终止的phaser不会保证参与者的同步

public class PhaserDemo {


    private Random random = new Random();

    public void race(Phaser phaser){
        try {
            Thread.sleep(random.nextInt(5)* 1000);
            System.out.println(Thread.currentThread().getName()+ "aaaaa");
            phaser.arriveAndAwaitAdvance();
            Thread.sleep(random.nextInt(5)* 1000);
            System.out.println(Thread.currentThread().getName()+ "bbbbb");
            phaser.arriveAndAwaitAdvance();
            Thread.sleep(random.nextInt(5)* 1000);
            System.out.println(Thread.currentThread().getName()+ "cccc");
            phaser.arriveAndAwaitAdvance();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args){
        final PhaserDemo demo = new PhaserDemo();
        ExecutorService pool = Executors.newCachedThreadPool();
        final Phaser phaser = new Phaser(3);
        for(int i=0;i<3;i++){
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    demo.race(phaser);
                }
            });
        }
        pool.shutdown();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容