我们知道parquet文件格式是不能进行update操作的。但是是否可以对其进行添加一列数据呢?
先看看parquet文件长什么样
Parquet文件是以二进制方式存储的,是不可以直接读取和修改的,Parquet文件是自解析的,文件中包括该文件的数据和元数据。在HDFS文件系统和Parquet文件中存在如下几个概念:
- HDFS块(Block):它是HDFS上的最小的副本单位,HDFS会把一个Block存储在本地的一个文件并且维护分散在不同的机器上的多个副本,通常情况下一个Block的大小为256M、512M等。
- HDFS文件(File):一个HDFS的文件,包括数据和元数据,数据分散存储在多个Block中。
- 行组(Row Group):按照行将数据物理上划分为多个单元,每一个行组包含一定的行数,在一个HDFS文件中至少存储一个行组,Parquet读写的时候会将整个行组缓存在内存中。
- 列块(Column Chunk):在一个行组中每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。不同的列块可能使用不同的算法进行压缩。
- 页(Page):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不同的编码方式。
通常情况下,在存储Parquet数据的时候会按照HDFS的Block大小设置行组的大小,由于一般情况下每一个Mapper任务处理数据的最小单位是一个Block,这样可以把每一个行组由一个Mapper任务处理,增大任务执行并行度。
上图展示了一个Parquet文件的结构,一个文件中可以存储多个行组,文件的首位都是该文件的Magic Code,用于校验它是否是一个Parquet文件,Footer length存储了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和当前文件的Schema信息。除了文件中每一个行组的元数据,每一页的开始都会存储该页的元数据,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引,目前Parquet中还不支持索引页,但是在后面的版本中增加。
从parquet文件格式中可以看出,如果我们需要为文件添加一列数据的话,需要对每个row group 添加列,这样势必会打破原有的数据偏移量,也有可能因为多了一列造成后续的row group全部变化,字典页和索引页也可能造成变化;另外在Footer中也需要根据新的数据列重新对每个偏移量进行计算。
所以基于此,并未看到parquet在加列操作中,是在原文件中添加(甚至在使用impala对table加列后,原parquet文件schema都未曾变化),而往往是新生成一个parquet文件。
示例
原表 entry, uuid, age, name 三个字段,建表指定parquet存储,并插入两条数据,在hdfs上生成一个parquet文件。
然后在impala执行add column操作,添加tel字段,并向其插入一些数据,生成一个新的parquet文件。通过parquet-Hadoop API直接读取parquet文件信息,获得
uuid->age->name->
row count: 2
{"uuid":"1",age":"20",name":"bob"}
{"uuid":"2",age":"10",name":"tom"}
uuid->age->name->tel->
row count: 6
{"uuid":"1",age":"20",name":"bob",tel":"186152372"}
{"uuid":"2",age":"30",name":"tom",tel":"186152372"}
{"uuid":"3",age":"40",name":"laiwb2",tel":"186152372"}
{"uuid":"4",age":"22",name":"bingo1",tel":"186152372"}
{"uuid":"6",age":"23",name":"feng",tel":"186152372"}
{"uuid":"7",age":"24",name":"bixians",tel":"186152372"}
从打印的信息来看,parquet文件1 schema只有三个字段uuid,age,name,这个文件就是建表时插入的,后面在add column后又插入了6条数据,这时,新生成的parquet文件的metadata多了一个tel,数据页中数据也多了tel列。
读取parquet文件主要的代码片段:
ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
// parquet 文件的 schema信息
MessageType schema = readFooter.getFileMetaData().getSchema();
List<Type> columnInfos = schema.getFields();
for (Type type : columnInfos) {
System.out.print(type.getName() + "->");
}
// 每个 row group 的数量
List<BlockMetaData> blockMeta = readFooter.getBlocks();
for (BlockMetaData bl : blockMeta) {
System.out.println("row count: " + bl.getRowCount());
}
ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).build();
int count = 0;
Group recordData = reader.read();
while (count < 10 && recordData != null) {
StringBuilder builder = new StringBuilder();
builder.append("{\"");
for (int j = 0; j < columnInfos.size(); j++) {
if (j < columnInfos.size() - 1) {
String columnName = columnInfos.get(j).getName();
String value = recordData.getValueToString(j, 0);
builder.append(columnName + "\":\"" + value + "\",");
} else {
String columnName = columnInfos.get(j).getName();
String value = recordData.getValueToString(j, 0);
builder.append(columnName + "\":\"" + value + "\"}");
}
}
System.out.println(builder.toString());
count++;
recordData = reader.read();
}
然后drop name字段(parquet文件不会有什么变动,只是元数据信息变化),在hive元数据中只有uuid,age,tel三个字段,使用impala查询和hive查询出现了不同数据。hive能够查询匹配出这三个字段的数据来,但impala却查询出的是uuid, age, name数据,看样子impala是根据元数据信息逐个取值的,并不是根据hive元数据和parquet元数据对应取值。
综上,parquet 理论上可以追加一列,但是代价比较高,需要对row group 进行修改,并且需要同步parquet metadata信息,修改数据的offerset和schema等,并且parquet-Hadoop也未提供相应的方法。
后续~~~
列式存储的另外一种存储格式ORC,与parquet的对比图。
从parquet与orc对比图中可以知道,orc支持ACID和update操作,接下来说说ORC。
和Parquet类似,它并不是一个单纯的列式存储格式,仍然是首先根据行组分割整个表,在每一个行组内进行按列存储。ORC文件是自描述的,它的元数据使用Protocol Buffers序列化,并且文件中的数据尽可能的压缩以降低存储空间的消耗,目前也被Spark SQL、Presto等查询引擎支持,但是Impala对于ORC目前没有支持,仍然使用Parquet作为主要的列式存储格式(可能parquet是cloudera自家开发的吧,竞品ORC架构和parquet又相似度很高)。
ORC文件结构
和Parquet类似,ORC文件也是以二进制方式存储的,所以是不可以直接读取,ORC文件也是自解析的,它包含许多的元数据,这些元数据都是同构ProtoBuffer进行序列化的。ORC的文件结构见下图,其中涉及到如下的概念:
- ORC文件:保存在文件系统上的普通二进制文件,一个ORC文件中可以包含多个stripe,每一个stripe包含多条记录,这些记录按照列进行独立存储,对应到Parquet中的row group的概念。
- 文件级元数据:包括文件的描述信息PostScript、文件meta信息(包括整个文件的统计信息)、所有stripe的信息和文件schema信息。
- stripe:一组行形成一个stripe,每次读取文件是以行组为单位的,一般为HDFS的块大小,保存了每一列的索引和数据。
- stripe元数据:保存stripe的位置、每一个列的在该stripe的统计信息以及所有的stream类型和位置。
- row group:索引的最小单位,一个stripe中包含多个row group,默认为10000个值组成。
- stream:一个stream表示文件中一段有效的数据,包括索引和数据两类。索引stream保存每一个row group的位置和统计信息,数据stream包括多种类型的数据,具体需要哪几种是由该列类型和编码方式决定。
从ORC数据格式来看,发现其结构和parquet很类似,也是先分行,每个row group里面,对数据进行列式存储,然后提供元数据(包括每个column的offset,字典表,压缩算法等等)。在parquet里面我谈到,如果想往parquet追加一列数据的话,需要往每个row group里面添加列,需要打破原有的column chunk,并且重新更新footer元数据信息。而ORC也类似,需要增添的列打摊到每个stripe,在row data里面添加一列,更新index和stream。
因为HDFS是一次写的文件系统,ORC是一次写的文件格式,因此ORC为了支持ACID操作,采用基础文件和增量文件来实现insert,update,delete。大致思想是transaction会被存储在增量文件中,并且当delte变多会自动合并,当查询数据时,将原数据与delta数据排序,然后取最近的更改。详情见ACID。