【需求】
SpringBoot项目,接入定时任务平台xxl-job。
支持集群环境下定时任务/脚本任务/手动触发任务的统一注册和管理,并且支持数据的分片处理的功能。
【介绍】
大众点评员工徐雪里于2015年发布的分布式任务调度平台,是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。
● 有web界面,通过网页进行任务注册,并可以动态编辑、启动、终止
● 调度中心是中心化部署,但支持集群部署来保证高可用
● 执行器的HA
执行器是分布式任务执行的基本单元,支持集群部署。可以理解成一个微服务是一个执行器,executor.appname相同的执行器组成了一个执行器集群,执行器在系统启动时自动注册到调度中心。
调度中心根据配置好的定时参数,使用合适的路由策略从执行器集群中选择指定的执行器进行任务的执行,集群部署保证定时任务的HA
● 执行器的动态分片
执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
“分片广播” 以执行器为维度进行分片,支持动态扩容“执行器集群”从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。
“分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理。
适用场景
○ 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
○ 广播任务场景:广播执行器机器运行shell脚本、广播集群节点进行缓存更新等
【BEAN模式开发】
||▶ 下载源码并解压
||▶ 在指定的数据库中,执行数据库初始化SQL脚本,脚本位置为项目根目录文件夹的doc/db/tables_xxl_job.sql
||▶ 按maven格式将源码导入IDE, 修改调度中心的配置文件,指定DB配置信息,配置文件的地址是xxl-job-admin\src\main\resources\application.properties。 在配置文件中还可以配置报警的发件邮箱信息等参数。
||▶ 项目编译,打包,部署在docker容器中,并配置访问
||▶ 浏览器打开http://{ip}:{port}/xxl-job-admin访问调度中心,默认登录账号为admin/123456,可登录后修改密码
||▶ SpringBoot项目导入依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.2.0</version>
</dependency>
||▶ SpringBoot项目核心模块(common-core)配置xxl-job
@Slf4j
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
/**
* 将执行器组件交给spring管理
*/
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
log.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
||▶ 启动类导入配置文件
@SpringBootApplication
@Import({XxlJobConfig.class})
public class UserApplication {
……
}
||▶ 编写JobHandler
@Slf4j
@Component
public class MyXxlJobHandler {
/**
* 执行器的第一个任务对应的执行方法
*/
@XxlJob("FirstTaskHandler")
public ReturnT<String> teskExecute1(String param) {
……
return ReturnT.SUCCESS;
}
/**
* 执行器的第二个任务对应的执行方法
*/
@XxlJob("SecondTaskHandler")
public ReturnT<String> teskExecute2(String param) {
……
return ReturnT.SUCCESS;
}
}
||▶ application.yaml配置xxl-job
xxl:
job:
admin:
addresses: http://127.0.0.1:8080/xxl-job-admin # [选填]调度中心地址(以本地为例),执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";若调度中心集群部署存在多个地址则用逗号分隔,为空则关闭自动注册
executor:
# [选填] 执行器注册优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
address:
# [选填] 执行器名称;为空则关闭自动注册
appname: my-handler
# [选填] 执行器IP,默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯使用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务"
ip:
# [选填] 执行器端口,≤0则自动获取,默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
port: 9999
# [选填] 执行器运行日志文件存储磁盘路径:需对该路径拥有读写权限;为空则使用默认路径
logpath: /data/applogs/xxl-job/jobhandler
# [选填] 执行器日志文件保存天数:过期日志自动清理, 限制值≥3时生效; 否则(如-1), 关闭自动清理功能
logretentiondays: 30
# [选填] 执行器腾讯TOKEN,非空时启用
accessToken:
||▶ (可选)执行器集群配置:执行器支持集群部署,提升调度系统可用性,同时提升任务处理能力。要求如下:
1.执行器回调地址(xxl.job.admin.addresses)需要保持一致;执行器根据该配置进行执行器自动注册等操作。
2.同一个执行器集群内AppName(xxl.job.executor.appname)需保持一致;调度中心根据该配置动态发现不同集群的在线执行器列表。
||▶ 启动SpringBoot项目
||▶ 登录xxl-job控制台: http://IP:端口/xxl-job-admin/(以本地为例可以登录服务器),账号admin,密码123456
||▶ 执行器管理>新增执行器
▪ AppName:和xxl.job.executor.appname(my-handler)保持一致
▪ 名称:自定义,简述执行器功能即可
▪ 注册方式:选自动注册,注册完成后,看到执行器列表,自动多了一条在线机器地址。
||▶ 任务管理>新建任务
▪ 执行器:自动对应刚刚新建执行器时的功能简述
▪ 任务描述:详述该执行器所执行的任务
▪ Cron:定时设置
▪ JobHandler:手动输入和@XxlJob注解对应的value值(FirstTaskHandler、SecondTaskHandler)
▪ 报警邮件可填开发/运维人员的邮箱地址。
▪ 任务参数(可选):Handler方法可通过param接收
||▶ 新增任务成功,操作栏将按钮选择为“启动”
||▶ 手动执行一次任务,看到控制台打印成功:
备注:"NJU,Happy 120th Birthday~"是我们在任务中传过去的参数
【GLUE模式开发】
介紹
刚才选择的是Bean模式,利用Spring环境中写好的Bean执行任务;
还有一种是GLUE模式,将任务以源码形式,在调度中心实时维护:
||▶ 新增任务,选择我们刚刚的执行器,新增任务
“运行模式”选择“GLUE(Java)”,
“阻塞处理策略”选择单机串行,
“JobHandler”可以不选择
然后选择右边的“GLUDE IDE”,编辑可在单独中心实时维护的任务:
||▶ 手动执行一次任务,并传入参数:“My name is Harry~”
||▶ 执行成功:
【分片广播&动态分片】
介绍
执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发集群中所有执行器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务。
“分片广播” 以执行器为维度进行分片,支持动态扩容“执行器集群”,从而动态增加分片数,协同处理业务;在操作大数据量业务时可显著提升处理能力和速度。
“分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数,获取分片参数进行分片业务处理。
▶ Java语言任务获取分片参数方式:BEAN、GLUE模式(Java)
// 可参考Sample示例执行器中的示例任务"ShardingJobHandler"了解试用
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
▶ 脚本语言任务获取分片参数方式:GLUE模式(Shell)、GLUE模式(Python)、GLUE模式(Nodejs)
// 脚本任务入参固定为三个,依次为:任务传参、分片序号、分片总数。以
Shell模式任务为例,获取分片参数代码如下
echo "分片序号 index = $2"
echo "分片总数 total = $3"
分片参数属性说明:
▪ index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号;
▪ total:总分片数,执行器集群的总机器数量;