《elastic-job》学习

概述

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
官方文档地址:http://elasticjob.io/docs/elastic-job-lite/00-overview/
提供了如下功能

  • 分布式调度协调
  • 弹性扩容缩容
  • 失效转移
  • 错过执行作业重触发
  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
  • 自诊断并修复分布式不稳定造成的问题
  • 支持并行调度
  • 支持作业生命周期操作
  • 丰富的作业类型
  • Spring整合以及命名空间提供
  • 运维平台

单机定时任务

通常我们在开发过程中或多或少会运用到定时任务。常用单机定时任务我们常用的包括:

  • java自带的timer
  • ScheduledExecutorService
  • spring定时任务
  • Quartz 也可支持分布式

单机定时任务的缺点

  • 缺乏高可用性,单机式的定时任务调度只能在一台机器上运行,程序或系统异常将导致功能不可用
  • 单机处理极限,单机的分布式调度只能在一台机器上执行,收到单机CUP内存等的限制

elastic-job的使用场景

elastic-job就是一个分布式的定时任务调度,我们能用它做什么?
例如有个需求需要在每天晚上统计当天的订单订单情况,单机模式下我们会写一段程序去处理,假如有10万个订单,处理成功需要耗时很长时间,且在机器瓶颈下可能会导致内存不足等情况。若使用elastic-job,我们则可以考虑将定时任务进行分片,让其分布在不同的机器上运行,提供可用性,减少单机失败带来的功能不可用。例如当下有2台机器,将任务分成4片,则每台机器处理其中2片任务。


image.png

使用实例初探

根据上图,实现将任务分成4片,分别在2台机器上执行
任务类

//具体执行的任务类
public class MyElasticJob implements SimpleJob {
    public final static Logger logger = LoggerFactory.getLogger(MyElasticJob.class);
    @Override
    public void execute(ShardingContext shardingContext) {
        int shardingTotalCount = shardingContext.getShardingTotalCount();
        String shardingParameter = shardingContext.getShardingParameter();
        //任务的分片项,从0开始递增例如有四个分片则序号为0~4
        int shardingItem = shardingContext.getShardingItem();
        //分片带的参数例如0=A,1=B,2=C,3=D
        String jobParamter = shardingContext.getJobParameter();
        logger.info("shardingItem =" + shardingItem + " , shardingTotalCount=" + shardingTotalCount + " , shardingParameter =" + shardingParameter + " ,jobParamter =" + jobParamter);
        //System.out.println("shardingItem =" + shardingItem + " , shardingTotalCount=" + shardingTotalCount + " , shardingParameter =" + shardingParameter + " ,jobParamter =" + jobParamter);
        switch (shardingItem){
            case  0:
                doLongJob("食品订单");
                break;
            case 1:
                doSmallJob("电脑订单");
                break;
            case 2:
                doLongJob("服装订单");
                break;
            case 3:
                doSmallJob("机械订单");
                break;
        }
    }

    public void doLongJob(String msg){
        logger.info("========长时间任务=======" + msg);
        try {
            TimeUnit.SECONDS.sleep(60);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void doSmallJob(String msg){
        logger.info("========短时间任务=======" + msg);
    }
}

配置类

public class FastDemo {
    private static CoordinatorRegistryCenter createRegistryCenter(){
      //elasticjob采用zookeeper进行任务的调度,根据抢主的方式实现同一时刻只有一个任务在一台机器上执行,保证不重复,这里配置zookeeper的地址
        CoordinatorRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("192.168.0.103:2181","elastic-job-demo"));
        registryCenter.init();
        return registryCenter;
    }

    private static LiteJobConfiguration createJobConfiguration() {
        //设置任务每15秒执行一次,一共分成4片,elastic-job会获取当前这个任务一共在多少台服务器上进行平均分配,例如我将war包分别放在了128,129机器上,任务总片为4,则按照elasticjob的默认分配策略,128将执行第0,1片任务,129执行第2,3片任务
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration
                .newBuilder("demoSimpleJob", "0/20 * * * * ?", 4)
                .shardingItemParameters("0=A,1=B,2=C,3=D")
                .jobParameter("xuzy")
                .failover(true) //设置失效转移,当一台机器挂了以后他的分片会让其他台服务器执行
                .build();
        // 定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
        // 定义Lite作业根配置
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
        return simpleJobRootConfig;
    }

    public static void initJob(){
        new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
    }
}

ServletContextLTest类

public class ServletContextLTest implements ServletContextListener {
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        FastDemo.initJob();
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
    }
}

web.xml

    <listener>
        <listener-class>com.xzy.elasticjob.servletListener.ServletContextLTest</listener-class>
    </listener>

将项目打包成war分别放在128,129上面分别观察日志
128日志

[root@server-1 bin]# taif -f /var/logs/elasticjob.log 
-bash: taif: command not found
[root@server-1 bin]# tail -f /var/logs/elasticjob.log 
  Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
  NOT STARTED.
  Currently in standby mode.
  Number of jobs executed: 0
  Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 1 threads.
  Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.

