spring batch 纯注解学习笔记(四)--Item概念及使用代码

批处理概念 中介绍一个标准的批处理分为 JobStep。本文将结合代码介绍在StepReaderProcessorWriter的实际使用。

1.Reader

Reader是指从各种各样的外部输入中获取数据,框架为获取各种类型的文件已经预定义了常规的Reader实现类。Reader通过ItemReader接口实现:

public interface ItemReader<T> {
    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;
}

read方法的作用就是读取一条数据,数据以泛型T的实体结构返回,当read返回null时表示所有数据读取完毕。返回的数据可以是任何结构,比如文件中的一行字符串,数据库的一行数据,或者xml文件中的一系列元素,只要是一个Java对象即可。

2.Writer

Writer通过ItemWriter接口实现:

public interface ItemWriter<T> {
    void write(List<? extends T> items) throws Exception;
}

WriterReader的反向操作,是将数据写入到特定的数据源中。在Step控制一文已经介绍Writer是根据chunk属性设定的值按列表进行操作的,所以传入的是一个List结构。chunk用于表示批处理的事物分片,因此需要注意的是,在writer方法中进行完整数据写入事物操作。例如向数据库写入List中的数据,在写入完成之后再提交事物。

3.Processor

除了使用组合模式,直接使用Processor是一种更优雅的方法。Processor是Step中的可选项,但是批处理大部分时候都需要对数据进行处理,因此框架提供了ItemProcessor接口来满足Processor过程:

public interface ItemProcessor<I, O> {
    O process(I item) throws Exception;
}

Processor的结构非常简单也是否易于理解。传入一个类型I,然后由Processor处理成为O。

4.1.Processor链

在一个Step中可以使用多个Processor来按照顺序处理业务,此时同样可以使用CompositeItem模式来实现:

@Bean
public CompositeItemProcessor compositeProcessor() {
    //创建 CompositeItemProcessor
    CompositeItemProcessor<Foo,Foobar> compositeProcessor = new CompositeItemProcessor<Foo,Foobar>();
    List itemProcessors = new ArrayList();
    //添加第一个 Processor
    itemProcessors.add(new FooTransformer());
    //添加第二个 Processor
    itemProcessors.add(new BarTransformer());
    //添加链表
    compositeProcessor.setDelegates(itemProcessors);
    return processor;
}

4.1.过滤记录

在Reader读取数据的过程中,并不是所有的数据都可以使用,此时Processor还可以用于过滤非必要的数据,同时不会影响Step的处理过程。只要ItemProcesspr的实现类在procss方法中返回null即表示改行数据被过滤掉了。

5.ItemStream

Step控制一文中已经提到了ItemStream。在数据批处理概念中提到过,Spring Batch的每一步都是无状态的,进而ReaderWriter也是无状态的,这种方式能够很好的隔离每行数据的处理,也能将容错的范围收窄到可以空子的范围。但是这并不意味着整个批处理的过程中并不需要控制状态。例如从数据库持续读入或写入数据,每次ReaderWriter都单独去申请数据源的链接、维护数据源的状态(打开、关闭等)。因此框架提供了ItemStream接口来完善这些操作:

public interface ItemStream {
    void open(ExecutionContext executionContext) throws ItemStreamException;
    void update(ExecutionContext executionContext) throws ItemStreamException;
    void close() throws ItemStreamException;
}

6.持久化数据

在使用Spring Batch之前需要初始化他的元数据存储(Meta-Data Schema),也就是要将需要用到的表导入到对应的数据库中。当然,Spring Batch支持不使用任何持久化数据库,仅仅将数据放到内存中,不设置DataSource即可。

6.1.初四化数据

Spring Batch相关的工作需要使用序列SEQUENCE:

CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ;
CREATE SEQUENCE BATCH_JOB_SEQ;

有些数据库不支持SEQUENCE,可以通过表代理,比如在MySql(InnoDB数据库)中:

CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_STEP_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_JOB_EXECUTION_SEQ values(0);
CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL);
INSERT INTO BATCH_JOB_SEQ values(0);

