项目需要搞分布式,出于一些原因定时器的代码也需要部署两份,但是定时器是不需要跑两遍,所以考虑了分布式的定时任务框架Quartz。主要解决2个问题:
- 多台服务运行,保证只有一台服务的定时器在跑。这台服务不挂,另一台上的定时器永远不启动。
- 保证定时器串行调度。一个定时任务没有执行完,绝对不会触发第二次。(类似于Spring的定时器的fixedDelayString参数)
于是我就开始写demo,第一个很容易就得到了解决这里就不细说了,坑在了第二个,由于我用的是比较新的版本所以找了很多资料都不合适,结果没办法,跑去调试源码。(容易吗我,写个demo要去调源码)
网上很多人都说只要concurrent 设置为false
@Bean("testJobDetail")
public JobDetailFactoryBean testJobDetail() {
JobDetailFactoryBean bean = new JobDetailFactoryBean();
bean.setDurability(true);
bean.setRequestsRecovery(true);
bean.setJobClass(MyDetailQuartzJobBean.class);
Map<String, String> map = new HashMap<>();
//配置定时任务类
map.put("targetObject", "testScheduleTask");
map.put("targetMethod", "execute");
//是否允许任务并发执行。当值为false时,表示必须等到前一个线程处理完毕后才再启一个新的线程
// map.put("concurrent", "false");
bean.setJobDataAsMap(map);
return bean;
}
或者,使QuartzJobBean类实现org.quartz.StatefulJob接口即可
public class BackCoupon implements StatefulJob {
@Override
public void execute(JobExecutionContext context)
throws JobExecutionException {
}
}
我都试过了完全没效果StatefulJob接口已经被废弃了不推荐使用,没办法了,撸起袖子调试源码吧。我们知道quartz是可以把jobdetail存到数据表里的
我发现了一个很可疑的字段IS_NONCONCURRENT,入口点就在这里了。段点打在类的org.quartz.impl.jdbcjobstore.JobStoreSupport类storeJob方法,第610行,这里触发了jobdetail的一个更新操作,点进去发现了orcale的具体实现
public int updateJobDetail(Connection conn, JobDetail job) throws IOException, SQLException {
ByteArrayOutputStream baos = this.serializeJobData(job.getJobDataMap());
byte[] data = baos.toByteArray();
PreparedStatement ps = null;
PreparedStatement ps2 = null;
ResultSet rs = null;
int var13;
try {
ps = conn.prepareStatement(this.rtp("UPDATE {0}JOB_DETAILS SET DESCRIPTION = ?, JOB_CLASS_NAME = ?, IS_DURABLE = ?, IS_NONCONCURRENT = ?, IS_UPDATE_DATA = ?, REQUESTS_RECOVERY = ?, JOB_DATA = EMPTY_BLOB() WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"));
ps.setString(1, job.getDescription());
ps.setString(2, job.getJobClass().getName());
this.setBoolean(ps, 3, job.isDurable());
this.setBoolean(ps, 4, job.isConcurrentExectionDisallowed());
this.setBoolean(ps, 5, job.isPersistJobDataAfterExecution());
this.setBoolean(ps, 6, job.requestsRecovery());
ps.setString(7, job.getKey().getName());
ps.setString(8, job.getKey().getGroup());
ps.executeUpdate();
ps.close();
ps = conn.prepareStatement(this.rtp("SELECT JOB_DATA FROM {0}JOB_DETAILS WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ? FOR UPDATE"));
ps.setString(1, job.getKey().getName());
ps.setString(2, job.getKey().getGroup());
rs = ps.executeQuery();
int res = 0;
if (rs.next()) {
Blob dbBlob = this.writeDataToBlob(rs, 1, data);
ps2 = conn.prepareStatement(this.rtp("UPDATE {0}JOB_DETAILS SET JOB_DATA = ? WHERE SCHED_NAME = {1} AND JOB_NAME = ? AND JOB_GROUP = ?"));
ps2.setBlob(1, dbBlob);
ps2.setString(2, job.getKey().getName());
ps2.setString(3, job.getKey().getGroup());
res = ps2.executeUpdate();
}
var13 = res;
} finally {
closeResultSet(rs);
closeStatement(ps);
closeStatement(ps2);
}
return var13;
}
这个时候发现IS_NONCONCURRENT 字段是通过JOB的isConcurrentExectionDisallowed属性确定的。而且下方有个循环,这个循环就是把之前JobDetailFactoryBean 配置的map里的值取出来。看一下SQL,根本就不能配置IS_NONCONCURRENT 嘛,那么IS_NONCONCURRENT 怎么办呢?
点进去JobDetail的实现类查看相关的getter and setter发现如下东西:
public boolean isConcurrentExectionDisallowed() {
return ClassUtils.isAnnotationPresent(this.jobClass, DisallowConcurrentExecution.class);
}
是否有出现标签?哦,原来是通过扫描标签啊,DisallowConcurrentExecution这个标签是个空标签。找到定义的job,在类名上加上标签。问题就得到了解决。
比如我这里绑定的job是MyDetailQuartzJobBean方法,用的是网上给的绑定方法map里面定义targetObject,targetMethod。然后通过反射调用自己写的定时任务,那么就在类名上加上注解即可。
package com.quartz.demo.service;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.lang.reflect.Method;
@DisallowConcurrentExecution
public class MyDetailQuartzJobBean extends QuartzJobBean {
// 计划任务所在类
private String targetObject;
// 具体需要执行的计划任务
private String targetMethod;
private ApplicationContext ctx;
@Override
protected void executeInternal(JobExecutionContext context)
throws JobExecutionException {
try {
Object otargetObject = ctx.getBean(targetObject);
Method m = null;
try {
m = otargetObject.getClass().getMethod(targetMethod);
m.invoke(otargetObject);
} catch (SecurityException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
} catch (Exception e) {
throw new JobExecutionException(e);
}
}
public void setApplicationContext(ApplicationContext applicationContext) {
this.ctx = applicationContext;
}
public void setTargetObject(String targetObject) {
this.targetObject = targetObject;
}
public void setTargetMethod(String targetMethod) {
this.targetMethod = targetMethod;
}
}
或者绑定JOB的时候就直接绑定到实际的定时任务类,不要通过反射(网友们说可能会出现序列化问题,我并没有尝试过)
环境:2.0.3的spring-boot-starter-quartz。orcale数据库。
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>com.github.noraui</groupId>
<artifactId>ojdbc7</artifactId>
<version>12.1.0.2</version>
</dependency>
<dependency>
<groupId>com.mchange</groupId>
<artifactId>c3p0</artifactId>
<version>0.9.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
参考: