任务的流转控制与监测-spring cloud stream

背景:

  • 公司系统上有很多任务流程要一连串的执行,而且任务之间还有很多依赖关系。比如,A任务执行完了以后,B任务开始执行,但B跟C同时完成的情况下,才能执行D任务。简单的办法,我们可以写几个定时任务,各自分好时段,你跑完了,我再跑。但是这样的话,一整套流程跑下来,执行等待的时间会很长很长。那我们可以不间断的,像加了三通的水管里的水一样,流下去执行吗?答案是,可以的。spring家族的事件驱动型框架有好几个,结合MQ(官方首推kafka跟rabbit)之后,还是能玩得转的。
    其实这东西还是比较好理解,任务之间的关系,通过MQ消息队列来进行传递维系。我执行完了往队列里放一条消息,告诉下一个,你可以开始跑了。实现这些都比较简单,利用官方提供的手册,研究研究还是能很快捯饬出来。但是,只有当B任务跟C任务同时完成后,哪怕B先完成了,也得等C执行完,这种情况下,才能往下执行D,这个怎么控制呢?我们知道,一般mq就两种模式,发布订阅跟生产消费,想了半天也没弄明白怎么利用这两模式完成这个控制动作。后来想了想,既然控制不了消息的订阅与消费,那就控制方法的执行,退而求其次呗。
    脑子转到这,就想到了aop,利用环绕通知来进行控制,结合redis做个中转,再统计下各个任务的执行情况后进行判断代码是否继续往下执行。嗯.....,感觉行得通,来试试效果怎么样。
    首先,我们得把这个依赖关系先解决,很显然,D是依赖B跟C的。在下面代码里,步骤4的执行,是依赖步骤2跟步骤3的,这个依赖关系设置,为了方便我做了个注解,往下看代码。我们把任务先枚举出来。
public enum TaskEnum {
        STEPONEININIT(0,"步骤:1"),
        STEPTWOINIT(1,"步骤:2"),
        STEPTHREEINIT(2,"步骤:3"),
        STEPFOURINIT(3,"步骤:4")
        ;

        private int taskId;
        private String taskName;

        TaskEnum(int taskId, String taskName) {
            this.taskId = taskId;
            this.taskName = taskName;
        }

        public int getTaskId() {
            return taskId;
        }

        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }

        public String getTaskName() {
            return taskName;
        }

        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }
}

接下来是注解,


/**
 * 
 * @author yangpin
 *
 */
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface TaskDependsOn {

    @NotNull
    TaskEnum[] taskEnum();

}

然后是关键的aop,用的环绕通知,


/**
* @Author yangpin
* @Desciption aop处理
* @Date 1:58 2020-03-26
**/
@Aspect
@Component
public class TaskListenerAspect {
 
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final String TaskStreamMonitorQueue = "TaskMonitor:TaskStreamMonitorQueue";


    @Autowired
    StringRedisTemplate stringRedisTemplate;

    /**
    * @Author yangpin
    * @Desciption 参数处理
    * @Date 22:16 2020-03-18
    * @Param [joinPoint]
    * @return com.demo.aop.TaskListenerAspect.Params
    **/
    public Params getAnnotationValue(JoinPoint joinPoint) throws Exception {
        String targetName = joinPoint.getTarget().getClass().getName();
        String methodName = joinPoint.getSignature().getName();
        Object[] arguments = joinPoint.getArgs();
        Class targetClass = Class.forName(targetName);
        Method[] methods = targetClass.getMethods();
        Params params = new Params();
        for (Method method : methods) {
            if (method.getName().equals(methodName)) {
                Class[] classes = method.getParameterTypes();
                if (classes.length == arguments.length) {
                    TaskEnum[] taskEnums = method.getAnnotation(TaskDependsOn.class).taskEnum();
                    Task task = (Task) arguments[0];
                    params.setTask(task);
                    params.setTaskEnums(taskEnums);
                    break;
                }
            }
        }
        return params;
    }


    @Pointcut("execution(* com.demo..*(..))  && @annotation(com.demo.aop.TaskDependsOn)")
    public void TaskListener() {}
 
    @Around("TaskListener()")
    public void round(ProceedingJoinPoint joinPoint) throws Throwable {
        Params params = getAnnotationValue(joinPoint);
        //判断方法是否执行
        if (processInputArg(params))
            joinPoint.proceed();
    }

    /**
    * @Author yangpin
    * @Desciption 利用redis暂存任务的执行情况
    * @Date 1:52 2020-03-26
    * @Param [params]
    * @return boolean
    **/
    private boolean processInputArg(Params params) {
        boolean proceed = false;

        List<String> keys = new ArrayList<>();

        //为了确保任务批次唯一,可以依照自己的业务规则,进行key值设定
        String key = TaskStreamMonitorQueue+params.getTask().getStreamId() + "_" + params.getTask().getTaskId();

        stringRedisTemplate.opsForValue().set(key,params.getTask().toString());

        TaskEnum[] taskEnums = params.getTaskEnums();

        for (int i = 0; i < taskEnums.length; i++) {
            String tmpKey = TaskStreamMonitorQueue+params.getTask().getStreamId() + "_" + taskEnums[i].getTaskId();
            keys.add(tmpKey);
        }

        List<String> strings = stringRedisTemplate.opsForValue().multiGet(keys);

        strings.removeAll(Collections.singleton(null));

        //TODO 从注解上获取任务之间的依赖关系,当最后一个依赖的任务执行完毕并放入redis后,取出所有依赖的任务进行统计
        //TODO 注解依赖的任务数 == redis中已经执行完的任务数 ---> 可以放行  
        if (strings.size() == taskEnums.length) proceed = true;

        return proceed;
    }



