[TOC]
Generic Load/Save Functions(通用加载/保存函数)
最简单的情况, 默认数据源(parquet
, 除非使用spark.sql.sources.default
修改了配置) 将会应用到所有操作.
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
Manually Specifying Options
你可以手动指定数据源和传给此数据源的额外选项.数据源由其完全限定的名称(例如org.apache.spark.sql.parquet
)指定,但对于内置源,你也可以使用其短名称(json
,parquet
,jdbc
,orc
,libsvm
,csv
,text
).从任何类型数据源加载的数据帧, 都能转换到其他类型数据源.
加载json文件你可以使用:
Dataset<Row> peopleDF =
spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
加载csv文件你可以使用:
Dataset<Row> peopleDFCsv = spark.read().format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv");
额外的选项也可以用在写操作中.例如, 你可以控制ORC数据源的布隆过滤器和字典编码.下面的ORC示例会在favorite_color
上创建布隆过滤器, 并在name
和favorite_color
上使用字典编码.对于Parquet, 这里也有parquet.enable.dictionary
可以用.查看更多关于ORC/Parquet选项, 可以访问 Apache ORC/Parquet 官网.
usersDF.write().format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.save("users_with_options.orc");
Run SQL on files directly(直接在文件上执行SQL)
相比使用API加载一个文件到数据帧然后查询它, 你也可以选择直接使用SQL去查询那个文件.
完整示例在Spark仓库 中"examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"
Save Modes(保存模式)
保存操作可以选择性带一个"保存模式", 它决定如何去处理已经存在的数据.要意识到, 保存模式并不使用任何锁也不是原子性的.另外, 使用"Overwrite"模式, 在写入新数据之前会删除原有数据.
Scala/Java | Any Language | Meaning |
---|---|---|
SaveMode.ErrorIfExists (default) |
"error" or "errorifexists" (default) |
保存数据帧到一个数据源, 如果数据已经存在, 那么会抛出一个异常 |
SaveMode.Append |
"append" |
保存数据帧到一个数据源, 如果有原数据, 新的数据会追加到原数据之后 |
SaveMode.Overwrite |
"overwrite" |
覆盖原数据 |
SaveMode.Ignore |
"ignore" |
写入数据, 如果有原数据, 那么不写入任何内容 , 类似于SQL中的CREATE TABLE IF NOT EXISTS
|
Saving to Persistent Tables(保存到持久表)
DataFrames
也可以使用 saveAsTable
命令将自己作为持久表保存到Hive metastore.注意使用此功能并不需要布署Hive.Spark会为你创建一个默认的本地Hive metastore(使用Derby).不像createOrReplaceTempView
命令, saveAsTable
会保存数据帧的内容, 并创建一个指针向向Hive metastore中的数据.只要保持与metastore的连接, 即使重启spark程序, 持久表也不会丢失.一个持久副县长的数据帧可以通过调用SparkSession.table(table_name)
来创建.
对于基于文件的数据源, 比如text,parquet,json等等, 你可以通过path
选项来自定义表的路径, 比如df.write.option("path","/some/path").saveAsTable("t")
.当表被删除时, 自定义的表路径和数据都还在那里.如果不自定义表路径, spark会将数据放到默认的仓库路径下面, 并且删除表时, 也会删除对应的表路径.
从spark2.1开始, 持久化数据源的表在Hive metastore中拥有每分区的元数据.这带来了若干好处:
- 因为对于一个查询, 元数据可以让结果只返回需要的分区, 所以对于表的第一次查询时搜索所有分区就不需要了.
- Hive DDL(像
ALTER TABLE PARTITION ... SET LOCATION
)可以对Datasource API创建的表生效了.
请注意,在创建外部数据源表(具有path
选项的表)时,默认情况下不会收集分区信息。要分区信息到metastore,可以调用MSCK REPAIR TABLE
。
Bucketing, Sorting and Partitioning(分桶,排序和分区)
对于基于文件的数据源, 对输出进行分桶/排序和分区是可以的.分桶和排序仅对持久表适用.
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
而分区对于save
和saveAsTable
都是有效的
usersDF
.write()
.partitionBy("favorite_color")
.format("parquet")
.save("namesPartByColor.parquet");
可以对单个表同时使用分区和分桶:
peopleDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("people_partitioned_bucketed");
partitionBy
创建了一个目录结构, 在Partition Discovery节有描述.因此, 它对于高基数的列来说, 不是那么有效果.相反, bucketBy
将数据分布在固定数量的桶中, 并且可以在许多唯一值没有边界的时候使用.