调度中心集群参考

  1. 总体思路
  • 利用kafka单播模式:开启一个定时任务
  • 利用kafka广播模式:关闭某个定时任务
  1. 实现代码
  • WcsTimerManager类

     package com.isoftstone.hig.wmsck.adapters.jd.common.timermanager;
     
     import com.alibaba.fastjson.JSONObject;
     import com.isoftstone.hig.common.constants.KafkaConstant;
     import com.isoftstone.hig.common.model.MQEvent;
     import com.isoftstone.hig.common.model.ResultMode;
     import com.isoftstone.hig.common.utils.*;
     import com.isoftstone.hig.wmsck.adapters.jd.api.param.timermanager.TimerTaskParam;
     import com.isoftstone.hig.wmsck.adapters.jd.api.param.timermanager.WcsTimerTaskEnum;
     import com.isoftstone.hig.wmsck.adapters.jd.common.kafka.KafkaClientManager;
     import org.quartz.CronExpression;
     import org.springframework.beans.factory.annotation.Autowired;
     import org.springframework.context.annotation.Bean;
     import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
     import org.springframework.scheduling.support.CronTrigger;
     import org.springframework.stereotype.Component;
     
     import javax.validation.constraints.NotNull;
     import java.util.Date;
     import java.util.HashMap;
     import java.util.Map;
     import java.util.concurrent.ScheduledFuture;
     
     @Component
     public class WcsTimerManager {
     
         private ScheduledFuture future;
     
         private static Map<String,ScheduledFuture> map = new HashMap<>();
     
         @Autowired
         private TimerTask timerTask;
     
         @Autowired
         private ThreadPoolTaskScheduler threadPoolTaskScheduler;
     
         @Bean
         public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
             return new ThreadPoolTaskScheduler();
         }
     
         /**
          * 开启定时器
          */
         public ResultMode startTask(TimerTaskParam timerTask) {
             ResultMode resultMode = new ResultMode();
             try {
                 if (PublicUtil.isEmpty(timerTask.getTaskNameNo())){
                     resultMode.setSucceed(false);
                     resultMode.setErrMsg("任务名称编号不能为空");
                     return resultMode;
                 }
                 if (PublicUtil.isEmpty(timerTask.getTimeCycle())){
                     resultMode.setSucceed(false);
                     resultMode.setErrMsg("任务时间周期不能为空");
                     return resultMode;
                 }
                 if (PublicUtil.isEmpty(timerTask.getTimeType())){
                     resultMode.setSucceed(false);
                     resultMode.setErrMsg("cron表达式类型不能为空");
                     return resultMode;
                 }
                 //获取cron表达式
                 String cronTrigger = this.getCron(timerTask);
                 //校验cron表达式是否合法
                 if (!CronExpression.isValidExpression(cronTrigger)){
                     LogHelper.writeInfo("cron表达式非法!");
                     resultMode.setSucceed(false);
                     resultMode.setErrMsg("cron表达式非法!");
                     return resultMode;
                 }
                 //如果存在Redis标识
                 if (RedisUtil.hexists("future",timerTask.getTaskNameNo())){
                     //判断任务是否关闭,已关闭=true
                     boolean bool = Boolean.valueOf(RedisUtil.hget("future",timerTask.getTaskNameNo()));
                     //如果未关闭,等待其关闭
                     if (!bool){
                         //先关闭上个任务,开启新的任务!!!
                         this.stopTask(timerTask);
                         LogHelper.writeInfo("先关闭上个任务,开启新的任务!!!");
     
                         //如果值为:false-未关闭,一直等待其关闭
                         while (!Boolean.valueOf(RedisUtil.hget("future",timerTask.getTaskNameNo()))){
                             Thread.sleep(1000);
                             LogHelper.writeWarn("休眠1秒!");
                         }
                     }
                 }
                 ExecuteTimerTask executeTimerTask = new ExecuteTimerTask();
                 //任务名称编号
                 executeTimerTask.setTaskNameNo(timerTask.getTaskNameNo());
                 //入参对象
                 executeTimerTask.setData(timerTask.getData());
                 future = threadPoolTaskScheduler.schedule(executeTimerTask,new CronTrigger(cronTrigger));
                 //放入map中,停止任务时需要释放对应的任务
                 map.put(timerTask.getTaskNameNo(),future);
                 //写入标识到Redis
                 long result = RedisUtil.hset("future",timerTask.getTaskNameNo(), "false");
                 LogHelper.writeInfo("Redis标识改为false-未关闭:"+result);
                 LogHelper.writeInfo("策略已经启动");
                 resultMode.setErrMsg("策略已经启动!");
             } catch (Exception e) {
                 LogHelper.writeError("策略启动失败{}", e);
                 resultMode.setSucceed(false);
                 resultMode.setErrMsg("策略启动失败!");
             }
             return resultMode;
         }
     
         /**
          * 关闭定时器-发送kafka消息
          */
         public ResultMode stopTask(TimerTaskParam timerTask) {
             ResultMode resultMode = new ResultMode();
             try {
     
                 if (PublicUtil.isEmpty(timerTask.getTaskNameNo())) {
                     resultMode.setSucceed(false);
                     resultMode.setErrMsg("任务名称编号不能为空");
                     return resultMode;
                 }
                 //发布kafka
                 MQEvent mqEvent = new MQEvent<>(UtilityClass.uuid(), KafkaConstant.EVENT_WCS_TIMER_MANAGER, timerTask);
                 String topic = SpringContextUtil.getKafkaTopicPrefix() + UtilityEnum.KafkaTopicNameEnum.TOPIC_WMSCK_JDA_TIMER_MANAGER.getTopicName();
                 LogHelper.writeInfo("生产消息:" + JSONObject.toJSONString(mqEvent));
                 KafkaClientManager.sendMessage(topic, null, JSONObject.toJSONString(mqEvent));
                 resultMode.setErrMsg("策略关闭请求发送成功");
             }catch (Exception e){
                 LogHelper.writeError("策略关闭失败{}", e);
                 resultMode.setSucceed(false);
                 resultMode.setErrMsg("策略关闭失败!");
             }
             return resultMode;
         }
     
         /**
          * 关闭定时器-消费kafka消息
          */
         public void stopTimerManager(MQEvent mqEvent) {
             LogHelper.writeInfo("定时器,监听测试---------" + JSONObject.toJSONString(mqEvent));
             LogHelper.writeInfo("消费消息:" + JSONObject.toJSONString(mqEvent));
             TimerTaskParam timerTask = JSONObject.parseObject(JSONObject.toJSONString(mqEvent.getData()), TimerTaskParam.class);
             //关闭定时器
             if (!this.stop(timerTask).getSucceed()){
                 LogHelper.writeInfo("关闭定时器失败!");
             }
             LogHelper.writeInfo("关闭定时器成功!");
         }
         //关闭定时任务
         private ResultMode stop(TimerTaskParam timerTask) {
             ResultMode resultMode = new ResultMode();
             try {
                 if (future != null) {
                     future=map.get(timerTask.getTaskNameNo());
                     future.cancel(true);
                     LogHelper.writeInfo("future:"+future);
                 }
                 //Redis标识改为true-已关闭
                 long result = RedisUtil.hset("future",timerTask.getTaskNameNo(),"true");
                 LogHelper.writeInfo("Redis标识改为true-已关闭:"+result);
     
                 LogHelper.writeInfo("策略已经停止");
                 resultMode.setErrMsg("策略已经停止");
                 return resultMode;
             } catch (Exception e) {
                 LogHelper.writeError("策略停止失败{}", e);
                 resultMode.setSucceed(false);
                 resultMode.setErrMsg("策略停止失败!");
                 return resultMode;
     
             }
         }
     
         //获取cron表达式
         private String getCron(TimerTaskParam timerTask){
             //间隔时间或cron表达式
             String time = timerTask.getTimeCycle();
             //按秒执行
             if (WcsTimerTaskEnum.TimerTypeEnum.WCS_TimerType_10.getCode().equals(timerTask.getTimeType())){
                 return "0/"+time+" * * * * ?";
             }
             //按分钟执行
             if (WcsTimerTaskEnum.TimerTypeEnum.WCS_TimerType_20.getCode().equals(timerTask.getTimeType())){
                 return "0 0/"+time+" * * * ?";
             }
             //按小时执行
             if (WcsTimerTaskEnum.TimerTypeEnum.WCS_TimerType_30.getCode().equals(timerTask.getTimeType())){
                 return "0 0 0/"+time+" * * ?";
             }
             //自定义时间cron表达式
             return time;
         }
     
         //执行任务
         private class ExecuteTimerTask implements Runnable{
     
             /**
              * 任务名称编号
              */
             @NotNull(message = "任务名称编号不能为空")
             private String taskNameNo;
     
             /**
              * 入参对象
              */
             private Object data;
     
     
             public String getTaskNameNo() {
                 return taskNameNo;
             }
     
             public void setTaskNameNo(String taskNameNo) {
                 this.taskNameNo = taskNameNo;
             }
     
             public Object getData() {
                 return data;
             }
     
             public void setData(Object data) {
                 this.data = data;
             }
     
             @Override
             public void run() {
                 if (this.taskNameNo.equals(WcsTimerTaskEnum.TimerTaskEnum.WCS_TIMER_TASK_10.getCode())) {
                     LogHelper.writeInfo("定时生成设备摘果单-启动中:" + new Date());
                     //定时生成设备摘果单
                     timerTask.buildPickingTask(this.data);
                 } else {
                     LogHelper.writeInfo("待定任务-我在启动中:" + new Date());
                 }
     
             }
         }
     
     }
     
    
  • TimerTask类

    package com.isoftstone.hig.wmsck.adapters.jd.common.timermanager;
    
    import com.alibaba.fastjson.JSON;
    import com.isoftstone.hig.common.model.ResultMode;
    import com.isoftstone.hig.wmsck.adapters.jd.client.WcsLocalCallServiceBusiness;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    //定时任务类
    @Service
    public class TimerTask {
    
        @Autowired
        private WcsLocalCallServiceBusiness wcsLocalCallServiceBusiness;
    
        /**
         * 自动生成设备摘果单
         */
        public ResultMode<String> buildPickingTask(Object data){
            //LogHelper.writeInfo("入参:"+JSON.toJSONString(data));
            String warehouseNo = JSON.parseObject(JSON.toJSONString(data),String.class);
            //生成设备摘果单
            return wcsLocalCallServiceBusiness.buildPickingTask(warehouseNo);
        }
    
    }
    
    
  • TimerTaskParam类

    package com.isoftstone.hig.wmsck.adapters.jd.api.param.timermanager;
    
    import io.swagger.annotations.ApiModel;
    import io.swagger.annotations.ApiModelProperty;
    
    import java.io.Serializable;
    
    @ApiModel("定时器入参")
    public class TimerTaskParam implements Serializable {
    
        private static final long serialVersionUID = 2168115660376016205L;
    
        /**
         * 任务名称编号
         */
        @ApiModelProperty(value = "任务名称编号", name ="taskNameNo")
        private String taskNameNo;
    
        /**
         * 任务时间周期或cron表达式
         */
        @ApiModelProperty(value = "任务时间周期或cron表达式", name ="timeCycle")
        private String timeCycle;
    
        /**
         * cron表达式类型:参考WcsTimerTaskEnum.TimerTypeEnum枚举类型
         */
        @ApiModelProperty(value = "cron表达式类型", name ="timeType")
        private String timeType;
    
        /**
         * 入参对象
         */
        private Object data;
    
        public String getTaskNameNo() {
            return taskNameNo;
        }
    
        public void setTaskNameNo(String taskNameNo) {
            this.taskNameNo = taskNameNo;
        }
    
        public String getTimeCycle() {
            return timeCycle;
        }
    
        public void setTimeCycle(String timeCycle) {
            this.timeCycle = timeCycle;
        }
    
        public String getTimeType() {
            return timeType;
        }
    
        public void setTimeType(String timeType) {
            this.timeType = timeType;
        }
    
        public Object getData() {
            return data;
        }
    
        public void setData(Object data) {
            this.data = data;
        }
    }
    
    
  • WcsTimerTaskEnum

    package com.isoftstone.hig.wmsck.adapters.jd.api.param.timermanager;
    
    public class WcsTimerTaskEnum {
    
        /**
         * 定时任务列表枚举类
         */
        public enum TimerTaskEnum {
    
            WCS_TIMER_TASK_10("10", "定时生成设备摘果单"),
            WCS_TIMER_TASK_20("20", "待定");
    
            private String code;
    
            private String desc;
    
            public String getCode() {
                return code;
            }
    
            public String getDesc() {
                return desc;
            }
    
            private TimerTaskEnum(String code, String desc) {
                this.code = code;
                this.desc = desc;
            }
        }
    
        /**
         * cron表达式类型
         */
        public enum TimerTypeEnum {
    
            WCS_TimerType_10("10", "秒"),
            WCS_TimerType_20("20", "分钟"),
            WCS_TimerType_30("30", "小时"),
            WCS_TimerType_40("40", "自定义cron表达式");
    
            private String code;
    
            private String desc;
    
            public String getCode() {
                return code;
            }
    
            public String getDesc() {
                return desc;
            }
    
            private TimerTypeEnum(String code, String desc) {
                this.code = code;
                this.desc = desc;
            }
        }
    
    }
    
    
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容