    class Params{

        TaskEnum[] taskEnums;

        Task task;

        public TaskEnum[] getTaskEnums() {
            return taskEnums;
        }

        public void setTaskEnums(TaskEnum[] taskEnums) {
            this.taskEnums = taskEnums;
        }

        public Task getTask() {
            return task;
        }

        public void setTask(Task task) {
            this.task = task;
        }

        public Params() {
        }

        public Params(TaskEnum[] taskEnums, Task task) {
            this.taskEnums = taskEnums;
            this.task = task;
        }
    }


}

我们来看下注解的用法。这是任务“步骤:4”的处理类,为了能在页面实时监控每一步任务执行了多少,用了redis的队列进行处理。


@EnableBinding(DemoBinding.class)
public class TaskSink {

    private static final Logger logger = LoggerFactory.getLogger(TaskSink.class);


    @Autowired
    StringRedisTemplate stringRedisTemplate;

    protected static ExecutorService executorService = Executors.newSingleThreadExecutor();

        //填入该任务所依赖的两个任务枚举值,分别是"步骤:2"跟"步骤:3"
    @TaskDependsOn(taskEnum = {TaskEnum.STEPTWOINIT,TaskEnum.STEPTWOINIT})
    @StreamListener(DemoBinding.INPUT_3)
    public void process(Task task) {
        logger.info("stream-demo3收到消息,"+ task.toString()+",开始步骤:4!" );
        task.setTaskId(3);
        task.setTaskName("步骤:4");
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.StepfourCreateQueue,task.toString());
                    stringRedisTemplate.convertAndSend(TaskMonitorQueue.StepfourCreateQueue,task.toString());
                } catch (Exception e) {
                    logger.error("步骤:4任务队列发送失败:" + task.toString());
                }
            }
        });
    }

}

好,再来看下“步骤:2”跟“步骤:3”是怎么处理的,


@EnableBinding(DemoBinding.class)
public class TaskProcessor {

    private static final Logger logger = LoggerFactory.getLogger(TaskProcessor.class);


    @Autowired
    StringRedisTemplate stringRedisTemplate;

    protected static ExecutorService executorService = Executors.newSingleThreadExecutor();


    @StreamListener(DemoBinding.INPUT_1)
    @SendTo({DemoBinding.OUTPUT_3})
    public Task process(Task task) {
        logger.info("stream-demo1,收到消息" + task.toString() + ",处理完毕!");
        //任务id-1
        task.setTaskId(1);
        task.setTaskName("步骤:2");
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.SteptwoCreateQueue,task.toString());
                    stringRedisTemplate.convertAndSend(TaskMonitorQueue.SteptwoCreateQueue,task.toString());
                } catch (Exception e) {
                    logger.error("步骤:2任务队列发送失败:" + task.toString());
                }
            }
        });
        return task;
    }

    @StreamListener(DemoBinding.INPUT_2)
    @SendTo({DemoBinding.OUTPUT_3})
    public Task process2(Task task) {
        logger.info("stream-demo2,收到消息" + task.toString() + ",处理完毕!");
        //任务id-2
        task.setTaskId(2);
        task.setTaskName("步骤:3");
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.StepthreeCreateQueue,task.toString());
                    stringRedisTemplate.convertAndSend(TaskMonitorQueue.StepthreeCreateQueue,task.toString());
                } catch (Exception e) {
                    logger.error("步骤:3任务队列发送失败:" + task.toString());
                }
            }
        });
        return task;
    }
}

我们继续往下看“步骤:1”,也就是源头任务,怎么处理的。


@EnableScheduling
@EnableBinding(DemoBinding.class)
public class TaskSender {

    private static final Logger logger = LoggerFactory.getLogger(TaskSender.class);

    @Autowired
    private DemoBinding source;

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    protected static ExecutorService executorService = Executors.newSingleThreadExecutor();

    @Scheduled(initialDelay = 10000,fixedDelay = 3000)
    public void sendEvents() {
        Task task = new Task();
        //任务id-0,为保证任务流程批次的唯一性,利用uuid设置streamId进行甄别
        task.setStreamId(UUID.randomUUID().toString().replace("-",""));
        task.setTaskId(0);
        task.setTaskName("步骤:1");
        task.setOrgansin("ABCD" + new Random().nextInt(700));
        //完成,通知步骤1跟步骤2
        this.source.output1().send(MessageBuilder.withPayload(task).build());
        this.source.output2().send(MessageBuilder.withPayload(task).build());
        logger.info("向stream-demo1,stream-demo2发送消息:"+ task.toString());

        executorService.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    stringRedisTemplate.opsForList().leftPush(TaskMonitorQueue.SteponeCreateQueue,task.toString());
                    stringRedisTemplate.convertAndSend(TaskMonitorQueue.SteponeCreateQueue,task.toString());
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("步骤:1任务队列发送失败:" + task.toString());
                }
            }
        });
    }
}

为了实时掌握任务的执行情况并反馈到页面,用了websocket,在这就不赘述了。
任务关系:1先执行完,通知2、3,然后2、3都完成了,跑4,看下最后的效果:


spring-cloud-stream-demo.gif

看起来是实现了这个效果。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容