概述
快速上手, 以最快的方式和配置运行和了解Spring Batch批处理; 通过bean注入的方式, 可以使用spring-boot-starter-batch进行相关bean的管理和配置
但是这种方式不够灵活, 因为项目不一定在spring的环境中执行,所以需要另一种方式。结合源码不难发现Batch执行Job的核心代码和创建过程
非Bean注入方式
通过人工进行对象的创建
Maven
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>4.3.1</version>
</dependency>
Code
public class RunDemo {
static class MyBatchConfigurer extends DefaultBatchConfigurer {
@Override
public void setDataSource(DataSource dataSource) {
}
}
public static void main(String[] args) throws Exception {
File dataFile = new File("data.json");
if (!dataFile.exists()) {
dataFile.createNewFile();
}
FileOutputStream os = new FileOutputStream(dataFile);
os.write("[{\"city\":\"beijing\",\"name\":\"jaychou\"},".getBytes());
os.write("{\"city\":\"beijing\",\"name\":\"jaychou2\"}]".getBytes());
os.flush();
os.close();
MyBatchConfigurer context = new MyBatchConfigurer();
context.initialize();
JsonItemReader<Map> reader = new JsonItemReaderBuilder<Map>().name("reader")
.resource(new FileSystemResource(dataFile))
.jsonObjectReader(new JacksonJsonObjectReader<>(Map.class))
.saveState(true).build();
FlatFileItemWriter<String> writer = new FlatFileItemWriterBuilder<String>().name("writer")
.resource(new FileSystemResource("target/output.txt")).lineAggregator(new PassThroughLineAggregator<>()).build();
Step step = new StepBuilderFactory(context.getJobRepository(), context.getTransactionManager())
.get("step1").<Map, String>chunk(10).reader(reader).writer(writer)
.processor((ItemProcessor<Map, String>) (v) -> "__" + v.toString())
.build();
Job job = new JobBuilderFactory(context.getJobRepository()).get("job").start(step).build();
context.getJobLauncher().run(job, new JobParameters());
}
}
Bean注入方式
Maven
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>2.4.1</version>
</dependency>
Code
SpringBatchApplication
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class SpringBatchApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchApplication.class, args);
}
}
RunDemo
@Configuration
@EnableBatchProcessing
public class RunDemo {
@Autowired
private JobBuilderFactory jobBuilders;
@Autowired
private StepBuilderFactory stepBuilders;
@Component
static class MyBatchConfigurer extends DefaultBatchConfigurer {
@Override
public void setDataSource(DataSource dataSource) {
}
}
@Bean
public Job demoJob() throws Exception {
File dataFile = new File("data.json");
if (!dataFile.exists()) {
dataFile.createNewFile();
}
FileOutputStream os = new FileOutputStream(dataFile);
os.write("[{\"city\":\"beijing\",\"name\":\"jaychou\"},".getBytes());
os.write("{\"city\":\"beijing\",\"name\":\"jaychou2\"}]".getBytes());
os.flush();
os.close();
JsonItemReader<Map> reader = new JsonItemReaderBuilder<Map>().name("reader")
.resource(new FileSystemResource(dataFile))
.jsonObjectReader(new JacksonJsonObjectReader<>(Map.class))
.saveState(true).build();
FlatFileItemWriter<String> writer = new FlatFileItemWriterBuilder<String>().name("writer")
.resource(new FileSystemResource("target/output.txt")).lineAggregator(new PassThroughLineAggregator<>()).build();
Step step = stepBuilders.get("step1").<Map, String>chunk(10).reader(reader).writer(writer)
.processor((ItemProcessor<Map, String>) (v) -> "__" + v.toString())
.build();
return jobBuilders.get("job").start(step).build();
}
}
补充
如果想要其他的输入和输出, 如KafkaItemReader,KafkaItemWriter, 则需要引入如下的依赖包
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-infrastructure</artifactId>
<version>4.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.0.1</version>
</dependency>
demo运行结果, 会生成output.txt
__{city=beijing, name=jaychou}
__{city=beijing, name=jaychou2}