Phaser用法

Phaser是java 7 引入的新的并发API。他引入了新的Phaser的概念,我们可以将其看成一个一个的阶段,每个阶段都有需要执行的线程任务,任务执行完毕就进入下一个阶段。所以Phaser特别适合使用在重复执行或者重用的情况。

示例1:

需求:1个任务分为2个步骤,步骤之间没有先后顺序,返回两个步骤结果。任一步骤完成后5s内另一步骤未完成,也返回结果。

import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;

 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 @RestController
 @RequestMapping("")
 @Slf4j
 public class PhaserController {
     private static final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
     private static ScheduledFuture<?> future;

     private static List<String> twoPhase = new ArrayList<>();

     private static Phaser phaser;

     private static Phaser getPhaser() {
         if (phaser != null) {
             return phaser;
         }
         phaser = new Phaser() {
             @Override
             protected boolean onAdvance(int phase, int registeredParties) {
                 switch (phase) {
                     case 0:
                         log.info("arrived phase:{}.", phase);
                         future = executorService.schedule(() -> {
                             log.info("获取phaser1超时");
                             phaser.arriveAndAwaitAdvance();
                         }, 5, TimeUnit.SECONDS);
                         return false;
                     case 1:
                         log.info("arrived phase:{}.", phase);
                         if (future != null) {
                             future.cancel(true);
                             future = null;
                         }
                         reset();
                         return true;
                     default:
                         return true;
                 }

             }
         };
         phaser.register();
         return phaser;
     }

     private static void reset() {
         if (phaser != null) {
             phaser = null;
         }
         try {
             log.info("result:{}.",new ObjectMapper().writeValueAsString(twoPhase));
         } catch (JsonProcessingException e) {
             e.printStackTrace();
         }
         twoPhase.clear();
     }

     @GetMapping("/phase0")
     public void phase0(String phase0) {
         twoPhase.add(phase0);
         getPhaser().arriveAndAwaitAdvance();
     }

     @GetMapping("/phase1")
     public void phase1(String phase1) {
         twoPhase.add(phase1);
         getPhaser().arriveAndAwaitAdvance();
     }
 }

正常输出:

步骤二在步骤一之后:
arrived phase:0.
arrived phase:1.
result:["phase0","phase1"].
步骤一在步骤二之后:
arrived phase:0.
arrived phase:1.
result:["phase1","phase0"].

超时输出:

arrived phase:0.
获取phaser1超时
arrived phase:1.
result:["phase1"].
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容