第四章 结构化API概述
结构化API主要指三种核心分布式集合类型API:Dataset、DataFrame、SQL表和视图
DataFrame和Dataset类型
- DataFrame和Dataset是具有行和列的类似于数据表的集合类型
- Spark中的DataFrame和Dataset代表不可变的数据集合
Schema
定义了DataFrame的列名和类型
两者比较
- 非类型化的DataFrame和类型化的Dataset,Spark负责维护DataFrame的类型(Schema),Dataset在编译时就会检查类型是否符合规范
- 在Scala版本Spark中,DataFrame就是Row类型的Dataset的集合
- Row类型是Spark用于支持内存计算而优化的数据格式,避免JVM类型的垃圾回收开销和对象实例化开销
结构化API执行概述
- 编写代码
- Spark生成逻辑计划
- Spark将逻辑计划转换成物理计划
- 在集群上执行物理计划
逻辑计划
- 最开始是尚未被解析的逻辑计划
- 加入了元数据Catalog,计划变得可理解了
- 经过逻辑优化生成优化后的逻辑计划
物理计划
- 获取到很多个物理执行计划
- 代价模型会分析得到最优的物理执行计划,比如链接操作、物理表的大小
- 交给集群执行 运行在底层编程接口RDD上
第五章 基本的结构化操作
主要是针对DataFrame的操作
模式
schema 文件的结构可以通过schema指定,也可查看schema
val mySchema = StructType(Array(
StructFiled(名称,类型,是否为空),
StructFiled(名称,类型,是否为空),
))
val df = spark.read.format("json").schema(mySchema).load("xxx")
列和表达式
列
df.col("count")
col("xxx")
column("xxx")
表达式
- 表达式是对DataFrame中某一个或多个值的一组转换操作
expr("somecol-5")
记录和行
- DataFrame每一行都是一个记录,而记录是Row类型的对象
- Spark使用列表达式操作Row类型对象,Row对象内部是字节数组
DataFrame转换操作
- 创建DataFrame
df.createOrReplaceTempView
- 操作DataFrame的列 select selectExpr
df.select(col(xxx)) df.selectExpr(xxx)
- 添加列 withColumn
df.withColumn("列名", 表达式)
- 重命名列 withColumnRenames("原名","新名")
- 删除列 drop
- 过滤行 where filter
df.filter(col("count") < 2)
- 去重 distinct()
df.select("xxx").distinct().count()
- 连接 union
df.union(otherdf)
- 行排序 sort orderBy
df.sort("xxx")
df.orderBy(expr("count desc"))
- 重新分区 repartition
df.repartition(4)
第六章 处理不同类型的数据
布尔
val priceFilter = col("unitPrice") > 600
df.withColum("isExpensive",priceFilter) // 创建一个列判断价格是否大于600
字符串
- 正则匹配
import org.apache.spark.sql.functions.regexp_replace
val simpleColors = Seq("black", "white", "red", "green", "blue")
val regexString = simpleColors.map(_.toUpperCase).mkString("|")
// the | signifies `OR` in regular expression syntax
df.select(
regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"),
col("Description")).show(2)
空值
df.na.drop()
df.na.fill("填的值", Seq("列名1", "列名2"))
用户自定义函数
- 一个例子
val udfExampleDF = spark.range(5).toDF("num")
def power3(number:Double):Double = number * number * number
power3(2.0)
- UDF如何运作
- 当使用Scala Java的UDF时,JVM中运行,可能导致性能下降(GC)
- 使用Python写的UDF,Spark在worker上启动一个Python进程,将所有数据序列化为Python可解释的格式,在Python进程中对该数据进行执行函数,最终将结果返回JVM和Spark。这会有两个问题:计算安规,进入Python进程后,Spark无法管理workjer内存
第七章 聚合操作
每个分组操作都会返回RelationGroupedDataset,基于它来进行聚合操作
聚合操作
- count
df.select(count("xxx"))
- countDistinct
df.select(countDistinct("xxx"))
- approx_count_distinct
df.select(approx_count_distinct("xxx",0.1))
- first last
df.select(first("xxx"))
- min max sum sumDistinct avg都是一样的操作
分组
- groupBy
df.groupBy("xxx")
- 使用表达式分组
df.groupBy("xxxx").agg(count("xxx"))
- 使用map分组
df.groupBy("xxx").agg("colname"->"avg","colname2"->"聚合操作")
window函数
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
// 每个用户id是一个分区,内部的quantity降序排序
val windowSpec = Window
.partitionBy("CustomerId", "date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
自定义聚合函数UDAF
类似于hive的udaf
- inputSchema指定输入参数
- bufferSchema指定UDAF中间结果
- dataType用于指定返回结果
- deterministic 指定UDAF对于某个输入是否返回相同结果
- initialize初始化聚合缓冲区
- update进行缓冲更新
- merge合并两个缓冲区
- evaluate生成聚合最终结果
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// 返回是不是全是ture
class BoolAnd extends UserDefinedAggregateFunction {
def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", BooleanType) :: Nil)
def bufferSchema: StructType = StructType(
StructField("result", BooleanType) :: Nil
)
def dataType: DataType = BooleanType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = true
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
}
def evaluate(buffer: Row): Any = {
buffer(0)
}
}
第八章 连接操作
基本的链接操作
内、外、左外、右外、左半(left semi)、左反(left anti)、笛卡尔
- 定义连接的表达式
val joinExpression = person.col("xxx") = otherdf.col("xxx")
- 选一个连接类型
joinType = "outer|left_outer|right_outer|left_sermi|left_anti|cross"
- 执行join操作
person.join(otherdf, joinExpression, joinType).show()
常见问题
复杂类型连接
可以自定返回Boolean的joinExpression来完成复杂类型连接
import org.apache.spark.sql.functions.expr
person.withColumnRenamed("id", "personId")
.join(sparkStatus, expr("array_contains(spark_status, id)")).show()
重复列名
- 用不同的连接表达式
- 连接后删除某一列
- 连接前就重命名
Spark如何进行连接
spark以两种不同的方式处理集群通信问题,要么执行all-to-all通信的shuffle join,要么执行广播join
shuffle join
每个节点都与其他所有节点进行通信,对网络传输有一定要求
broadcast join
当表的大小足够下能够放入当个节点内存还有空余的时候,可以用。
在开始之前会有一次大规模通信,分发这个表,通信结束之后节点间不再有通信。
需要注意这个会先给driver,driver的空间也有够,还要注意第一次大规模通信的timeout
第九章 数据源
数据源API结构
Read API
spark.read.format("格式")
.option("配置key", "配置value")
.schema(xxx)
.load()
Write API
spark.writer.format("格式").
.option("配置key", "配置value")
//.partuitionBy()
//.sortBy()
//.bucketBy()
.save()
Parquet & ORC
都是压缩格式
但书里提到Parquet针对Spark进行优化 ORC针对hive进行优化
写入SQL数据库
val driver = "org.sqlite.JDBC"
val path = "/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:/${path}"
val tablename = "flight_info"
val dbDataFrame = spark.read.format("jdbc").option("url", url)
.option("dbtable", tablename).option("driver", driver).load()
高级IO概念
- 可分割的文件类型和压缩
由于分区读的都是单个文件,这个文件分不开就影响效率 - 并行读
可以并行读取不同的文件 - 并行写
可以往同一个分区不同的文件下写入 - 分桶
具有相同桶ID的数据放在一个物理分区,可以避免在读取数据的时候shuffle
val numberBuckets = 10
val columnToBucketBy = "count"
csvFile.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
- 管理文件大小
由于每个task都是处理单个文件,文件太小起的task就太多
maxRecordPerFile来制定每个文件的最大记录数
第十章 SparkSQL
Spark和Hive的关系
Spark SQL可以与Hive metastores链接。 Hive metastore维护了Hive跨回话数据表的信息
如何运行Spark SQL查询
Spark可编程SQL接口
spark.sql("select 1+1").show()
这会返回一个DataFrame
Catalog
Spark SQL中最高级别的抽象是Catalog,用于存储用户数据中的元数据以及数据库、数据表、函数、视图等有用的东西
托管表和非托管表
- 非托管表:定义磁盘上若干文件作为数据表
- 托管表:DataFrame上使用saceAsTable创建一个数据表,这个函数会把表写入一个新的位置
第十一章 Dataset
- Dataset是结构化API的基本类型
- Dataset具有严格的Java虚拟机语言特性,仅与Scala和Java一起使用
- 可以在Scala指定case类,Java创建JavaBean然后通过Spark以分布式的方式操作此非Row类型的对象
何时使用Dataset
- 无法使用DataFrame操作表示
- 需要类型安全,并愿意牺牲一定性能
创建Dataset
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String, count: BigInt)
val flightsDF = spark.read.parquet("F:/spark-3.0.1-bin-hadoop2.7/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
- 动作、转化操作都和DataFrame上类似,可以自定义一些函数进行转化操作
- 连接提供了joinWith方法,类似co-group(可以对多个RDD同key的进行连结)
- 分组和聚合最终返回的是DataFrame,会丢失类型信息。如果groupByKey可以返回Dataset,这个方法接收的是一个函数而非特定列名