2019-04-27 02:12:03 [ INFO] - org.quartz.impl.StdSchedulerFactory -StdSchedulerFactory.java(1339) -Quartz scheduler 'demoSimpleJob' initialized from an externally provided properties instance.
2019-04-27 02:12:03 [ INFO] - org.quartz.impl.StdSchedulerFactory -StdSchedulerFactory.java(1343) -Quartz scheduler version: 2.2.1
2019-04-27 02:12:04 [ INFO] - org.quartz.core.QuartzScheduler -QuartzScheduler.java(575) -Scheduler demoSimpleJob_$_NON_CLUSTERED started.
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =2 , shardingTotalCount=4 , shardingParameter =C ,jobParamter =xuzy
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========长时间任务=======服装订单
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =3 , shardingTotalCount=4 , shardingParameter =D ,jobParamter =xuzy
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短时间任务=======机械订单
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =2 , shardingTotalCount=4 , shardingParameter =C ,jobParamter =xuzy
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========长时间任务=======服装订单
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =3 , shardingTotalCount=4 , shardingParameter =D ,jobParamter =xuzy
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短时间任务=======机械订单
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =2 , shardingTotalCount=4 , shardingParameter =C ,jobParamter =xuzy
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========长时间任务=======服装订单
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =3 , shardingTotalCount=4 , shardingParameter =D ,jobParamter =xuzy
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短时间任务=======机械订单

129日志

[root@server-2 bin]# tail -f /var/logs/elasticjob.log 
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =1 , shardingTotalCount=4 , shardingParameter =B ,jobParamter =xuzy
2019-04-27 02:12:15 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短时间任务=======电脑订单
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =0 , shardingTotalCount=4 , shardingParameter =A ,jobParamter =xuzy
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========长时间任务=======食品订单
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =1 , shardingTotalCount=4 , shardingParameter =B ,jobParamter =xuzy
2019-04-27 02:13:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短时间任务=======电脑订单
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =0 , shardingTotalCount=4 , shardingParameter =A ,jobParamter =xuzy
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========长时间任务=======食品订单
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =1 , shardingTotalCount=4 , shardingParameter =B ,jobParamter =xuzy
2019-04-27 02:14:30 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(51) -========短时间任务=======电脑订单
2019-04-27 02:15:45 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =0 , shardingTotalCount=4 , shardingParameter =A ,jobParamter =xuzy
2019-04-27 02:15:45 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(42) -========长时间任务=======食品订单
2019-04-27 02:15:45 [ INFO] - com.xzy.elasticjob.MyElasticJob -MyElasticJob.java(23) -shardingItem =1 , shardingTotalCount=4 , shardingParameter =B ,jobParamter =xuzy

可以看到128,129分别处理了不同的数据分片,期间假如128服务器挂了,由于配置了失效转移,原来128的服装订单、机械订单任务将转移给129继续执行

elastic-job作业调度-zookeeper

elastic-job是通过zookeeper进行任务协调和故障转移。
当启动项目后会发现任务在zookeeper注册了节点,如下,首先在根据我们的配置创建了/elastic-job-demo/demoSimpleJob节点,对应的config,instances, leader, servers和sharding


image.png
  • config节点
    config节点记录了任务的配置信息,包含执行类,cron表达式,分片算法类,分片数量,分片参数。默认状态下,如果你修改了Job的配置比如cron表达式,分片数量等是不会更新到zookeeper上去的,除非你把参数overwrite修改成true或者使用rmr /elastic-job-demo/demoSimpleJob命令删除节点并重新启动创建
{
  "jobName": "demoSimpleJob", //任务名称
  "jobClass": "com.xzy.elasticjob.MyElasticJob", //具体执行类
  "jobType": "SIMPLE", //任务类型
  "cron": "0/20 * * * * ?", //任务实行时间corn
  "shardingTotalCount": 4, //总分片数
  "shardingItemParameters":  "0\u003dA,1\u003dB,2\u003dC,3\u003dD", //分片参数
  "jobParameter": "xuzy", //任务参数
  "failover": true, //是否失效转移
  "misfire": true,
  "description": "",
  "jobProperties": {
    "job_exception_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler",//默认的异常处理类
    "executor_service_handler": "com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler" //默认的作业处理线程池类
  },
  "monitorExecution": true,
  "maxTimeDiffSeconds": -1,
  "monitorPort": -1,
  "jobShardingStrategyClass": "",
  "reconcileIntervalMinutes": 10,
  "disabled": false, //作业是否禁止启动
  "overwrite": false //本地配置是否可覆盖注册中心配置
}
  • instances节点
    同一个Job下的elastic-job的部署实例。一台机器上可以启动多个Job实例,也就是Jar包。instances的命名是IP+@-@+PID
    如图,demoSimpleJob有两个实例,地址如下

    image.png

  • leader节点
    任务实例的主节点信息,通过zookeeper的主节点选举,选出来的主节点信息。下面的子节点分为election,sharding和failover三个子节点。分别用于主节点选举,分片和失效转移处理。election下面的instance节点显式了当前主节点的实例ID:jobInstanceId。latch节点也是一个永久节点用于选举时候的实现分布式锁。sharding节点下面有一个临时节点,necessary,是否需要重新分片的标记。如果分片总数变化,或任务实例节点上下线或启用/禁用,以及主节点选举,都会触发设置重分片标记,主节点会进行分片计算


    image.png
  • servers节点
    记录了任务实例的信息


    image.png
  • sharding节点
    任务的分片信息,子节点是分片项序号,从零开始,至分片总数减一。从这个节点可以看出哪个分片在哪个实例上运行


    image.png

    image.png

elastic-job-lite-console

elastic-job-lite-console是elasticjob提供的管理工具
安装方法

  • https://github.com/miguangying/elastic-job-lite-console下载zip文件并压缩得到tar.gz。如果是linux则将tar.gz放到linux解压,可以得到start.sh,点击执行。如果是window则再次解压tar.gz得到start.bat直接点击执行
  • 登录用户名密码root/root,默认端口8899
  • 进入首页后添加注册中心配置命名空间为代码上的命名空间,页面上可以查看每个作业的运行情况和进行手动触发


    image.png

    image.png

    image.png

    image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容