01 Phaser
Phaser 与 CountDownLatch、CyclicBarrier 做的事情类似,只是Phaser 提供了更加丰富的API,左右处理同样一个场景时,使用一个 Phaser 对象,并通过调用不同的 API 即可用更简单的方式实现相同的功能。
Phaser 主要有以下功能:
(1)提供了多个阻塞节点,因此可以组织一个流程中可以组织多个阻塞节点。
(2)控制线程直接通过阻塞节点还是被阻塞节点阻塞。
(3)动态添加 parties
(4)动态控制 Phaser 对象的阻塞功能失效。
02 常用方法
arriveAndAwaitAdvance()
与 CountDownLatch 的 await()相似,到达阻塞节点之后,等待条件满足后继续向下执行,否则则一直等待。
arriveAndDeregister()
假设 parties = n,首先执行该 arriveAndDeregister()方法的线程将总的 parties 数减一,即阻塞节点原本需要等待n个线程到达,现在阻塞节点只需要等待(n-1)个节点到达就继续可以向下执行。执行 arriveAndDeregister()方法的线程并不会因为执行此方法而推出执行。
getPhase()
获取到达了第几个阻塞点。
onAdvance()
每通过一次阻塞节点就会调用一次。
该方法返回true,表示不再阻塞,即 phaser 呈无效的状态。
该方法返回false,表示 phaser 继续工作。
getReqisteredRarties()
获取注册的 parties 数量。
register()
每执行一次该方法总的 parties 数量就会 + 1。
blukRegister(int parties)
批量一次性添加多个 parties 数量。
getArrivedParties()
获取已经到达阻塞节点的线程数量。
getUnarrivedParties()
获取尚未到达阻塞节点的数量。
arrive()
使 当前阻塞节点的 parties 值加 1( phaser 中总的 parteis 并不会变化), 并切不在阻塞节点等待,直下向下执行下面的代码。
awaitAdvance(int phaseIndex)
如果传入的 phaseIndex 的数值与 getPhase()的数值一样则在阻塞节点等待,否则继续向下执行。这样能够灵活的控制一个线程应该在第几个阻塞节点被阻塞,而不是此次都需要在阻塞节点等待。å
awaitAdvanceInterruptibly(int phaseIndex)
与 awaitAdvance(int phaseIndex) 类似,区别在于当中断时会抛出相关的 InterruptiblyException 异常,并可捕获。
awaitAdvanceInterruptibly(int phaseIndex, long timeout, TimeUnit unit);
如果传入的 phaseIndex 的数值与 getPhase()的数值一样则在阻塞节点等待设定的时间,否则继续向下执行,当超时之后继续向下执行。
forTermination()
将 Phaser 对象的阻塞功能销毁
isTerminated()
判断 Phaser 对象的阻塞功能是否已经被销毁
03 案例
(1) 通过 arriveAndAwaitAdvance() 来建立多个阻塞点,并一次集体功过阻塞点。
Runner.java
package com.page.concurrent.phaser;
import java.util.concurrent.Phaser;
public class Runner extends Thread {
private final Phaser phaser;
private final long index;
public Runner(Phaser phaser, long index) {
this.phaser = phaser;
this.index = index;
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " first sprint");
sprint();
System.out.println(threadName + " arrive A and waiting others");
phaser.arriveAndAwaitAdvance();
System.out.println(threadName + " second sprint");
sprint();
System.out.println(threadName + " arrive B and waiting others");
phaser.arriveAndAwaitAdvance();
}
private void sprint() {
try {
Thread.sleep(index * 2 * 1000);
} catch (InterruptedException e) {
System.out.println("Had error when running " + e.getMessage());
}
}
}
Game.java
package com.page.concurrent.phaser;
import java.util.concurrent.Phaser;
public class Game {
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
for (int i = 1; i <= 3; i++) {
Runner runner = new Runner(phaser, i);
runner.start();
}
}
}
(2) 3 名运动员赛跑,连续进行三次赛跑的,首先到达阻塞节点的则不用往下跑,跑的越慢经过的阻塞节点越多。
Runner.java
package com.page.concurrent.phaser;
import java.util.concurrent.Phaser;
public class Runner extends Thread {
private final Phaser phaser;
private final long index;
public Runner(Phaser phaser, long index) {
this.phaser = phaser;
this.index = index;
}
@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " first sprint");
sprint();
System.out.println(threadName + " arrive A and waiting others " + phaser.getRegisteredParties());
if (arriveAndAwaitAdvanceOrDeregister(threadName, 3, "A")) {
return;
}
System.out.println(threadName + " second sprint");
sprint();
System.out.println(threadName + " arrive B and waiting others");
if (arriveAndAwaitAdvanceOrDeregister(threadName, 2, "B")) {
return;
}
System.out.println(threadName + " still sprint");
}
private boolean arriveAndAwaitAdvanceOrDeregister(String threadName,
int currentParties,
String address) {
if (phaser.getRegisteredParties() == currentParties) {
phaser.arriveAndDeregister();
System.out.println(threadName + " had arrived " + address);
return true;
} else {
phaser.arriveAndAwaitAdvance();
}
return false;
}
private void sprint() {
try {
Thread.sleep(index * 2 * 1000);
} catch (InterruptedException e) {
System.out.println("Had error when running " + e.getMessage());
}
}
}