XXL-JOB日常实用进阶,包括分片任务,阻塞处理策略,路由策略,运行模式

主要包括XXL-JOB日志清理,包括分片广播任务,阻塞处理策略,路由策略,运行模式,创建子任务
如果查看XXL-JOB基本使用和整合SpringBoot,请参考我另一篇文章:XXL-JOB基本配置使用
导语:XLL-JOB是分布式任务调度平台,常见功能特性:
1、简单:支持通过Web页面对任务进行CRUD操作,操作简单,容易上手
2、动态:支持动态修改任务状态,启动/停止任务,以及终止运行中的任务,即时生效
3、调度中心HA(中心式):调度中心式设计,并支持集群部署,保证调度平台高可用
4、执行器HA(分布式):任务分布执行,任务执行器支持集群部署,可保证任务执行高可用
5、弹性扩容缩容:一旦有新执行器机器上下线,下次调度执行时,将会重新分配任务执行

一、XXL-JOB任务类型:

1、BEAN模式: ①类形式 ②方法形式
2、GLUE模式:Java / Shell / Python / Nodejs / Php
1、Bean模式任务,支持基于方法的开发模式,每个任务对应一个方法
优点:

每个任务只需要开发一个方法,并添加@XxlJob注解即可,方便简单快捷
支持自动扫描并添加至执行器容器中

缺点:

要求spring开发环境,基本现在项目spring必备,所以无伤大雅
新定时任务的CRUD需要项目的重新构建和项目启动,如果遇到未执行完毕的情况,可能会多次执行,但是保证多次执行和一次执行的结果不影响,对系统也不会有影响

2、GLUE模式

定时任务以源码方式维护在调度中心,不需要在本地编写任何代码,我们在使用过程中,经常是在本地编码完毕后,直接复制到线上维护中心中

优点:

支持通过Web IDE在线更新,实时编译和生效,因此不需要指定JobHandler和重启项目

缺点:

如果你依赖了某个框架和服务,需要先依赖到自己项目中,然后在Web IDE中才能依赖,否则会执行报错,正常可以理解为,把代码从项目中搬到线上,可以实时编辑,但是和自己在本地写代码的要求一样,依赖和服务必须全部具备,多用于定时任务经常调整的场景中使用

调度中心使用示例:


二、XXL-JOB的日志清理:

日志分类:
1、调度日志:任务调度的时候,会告知一些比如执行器信息,调度结果等2、
2、执行日志:JOB执行过程中日志,XxlJobLogger.log("")中进行打印

日志执行过程中,可以编写一个定时任务定时清理也可以/也可以调用自动清理的API,就是点击确认清理,出发的Http请求的URL地址(服务访问地址+/joblog/clearLog),根据源码中参数。进行传参即可

三、XLL-JOB子任务介绍:

XXL-JOB中有自带的子任务编排功能,支持子任务依赖,当父任务执行结束且执行成功后将会主动出发一次子任务的执行,多个子任务使用逗号分隔
优点:

适合连续,连贯的业务场景,框架自带任务编排,使用简单,只需要通过调度中心页面配置即可实现

缺点:

连续任务的数据不能直接进行传递,不像JAVA中CompletableFuture可以将上一个任务的执行结果传递到后续使用,可以就需要将每个任务的处理数据,存储到第三方存储中,比如Mysql,Redis等

四、XLL-JOB分片广播任务:

执行器集群部署时,任务路由策略选择 【分片广播】路由策略情况下,一次任务调度将会广播触发对应集群中所有执行器都触发执行一次任务,同时系统自动传递分片参数,可根据分片参数开发分片任务。
【分片广播】:以执行器维度进行分片,支持动态扩容执行器从而动态增加分片数量,
协同进行业务处理,在进行大数据量业务操作时可显著提升任务处理能力和速度。
分片广播和普通任务开发流程一致,不同之处在于可以获取分片参数,获取分片参数进行分片任务处理

获取分片参数

    final ShardingUtil.ShardingVO shardingVo = ShardingUtil.getShardingVo();
    
    index: 当前分片的序号(从0开始)执行器集群列表中当前执行器的序号
    total: 总分片数,执行器集群的总机器数量
