DeltaLake支持大部分由Spark DataFrame提供的Opition选项
SparkSession的配置
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("demo")
.master("demo")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
创建表
DeltaLake支持通过路径或者元数据查询表,如下:
spark.table("events") // 通过元数据查询
spark.read.format("delta").load("/delta/events") // 通过路径查询
import io.delta.implicits._
spark.read.delta("/delta/events")
DeltaLake读取分区表,如下:
df.write.format("delta").partitionBy("date").saveAsTable("events") // create table in the metastore
df.write.format("delta").partitionBy("date").save("/delta/events") // create table by path
DeltaLake也支持读取比较旧的数据快照,如下:
spark.read.format("delta").option("versionAsOf","0") // _delta_log下的0,1,2是版本号
spark.read.format("delta").load("/path/to/my/table@v1") # @v1就是第一个版本
DeltaLake目录下为:
image.png