(1)简介
elastic-job 是由当当网基于quartz 二次开发之后的分布式调度解决方案 , 由两个相对独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成 。
elastic-job主要的设计理念是无中心化的分布式定时调度框架,思路来源于Quartz的基于数据库的高可用方案。但数据库没有分布式协调功能,所以在高可用方案的基础上增加了弹性扩容和数据分片的思路,以便于更大限度的利用分布式服务器的资源。
elastic-job是当当网基于Zookepper 、Quartz开源的一个java分布式定时任务,解决了Quartz不支持分布式的弊端.
elastic-job由两个相互独立子项目Elastic-Job-Lite 、 Elastic-Job-Cloud组成.
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅Zookeeper
(2)Thread
package com.zhaoyang.demo;
import java.io.IOException;
import java.time.LocalDateTime;
public class A {
// 每隔3s执行一次。
public static void main(String[] args) throws IOException {
new Thread(() -> {
while (true) {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行任务"+ LocalDateTime.now());
}
}).start();
System.in.read();
}
}
(3)Timer
package com.zhaoyang.demo;
import java.time.LocalDateTime;
import java.util.Timer;
import java.util.TimerTask;
public class B {
// 5s之后开始执行,后续每隔3s执行一次。
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("执行任务"+ LocalDateTime.now());
}
}, 5000,3000);
}
}
(4)ScheduledThreadPoolExecutor
package com.zhaoyang.demo;
import java.time.LocalDateTime;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class C {
// 5s之后开始执行,后续每隔3s执行一次。
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println("执行任务"+ LocalDateTime.now());
}
}, 5, 3, TimeUnit.SECONDS);
}
}
(5)Spring Task
package com.zhaoyang.demo;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 以上几种方式都有几个共同的缺点:
*
* 单线程执行,若前一个任务执行时间较长,会导致下一个任务饥饿阻塞
* 无分布式协调机制,如果只有一个节点就会单点风险,如果部署多个节点就会有并发执行的问题
* 随着任务规模增多,无统一视角对其进行任务进度进行追踪和管控
* 功能比较简单,没有超时、重试等高级特性
*/
@Component
public class MySpringTask {
@Scheduled(cron = "0/5 * * * * ?")
public void test(){
System.out.println("执行SpringTask");
}
}
(6)SimpleJob简单作业-基础
package com.zhaoyang.elastic;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import java.time.LocalDateTime;
// (1)SimpleJob-简单作业
public class MySimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println("执行任务"+ LocalDateTime.now());
}
}
package com.zhaoyang.elastic;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
// (2)SimpleJob-简单作业:集成springboot
@Component
public class MySimpleJob2 implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println("执行任务2"+ LocalDateTime.now());
}
}
(7)DataflowJob数据流作业-基础
package com.zhaoyang.elastic;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
// (2)DataflowJob-数据流作业
public class MyDataflowJob implements DataflowJob<String> {
@Override
public List<String> fetchData(ShardingContext shardingContext) {
List<String> data = new ArrayList<>();
data.add("数据1");
data.add("数据2");
data.add("数据3");
data.add("数据4");
return data;
}
@Override
public void processData(ShardingContext shardingContext, List<String> list) {
System.out.println(LocalDateTime.now()+"处理数据:"+list);
}
}
(8)SimpleJob简单作业-进阶
package com.zhaoyang;
import com.zhaoyang.elastic.MySimpleJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
// (1)SimpleJob-简单作业
public class TestMySimpleJob {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), new MySimpleJob(), createJobConfiguration()).schedule();
}
// 连接Zookeeper
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));
regCenter.init();
return regCenter;
}
// 创建作业配置
private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder("MySimpleJob", 1)
.cron("0/3 * * * * ?")
.build();
}
}
(9)DataflowJob数据流作业-进阶
package com.zhaoyang;
import com.zhaoyang.elastic.MyDataflowJob;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
/**
*
* (2)DataflowJob-数据流作业
*
* streaming.process=true,表示开启流式处理,默认为false
* overwrite=true,表示要重写Job配置,如果不设置这个,新修改的或新增的配置将不会生效
* 一旦这么做了之后,我们会发现以上代码会不停的执行任务,而不是每隔3s执行一次了。
*
* 这是因为,如果开启流式处理,则作业只有在 fetchData 方法的返回值为 null 或集合容量为空时,才停止抓取,否则作业将一直运行下去; 如果关闭流式处理,则作业只会在每次作业执行过程中执行一次 fetchData 和 processData 方法,随即完成本次作业。
*
* 所以,以上代码每次调用 fetchData 方法都能获取到数据,所以会一直执行。
*
* 如果采用流式作业处理方式,那么就需要业务代理自己来控制什么时候从fetchData获取不到数据,从而停止本次任务的执行。
*/
public class TestMyDataflowJob {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), new MyDataflowJob(), createJobConfiguration()).schedule();
}
// 连接Zookeeper
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));
regCenter.init();
return regCenter;
}
// 创建作业配置
private static JobConfiguration createJobConfiguration() {
return JobConfiguration.newBuilder("MyDataflowJob", 1)
.cron("0/3 * * * * ?")
.build();
}
}
(10)ScheduleJobBootstrap脚本作业
package com.zhaoyang;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.script.props.ScriptJobProperties;
// (3)脚本作业
/**
* 注意ScheduleJobBootstrap的第二个参数为"SCRIPT",另外通过设置script.command.line来配置要执行的脚本。
*
* 其底层其实就是利用的CommandLine来执行的命令,所以只要在你机器上能执行的命令,那么就可以在这里进行设置并执行。
*/
public class TestScriptJob {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), "SCRIPT", createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
// 创建作业配置
return JobConfiguration.newBuilder("MyScriptJob", 1)
.cron("0/5 * * * * ?")
.setProperty(ScriptJobProperties.SCRIPT_KEY, "java -version")
.overwrite(true)
.build();
}
}
(11)HTTP作业
package com.zhaoyang;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.http.props.HttpJobProperties;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
// (4)HTTP作业(3.0.0-beta 提供)
/**
* 注意ScheduleJobBootstrap的第二个参数为"HTTP",另外通过设置http.uri、http.method等参数来配置请求信息。
*
* 其底层其实就是利用的HttpURLConnection来实现的。
*
* 如果要看到调用结果,得把日志级别设置为debug,因为在HttpJobExecutor源码中中是这么打印请求结果的:
*/
public class TestHttpJob {
public static void main(String[] args) {
new ScheduleJobBootstrap(createRegistryCenter(), "HTTP", createJobConfiguration()).schedule();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("localhost:2181", "my-job"));
regCenter.init();
return regCenter;
}
private static JobConfiguration createJobConfiguration() {
// 创建作业配置
return JobConfiguration.newBuilder("MyHttpJob", 1)
.cron("0/5 * * * * ?")
.setProperty(HttpJobProperties.URI_KEY, "http://www.baidu.com")
.setProperty(HttpJobProperties.METHOD_KEY, "GET")
.setProperty(HttpJobProperties.DATA_KEY, "source=ejob") // 请求体
.overwrite(true)
.build();
}
}