spring batch 纯注解学习笔记(五)--文件读写

Spring batch由上至下的结构JobStep都是属于框架级别的的功能,大部分时候都是提供一些配置选项给开发人员使用,而Item中的ReaderProcessorWriter是属于业务级别的,它开放了一些业务切入的接口。 但是文件的读写过程中有很多通用一致的功能Spring Batch为这些相同的功能提供了一致性实现类。

1.扁平结构文件

扁平结构文件(也称为矩阵结构文件,后文简称为文件)是最常见的一种文件类型。他通常以一行表示一条记录,字段数据之间用某种方式分割。与标准的格式数据(xml、json等)主要差别在于他没有结构性描述方案(SXD、JSON-SCHEME),进而没有结构性分割规范。因此在读写此类文件之前需要先设定好字段的分割方法。

文件的字段数据分割方式通常有两种:使用分隔符或固定字段长度。前者通常使用逗号(,)之类的符号对字段数据进行划分,后者的每一列字段数据长度是固定的。 框架为文件的读取提供了FieldSet用于将文件结构中的信息映射到一个对象。FieldSet的作用是将文件的数据与类的field进行绑定(field是Java中常见的概念,不清楚的可以了解Java反射)。

2.数据读取

pring Batch为文件读取提供了FlatFileItemReader类,它为文件中的数据的读取和转换提供了基本功能。在FlatFileItemReader中有2个主要的功能接口,一是Resource、二是LineMapperResource用于外部文件获取,下面是一个例子:

Resource resource = new FileSystemResource("resources/trades.csv"); 

在复杂的生产环境中,文件通常由中心化、或者流程式的基础框架来管理(比如EAI)。因此文件往往需要使用FTP等方式从其他位置获取。如何迁移文件已经超出了Spring Batch框架的范围,在Spring的体系中可以参考Spring Integration项目。

下面是FlatFileItemReader的属性,每一个属性都提供了Setter方法。


属性

每个属性都为文件的解析提供了某方面的功能,下面是结构的说明。

2.1.LineMapper

这个接口的作用是将字符串转换为对象:

public interface LineMapper { T mapLine(String line, int lineNumber) throws Exception; }

接口的基本处理逻辑是聚合类(FlatFileItemReader)传递一行字符串以及行号给LineMapper::mapLine,方法处理后返回一个映射的对象。

2.1.LineTokenizer

这个接口的作用是将一行数据转换为一个FieldSet结构。对于Spring Batch而言,扁平结构文件的到Java实体的映射都通过FieldSet来控制,因此读写文件的过程需要完成字符串到FieldSet的转换:

public interface LineTokenizer { FieldSet tokenize(String line); }

这个接口的含义是:传递一行字符串数据,然后获取一个FieldSet。

框架为LineTokenizer提供三个实现类:

DelimitedLineTokenizer:利用分隔符将数据转换为FieldSet。最常见的分隔符是逗号,,类提供了分隔符的配置和解析方法。
FixedLengthTokenizer:根据字段的长度来解析出FieldSet结构。必须为记录定义字段宽度。
PatternMatchingCompositeLineTokenizer:使用一个匹配机制来动态决定使用哪个LineTokenizer。

2.3.FieldSetMapper

该接口是将FieldSet转换为对象:

public interface FieldSetMapper { T mapFieldSet(FieldSet fieldSet) throws BindException; }

FieldSetMapper通常和LineTokenizer联合在一起使用:String->FieldSet->Object。

2.4.DefaultLineMapper

DefaultLineMapper是LineMapper的实现,他实现了从文件到Java实体的映射:

public class DefaultLineMapper implements LineMapper<>, InitializingBean {
    private LineTokenizer tokenizer;
    private FieldSetMapper fieldSetMapper;
    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }
    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }
    public void setFieldSetMapper(FieldSetMapper fieldSetMapper) {
        this.fieldSetMapper = fieldSetMapper;
    }
}

在解析文件时数据是按行解析的:

传入一行字符串。
LineTokenizer将字符串解析为FieldSet结构。
FieldSetMapper继续解析为一个Java实体对象返回给调用者。

DefaultLineMapper是框架提供的默认实现类,看似非常简单,但是利用组合模式可以扩展出很多功能。

2.5.数据自动映射

在转换过程中如果将FieldSet的names属性与目标类的field绑定在一起,那么可以直接使用反射实现数据转换,为此框架提供了BeanWrapperFieldSetMapper来实现。

DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); //创建LineMapper

DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); //创建LineTokenizer
tokenizer.setNames(new String[] { "siteId", "month", "type", "value", "ext" }); //设置Field名称

BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper 
    = new BeanWrapperFieldSetMapper<>(); //创建FieldSetMapper
wrapperMapper.setTargetType(WeatherEntity.class); //设置实体,实体的field名称必须和tokenizer.names一致。

// 组合lineMapper
lineMapper.setLineTokenizer(tokenizer);
lineMapper.setFieldSetMapper(wrapperMapper);

2.6.文件读取总结

上面提到了各种接口和实现,实际上都是围绕着FlatFileItemReader的属性在介绍,虽然内容很多但是实际上就以下几点:

首先要定位文件,Spring Batch提供了Resource相关的定位方法。
其次是将文件中的行字符串数据转换为对象,LineMapper的功能就是完成这个功能。
框架为LineMapper提供了DefaultLineMapper作为默认实现方法,在DefaultLineMapper中需要组合使用LineTokenizer和FieldSetMapper。前者将字符串转为为一个Field,后者将Field转换为目标对象。
LineTokenizer有3个实现类可供使用、FieldSetMapper有一个默认实现类BeanWrapperFieldSetMapper。

2.7.文件读写可执行源码

运行之前需要配置数据库链接,参看源码库中的README.md。

文件读取的主要逻辑在org.chenkui.spring.batch.sample.items.FlatFileReader类:

public class FlatFileReader {
    // FeildSet的字段名,设置字段名之后可以直接使用名字作为索引获取数据。也可以使用索引位置来获取数据
    public final static String[] Tokenizer = new String[] { "siteId", "month", "type", "value", "ext" };
    private boolean userWrapper = false;

    @Bean
    //定义FieldSetMapper用于FieldSet->WeatherEntity
    public FieldSetMapper<WeatherEntity> fieldSetMapper() {
        return new FieldSetMapper<WeatherEntity>() {
            @Override
            public WeatherEntity mapFieldSet(FieldSet fieldSet) throws BindException {
                if (null == fieldSet) {
                    return null; // fieldSet不存在则跳过该行处理
                } else {
                    WeatherEntity observe = new WeatherEntity();
                    observe.setSiteId(fieldSet.readRawString("siteId"));
                    //Setter
                    return observe;
                }
            }
        };
    }

    @Bean
    // 配置 Reader
    public ItemReader<WeatherEntity> flatFileReader(
                           @Qualifier("fieldSetMapper") FieldSetMapper<WeatherEntity> fieldSetMapper) {
        FlatFileItemReader<WeatherEntity> reader = new FlatFileItemReader<>();
        reader.setResource(new FileSystemResource("src/main/resources/data.csv")); // 读取资源文件
        DefaultLineMapper<WeatherEntity> lineMapper = new DefaultLineMapper<>(); // 初始化 LineMapper实现类
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); // 创建LineTokenizer接口实现

        tokenizer.setNames(Tokenizer); // 设定每个字段的名称,如果不设置需要使用索引获取值
        lineMapper.setLineTokenizer(tokenizer); // 设置tokenizer工具

        if (userWrapper) { //使用 BeanWrapperFieldSetMapper 使用反射直接转换
            BeanWrapperFieldSetMapper<WeatherEntity> wrapperMapper = new BeanWrapperFieldSetMapper<>();
            wrapperMapper.setTargetType(WeatherEntity.class);
            fieldSetMapper = wrapperMapper;
        }

        lineMapper.setFieldSetMapper(fieldSetMapper);
        reader.setLineMapper(lineMapper);
        reader.setLinesToSkip(1); // 跳过的初始行,用于过滤字段行
        reader.open(new ExecutionContext());
        return reader;
    }
}

2.8.按字段长度格读取文件

除了按照分隔符,有些文件可以字段数据的占位长度来提取数据。按照前面介绍的过程,实际上只要修改LineTokenizer接口即可,框架提供了FixedLengthTokenizer类:

@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
    FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();

    tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
    //Range用于设定数据的长度。
    tokenizer.setColumns(new Range(1-12),
                        new Range(13-15),
                        new Range(16-20),
                        new Range(21-29));
    return tokenizer;
}

3.0写入扁平结构文件

将数据写入到文件与读取的过程正好相反:将对象转换为字符串。

3.1.LineAggregator