代码示例:
 @XxlJob("executeJobHandler")
    public ReturnT<String> executeJobHandler(String param) throws Exception {
        log.info("XXL-JOB, Hello World. time:{} ", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

        // 分片参数
        ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
        log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardingVO.getIndex(), shardingVO.getTotal());

        // 总分片数目
        final int total = shardingVO.getTotal();

        // 当前执行器序号
        final int index = shardingVO.getIndex();

        // 1.获取执行数据
        final List<Integer> list = queryDataList();
        for (int i = 0; i < list.size(); i++) {
            Integer id = list.get(i);
            // 分片总数量取模等于当前分片
            if (id % total == index) {
                XxlJobLogger.log("=== 任务执行 ===");
            }
        }

        return ReturnT.SUCCESS;
    }
调度中心使用示例:

五、XLL-JOB阻塞处理策略类型

单机串行(默认)

调度进入单机执行器后,调度请求进入FIFO队列中并以串行方式运行

丢弃后续调度(推荐)

调度请求进入单机执行器,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败

覆盖之前调度(不推荐)

调度请求进入单机执行器后,发现执行器存在运行的调度任务,
将会终止运行中的调度任务并清空队列,然后运行本地调度

单机串行情况下:如果一个任务没有执行完毕,第二次任务执行又开始了,那么第二次会一直等待直到第一次执行完毕才会执行第二次任务调度,这样如果任务频率比较高,同时执行时间长,不建议使用这种方式,这样会导致阻塞i的任务越来越多

XXL-JOB定时任务超时注意事项:

任务超时/任务终止注意事项
JOB中不能消化InterruptedException必须往外抛出异常杨

如果异常被捕获,但是在任务日志执行页面手动点击【终止任务】
会抛出InterruptedException异常, 但是任务不会停止,需要手动处理

    @XxlJob("executeJobHandler")
    public ReturnT<String> executeJobHandler(String param) throws Exception {
        log.info("XXL-JOB, Hello World. time:{} ", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

        /*
         * 如果异常被捕获,但是在任务日志执行页面手动点击【终止任务】,会抛出InterruptedException
         * 但是任务不会停止
         */

        try {
            for (int i = 0; i < 10; i++) {
                XxlJobLogger.log("执行中");
                TimeUnit.SECONDS.sleep(5);
            }
        } catch (Exception e) {

            /*
             * 解决方案:
             * 这里手动处理,用来避免这种情况
             */
            if (e instanceof InterruptedException) {
                throw e;
            }
            e.printStackTrace();
        }

        return ReturnT.SUCCESS;
    }

六、xxl-job执行器路由选择策略

  • 路由策略:当执行器集群部署时,提供丰富的路由策略,包括:
FIRST(第一个):固定选择第一个机器;

LAST(最后一个):固定选择最后一个机器;

ROUND(轮询):;

RANDOM(随机):随机选择在线的机器;

CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。

LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;

LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;

FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;

BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;

SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

1、FIRST:获取地址列表中的第一个
public class ExecutorRouteFirst extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){
        return new ReturnT<String>(addressList.get(0));
    }

}

2、LAST:获取地址列表中的最后一个
public class ExecutorRouteLast extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        return new ReturnT<String>(addressList.get(addressList.size()-1));
    }

}
3、轮询: 缓存时间是1天, 叠加次数最多为一百万,超过后进行重置,但是重置时采用随机方式,随机到一个小于100的数字,基于计数器,对地址列表取模
public class ExecutorRouteRound extends ExecutorRouter {
    private static ConcurrentMap routeCountEachJob = new ConcurrentHashMap<>();
    private static long CACHE_VALID_TIME = 0;
    private static int count(int jobId) {
        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            routeCountEachJob.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
        }
        AtomicInteger count = routeCountEachJob.get(jobId);
        if (count == null || count.get() > 1000000) {
            // 初始化时主动Random一次,缓解首次压力
            count = new AtomicInteger(new Random().nextInt(100));
        } else {
            // count++
            count.addAndGet(1);
        }
        routeCountEachJob.put(jobId, count);
        return count.get();
    }

    @Override
    public ReturnT route(TriggerParam triggerParam, List addressList) {
        String address = addressList.get(count(triggerParam.getJobId())%addressList.size());
        return new ReturnT(address);
    }

}
4、随机,随机选择一台及其执行
public class ExecutorRouteRandom extends ExecutorRouter {

