快速了解组件-spring batch(2)之helloworld

tags: springbatch


1.引言

前面《数据批处理神器-Spring Batch(1)简介及使用场景》已经介绍了Spring Batch是一个轻量级,完善的批处理框架,它使用起来简单,方便,比较适合有点编程基础(特别是使用Spring及SpringBoot框架)的开发人员,针对业务编程,只需要关心具体的业务实现即可,把流程以及流程的控制交给Spring Batch就好。常言道"talk is cheap, show me the code",下面我们就通过一个简单的hello world,进入Spring Batch的世界,通过这个示例,可以快速了解开发批处理的流程和Spring Batch开发用到的组件,为后续的操作打下基础。

2.开发环境

  • JDK: jdk1.8
  • Spring Boot: 2.1.4.RELEASE
  • Spring Batch:4.1.2.RELEASE
  • 开发IDE: IDEA
  • 构建工具Maven: 3.3.9
  • 日志组件logback:1.2.3
  • lombok:1.18.6

3.helloworld开发

3.1 helloworld说明

本helloworld实现一个非常简单的功能,就是从数据组中读取字符串,把字符串转为大写,然后输出到控制台。如图:

字符串读写

整个过程就是一个批重任务(Job),它只有一个步骤(Job Step),步骤里分为三个阶段,读数据(ItemReader)、处理数据(ItemProcessor)、写数据(ItemWriter)。

3.2 开发流程

开发的主要代码如下:

主要代码

总体来说就是,通过ReaderProcessorWriter完成任务,结束后通过Listener进行监听,整个任务通过配置(BatchConfig)进行配置。

3.2.1 创建Spring Boot工程

直接使用Idea生成或在使用Spring Initializr生成即可,此处不详细说明。也可以直接使用我的代码示例。当前使用的Spring Boot版本是2.1.4.RELEASE

3.2.2 添加相关依赖

  • Spring Batch依赖
    在使用spring-boot-starter-parent的情况下,直接添加以下依赖即可:
<!-- 批处理框架-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>

引用后,会引用两个jar包,一个是spring-batch-infrastructure,一个是spring-batch-core,版本是4.1.2.RELEASE。分别对应的是基础框架层和核心层。

  • 添加内存数据库H2
    Spring Batch是需要数据库来存储任务的基本信息以及运行状态的,本例中不需要操作数据库逻辑,直接使用内存数据库H2即可。添加以下依赖:
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
</dependency>
  • 添加测试及工具类依赖
    为了简化开发,使用lombok进行处理。使用Spring Boot进行单元测试,添加依赖如下:
<!-- 工具包:lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.6</version>
</dependency>
<!-- 测试框架 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

3.2.3 开发读数据组件ItemReader

添加完依赖后,就可以进入业务逻辑编程了。按Spring Batch的批处理流程,读数据ItemReader是第一步,当前示例中,我们的任务是从数组中读取数据。ItemReader是一个接口,开发人员直接实现此接口即可。此接口定义了核心方法read(),负责从给定的资源中读取可用的数据。具体实现如下:

@Slf4j
public class StringReader implements ItemReader<String> {
    private String[] messages = {"aaa1","aaa2","aaa3","aaa4"};
    private int count = 0;
    @Override
    public String read() throws UnexpectedInputException, ParseException, NonTransientResourceException {
        if(count < messages.length){
            String message = messages[count++];
            log.debug(LogConstants.LOG_TAG + "read data:"+message);
            return message;
        }else{
            log.debug(LogConstants.LOG_TAG + "read data end.");
            count = 0;
        }
        return null;
    }
}

说明:

  • (1)StringReader实现ItemReader接口;
  • (2)messages是数据源;
  • (3)count表示读取数据的下标,每读一次,下标自增,读取完后返回null表示结束。同时把count置为0,以方便下次读取。
  • (4)日志输出使用的是logback,结合lombok的@Slf4j注解,直接可使用log进行输出,简化操作。

3.2.4 开发处理数据组件ItemProcessor

读取数据后,返回的数据会流到ItemProcessor进行处理。同样,ItemProcessor是一个接口,要实现自己的处理逻辑,实现此接口即可。当然,如果没有ItemProcessor,读到的数据直接就到ItemWriter流程也是可以的。此处,Spring Batch有一个Chunk的概念,用于多次读,直到chunk指定的数量后,再统一给到processor和writer,以提高效率。本示例对于ItemProcessor的实现很简单,即把字符串转为大写。如下:

@Slf4j
public class ConvertProcessor implements ItemProcessor<String,String> {
    @Autowired
    private ConsoleService consoleService;
    @Override
    public String process(String data) {
        String dataProcessed = consoleService.convert2UpperCase(data);
        log.debug(LogConstants.LOG_TAG + data +" process data --> " + dataProcessed);
        return dataProcessed;
    }
}

说明:

  • 实现ItemProcessor接口,它有两个泛型,分别是I和O,I是读阶段获取的数据,O是提交给写阶段的数据。
  • 使用ConsoleService服务,对数据进行大写转换,里面的实现直接使用字符串的toUpperCase()方法

3.2.5 开发写数据组件ItemWriter