与LineMapper相对应的是LineAggregator,他的功能是将实体转换为字符串:

public interface LineAggregator<T> {
    public String aggregate(T item);
}

3.2.PassThroughLineAggregator

框架为LineAggregator接口提供了一个非常简单的实现类——PassThroughLineAggregator,其唯一实现就是使用对象的toString方法:

3.3.DelimitedLineAggregator

ineAggregator的另外一个实现类是DelimitedLineAggregator。与PassThroughLineAggregator简单直接使用toString方法不同的是,DelimitedLineAggregator需要一个转换接口FieldExtractor:

DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
lineAggregator.setDelimiter(",");
lineAggregator.setFieldExtractor(fieldExtractor);

3.4.FieldExtractor

FieldExtractor用于实体类到collection结构的转换。它可以和LineTokenizer进行类比,前者是将实体类转换为扁平结构的数据,后者是将String转换为一个FieldSet结构。

public interface FieldExtractor<T> {
    Object[] extract(T item);
}

框架为FieldExtractor接口提供了一个基于反射的实现类BeanWrapperFieldExtractor,其过程就是将实体对象转换为列表:

BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
fieldExtractor.setNames(new String[] {"field1", "field2"});

setName方法用于指定要转换的field列表。

3.5.输出文件处理

文件读取的逻辑非常简单:文件存在打开文件并写入数据,当文件不存在抛出异常。但是写入文件明显不能这么简单粗暴。新建一个JobInstance时最直观的操作是:存在同名文件就抛出异常,不存在则创建文件并写入数据。但是这样做显然有很大的问题,当批处理过程中出现问题需要restart,此时并不会从头开始处理所有的数据,而是要求文件存在并接着继续写入。为了确保这个过程FlatFileItemWriter默认会在新JobInstance运行时删除已有文件,而运行重启时继续在文件末尾写入。FlatFileItemWriter可以使用shouldDeleteIfExists、appendAllowed、shouldDeleteIfEmpty来有针对性的控制文件。

3.6.文件写入可执源码

文件写入主要代码在org.chenkui.spring.batch.sample.items.FlatFileWriter:

public class FlatFileWriter {

    private boolean useBuilder = true;

    @Bean
    public ItemWriter<MaxTemperatureEntiry> flatFileWriter() {
        BeanWrapperFieldExtractor<MaxTemperatureEntiry> fieldExtractor = new BeanWrapperFieldExtractor<>();
        fieldExtractor.setNames(new String[] { "siteId", "date", "temperature" }); //设置映射field
        fieldExtractor.afterPropertiesSet(); //参数检查

        DelimitedLineAggregator<MaxTemperatureEntiry> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(","); //设置输出分隔符
        lineAggregator.setFieldExtractor(fieldExtractor); //设置FieldExtractor处理器

        FlatFileItemWriter<MaxTemperatureEntiry> fileWriter = new FlatFileItemWriter<>();
        fileWriter.setLineAggregator(lineAggregator);
        fileWriter.setResource(new FileSystemResource("src/main/resources/out-data.csv")); //设置输出文件位置
        fileWriter.setName("outpufData");

        if (useBuilder) {//使用builder方式创建
            fileWriter = new FlatFileItemWriterBuilder<MaxTemperatureEntiry>().name("outpufData")
                .resource(new FileSystemResource("src/main/resources/out-data.csv")).lineAggregator(lineAggregator)
                .build();
        }
        return fileWriter;
    }
}

文件的写入过程与读取过程完全对称相反:先用FieldExtractor将对象转换为一个collection结构(列表),然后用lineAggregator将collection转化为带分隔符的字符串。

4.文件说明

  • 代码中的测试数据来自数据分析交流项目bi-process-example,是NOAA的2015年全球天气监控数据。为了便于源码存储进行了大量的删减,原始数据有百万条,如有需要使用下列方式下载: curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/2015.csv.gz #数据文件 curl -O ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/ghcnd-stations.txt # 文件结构及类型说明
  • 代码实现了读取文件、处理文件、写入文件的整个过程。处理文件的过程是只获取监控的最高温度信息(Type=TMAX),其他都过滤。
  • 本案例的代码使用org.chenkui.spring.batch.sample.flatfile.FlatFileItemApplication::main方法运行,使用的是Command Runner的方式执行(运行方式的说明见Item概念及使用代码命令行方式运行Java内嵌运行)。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容