6.2.关于Version字段

某些表中都有Version字段。因为Spring的更新策略是乐观锁,因此在进行数据更新之后都会对表的Version字段进行+1处理。在内存与数据库交互的过程中,会使用采用getVersion、increaseVersion(+1)、updateDataAndVersion的过程,如果在update的时候发现Version不是预计的数值(+1),则会抛出OptimisticLockingFailureException的异常。当同一个Job在进群中不同服务上执行时,需要注意这个问题。

6.3.BATCH_JOB_INSTANCE

BATCH_JOB_INSTANCE用于记录JobInstance,在数据批处理概念中介绍了他的工作方式,其结构为:

CREATE TABLE BATCH_JOB_INSTANCE  (
  JOB_INSTANCE_ID BIGINT  PRIMARY KEY ,
  VERSION BIGINT,
  JOB_NAME VARCHAR(100) NOT NULL ,
  JOB_KEY VARCHAR(2500)
);

表结构

6.4.BATCH_JOB_EXECUTION_PARAMS

BATCH_JOB_EXECUTION_PARAMS对应的是JobParameters对象。其核心功能是存储Key-Value结构的各种状态数值。字段中IDENTIFYING=true用于标记那些运行过程中必须的数据(可以理解是框架需要用到的数据),为了存储key-value结构该表一个列数据格式:

CREATE TABLE BATCH_JOB_EXECUTION_PARAMS  (
    JOB_EXECUTION_ID BIGINT NOT NULL ,
    TYPE_CD VARCHAR(6) NOT NULL ,
    KEY_NAME VARCHAR(100) NOT NULL ,
    STRING_VAL VARCHAR(250) ,
    DATE_VAL DATETIME DEFAULT NULL ,
    LONG_VAL BIGINT ,
    DOUBLE_VAL DOUBLE PRECISION ,
    IDENTIFYING CHAR(1) NOT NULL ,
    constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)
    references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
);

表结构

6.5.BATCH_JOB_EXECUTION

关联JobExecution,每当运行一个Job都会产生一个新的JobExecution,对应的在表中都会新增一行数据。

