生成一个parquet文件
下载
wget [https://github.com/apache/parquet-mr/archive/apache-parquet-1.10.0.tar.gz](https://github.com/apache/parquet-mr/archive/apache-parquet-1.10.0.tar.gz)
tar xvf parquet-mr-apache-parquet-1.10.0.tar.gz
编译
cd parquet-cli
mvn clean install -DskipTests
运行
mvn dependency:copy-dependencies
java -cp 'target/*:target/dependency/*' org.apache.parquet.cli.Main
准备csv文件
a,b,c
0,a,0.0
1,b,1.1
2,c,2.2
3,d,
4,,4.4
,f,5.5
,,
7,h,7.7
8,i,8.8
9,j,9.9
生成parquet文件
java -cp target/parquet-cli-1.10.0.jar:target/dependency/parquet-avro-1.10.0.jar:target/dependency/* org.apache.parquet.cli.Main convert-csv sample.csv -o sample.parquet
ls
README.md parquet-cli.iml pom.xml sample.csv sample.parquet src target
可见目录下多了一个sample.parquet文件。这里没有指定avsc格式的schema文件,cli会生成默认的schema文件。
schema文件长什么样子呢?我们通过cli来生成单独的schema文件:
java -cp 'target/:target/dependency/' org.apache.parquet.cli.Main csv-schema sample.csv --record-name sample -o sample.avsc
结果保存在sample.avsc:
cat sample.avsc
{
"type" : "record",
"name" : "sample",
"fields" : [ {
"name" : "a",
"type" : [ "null", "long" ],
"doc" : "Type inferred from '0'",
"default" : null
}, {
"name" : "b",
"type" : [ "null", "string" ],
"doc" : "Type inferred from 'a'",
"default" : null
}, {
"name" : "c",
"type" : [ "null", "double" ],
"doc" : "Type inferred from '0.0'",
"default" : null
} ]
}
mac下工具集安装
brew install parquet-tools
工具集包含两个文件:
/usr/local/Cellar/parquet-tools/1.10.0/bin/parquet-tools
/usr/local/Cellar/parquet-tools/1.10.0/libexec/parquet-tools-1.10.0.jar
这两个工具的效果是相同的,下面以parquet-tools来举例。
cat
parquet-tools cat sample.parquet
a = 0
b = a
c = 0.0
a = 1
b = b
c = 1.1
a = 2
b = c
c = 2.2
a = 3
b = d
a = 4
b =
c = 4.4
b = f
c = 5.5
b =
a = 7
b = h
c = 7.7
a = 8
b = i
c = 8.8
a = 9
b = j
c = 9.9
schema
parquet-tools schema sample.parquet
message sample {
optional int64 a;
optional binary b (UTF8);
optional double c;
}
parquet-tools meta sample.parquet
file: file:/Users/xxx/tools/parquet-mr-apache-parquet-1.10.0/parquet-cli/sample.parquet
creator: parquet-mr version 1.10.0 (build ${buildNumber})
extra: parquet.avro.schema = {"type":"record","name":"sample","fields":[{"name":"a","type":["null","long"],"doc":"Type inferred from '0'","default":null},{"name":"b","type":["null","string"],"doc":"Type inferred from 'a'","default":null},{"name":"c","type":["null","double"],"doc":"Type inferred from '0.0'","default":null}]}
extra: writer.model.name = avro
file schema: sample
--------------------------------------------------------------------------------
a: OPTIONAL INT64 R:0 D:1
b: OPTIONAL BINARY O:UTF8 R:0 D:1
c: OPTIONAL DOUBLE R:0 D:1
row group 1: RC:10 TS:346 OFFSET:4
--------------------------------------------------------------------------------
a: INT64 GZIP DO:0 FPO:4 SZ:109/133/1.22 VC:10 ENC:PLAIN,RLE,BIT_PACKED ST:[min: 0, max: 9, num_nulls: 2]
b: BINARY GZIP DO:0 FPO:113 SZ:76/80/1.05 VC:10 ENC:PLAIN,RLE,BIT_PACKED ST:[min: , max: j, num_nulls: 0]
c: DOUBLE GZIP DO:0 FPO:189 SZ:120/133/1.11 VC:10 ENC:PLAIN,RLE,BIT_PACKED ST:[min: -0.0, max: 9.9, num_nulls: 2]
使用java读写parquet文件
一般不会直接使用Java写parquet文件,而是直接建一张hive表,表格式设置为parquet,通过hive写入数据保存成parquet格式。
## pom文件
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>0.23.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.1</version>
</dependency>
</dependencies>
## 写文件
WriteExample
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import [org.apache.parquet.hadoop.metadata.CompressionCodecName;](http://org.apache.parquet.hadoop.metadata.compressioncodecname;/)
public class WriteExample {
public static void main(String[] args) throws IllegalArgumentException, IOException {
List<Field> fields = new ArrayList<Field>();
Object defaultValue = null;
fields.add(new Field("x", Schema.create(Type.INT), "x", defaultValue));
fields.add(new Field("y", Schema.create(Type.INT), "y", defaultValue));
Schema schema = Schema.createRecord("name", "doc", "namespace", false, fields);
try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(
new Path("my-file.parquet")).withSchema(schema).withCompressionCodec(CompressionCodecName.SNAPPY)
.build()) {
// 模拟10000行数据
for (int r = 0; r < 10000; ++r) {
Record record = new Record(schema);
record.put(0, r);
record.put(1, r * 3);
writer.write(record);
}
}
}
}
## 读文件
import java.io.IOException;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
public class ReadExample {
public static void main(String[] args) throws IllegalArgumentException, IOException {
ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path("my-file.parquet"))
.build();
GenericRecord record;
while ((record = reader.read()) != null) {
System.out.println(record);
}
}
}
部分结果
{"x": 0, "y": 0}
{"x": 1, "y": 3}
{"x": 2, "y": 6}
{"x": 3, "y": 9}
{"x": 4, "y": 12}
{"x": 5, "y": 15}
parquet基本原理
在互联网大数据应用场景下,通常数据量很大且很多,而每次查询数据只针对其中的少数几个字段,这时候列式存储是极好的选择。列式存储要 解决的问题:
- 把IO只给查询需要用到的数据: 只加载需要被计算的列
- 空间节省: 1. 列式的压缩效果更好 2. 可以针对数据类型进行编码,由于每列存的数据类型是相同的。
Parquet是一种列式存储:
Parquet只是一种存储格式,它与上层平台、语言无关,不需要与任何一种数据处理框架绑定,能够与Parquet配合的组件有:
查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM Big SQL
计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite
数据模型: Avro, Thrift, Protocol Buffers, POJOs
那么Parquet是如何与这些组件协作的呢?数据从内存到Parquet文件或者反过来的过程主要由以下三个部分组成:
- 存储格式(storage format)
parquet-format项目定义了Parquet内部的数据类型、存储格式等。
- 对象模型转换器(object model converters)
这部分功能由parquet-mr项目来实现,主要完成外部对象模型与Parquet内部数据类型的映射。
- 对象模型(object models)
对象模型可以简单理解为内存中的数据表示,Avro, Thrift, Protocol Buffers, Hive SerDe, Pig Tuple, Spark SQL InternalRow等这些都是对象模型
例如parquet-mr项目里的parquet-pig项目就是负责把内存中的Pig Tuple序列化并按列存储成Parquet格式,以及反过来把Parquet文件的数据反序列化成Pig Tuple。
这里需要注意的是Avro, Thrift, Protocol Buffers都有他们自己的存储格式,但是Parquet并没有使用他们,而是使用了自己在parquet-format项目里定义的存储格式。所以如果你的应用使用了Avro等对象模型,这些数据序列化到磁盘还是使用的parquet-mr定义的转换器把他们转换成Parquet自己的存储格式。
什么是列式存储?
列式存储,顾名思义就是按照列进行存储数据,把某一列的数据连续的存储,每一行中的不同列的值离散分布。传统的思维中是按照一条记录一条记录的组织存储,列式存储是竖过来,按照一列一列的方式组织存储。对于一个查询,尽量只读取对这个查询有用的数据,以此来让磁盘IO 最小. 通过使用 Parquet这种列式存储,可以做到把 Twitter 的大数据集上的 IO缩减到原来的 1/3. 做到了『指哪打哪』,也就是遍历(scan)一个数据集的时候,如果只读取部分列,那么读取时间也相应会缩短,时间缩短的比例就是那几列的数据量占全部列数据量的比例. 原理很简单,就是不采用传统的按行存储,而是连续存储一列的数据.如果数据是扁平的(比如二维表形式),那列改成按列存储毫无难度,处理嵌套的数据结构才是真正的挑战.开源项目 Parquet 是 Hadoop 上的一种支持列式存储文件格式,它把嵌套结构存储成扁平格式。
列式存储可以大大提升这类查询的性能,较之于行是存储,列式存储能够带来这些优化:
- 查询的时候不需要扫描全部的数据,而只需要读取每次查询涉及的列,这样可以将I/O消耗降低N倍,另外可以保存每一列的统计信息(min、max、sum等),实现部分的谓词下推。
- 由于每一列的成员都是同构的,可以针对不同的数据类型使用更高效的数据压缩算法,进一步减小I/O。
- 由于每一列的成员的同构性,可以使用更加适合CPU pipeline的编码方式,减小CPU的缓存失效。
Parquet详解
数据模型
理解Parquet首先要理解这个列存储格式的数据模型。我们以一个下面这样的schema和数据为例来说明这个问题。
message AddressBook {
required string owner;
repeated string ownerPhoneNumbers;
repeated group contacts {
required string name;
optional string phoneNumber;
}
}
这个schema中每条记录表示一个人的AddressBook。有且只有一个owner,owner可以有0个或者多个ownerPhoneNumbers,owner可以有0个或者多个contacts。每个contact有且只有一个name,这个contact的phoneNumber可有可无。
每个schema的结构是这样的:根叫做message,message包含多个fields。每个field包含三个属性:repetition, type, name。repetition可以是以下三种:required(出现1次),optional(出现0次或者1次),repeated(出现0次或者多次)。type可以是一个group或者一个primitive类型。
Parquet格式的数据类型不需要复杂的Map, List, Set等,而是使用repeated fields 和 groups来表示。例如List和Set可以被表示成一个repeated field,Map可以表示成一个包含有key-value 对的repeated group
,而且key是required的。
嵌套结构的模型
嵌套结构的模型,多个 field 可以形成一个 group,一个 field 可以重复出现(叫做repeated field),这样就简单地描述了嵌套和重复,没有必要用更复杂的结构如Map / List / Sets,因为这些都能用 group 和 repeated field的各种组合来描述。整个结构是从最外层一个message 开始的. 每个 field 有三个属性:repetition、type、name. 一个field 的 type 属性,要么是 group,要么是基本类型(int, float, boolean,string),repetition 属性,有以下三种:
- required:出现,且只能出现 1 次.
- optional 出现 1 或 0 次.
- repeated:0 到 任意多次
example
message AddressBook {
required string owner;
repeated string ownerPhoneNumbers;
repeated group contacts {
required string name;
optional string phoneNumber;
}
}
Lists(或者 Sets)可以用 repeated field 表示.
Maps,首先有一个repeated field 在外面,里面每个 field,是一个 group,group 里面是key-value 对,其中key 是 required 的.
列存储格式
列存储通过将相同基本类型(primitive type)的值存储在一起来提供高效的编码和解码。为了用列存储来存储如上嵌套的数据结构,我们需要将该schema用某种方式映射到一系列的列使我们能够将记录写到列中并且能读取成原来的嵌套的数据结构。
在Parquet格式的存储中,一个schema的树结构有几个叶子节点(叶子节点都是primitive type),实际的存储中就会有多少column。
上面的schema的树结构如图所示:
上面这个schema的数据存储实际上有四个column,如下图所示:
只有字段值不能表达清楚记录的结构。给出一个repeated field的两个值,我们不知道此值是按什么‘深度’被重复的(比如,这些值是来自两个不同的记录,还是相同的记录中两个重复的值)。同样的,给出一个缺失的可选字段,我们不知道整个路径有多少字段被显示定义了。因此我们将介绍repetition level 和 definition level的概念。
example:
两条嵌套的记录和它们的schema:
将上图的两条记录用列存储表示:
上面的例子主要是想让大家对嵌套结构的列式存储有个直观的印象,包括repetition level 和 definition level的应用,接下来详细介绍repetition level 和 definition level。
Definition levels
Definition level指明该列的路径上多少个可选field被定义了。
嵌套数据类型的特点是有些field(optional field 和 repeated field)可以是空的,也就是没有定义。如果一个field是定义的,那么它的所有的父节点都是被定义的。从根节点开始遍历,当某一个field的路径上的节点开始是空的时候我们记录下当前的深度作为这个field的Definition Level。如果一个field的definition Level等于这个field的最大definition Level就说明这个field是有数据的。对于required类型的field必须是有定义的,所以这个Definition Level是不需要的.
注:definition Level是该路径上有定义的repeated field 和 optional field的个数,不包括required field,因为required field是必须有定义的。
再举个简单的例子:
message ExampleDefinitionLevel {
optional group a {
required group b {
optional string c;
}
}
}
因为b是required field,所以第3行c的definition level为1而不是2(因为b是required field,所有不需计算在内);第4行c的definition level为2而不是3(理由同上).
Repetition levels
Repetition level指明该值在路径中哪个repeated field重复。
Repetition level是针对repeted field的。注意在图2中的Code字段。可以看到它在r1出现了3次。‘en-us’、‘en’在第一个Name中,而‘en-gb’在第三个Name中。结合了图2你肯定能理解我上一句话并知道‘en-us’、‘en’、‘en-gb’出现在r1中的具体位置,但是不看图的话呢?怎么用文字,或者说是一种定义、一种属性、一个数值,诠释清楚它们出现的位置?这就是重复深度这个概念的作用,它能用一个数字告诉我们在路径中的什么重复字段,此值重复了,以此来确定此值的位置(注意,这里的重复,特指在某个repeated类型的字段下“重复”出现的“重复”)。我们用深度0表示一个纪录的开头(虚拟的根节点),深度的计算忽略非重复字段(标签不是repeated的字段都不算在深度里)。所以在Name.Language.Code这个路径中,包含两个重复字段,Name和Language,如果在Name处重复,重复深度为1(虚拟的根节点是0,下一级就是1),在Language处重复就是2,不可能在Code处重复,它是required类型,表示有且仅有一个;同样的,在路径Links.Forward中,Links是optional的,不参与深度计算(不可能重复),Forward是repeated的,因此只有在Forward处重复时重复深度为1。现在我们从上至下扫描纪录r1。当我们遇到’en-us’,我们没看到任何重复字段,也就是说,重复深度是0。当我们遇到‘en’,字段Language重复了(在‘en-us’的路径里已经出现过一个Language),所以重复深度是2.最终,当我们遇到’en-gb‘,Name重复了(Name在前面‘en-us’和‘en’的路径里已经出现过一次,而此Name后Language只出现过一次,没有重复),所以重复深度是1。因此,r1中Code的值的重复深度是0、2、1.
要注意第二个Name在r1中没有包含任何Code值。为了确定‘en-gb’出现在第三个Name而不是第二个,我们添加一个NULL值在‘en’和‘en-gb’之间(如图3所示)。
Striping and assembly
下面用AddressBook的例子来说明Striping和assembly的过程。
对于每个column的最大的Repetion Level和 Definition Level下图所示。
下面这样两条record:
AddressBook {
owner: "Julien Le Dem",
ownerPhoneNumbers: "555 123 4567",
ownerPhoneNumbers: "555 666 1337",
contacts: {
name: "Dmitriy Ryaboy",
phoneNumber: "555 987 6543",
},
contacts: {
name: "Chris Aniszczyk"
}
}
AddressBook {
owner: "A. Nonymous"
}
以contacts.phoneNumber这一列为例,"555 987 6543"这个contacts.phoneNumber的Definition Level是最大Definition Level=2。而如果一个contact没有phoneNumber,那么它的Definition Level就是1。如果连contact都没有,那么它的Definition Level就是0。
下面我们拿掉其他三个column只看contacts.phoneNumber这个column,把上面的两条record简化成下面的样子:
AddressBook {
contacts: {
phoneNumber: "555 987 6543"
}
contacts: {
}
}
AddressBook {
}
这两条记录的序列化过程如下图所示:
如果我们要把这个column写到磁盘上,磁盘上会写入这样的数据:
注意:NULL实际上不会被存储,如果一个column value的Definition Level小于该column最大Definition Level的话,那么就表示这是一个空值。
下面是从磁盘上读取数据并反序列化成AddressBook对象的过程:
-
读取第一个三元组R=0, D=2, Value=”555 987 6543”
R=0 表示是一个新的record,要根据schema创建一个新的nested record直到Definition Level=2。
D=2 说明Definition Level=Max Definition Level,那么这个Value就是contacts.phoneNumber这一列的值,赋值操作contacts.phoneNumber=”555 987 6543”。
-
读取第二个三元组 R=1, D=1
R=1 表示不是一个新的record,是上一个record中一个新的contacts。
D=1 表示contacts定义了,但是contacts的下一个级别也就是phoneNumber没有被定义,所以创建一个空的contacts。
-
读取第三个三元组 R=0, D=0
R=0 表示一个新的record,根据schema创建一个新的nested record直到Definition Level=0,也就是创建一个AddressBook根节点。
可以看出在Parquet列式存储中,对于一个schema的所有叶子节点会被当成column存储,而且叶子节点一定是primitive类型的数据。对于这样一个primitive类型的数据会衍生出三个sub columns (R, D, Value),也就是从逻辑上看除了数据本身以外会存储大量的Definition Level和Repetition Level。那么这些Definition Level和Repetition Level是否会带来额外的存储开销呢?实际上这部分额外的存储开销是可以忽略的。因为对于一个schema来说level都是有上限的,而且非repeated类型的field不需要Repetition Level,required类型的field不需要Definition Level,也可以缩短这个上限。例如对于Twitter的7层嵌套的schema来说,只需要3个bits就可以表示这两个Level了。
对于存储关系型的record,record中的元素都是非空的(NOT NULL in SQL)。Repetion Level和Definition Level都是0,所以这两个sub column就完全不需要存储了。所以在存储非嵌套类型的时候,Parquet格式也是一样高效的。
文件格式
行组(Row Group):按照行将数据物理上划分为多个单元,每一个行组包含一定的行数。一个行组包含这个行组对应的区间内的所有列的列块。
官方建议:
更大的行组意味着更大的列块,使得能够做更大的序列IO。我们建议设置更大的行组(512MB-1GB)。因为一次可能需要读取整个行组,所以我们想让一个行组刚好在一个HDFS块中。因此,HDFS块的大小也需要被设得更大。一个最优的读设置是:1GB的行组,1GB的HDFS块,1个HDFS块放一个HDFS文件。
列块(Column Chunk):在一个行组中每一列保存在一个列块中,行组中的所有列连续的存储在这个行组文件中。不同的列块可能使用不同的算法进行压缩。一个列块由多个页组成。
页(Page):每一个列块划分为多个页,页是压缩和编码的单元,对数据模型来说页是透明的。在同一个列块的不同页可能使用不同的编码方式。官方建议一个页为8KB。
上图展示了一个Parquet文件的结构,一个文件中可以存储多个行组,文件的首位都是该文件的Magic Code,用于校验它是否是一个Parquet文件,Footer length存储了文件元数据的大小,通过该值和文件长度可以计算出元数据的偏移量,文件的元数据中包括每一个行组的元数据信息和当前文件的Schema信息。除了文件中每一个行组的元数据,每一页的开始都会存储该页的元数据,在Parquet中,有三种类型的页:数据页、字典页和索引页。数据页用于存储当前行组中该列的值,字典页存储该列值的编码字典,每一个列块中最多包含一个字典页,索引页用来存储当前行组下该列的索引,目前Parquet中还不支持索引页,但是在后面的版本中增加。
之前的理解
列式存储,简单来说就是:
- 把一个嵌套的结构,映射为若干列
- 把一条嵌套的数据,写入这些列
- 根据这些列,把原来的嵌套结构拼出来
Parquet是怎么做的呢?
为嵌套结构的schema中的每个基本类型的field,建立一个列,若用一棵树描述schema,基本类型的field就是树的叶子节点.
上面的address book结构用树表示:
从上图可以看出,其实最终的值都是在基本类型的field中的, group类型的field本身不含有值,上图中蓝色叶子节点,每个对应一个列:
为了支持嵌套结构,我们需要知道一个field,到哪一层,变成null了(field没有定义),这个是definitionlevel的功能,试想, 如果一个field有定义,则它的parent也肯定有定义,如果一个field是没有定义的,那有可能它的上级和上上级都没定义,也可能上级没有定义,但上上级有定义,所以需要知道到底是从哪一级开始没定义的,这是还原整条记录所必须知道的。
对于扁平结构,也即没有任何嵌套,optional field可以用一个bit来表示是否有定义, 有: 1, 无: 0
对于嵌套结构,可以给每一级的optional field都加一个bit来记录是否有定义,但其实没有必要,因为嵌套的特性,上层没定义,那下层当然也是没定义的,所以只需要知道从哪一级开始没定义就可以。required field因为总是有定义的,所以不需要definition level。
message ExampleDefinitionLevel {
optional group a {
optional group b {
optional string c;
}
}
}
转换成列式,它只有一列a.b.c, 所有field都是optional,都可能是null,如果c有定义,那么a b作为作为它的上层,也是有定义的,当c是null时候,可能是因为它的某一级parent为null才导致c是null的,这时为了记录嵌套结构的状况,就需要保存最先出现null的那一层的深度了。 一共三个嵌套的optional field,所以最大的definition level是3。
这里的definition level不会大于3, 等于3表示c有定义,等于0,1,2的时候,表示null出现的层级。
required 总是有定义的,所以不需要definition level,看看下面情况:
message ExampleDefinitionLevel {
optional group a {
required group b {
optional string c;
}
}
}
现在最大的 definition level 是 2,因为 b 不需要 definition level.下面是各种情形下,a.b.c 的 definition level:
Repetition level
对于一个带repeated field的结构,转成列式表示后,一列可能有多个值,这些值得一部分是一坨里的,另一部分可能是另一坨里的,但一条记录的全部列都放在一列里,就区分不清楚了,所以需要一个值来标注怎么分成不同的坨,这个值就是repetition level,对于列中的一个值,它告诉我,是在哪个层级上,发生重复的
这个结构转成列式的,实际也只有一列:level1.level2,这一列的各个值,对应的 repeatiton level 如下:
为了表述方便,在一个嵌套结构里,一个repeated field连续出现的一组值为一个List,比如a,b,c 是一个level2 List, d,e,f,g 是一个level2 List,h 是一个level2List,i,j 是一个level2 List。a,b,c,d,e,f,g 所在的两个 level2 list是同一个 level1 List 里的,h,i,j 所在的两个 level2 List 是同一个 level1List里的。那么repetition level标识着新List出现的层级:
- 0 表示整条记录的开始,此时应该创建新的 level1 List 和 level2 List
- 1 表示 level1 List 的开始,此时应该创建一个 level2 List
- 2 表示 level2 List中新的值产生,此时不新建 List,只在 List 里插入新值.
repetition=0 标志着一整条新record的开始,在扁平化结构里,没有repetition,所以repetition level总是0, optional和required永远也不会重复,所以其对应的repetition level都是0。
AddressBook {
contacts: {
phoneNumber: "555 987 6543"
}
contacts: {
}
}
AddressBook {
}
转成列式之后,列中存储的东西应该是这样的(R = Repetiton Level, D =Definition Level):
为了将这条嵌套结构的record 转换成列式,我们把这个 record 整个遍历一次,
contacts.phoneNumber: “555 987 6543”
new record: R = 0
value is defined: D = maximum (2)
contacts.phoneNumber: null
repeated contacts: R = 1
only defined up to contacts: D = 1
contacts: null
new record: R = 0
only defined up to AddressBook: D = 0
最后列中存储的东西是:
注意,NULL 值在这里列出来,是为了表述清晰,但是实际上是不会存储的. 列中小于最大 definition 值的(这个例子里最大值是2),都应该是 NULL.
现在还原这条嵌套结构的记录:
R=0, D=2, Value = “555 987 6543”:
R = 0 这是一个新的 record. 从根开始按照schema 重建结构,直到 repetition level 达到 2
D = 2 是最大值,值是有定义的,所以此时将值插入.
R=1, D=1:
R = 1 level1 的 contact list 中一条新记录
D = 1 contacts 有定义,但 phoneNumber 没定义,所建一个空的 contacts 即可.
R=0, D=0:
R = 0 一条新 record. 可以重建嵌套结构,直到达到 definition level 的值.
D = 0 => contacts 是 null,所以最后拼装出来的是一个空的 Address Book
再举个完整的例子
在存储方面,问题很容易归结为:每一个基本类型的列,都要创建三个子列(R,D, Value).然而,得益于我们所采用的这种列式的格式,三个子列的总开销其实并不大.因为两种 Levels的最大值,是由 schema 的深度决定的,并且通常只用几个 bit就够用了.对于上面的 AddressBook 实例,owner这一列,深度为1,contacts.name深度为2,而这个表达能力已经很强了. R level 和 D level 的下限总是0,上限总是列的深度. 如果一个 field 不是 repeated的,就更好了,可以不需要 repetition level,而 required field 则不需要definition level,这降低了两种 level 的上限。 考虑特殊情况,所有field 全是 required(相当于SQL 中的NOT NULL),repetition level 和definition level就完全不需要了(总是0,所以不需要存储),直接存值就ok了.如果我们要同时支持存储扁平结构,那么两种level也是一样不需要存储空间的.
Parquet文件格式
Parquet文件是二进制方式存储的,文件中包含数据和元数据,可以直接进行解析。 先了解下parquet文件的几个概念:
- 行组(Row Group):每一个行组包含一定的行数,一般对应一个HDFS文件块,Parquet读写的时候会将整个行组缓存在内存中。
- 列块(Column Chunk):在一个行组中每一列保存在一个列块中,一个列块中的值都是相同类型的,不同的列块可能使用不同的算法进行压缩。
- 页(Page):每一个列块划分为多个页,一个页是最小的编码的单位,在同一个列块的不同页可能使用不同的编码方式。
Parquet文件组成
- 文件开始和结束的4个字节都是Magic Code,用于校验它是否是一个Parquet文件
- 结束MagicCode前的Footer length是文件元数据的大小,通过该值和文件长度可以计算出元数据Footer的偏移量
- 再往前推是Footer文件的元数据,里面包含:
文件级别的信息:版本,Schema,Extra key/value对等
每个行组的元信息,每个行组是由多个列块组成的:
每个列块的元信息:类型,路径,编码方式,第1个数据页的位置,第1个索引页的位置,压缩的、未压缩的尺寸,额外的KV
- 文件中大部分内容是各个行组信息:
一个行组由多个列块组成
一个列块由多个页组成,在Parquet中有三种页:
数据页
一个页由页头、repetition levels\definition levles\valus组成
字典页
存储该列值的编码字典,每一个列块中最多包含一个字典页