    private static Random localRandom = new Random();
    @Override

    public ReturnT route(TriggerParam triggerParam, List addressList) {
        String address = addressList.get(localRandom.nextInt(addressList.size()));
        return new ReturnT(address);
    }
}
5、一致性哈希

分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
a、virtual node:解决不均衡问题
b、hash method replace hashCode:String的hashCode可能重复,需要进一步扩大hashCode的取值范围

public class ExecutorRouteConsistentHash extends ExecutorRouter {

    private static int VIRTUAL_NODE_NUM = 100;

    /**
     * get hash code on 2^32 ring (md5散列的方式计算hash值)
     * @param key
     * @return
     */
    private static long hash(String key) {

        // md5 byte
        MessageDigest md5;
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 not supported", e);
        }
        md5.reset();
        byte[] keyBytes = null;
        try {
            keyBytes = key.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException("Unknown string :" + key, e);
        }

        md5.update(keyBytes);
        byte[] digest = md5.digest();

        // hash code, Truncate to 32-bits
        long hashCode = ((long) (digest[3] & 0xFF) << 24)
                | ((long) (digest[2] & 0xFF) << 16)
                | ((long) (digest[1] & 0xFF) << 8)
                | (digest[0] & 0xFF);

        long truncateHashCode = hashCode & 0xffffffffL;
        return truncateHashCode;
    }

    public String hashJob(int jobId, List<String> addressList) {

        // ------A1------A2-------A3------
        // -----------J1------------------
        TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
        for (String address: addressList) {
            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                long addressHash = hash("SHARD-" + address + "-NODE-" + i);
                addressRing.put(addressHash, address);
            }
        }

        long jobHash = hash(String.valueOf(jobId));
        SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
        if (!lastRing.isEmpty()) {
            return lastRing.get(lastRing.firstKey());
        }
        return addressRing.firstEntry().getValue();
    }

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = hashJob(triggerParam.getJobId(), addressList);
        return new ReturnT<String>(address);
    }

}

6. LEAST_FREQUENTLY_USED(最不经常使用): 缓存时间还是一天,对地址列表进行筛选,如果新加入的地址列表或者使用次数超过一百万次的话,就会随机重置为小于地址列表地址个数的值。 最后返回的就是value值最小的地址
public class ExecutorRouteLFU extends ExecutorRouter {
    private static ConcurrentMap jobLfuMap = new ConcurrentHashMap();
    private static long CACHE_VALID_TIME = 0;
    public String route(int jobId, List addressList) {
        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            jobLfuMap.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
        }

        // lfu item init
        HashMap lfuItemMap = jobLfuMap.get(jobId);     // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
        if (lfuItemMap == null) {
            lfuItemMap = new HashMap();
            jobLfuMap.putIfAbsent(jobId, lfuItemMap);   // 避免重复覆盖
        }
        // put new
        for (String address: addressList) {
            if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
                lfuItemMap.put(address, new Random().nextInt(addressList.size()));  // 初始化时主动Random一次,缓解首次压力
            }
        }
        // remove old
        List delKeys = new ArrayList<>();
        for (String existKey: lfuItemMap.keySet()) {
            if (!addressList.contains(existKey)) {
                delKeys.add(existKey);
            }
        }
        if (delKeys.size() > 0) {
            for (String delKey: delKeys) {
               lfuItemMap.remove(delKey);
            }
        }
        // load least userd count address
        List lfuItemList = new ArrayList(lfuItemMap.entrySet());
        Collections.sort(lfuItemList, new Comparator() {
            @Override
            public int compare(Map.Entry o1, Map.Entry o2) {
                return o1.getValue().compareTo(o2.getValue());
            }
        });
        Map.Entry addressItem = lfuItemList.get(0);
        String minAddress = addressItem.getKey();
        addressItem.setValue(addressItem.getValue() + 1);
        return addressItem.getKey();
    }
    @Override
    public ReturnT route(TriggerParam triggerParam, List addressList) {
        String address = route(triggerParam.getJobId(), addressList);
        return new ReturnT(address);
    }
}