数据处理完后,会统一交给写组件(ItemWriter)进行写入。ItemWriter也是一个接口,核心方法是write方法,参数是数组。要实现自己的逻辑,实现此接口即可。本示例中,直接把数据输出到日志中即可。如下:

@Slf4j
public class ConsoleWriter implements ItemWriter<String> {
    @Override
    public void write(List<? extends String> list) {
        for (String msg :list) {
            log.debug(LogConstants.LOG_TAG + "write data: "+msg);
        }
    }
}

3.2.6 开发任务完成后的监听器JobExecutionListener

数据写入到目标后,任务即结束,但有时候我们还需要在任务结束时去做一些其它工作,如清理数据,更新时间等,则需要在任务完成后进行逻辑处理。Spring Batch对于任务或步骤开始和结束都会提供监听,以便于开发人员实现监听逻辑。如通过继承JobExecutionListenerSupport,可以实现beforeJobafterJob的监听,以实现开始任务前和结束任务后的处理。当前示例中,仅输出任务完成的日志。如下:

@Slf4j
public class ConsoleJobEndListener extends JobExecutionListenerSupport {
    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED){
            log.debug("console batch job complete!");
        }
    }
}

3.2.7 配置完整任务

经过上面的读、处理、写、任务完成后监听的操作,现在需要把它们组装在一起,形成一个完成的任务,使用Spring Boot,简单的使用几个配置即可完成任务的组装。任务及其相关组件的关系如下:

组件关系

创建配置文件ConsoleBatchConfig.java,具体代码如下:

@Configuration
@EnableBatchProcessing
public class ConsoleBatchConfig {
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job consoleJob(Step consoleStep,JobExecutionListener consoleListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return jobBuilderFactory.get(funcName).listener(consoleListener).flow(consoleStep)
                .end().build();
    }

    @Bean
    public Step consoleStep(ItemReader stringReader,ItemProcessor convertProcessor
            ,ItemWriter consoleWriter, CommonStepListener commonStepListener){
        String funcName = Thread.currentThread().getStackTrace()[1].getMethodName();
        return stepBuilderFactory.get(funcName).listener(commonStepListener)
                .<String,String>chunk(3).reader(stringReader).processor(convertProcessor)
                .writer(consoleWriter).build();
    }

    @Bean
    public ItemReader stringReader(){return new StringReader();}

    @Bean
    public ItemWriter consoleWriter(){return new ConsoleWriter();}

    @Bean
    public ItemProcessor convertProcessor(){return new ConvertProcessor();}

    @Bean
    public JobExecutionListener consoleListener(){return new ConsoleJobEndListener();}
}

说明:

  • 添加注解@Configuration及@EnableBatchProcessing,标识为配置及启用Spring Batch的配置(可以直接使用JobBuilderFactoryStepBuilderFactory分别用于创建Job和Step)。
  • 创建ItemReaderItemWriterItemProcessorListener对应的Bean,以供Step及Job的注入。
  • 使用stepBuilderFactory创建作业Step,其中chunk进行面向块的处理,即多次读取后再写入,提高效率。当前配置是3个为一个chunk。
  • 使用jobBuilderFactory添加step,创建任务。
  • 注意step和Job都需要有对应的名称(get方法确定),此处直接使用方法名作为Job和Step的名称。

3.2.8 测试批处理

经过上面的步骤,已经完成Job的开发,测试则可使用两种方式,一个是编写Controller,以接口调用的方式运行job,一种编写单元测试。

  • Job的运行
    通过JobLauncherrun方法来运行任务,run方法参数分别是JobjobParameters,即已配置的Job及job运行的参数。每个任务的区分是通过任务名(jobName)和任务参数(jobParameters)作为区别的,即如果jobNamejobParameters相同,Spring Batch会认为是同一任务,若任务已运行成功,同一任务不会再运行。因此,一般来说,不同的任务,我们的jobParameters可以直接以时间作为参数,以便于区别。生成jobParameters。代码如下:
JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time",System.currentTimeMillis())
                .toJobParameters();
  • 编写单元测试
    编写ConsoleJobTest,加载job,运行测试,如下所示:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MainBootApplication.class,ConsoleBatchConfig.class})
@Slf4j
public class ConsoleJobTest {
    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job consoleJob;

    public void testConsoleJob2() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        //构建参数
        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time",System.currentTimeMillis())
                .toJobParameters();
        //执行任务
        JobExecution run = jobLauncher.run(consoleJob, jobParameters);
        ExitStatus exitStatus = run.getExitStatus();
        log.debug(exitStatus.toString());
    }
}

说明:引入SpringBootTest注解时,需要把Spring Batch任务也引入进来。

  • 执行结果输出
    执行结果如下图所示:

    执行结果

    从输出可知,由于设置的chunk是3,读取3个数据后,就统一给ItemProcessor进行大写转换处理,然后统一交给ItemWriter进行写入。执行完成后,Job的exitCode表示任务执行的状态,如果正常则为COMPLETED,失败则为FAILED

4.总结

经过以上的操作步骤,即可完成批处理操作。关于任务的状态,流程的步骤(读、处理、写)均交给Spring Batch来完成,开发人员所做的工作是根据自己的业务逻辑编写具体的读数据、处理数据和写数据即可。希望通过本文,大家可以对Spring Batch的组件有清晰的了解。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容