最近团队在做网络爬虫活动,于是就有了批处理+定时任务的需求。于是简单研究了以下springboot对springbatch和quartz的集成。
大致流程如下:
Spring-Batch
batch最核心的三大步骤就是
- 读数据
reader
- 处理数据
processer
- 写数据
writer
reader
public class ProductReader implements ItemReader<Object> {
@Override
public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
//数据读取器,可以自定义数据源,可以是文件,也可以是数据库。
//将数据读取出来并返回到当前的batch中。
return null;
}
}
这里是最常规的读取数据,可以自定义数据源,获取数据(文件,数据库,消息队列等)。也可以使用batch提供的封装类获取数据FlatFileItemReader
,JdbcCursorItemReader
,只需要配置对应的数据源即可。
processer
public class ProductProcessor implements ItemProcessor<Object, List<Product>> {
@Override
public List<Product> process(Object obj) throws Exception {
//处理数据
return null;
}
}
将reader读取出来的数据处理成需要的业务模型的数据并返回。
writer
public class ProductWriter implements ItemWriter<List<Product>> {
@Override
public void write(List<? extends List<Product>> list) throws Exception {
//save or update
}
}
将处理后的数据做入库处理。
config
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")//创建一个step
.<Object, List<Product>> chunk(10)//每次执行的次数
.reader(productReader)//设置读处理器
.processor(productProcessor)//设置处理器
.writer(productWriter)//设置写处理器
.build();
}
@Bean
public Job etlJob(){
return jobBuilderFactory.get("etlJob")//创建一个batch-job
.incrementer(new RunIdIncrementer())
.listener(jobCompletionNotificationListener)//设置任务监听
.flow(step1())
.end()
.build();
}
Spring-Batch集成Quartz
由于SpringBatch只提供了批处理任务,但是没有任务调度,所以我们用batch+quartz完成批处理的任务调度。
config
private CronTriggerFactoryBean createTrigger(JobDetail detail, String expression){
CronTriggerFactoryBean cron = new CronTriggerFactoryBean();
cron.setJobDetail(detail);
cron.setCronExpression(expression);
return cron;
}
private JobDetailFactoryBean createJobDetail(String jobName, String filePath, Source source){
JobDetailFactoryBean factory = new JobDetailFactoryBean();
//设置quartz任务
factory.setJobClass(QuartzJobLauncher.class);
//spring-batch任务的配置
Map<String, Object> map = Maps.newHashMap();
map.put("jobName", jobName);//batch的任务名
map.put("jobLauncher", jobLauncher);//任务触发器,用来执行batchjob
map.put("jobLocator", jobLocator);//任务定位器,可以通过name获取batchjob
map.put("filePath", filePath);//etl的一些其它配置
map.put("source", source);
factory.setJobDataAsMap(map);
return factory;
}
@Bean
public JobDetailFactoryBean jdJobDetail(){
return createJobDetail("etlJob", "src/main/resources/html/jd/iphone", Source.JD);
}
@Bean
public CronTriggerFactoryBean jdTrigger(){
return createTrigger(jdJobDetail().getObject(), "*/20 * * * * ? *");
}
@Bean
public SchedulerFactoryBean schedulerFactoryBean(){
SchedulerFactoryBean scheduler = new SchedulerFactoryBean();
scheduler.setTriggers(jdTrigger().getObject());
return scheduler;
}
QuartzJobLauncher
@Data
public class QuartzJobLauncher extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
//通过quartz的上下文,获取quartzjob的配置,然后转换为batchjob的配置。
JobDataMap map = jobExecutionContext.getMergedJobDataMap();
JobParameters jobParameters = new JobParametersBuilder()
.addString("filePath", map.getString("filePath"))
.addString("source", map.get("source").toString())
.addLong("time", System.currentTimeMillis())
.toJobParameters();
try {
//获取batch-job
Job job = jobLocator.getJob(jobName);
//执行job
jobLauncher.run(job, jobParameters);
} catch (JobExecutionAlreadyRunningException | JobInstanceAlreadyCompleteException |
NoSuchJobException | JobParametersInvalidException | JobRestartException e) {
e.printStackTrace();
}
}
}
通过quartz配置quartz的定时任务,然后在QuartzJobLauncher
中执行Spring-Batch
的job任务。而且可以设置config
中的factory.setJobDataAsMap(map);
,设置每个quartz-job
的参数,通过JobExecutionContext.getMergedJobDataMap()
获取。再设置batch
的JobParameters
。这样的话就可以在reader
、processer
、writer
中获取,具体获取方法如下:
@Value("#{jobParameters['filePath']}")
private String filePath;
@Value("#{jobParameters['source']}")
private String source;
总结
spring-batch非常方便的提供大量数据的批处理任务。可以通过配置JobExecutionListenerSupport
来自己实现一个简单的任务的监控。
@Override
public void afterJob(JobExecution jobExecution) {
if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
//do something after job
}
}
@Override
public void beforeJob(JobExecution jobExecution) {
//do something before job
}
spring-batch还有很多其它特性,比如事务管理、启动、终止、重启任务、跳过、任务的日志、监控等特性。后续将介绍这些特性。