1.概览
在这篇文章中,我们将看一下java.util.Phaser的概念,Phaser和CountDownLatch很相似,它允许我们协调线程的执行。和CountDownLatch相比,它还有一些其他功能。Phaser是一个屏障器,许多线程在继续执行之前,必须先等待该屏障器。在CountDownLatch中,数量是不能够动态配置的,它需要在我们创建CountDownLatch实例的时候就提供。
2.Phaser API
Phaser 允许我们在 线程等待屏障器barrier中构建逻辑。
我们可以协调多个执行的阶段,为每一个程序阶段复用Phaser实例。每个阶段都可以有不同数量的等待进入下一阶段线程,我们接下来将看一个使用phase的例子。为了参与协调工作,线程需要把它自己注册到Phaser实例上。注意:这仅仅是增加了已注册的数量,我们并不能检查当前线程是否已经被注册了-为了支持这一点,我们必须继承相应的实现。
线程可以调用arriveAndAwaitAdvance()方法来发出一个信号,来说明它已经到达了屏障器。这个arriveAndAwaitAdvance()方法是一个阻塞方法。
当到达方的数量等于注册方的数量时,程序的运行将继续,并且 phase的数量将增加。通过调用getPhase()方法,我们可以获取到当前的phase number。
当线程结束了它的工作,我们就应该调用arriveAndDeregister()方法,去说明一下,在特定的阶段,当前线程不应该再被统计了。
3.使用Phaser API实现相关逻辑
假设说,我们需要协调多个行动阶段。有三个线程将处理第一个阶段,另有俩个线程将处理第二个阶段。
我们将创建一个实现了Runnable接口的LongRunningAction类:
class LongRunningAction implements Runnable {
private String threadName;
private Phaser ph;
LongRunningAction(String threadName, Phaser ph) {
this.threadName = threadName;
this.ph = ph;
ph.register();
}
@Override
public void run() {
ph.arriveAndAwaitAdvance();
try{
Thread.sleep(20);
} catch(InterruptedException e) {
e.printStackTrace();
}
ph.arriveAndDeregister();
}
}
当我们的action类被实例化之后,我们就调用 register()方法向Phaser实例注册。这会导致 使用特定Phaser的线程数量增加。
调用 arriveAndAwait()方法会导致当前线程等待屏障器(barrier)。正如已经提到的那样,当到达放的数量和注册方的数量相同时,就会继续执行。在处理完成之后,当前线程就可以通过调用arriveAndDeregister()方法把自己注销掉。
我们来创建一个测试案例。在这个案例中,我们将创建三个 LongRunningAction线程并且阻塞在屏障器(barrier)上.紧接着,当相应的动作结束之后,我们将创建2个额外的longRunningAction线程,这俩个额外的线程将执行下一阶段的处理。
当在主线程中创建Phaser实例时,我们传递了一个int 值1 作为参数,这等效于在当前线程中调用register()方法。我们之所以这样做,是因为在我们创建三个工作线程时,主线程就是个协调器,因此,该变相1器就需要有四个线程向它注册:
ExecutorService executorService = Executors.newCachedThreadPool();
Phaser ph = new Phaser(1);
assertEquals(0, ph.getPhase());
在初始化之后,phase的值为0。
Phaser类有一个构造器,该构造器可以接收一个父类实例。 当我们有大量同步竞争的消耗时,这很有用。在这种情况下,可以设置Phaser实例,这样sub-phaser的组就都能共享一个公共的父类。
下一步,我们启动了三个 LongRunningAction线程,这三个线程会一直等待屏障器(barrier)直到我们在主线程中调用arriveAndAwaitAdvance()方法。
请记住,我们已经用1初始化了我们的Phaser,并且调用了多次register()方法。现在,三个动作线程都声明它们已经到达了屏障器处,因此,我们需要至少调用一次arriveAndAwaitAdvance()方法。---这里,我们在主线程中调用:
executorService.submit(new LongRunningAction("thread-1", ph));
executorService.submit(new LongRunningAction("thread-2", ph));
executorService.submit(new LongRunningAction("thread-3", ph));
ph.arriveAndAwaitAdvance();
assertEquals(1, ph.getPhase());
在那一阶段结束之后,getPhase()方法将返回1,因为程序结束了第一阶段的处理工作。
在这之后,getPhase()方法返回的phase number将会是2.当我们想要结束我们的程序时,尚且需要调用arriveAndDeregister()方法,因为主线程仍然还注册在Phaser中。当注册方由于注销的原因变为0之后,Phaser就会终止。所有对同步方法的调用都将不再被阻塞,相反,这些调用都会立即返回。
运行该程序将产生如下输出:
This is phase 0
This is phase 0
This is phase 0
Thread thread-2 before long running action
Thread thread-1 before long running action
Thread thread-3 before long running action
This is phase 1
This is phase 1
Thread thread-4 before long running action
Thread thread-5 before long running action
我们已经看到了,所有的线程都正在等待执行,直到屏障器打开。只有当前一次的执行结束时,才会执行下一阶段的运行。
4.总结
在这篇教程中,我们看到了java.util.concurrent包中的Phaser变相器的构造,并且我们可以使用Phaser类来实现多个阶段的协调逻辑。所有这些案例的代码片段都能在github上找到,源码地址: sourceCode