CREATE TABLE BATCH_JOB_EXECUTION  (
  JOB_EXECUTION_ID BIGINT  PRIMARY KEY ,
  VERSION BIGINT,
  JOB_INSTANCE_ID BIGINT NOT NULL,
  CREATE_TIME TIMESTAMP NOT NULL,
  START_TIME TIMESTAMP DEFAULT NULL,
  END_TIME TIMESTAMP DEFAULT NULL,
  STATUS VARCHAR(10),
  EXIT_CODE VARCHAR(20),
  EXIT_MESSAGE VARCHAR(2500),
  LAST_UPDATED TIMESTAMP,
  JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,
  constraint JOB_INSTANCE_EXECUTION_FK foreign key (JOB_INSTANCE_ID)
  references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ;

表结构

6.6.BATCH_STEP_EXECUTION

该表对应的是StepExecution,其结构和BATCH_JOB_EXECUTION基本相似,只是对应的对象是Step,增加了与之相对的一些字段数值:

CREATE TABLE BATCH_STEP_EXECUTION  (
  STEP_EXECUTION_ID BIGINT  PRIMARY KEY ,
  VERSION BIGINT NOT NULL,
  STEP_NAME VARCHAR(100) NOT NULL,
  JOB_EXECUTION_ID BIGINT NOT NULL,
  START_TIME TIMESTAMP NOT NULL ,
  END_TIME TIMESTAMP DEFAULT NULL,
  STATUS VARCHAR(10),
  COMMIT_COUNT BIGINT ,
  READ_COUNT BIGINT ,
  FILTER_COUNT BIGINT ,
  WRITE_COUNT BIGINT ,
  READ_SKIP_COUNT BIGINT ,
  WRITE_SKIP_COUNT BIGINT ,
  PROCESS_SKIP_COUNT BIGINT ,
  ROLLBACK_COUNT BIGINT ,
  EXIT_CODE VARCHAR(20) ,
  EXIT_MESSAGE VARCHAR(2500) ,
  LAST_UPDATED TIMESTAMP,
  constraint JOB_EXECUTION_STEP_FK foreign key (JOB_EXECUTION_ID)
  references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;

未填入内容部分见BATCH_JOB_EXECUTION说明。


表结构

6.7.BATCH_JOB_EXECUTION_CONTEXT

该表会记录所有与Job相关的ExecutionContext信息。每个ExecutionContext都对应一个JobExecution,在运行的过程中它包含了所有Job范畴的状态数据,这些数据在执行失败后对于后续处理有中重大意义。

CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT  (
  JOB_EXECUTION_ID BIGINT PRIMARY KEY,
  SHORT_CONTEXT VARCHAR(2500) NOT NULL,
  SERIALIZED_CONTEXT CLOB,
  constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)
  references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ;


表结构

6.8.BATCH_STEP_EXECUTION_CONTEXT

Step中ExecutionContext相关的数据表,结构与BATCH_JOB_EXECUTION_CONTEXT完全一样。

6.9.表索引建议

上面的所有建表语句都没有提供索引,但是并不代表索引没有价值。当感觉到SQL语句的执行有效率问题时候,可以增加索引。

索引带来的价值取决于SQL查询的频率以及关联关系,下面是Spring Batch框架在运行过程中会用到的一些查询条件语句,用于参考优化索引:


表结构

7.1.使用案例

Spring Batch提供了2种执行方式:命令行方式或Java内嵌方式。命令行方式是直到需要执行批处理任务的时候才启动程序,内嵌方式是结合Web工程或其他外部化框架来使用。2者最大的差别就是是否直接向IoCs注入一个Job实例。

7.2.通用基本配置

两种方式的基本配置都是一样的,通过Reader、Processor、Writer来组装一个Step。代码中Item并不涉及文件或数据库的操作,只是简单的模拟数据读取、处理、写入的过程。实体Record和Msg用于模拟数据转换,基本配置如下:

public class BatchDefaultConfig {
    @Bean
    //配置Step
    public Step simpleStep(StepBuilderFactory builder, ItemReader<Record> reader, ItemProcessor<Record, Msg> processor,
            ItemWriter<Msg> writer) {
        return builder.get("SimpleStep").<Record, Msg>chunk(10).reader(reader).processor(processor).writer(writer)
                .build();
    }

    @Bean
    //配置 Reader
    public ItemReader<Record> reader() {
        return new ItemReader<Record>() {
            private int count = 0;
            public Record read()
                    throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                return ++this.count < 100 ? new Record().setId(this.count).setMsg("Read Number:" + this.count) : null;
            }
        };
    }

    @Bean
    //配置 Processor
    public ItemProcessor<Record, Msg> processor() {
        return new ItemProcessor<Record, Msg>() {
            public Msg process(Record item) throws Exception {
                return new Msg("MSG GET INFO = " + item.getMsg());
            }
        };
    }

    @Bean
    //配置 Writer
    public ItemWriter<Msg> writer() {
        return new ItemWriter<Msg>() {
            private int batchCount = 0;
            public void write(List<? extends Msg> items) throws Exception {
                System.out.println("Batch Count : " + ++batchCount + ". Data:");
                for (Msg msg : items) {
                    System.out.println(msg.getInfo());
                }
            }
        };
    }
}

7.3.命令行方式运行

有了基本配置之后,命令行运行的方式仅仅是向容器添加一个Job:

@Configuration
//导入依赖配置
@Import({ BatchDefaultConfig.class })
public class BatchCommondConfig {
    @Bean
    public Job simpleJob(Step step, JobBuilderFactory builder) {
        return builder.get("SimpleJob").start(step).build(); //向容器返回一个Job的Bean
    }
}

然后启动Spring Framework则会自动启用Command Runner运行方式运行——先调用SpringApplication::callRunner方法,然后使用JobLauncherCommandLineRunner::execute运行:

public class CommondSample {
    public static void main(String[] args) throws DuplicateJobException {
        //模拟测试参数, 这些参数值在执行Java时从外部传入的,比如-Dkey=value
        String[] argsExt = new String[2];
        argsExt[0] = "BuilderParam1=Value1";
        argsExt[1] = "BuilderParam2=Value2";
        //运行Spring Framework
        SpringApplication.run(CommondSample.class, argsExt);
    }
}

启用之后观察数据库已经发生了变更。使用命令行需要通过 Java运行参数(-Dkey=value)传递JobParameters的数据,上面的代码模拟实现了相关的过程。

7.4.Java内嵌运行

Java内嵌的方式主要是用于搭配外部工程化使用,比如使用Web框架或则统一调度平台管之类的结构化框架来统一管理批处理任务。与命令行执行最大的区别就是不向容器注入Job:

@Configuration
//导入进出配置 
@Import({BatchDefaultConfig.class})
public class BatchOperatoConfig {
    @Bean
    //返回JobFactory
    public JobFactory simpleJob(Step step, JobBuilderFactory builder) {
        SimpleJobFactory sampleJobFactory = new SimpleJobFactory();
        sampleJobFactory.setJob(builder.get("SimpleJob").start(step).build());
        return sampleJobFactory;
    }
}

配置代码向容器添加了一个JobFactory的实现类,JobFactory的两个接口一个是获取Job一个是获取Job的名称,SimpleJobFactory实现了JobFactory:

public class SimpleJobFactory implements JobFactory {
    private Job job;
    public void setJob(Job job) {
        this.job = job;
    }
    @Override
    public Job createJob() {
        return job;
    }
    @Override
    public String getJobName() {
        return job.getName();
    }
}

最后通过SimpleJobFactory来启动一个Job:

@EnableBatchProcessing
@EnableScheduling
public class OperatorSample {
    public static void main(String[] args) throws DuplicateJobException {
        new SuspendThread().run(); //挂起系统一直运行
        ConfigurableApplicationContext ctx = SpringApplication.run(OperatorSample.class);
        Cron cron = ctx.getBean(Cron.class);
        cron.register(); //注册JobFactory
        cron.runJobLaunch();
    }
}

@Service
class Cron {
    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    private JobOperator jobOperator;

    @Autowired
    private JobRegistry jobRegistry;

    @Autowired
    private JobFactory jobFactory;

    //注册JobFactory
    void register() throws DuplicateJobException {
        jobRegistry.register(jobFactory);
    }

    //使用JobLaunch执行
    void runJobLaunch() {
        Map<String, JobParameter> map = new HashMap<>();
        map.put("Builder", new JobParameter("1"));
        map.put("Timer", new JobParameter("2"));
        jobLauncher.run(jobFactory.createJob(), new JobParameters(map));
    }

    @Scheduled(cron = "30 * * * * ? ")
    void task1() {
        System.out.println("1");
        runOperator();
    }

    //定时任务使用 JobOperator执行
    private void runOperator() {
        jobOperator.start("SimpleJob", "Builder=1,Timer=2");
    }
}

这里使用了2种执行方式:JobLauncherJobOperatorJobLauncher简单明了的启动一个批处理任务。而JobOperator扩展了一些用于Job管理的接口方法,观察JobOperator的源码可以发现它提供了获取ExecuteContext、检查JobInstance等功能,如果需要定制开发一个基于Web或者JMX管理批处理任务的系统,JobOperator更合适。JobOperator的第二个参数用于传递JobParameters,等号两端分别是keyvalue,逗号用于分割多行数据。

Job配置与运行提及过一个JobInstance相当于Job+JobParameters,因此虽然上面的代码使用了两种不同的运行方式,但是JobJobParameters是一样的。在运行被定时任务包裹的runOperator方法时,会一直抛出JobInstanceAlreadyExistsException异常,因为同一个实例不能运行2次。如果运行失败可以使用对应的restart方法。

后续会介绍各种ReaderWriter的使用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容