深耕 JDK 源码 - Phaser

Phaser 是 Java 8 引入的一种同步工具,用于协调多个线程之间的同步操作。它提供了更灵活和高级的同步功能,可以替代传统的 CountDownLatch 和 CyclicBarrier,用于更复杂的并发场景。本文将详细介绍 Phaser 的实现原理、常见用法以及代码示例。

Phaser 的实现原理

Phaser 的实现原理基于一个 phase(阶段)的概念,线程可以在这个阶段中等待其他线程的到达,然后一起继续执行下一阶段的任务。Phaser 内部维护了一个参与者(parties)的计数器,表示当前参与 Phaser 同步的线程数量。每当一个线程调用 Phaser 的 arriveAndAwaitAdvance() 方法时,它会将自己标记为已到达,并等待其他线程的到达;当所有线程都到达时,Phaser 会自动进入下一个阶段,并唤醒所有等待的线程。

Phaser 支持多个阶段的连续同步操作,每个阶段都有一个参与者计数器,控制着当前阶段的线程数量。当一个阶段的参与者数量为 0 时,表示当前阶段的所有线程都已完成任务,Phaser 会自动进入下一个阶段。Phaser 还支持对参与者的动态注册和注销,从而在运行时动态地调整同步的线程数量。

Phaser 还提供了可选的注册和注销回调,使得在阶段的开始和结束时可以执行自定义的操作,从而更加灵活地控制同步的行为。Phaser 内部使用了一些高效的同步机制,如 CAS 操作和 volatile 变量,来保证多线程间的同步和顺序性。

Phaser 的常见用法

Phaser 可以用于各种并发场景,如多线程任务的协同工作、分阶段的计算、循环执行任务等。下面是一些 Phaser 的常见用法:

1.多线程任务的协同工作

Phaser 可以用于将多个线程分成多个阶段进行协同工作。比如一个任务需要多个子任务依次完成,每个子任务都需要等待其他子任务的完成才能继续进行,这时可以使用 Phaser 来进行同步。示例代码如下:

class Worker implements Runnable {
    private final Phaser phaser;

    public Worker(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
        System.out.println("Worker " + Thread.currentThread().getName() + " starts");
        phaser.arriveAndAwaitAdvance(); // 等待其他线程到达
        System.out.println("Worker " + Thread.currentThread().getName() + " continues");
        phaser.arriveAndAwaitAdvance(); // 等待其他线程到达
        System.out.println("Worker " + Thread.currentThread().getName() + " finishes");
        phaser.arriveAndDeregister(); // 注销自己
    }
}