7、 LEAST_RECENTLY_USED(最近最久未使用):缓存时间还是一天,对地址列表进行筛选, 采用LinkedHashMap实现LRU算法
其中LinkedHashMap的构造器中有一个参数:
//accessOrder 为true, 每次调用get或者put都会将该元素放置到链表最后,因而获取第一个元素就是当前没有使用过的元素

public class ExecutorRouteLRU extends ExecutorRouter {
    private static ConcurrentMap jobLRUMap = new ConcurrentHashMap();
    private static long CACHE_VALID_TIME = 0;
    public String route(int jobId, List addressList) {
        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            jobLRUMap.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
        }
        // init lru
        LinkedHashMap lruItem = jobLRUMap.get(jobId);
        if (lruItem == null) {
            /**
             * LinkedHashMap
             *      a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
             *      b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
             */
            //accessOrder 为true, 每次调用get或者put都会将该元素放置到链表最后,因而获取第一个元素就是当前没有使用过的元素
            lruItem = new LinkedHashMap(16, 0.75f, true);
            jobLRUMap.putIfAbsent(jobId, lruItem);
        }
        // put new
        for (String address: addressList) {
            if (!lruItem.containsKey(address)) {
                lruItem.put(address, address);
            }
        }
        // remove old
        List delKeys = new ArrayList<>();
        for (String existKey: lruItem.keySet()) {
            if (!addressList.contains(existKey)) {
                delKeys.add(existKey);
            }
        }
        if (delKeys.size() > 0) {
            for (String delKey: delKeys) {
                lruItem.remove(delKey);
            }
        }
        // load
        String eldestKey = lruItem.entrySet().iterator().next().getKey();
        String eldestValue = lruItem.get(eldestKey);
        return eldestValue;
    }
    @Override
    public ReturnT route(TriggerParam triggerParam, List addressList) {
        String address = route(triggerParam.getJobId(), addressList);
        return new ReturnT(address);
    }
}

8、FAILOVER 会返回第一个心跳检测ok的执行器,主要是使用xxl-job的执行器 RESTful API中的 beat

按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;

public class ExecutorRouteFailover extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {

        StringBuffer beatResultSB = new StringBuffer();
        for (String address : addressList) {
            // beat
            ReturnT<String> beatResult = null;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                beatResult = executorBiz.beat();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
            }
            beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"")
                    .append(I18nUtil.getString("jobconf_beat") + ":")
                    .append("<br>address:").append(address)
                    .append("<br>code:").append(beatResult.getCode())
                    .append("<br>msg:").append(beatResult.getMsg());

            // beat success
            if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {

                beatResult.setMsg(beatResultSB.toString());
                beatResult.setContent(address);
                return beatResult;
            }
        }
        return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());

    }
}
9、BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;

会返回空闲的第一个执行器的地址,主要是使用xxl-job的执行器 RESTful API中的 idleBeat

public class ExecutorRouteBusyover extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        StringBuffer idleBeatResultSB = new StringBuffer();
        for (String address : addressList) {
            // beat
            ReturnT<String> idleBeatResult = null;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
            }
            idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
                    .append(I18nUtil.getString("jobconf_idleBeat") + ":")
                    .append("<br>address:").append(address)
                    .append("<br>code:").append(idleBeatResult.getCode())
                    .append("<br>msg:").append(idleBeatResult.getMsg());

            // beat success
            if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
                idleBeatResult.setMsg(idleBeatResultSB.toString());
                idleBeatResult.setContent(address);
                return idleBeatResult;
            }
        }

        return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
    }

}
10、SHARDING_BROADCAST

SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容