public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3); // 创建 Phaser,设置参与者数量为 3
        ExecutorService executorService = Executors.newFixedThreadPool(3); // 创建线程池

        // 创建 3 个 Worker 实例,并提交到线程池中执行
        for (int i = 0; i < 3; i++) {
            executorService.execute(new Worker(phaser));
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在这个示例中,我们创建了一个 Phaser 实例,并将参与者数量设置为 3。然后,我们创建了 3 个 Worker 实例,并提交到线程池中执行。每个 Worker 实例在执行时都会调用 Phaser 的 arriveAndAwaitAdvance() 方法,等待其他线程的到达。当所有线程都到达时,Phaser 会自动进入下一个阶段,从而实现了多线程任务的协同工作。

2.分阶段的计算

Phaser 可以用于将计算任务分成多个阶段进行处理。比如一个复杂的计算任务需要经过多个阶段,每个阶段都有不同的处理逻辑,这时可以使用 Phaser 来进行分阶段的计算。示例代码如下:

class CalculationTask implements Runnable {
    private final int phase;
    private final Phaser phaser;

    public CalculationTask(int phase, Phaser phaser) {
        this.phase = phase;
        this.phaser = phaser;
    }

    @Override
    public void run() {
        System.out.println("CalculationTask " + Thread.currentThread().getName() + " starts phase " + phase);
        // 根据阶段号执行不同的计算逻辑
        switch (phase) {
            case 0:
                // 第一阶段的计算逻辑
                // ...
                break;
            case 1:
                // 第二阶段的计算逻辑
                // ...
                break;
            case 2:
                // 第三阶段的计算逻辑
                // ...
                break;
            // ...
            default:
                break;
        }
        System.out.println("CalculationTask " + Thread.currentThread().getName() + " finishes phase " + phase);
        phaser.arriveAndAwaitAdvance(); // 等待其他线程到达
    }
}

public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(3); // 创建 Phaser,设置参与者数量为 3
        ExecutorService executorService = Executors.newFixedThreadPool(3); // 创建线程池

        // 创建 3 个 CalculationTask 实例,并提交到线程池中执行
        for (int i = 0; i < 3; i++) {
            executorService.execute(new CalculationTask(i, phaser));
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在这个示例中,我们创建了一个 Phaser 实例,并将参与者数量设置为 3。然后,我们创建了 3 个 CalculationTask 实例,并提交到线程池中执行。每个 CalculationTask 实例在不同的阶段执行不同的计算逻辑,然后调用 Phaser 的 arriveAndAwaitAdvance() 方法等待其他线程的到达。当所有线程都完成当前阶段的计算后,Phaser 会自动进入下一个阶段,从而实现了分阶段的计算。

3.动态添加和删除参与者

Phaser 还支持动态添加和删除参与者。在某些场景下,可能需要在任务执行过程中动态地添加或删除参与者。Phaser 提供了 register()、arriveAndDeregister() 和 bulkRegister() 等方法来支持动态参与者的管理。示例代码如下:

class DynamicTask implements Runnable {
    private final Phaser phaser;
    private final String name;

    public DynamicTask(Phaser phaser, String name) {
        this.phaser = phaser;
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(name + " starts");
        phaser.arriveAndAwaitAdvance(); // 等待其他线程到达

        // 动态添加一个参与者
        phaser.register();
        System.out.println(name + " is added as a participant");

        // 执行任务逻辑
        System.out.println(name + " is working");

        phaser.arriveAndAwaitAdvance(); // 等待其他线程到达

        // 动态删除一个参与者
        phaser.arriveAndDeregister();
        System.out.println(name + " is removed as a participant");
    }
}

public class PhaserExample {
    public static void main(String[] args) {
        Phaser phaser = new Phaser(1); // 创建 Phaser,初始参与者数量为 1
        ExecutorService executorService = Executors.newFixedThreadPool(3); // 创建线程池

        // 创建 3 个 DynamicTask 实例,并提交到线程池中执行
        for (int i = 0; i < 3; i++) {
            executorService.execute(new DynamicTask(phaser, "Task " + i));
        }

        phaser.arriveAndAwaitAdvance(); // 等待所有线程注册

        // 动态添加一个参与者
        phaser.register();
        System.out.println("Additional participant is added");

        phaser.arriveAndAwaitAdvance(); // 等待所有线程执行任务

        executorService.shutdown();
    }
}

在这个示例中,我们创建了一个 Phaser 实例,并将初始参与者数量设置为 1。然后,我们创建了 3 个 DynamicTask 实例,并提交到线程池中执行。在任务执行过程中,我们使用了 register() 方法动态添加了一个参与者,并使用了 arriveAndDeregister() 方法动态删除了一个参与者。这种方式可以在任务执行过程中灵活地管理参与者数量。

总结:

JDK 8 中引入的 Phaser 类提供了一种强大的多线程协同工作的机制,可以用于同步多个线程之间的执行流程,并支持分阶段的任务处理和动态管理参与者。Phaser 的实现原理基于分层的树状结构,使用了类似于 CyclicBarrier 和 CountDownLatch 的概念,并且提供了丰富的方法来满足不同的需求。通过使用 Phaser,我们可以简化多线程编程中的同步和协调操作,从而提高多线程应用程序的性能和可维护性。

在常见的使用场景中,Phaser 可以用于解决需要多个线程协同工作的问题,例如多阶段的并行计算、游戏引擎中的场景切换、分布式系统中的任务协调等。Phaser 提供了丰富的方法和灵活的特性,可以满足不同场景下的需求。

在使用 Phaser 时,需要注意一些注意事项,如合理设计阶段数量、合理使用 awaitAdvance() 方法和避免出现死锁等。此外,Phaser 也并不适合所有的多线程场景,对于简单的同步需求,使用其他的同步工具如 CountDownLatch 和 CyclicBarrier 可能更加简单和适用。

总的来说,Phaser 是 JDK 8 提供的一个强大的多线程协同工作工具,通过其分阶段的任务处理和动态管理参与者的特性,可以在复杂的多线程场景中简化同步和协调操作,提高多线程应用程序的性能和可维护性。在实际应用中,合理使用 Phaser 可以充分发挥其优势,提升多线程编程的效果。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容