结构化API

概述

结构化API可以用来处理各种数据类型,包括非结构化的日志文件、半结构化的csv文件以及结构化的parquet文件。Spark中的结构化API主要是指以下三种核心分布式集合类型的API:

  • DataSet
  • DataFrame
  • SQL表和视图
    这些结构化API基本都适用于流处理和批处理,这意味着只需要修改较少的代码即可实现二者的转化。
  1. DataFrame和Dataset
    DF和DS是具有行和列的类似于数据表的集合。所有列的行数相同,缺省值用null补充,并且某一列的类型在所有行中必须保持一致。

在实际应用中通常使用SQL来操作DF,而不是执行DF专用代码。

  1. Spark内部有一个名为Catalyst的引擎,所有其他语言API都会在内部转化为Catalyst执行。

  2. DF与DS比较

类型不同:DF是非类型化(类型由Spark维护,为Row);DS是类型化的,类型需要我们自行维护,且只适用于使用JVM的语言(Scala、Java),通过case或java beans指定类型。

  1. DF的行和列

列:是一个简单类型(如:int或String等)或复杂类型(map或array等)或null。spark记录所有这些类型并提供多种转换方法。
行:一行对应一条记录。通过对DF调用collect方法可以展示,DF中每条记录都是Row类型。

  1. Spark类型与其他语言类型对应关系(见官网)

  2. 结构化API执行

  • 编写DF/DS/SQL代码;
  • 如果代码能够有效执行,Spark将其转换为一个逻辑执行计划(logical plan)
  • Spark将这个逻辑执行计划转换为一个物理执行计划(Physical Paln),检查可行的优化策略,并在此过程中检查优化
  • Spark在集群上执行该物理计划(RDD操作)

基本的结构化操作

介绍DF的基本操作。
DF由多条record构成,record是Row类型(类比表中的一条数据)。每条record由说个列组成,一系列的操作都是对列进行的。

数据模式(Schema)定义了DF列名及数据类型;DF的分区定义了DF在集群上的物理分布;划分模式定义了partition的分配方式,可以自定义也可以选择随机分配。

DF创建示例:

val df = spark.read.format("json").load("/usr/root/input/2015-summary.json") //创建DF

df.printSchema() //查看DF的schema信息,即名字与类型

1 数据模式(Schema)

schema定义了DF列的名字及数据类型,它可以由数据源来定义(称为读时模式,schema-on-read),也可以显式指定。
根据实际应用场景决定定义schema的方式。

查看DF的schema:

df.schema

//运行结果:
res1: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true),
StructField(ORIGIN_COUNTRY_NAME,StringType,true), 
StructField(count,LongType,true))

由上述运行结果可知,一个schema是由许多字段构成的StructType。这些字段就是StructField,每个StructField包含了名称、类型、和一个布尔值(指定该列是否可以包含缺失值或空值),用户还可以在此指定与该列关联的元数据(metadata)。

自定义schema示例:

//自定义schema
val myManualSchema = StructType(
    Array(StructField("DEST_COUNTRY_NAME",StringType,true),
        StructField("ORIGIN_COUNTRY_NAME",StringType,true),
        StructField("count",LongType,false,
                Metadata.fromJson("{\"hello\":\"world\"}"))
 ))

//创建DF时指定自定义的schema
val df = spark.read.format("json").scheme(myManualSchema).load("/usr/root/input/2015-summary.json")

2、列和表达式

对DF中的列进行选择、transform操作和删除等操作的表示称为表达式。在Spark中,列是逻辑结构,要操作一个列必须有一个行,有行则必须有一个DF,因此对列的操作都在DF上进行。

构造和引用列的方法col函数和column函数。使用时要传入列名:

col("someColumnName")  
column("someColumnName")
  • 显式列引用
    直接在DF上使用col方法:
df.col("count")
表达式(expression)

表达式是对一个DF中的某一个或多个值的一组转换操作。(可以理解为一个函数,将一个或多个列名作为输入,解析它们,然后对每条记录应用该表达式得到一个单值。这个单值可以是一个复杂类型,如array或map等)

  • expr:创建表达式函数
expr("someCol") //等同于col("someCol")

//几个相同的转换操作表达式
expr("someCol - 5")
col("someCol") -5
expr("someCol") – 5

//相同的转换操作,代码不同但转换为底层逻辑树都是一样的
(((col("someCol") + 5) * 200) - 6) < col("otherCol")
expr("(((someCol + 5) * 200) - 6) < otherCol")
  • printSchema:查询DF的所有列
  • columns:访问DF的所有列
df.columns

3、记录和行

DF中的每一行就是一个记录,都是Row类型的对象。表达式操作的就是Row对象,Row对象内部是字节数组,Spark无法直接访问这些数组,只能使用表达式去操纵。

  • first:查看DF中的一行数据
df.first()
  • 创建行和使用行
import org.apache.spark.sql.Row
val myRow = Row("Hello",null,1,false) //创建行
//指定想要的位置,使用java或scala时,必须用辅助方法或者显示指定值的类型
myRow(0) // 任意类型
myRow(0).asInstanceOf[String] // 字符串
myRow.getString(0) // 字符串
myRow.getInt(2) // 整型

4、DataFrame转换操作

几种主要操作:

  1. 添加行或列
  2. 删除行或列
  3. 行列互转
  4. 根据列值排列行的顺序
  • read:从原始数据源中创建DF
  • createOrReplaceTempView:注册临时表
val df = spark.read.format("json").load("/usr/root/input/2015-summary.json")
df.createOrReplaceTempView("dfTable") //创建临时表
  • createDataFrame:用RDD创建DF
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
val myManualSchema = new StructType(Array(
 new StructField("some", StringType, true),
 new StructField("col", StringType, true),
 new StructField("names", LongType, false))) //创建自定义schema
val myRows = Seq(Row("Hello", null, 1L)) //构造Row对象合集
val myRDD = spark.sparkContext.parallelize(myRows) //构造RDD
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show() //创建DF并打印出来

还可以使用toDF函数实现Sep到DF的隐式转换,该方法对null的支持不稳定,需酌情使用。

val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")
  • select函数和selectExpr函数
    处理DF三个常用的工具:处理列和表达式的select方法;处理字符串表达式的selectExpr方法;org.apache.spark.sql.functions包内的其他方法。

1)select函数实现对DF的SQL查询

df.select("DEST_COUNTRY_NAME").show(2)

//等同于SQL
SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2

//使用多种方式引用列
import org.apache.spark.sql.functions.{expr, col, column}
df.select(
 df.col("DEST_COUNTRY_NAME"),
 col("DEST_COUNTRY_NAME"),
 column("DEST_COUNTRY_NAME"),
 'DEST_COUNTRY_NAME,
 $"DEST_COUNTRY_NAME",
 expr("DEST_COUNTRY_NAME"))
 .show(2)
 

expr是目前最灵活的方式,可以引用列也可以引用SQL字符串。

df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

//相当于sql
SELECT DEST_COUNTRY_NAME as destination FROM dfTable LIMIT 2

//使用alias方法再将列名改回去
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))
.show(2)

2)selectExpr表达式
等同于select表达式加expr的写法。可以使用selectExpr构建复杂表达式来创建DF。

//判断两个列的值是否相等,用比较结果构建新的列
df.selectExpr(
 "*", //所有列
  "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")
  .show(2)

//等同于SQL

SELECT *, (DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry FROM dfTable LIMIT 2

还可以使用系统预定义的聚合函数操作DF。

df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show(2)

//相当于SQL
SELECT avg(count), count(distinct(DEST_COUNTRY_NAME)) FROM dfTable LIMIT 2
  • lit:传递字面量(literal)
import org.apache.spark.sql.functions.lit
df.select(expr("*"), lit(1).as("One")).show(2)

//相当于SQL
SELECT *, 1 as One FROM dfTable LIMIT 2
  • withColumn:添加列,两个参数分别为列名和赋值过程
//添加一个值为1的列
df.withColumn("numberOne",lit(1)).show(2)
//相当于SQL
SELECT *,1 AS numberOne FROM dfTable LIMIT 2
//使用测试数据,判断出发地与目的地在相同国家的数据
df.withColumn("withinCountry",expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME")).show(2)
//还可以用来重命名列
df.withColumn("Destination",expr("DEST_COUNTRY_NAME"))
  • withColumnRenamed重命名列。两个参数,第一个是待修改的列名,第二个为修改后的列名。
df.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns
  • 保留字与关键字:使用反引号(`)实现
import org.apache.spark.sql.functions.expr
val dfWithColName = ddf.withColumn("This Long Column-name",expr("DEST_COUNTRY_NAME")) //修改列名
dfWithColName。selectExpr("`This Long Column-name`","`This Long Column-name` as `newcol`") //引用列名时需要进行转义

//相当于SQL
SELECT `This Long Column-name`,`This Long Column-name` as `newcol` FROM dfTableLong LIMIT 2
  • 区分大小写: set spark.sql.caseSensitive true

  • drop:删除列
    当然可以通过select实现(只选出部分列后生成新的RDD)。drop后可以跟多个参数列。

df.drop("ORIGIN_COUNTRY_NAME").columns
  • withColumn+cast:强制转换列的数据类型
df.withColumn("count2",col("count").cast("long"))
//相当于SQL
SELECT *, cast(count as long) AS count2 FROM dfTable
  • where或filter:过滤行
    构建表达式来判断该表达式是true还是false,过滤掉false的行。相当于SQL中的where子句。
df.filter(col("count") < 2).show(2)
//相当于SQL
SELECT * FROM dfTable WHERE count < 2 LIMIT 2
//多过滤条件关联
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia").show(2)
//相当于SQL
SELECT * FROM dfTable WHERE count < 2 AND RIGIN_COUNTRY_NAME != "Croatia"
LIMIT 2

注意在使用多条件过滤时,Spark会同时执行所有操作,不管过滤条件的先后顺序,所以使用的时候并不总是能有效实现我们的需求。使用时只需将所有条件串在一起即可,不需考虑先后顺序。

  • distinct:去重
df.select("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").distinct().count()
//相当于SQL
SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable

  • sample:随机抽样,其中参数withReplacement的值决定抽样是否放回,true为有放回抽样
val seed = 5
val withReplacement = false
var fraction = 0.5

df.sample(withReplacement,fraction,seed).count
  • randomSplit:当需要将原始DataFrame随机分割成多个分片时,可以使用随机分割。
    由于随机分割是一种随机方法,所以我们还需要指定一个随机种子(只需在代码中用特定数字来替换seed)。需要注意的是,如果一个DataFrame的分
    割比例的和不为1,则比例参数会被自动归一化。
val dataFrames = df.randomSplit(Array(0.25,0.75),seed)
dataFrames(0).count() > dataFrames(1).count()
  • union:联合操作
    由于DF是不可变的,这意味着我们不能向DF中追加行。可以通过union的方式将两个DF联合起来实现。这种操作必须保证两个DF具有相同的列数和schema。
import org.apache.spark.sql.Row
//创建新的DF
val schema = df.schema
val newRows = Sep(Row("New Country","Other Country",5L),Row("New Country 2","Other Country 2",1L))

val parallelizedRows = spark.sparkContext.parallelize(newRows)
val newDF = spark.createDataFrame(parallelizedRows,schema)
//联合两个DF
df.union(newDF).where("count = 1").where($"ORIGIN_COUNTRY_NAME" =!= "United States").show()

当DataFrame追加了记录后,需要对产生的新DataFrame进行引用。一个常见的方式是将这个新DataFrame变成视图(View)或者注册成一个数据表,以便在代码中使用。

  • sort和orderBy:行排序(二者等价)
df.sort("count").show(5)
//指定升降序
import org.apache.spark.sql.functions.{desc, asc}
df.orderBy(expr("count desc")).show(2)
df.orderBy(desc("count"),asc("DEST_COUNTRY_NAME")).show(2)

//相当于SQL
SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2

你可以指定空值在排序列表中的位置,asc_nulls_first、esc_nulls_first、asc_nulls_last、desc_nulls_last

  • sortWithinPartitions:对每个分区进行内部排序
spark.read.format("json").load("/data/flight-data/json/*-summary.json")
 .sortWithinPartitions("count")
  • limit:只取几条
df.limit(5).show()
df.orderBy(expr("count desc")).limit(5).show()
//相当于SQL
SELECT * FROM dfTable ORDER BY count desc LIMIT 5
  • repartitions和coalesce:重划分和合并
    另一个重要的优化是根据一些经常过滤的列对数据进行分区,控制跨群集数据的物理布局,包括分区方案和分区数。
    不管是否有必要,重新分区都会导致数据的全面洗牌。如果将来的分区数大于当前的分区数,或者当你想要基于某一组特定列来进行分区时,通常只能重新分区。
//获取分区数
df.rdd.getNumPartitions
//设置重分区数
df.repartitions(5)
//如果你知道你经常按某一列执行过滤操作,则根据该列进行重新分区
df.repartitions(col("DEST_COUNTRY_NAME"))
//合并操作(coalesce)不会导致数据的全面洗牌,但会尝试合并分区
df.repartitions(5,col("DEST_COUNTRY_NAME")).coalesce(2) //先分为5个分区再合并它们
  • toLocalIterator:驱动器获取行
    为了遍历整个数据集,还有一种让驱动器获取行的方法,即toLocalIterator函数。toLocalIterator函数式一个迭代器,将每个分区的数据返回给驱动器。这个函数允许你以串行的方式一个一个分区地迭代整个数据集。
val collectDF = df.limit(10)
collectDF.take(5) // 获取整数行
collectDF.show() // 更友好的打印
collectDF.show(5, false)
collectDF.collect()
collectDF.toLocalIterator()

【注意】:将数据集合传递给驱动器的代价很高。当数据集很大时调用collect函数,可能会导致驱动器崩溃。如果使用toLocalIterator,并且分区很大,则容易使驱动器节点崩溃并丢失应用程序的状态,代价也是巨大的。因此我们可以一个一个分区进行操作,而不是并行运行。

5、处理不同类型的数据

  • 处理布尔型数据
    布尔型数据通常用于条件过滤,由and、or、true、false四要素构成。
import org.apache.spark.sql.functions.col
df.where(col("InvoiceNo").equalTo(536365)) //等同于col("InvoiceNo")===536365
 .select("InvoiceNo","Description")
 .show(5,false)

【关于spark中的相等判断】:使用Scala进行相等判断时,应该使用===(等于)或者=!=(不等于)符号,另外还可以使用not或equalTo函数实现。python中则使用传统的==和!=。

还可以直接使用字符串表达式进行比较,这种方式比较简洁:

df.where("InvoiceNo = 536365")
 .show(5,false)
 
df.where("InvoiceNo <> 536365")
 .show(5,false)

多个表达式的连接
虽然可以通过and或or将布尔表达式连接在一起。但在spark中,最好以链式连接的方式进行组合,形成顺序执行的过滤器。spark会将所有的过滤器合并为一条语句同时执行,隐式创建and。or语句需要在同一个语句中指定。

val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter)).show() 
//串行连接时直接使用“.”,or在同一语句中指定

//相当于SQL
SELECT * FROM dfTable WHERE StockCode in ("DOT") AND(UnitPrice > 600 OR
instr(Description, "POSTAGE") >= 1)

其他过滤器的实现
实现过滤并不一定要使用布尔表达式。例如:

val DOTCodeFilter = col("StockCode") === "DOT"
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.withColumn("isExpensive",DOTCodeFilter.and(priceFilter.or(descripFilter)))
 .where("isExpensive")
 .select("unitPrice","isExpensive").show(5) //构建isExpensive列,保留值为true的行
 
 //相当于SQL
 SELECT UnitPrice, (StockCode = 'DOT' AND
 (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
FROM dfTable
WHERE (StockCode = 'DOT' AND
 (UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))

使用SQL表达式和使用DF接口的效果是等价的,不会有性能差异。例如下列两种方式实现的效率是相同的。

import org.apache.spark.sql.functions.{expr, not, col}
df.withColumn("isExpensive", not(col("UnitPrice").leq(250)))
 .filter("isExpensive")
 .select("Description", "UnitPrice").show(5)
df.withColumn("isExpensive", expr("NOT UnitPrice <= 250"))
 .filter("isExpensive")
 .select("Description", "UnitPrice").show(5)

【注意】在执行布尔表达式时,如果数据出现空值就会有问题,需要使用不同方式处理。例如:

df.where(col("Description").eqNullSafe("hello")).show()
  • 处理数值类型
    1)DF操作进行数值计算
//求(Quantity*UnitPrice)^2+5
import org.apache.spark.sql.functions.{expr,pow}
val fabricatedQuantity = pow(col("Quantity")*col("UnitPrice"),2)+5 //自定义计算函数
df.select(expr("CustomerId"),fabricatedQuantity.alias("realQuantity")).show(2)

2)使用SQL表达式进行计算

df.selectExpr(
 "CustomerId",
 "(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)

取整操作:round(向上取整)、bround(向下取整)。可以指定精度。

import org.apache.spark.sql.functions.{round, bround,lit}
df.select(round(col("UnitPrice"), 1).alias("rounded"), col("UnitPrice")).show(5)
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)

求列的Pearson相关系数corr

import org.apache.spark.sql.functions.{corr}
df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice")).show()

计算列的统计信息(计数、均值、标准差、最值):describe

df.describe().show() 

也可以通过导入函数实现上述聚合操作需求并得到更精确的值

import org.apache.spark.sql.functions.{count, mean, stddev_pop, min, max}

statFunction包中封装了许多统计函数。如approxQuantile用来计算数据的精确分位数或近似分位数、crosstab来查看交叉列表或频繁项对、monotonically_increasing_id函数为每行添加一个唯一的ID,它会从0开始,为每行生成一个唯一值。

//approxQuantile用法
val colName = "UnitPrice"
val quantileProbs = Array(0.5)
val relError = 0.05
df.stat.approxQuantile("UnitPrice", quantileProbs, relError) // 2.51
//crosstab用法
df.stat.crosstab("StockCode", "Quantity").show()
df.stat.freqItems(Seq("StockCode", "Quantity")).show()
//monotonically_increasing_id用法
import org.apache.spark.sql.functions.monotonically_increasing_i
df.select(monotonically_increasing_id()).show(2)
  • 处理字符串类型
    1)大小写转换
    initcap:将字符串在每个单词的首字母转大写
    lower:字符串转小写
    upper:字符串转大写
    ltrim:清除字符串左边空字符
    rtrim:清除字符串右边空字符
    trim:清除字符串两边边空字符
    lpad:(待确认)
    rpad:(待确认)
import org.apache.spark.sql.functions.{lit, ltrim, rtrim, rpad, lpad, trim}
df.select(
 ltrim(lit(" HELLO ")).as("ltrim"),
 rtrim(lit(" HELLO ")).as("rtrim"),
 trim(lit(" HELLO ")).as("trim"),
 lpad(lit("HELLO"), 3, " ").as("lp"),
rpad(lit("HELLO"), 10, " ").as("rp")).show(2)

2)正则
regexp_extract和regexp_replace:分别用于提取和替换值

//将列中颜色都替换为COLOR
import org.apache.spark.sql.functions.regexp_replace
val simpleColors = Seq("black", "white", "red", "green", "blue")
val regexString = simpleColors.map(_.toUpperCase).mkString("|")
// "|"在正则表达式中是"或"的意思,构建的正则表达式为'BLACK|WHITE|RED|GREEN|BLUE'
df.select(
 regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"),
 col("Description")).show(2)
//提取颜色数据
import org.apache.spark.sql.functions.regexp_extract
val regexString = simpleColors.map(_.toUpperCase).mkString("(", "|", ")")
// "|"是正则表达式中的"或"的意思,构建的正则表达式'(BLACK|WHITE|RED|GREEN|BLUE)'
df.select(
 regexp_extract(col("Description"), regexString, 1).alias("color_clean"),
 col("Description")).show(2)

translate:直接替换

import org.apache.spark.sql.functions.translate
df.select(translate(col("Description"), "LEET", "1337"), col("Description"))
 .show(2)

contains:检查字符串是否存在

val containsBlack = col("Description").contains("BLACK")
val containsWhite = col("DESCRIPTION").contains("WHITE")
df.withColumn("hasSimpleColor", containsBlack.or(containsWhite))
 .where("hasSimpleColor")
 .select("Description").show(3, false)

当需要匹配查找的值很多时,可以使用以下方式(暂不理解

val simpleColors = Seq("black", "white", "red", "green", "blue")
val selectedColumns = simpleColors.map(color => {
 col("Description").contains(color.toUpperCase).alias(s"is_$color")
}):+expr("*") // could also append this value
df.select(selectedColumns:_*).where(col("is_white").or(col("is_red")))
 .select("Description").show(3, false)
 //在这种情况下,用Python来实现就很容易。将使用locate函数,它返回整数位置 (从1开始),然后将返回值转换为布尔值之后再使用它
  • 处理日期和时间戳
    current_date,current_timestamp:构建日期和时间戳
    date_add,date_sub:日期的增减
    datediff:日期之间相隔的天数
    months_between:日期之间相隔的月数
    to_date:字符串转换为日期
    to_timestamp:字符串转换为时间戳
import org.apache.spark.sql.functions.{date_add, date_sub}
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)

import org.apache.spark.sql.functions.{datediff, months_between, to_date}
dateDF.withColumn("week_ago", date_sub(col("today"), 7))
 .select(datediff(col("week_ago"), col("today"))).show(1)
dateDF.select(
 to_date(lit("2016-01-01")).alias("start"),
 to_date(lit("2017-05-22")).alias("end"))
 .select(months_between(col("start"), col("end"))).show(1)

import org.apache.spark.sql.functions.{to_date, lit}
spark.range(5).withColumn("date", lit("2017-01-01"))
 .select(to_date(col("date"))).show(1)

import org.apache.spark.sql.functions.to_date
val dateFormat = "yyyy-dd-MM"
val cleanDateDF = spark.range(1).select(
 to_date(lit("2017-12-11"), dateFormat).alias("date"),
 to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF.createOrReplaceTempView("dateTable2")

import org.apache.spark.sql.functions.to_timestamp
cleanDateDF.select(to_timestamp(col("date”), dateFormat)).show()

【注意】在字符串转时间格式时,如果不指定格式直接进行隐式转换,当字符串格式不符时程序不会报错,只会处理成null,因此不建议在生产中使用隐式转换。

日期的比较:直接使用lit引用的字符串会字符串文本进行比较

cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()
cleanDateDF.filter(col("date2") > "'2017-12-12'").show()
  • 处理数据中的空值
    在spark中建议使用null表示空值。
    coalesce:返回一组列中第一个非空值
    ifnull:如果第一个值为空, 则允许选择第二个值, 并将其默认为第一个
    nullif:如果两个值相等, 则返回null, 否则返回第二个值
    nvl:如果第一个值为null,则返回第二个值,否则返回第一个
    nvl2:如果第一个不为null,返回第二个值;否则,它将返回最后一个指定值
SELECT
 ifnull(null, 'return_value'),
 nullif('value', 'value'),
 nvl(null, 'return_value'),
 nvl2('not_null', 'return_value', "else_value")
FROM dfTable LIMIT 1
--DataFrame上的select表达式中使用它们同理

drop:用于删除包含null的行,默认删除包含null值的行

df.na.drop()
//指定“any”作为参数,当存在一个值是null时,就删除改行
df.na.drop("any")
//指定“all”作为参数,当所有的值为null或者NaN时才能删除该行
df.na.drop("all")
//指定某几个字段同时为空时删除
df.na.drop("all", Seq("StockCode", "InvoiceNo"))

fill:使用一组值填充一列或多列

//将某字符串类型列中null值替换为指定字符串
df.na.fill("All Null values become this string")
//填充int类型的列中的null,其他类型同理
df.na.fill(5:Integer)
df.na.fill(5:Double)
//填充多列
df.na.fifill(5, Seq("StockCode", "InvoiceNo"))
//使用map填充多列,key为列名,value为填充值
val fifillColValues = Map("StockCode" -> 5, "Description" -> "No Value")
df.na.fifill(fifillColValues)

repalce:替换操作

df.na.replace("Description", Map("" -> "UNKNOWN"))

asc_nulls_first、desc_nulls_first、asc_nulls_last、desc_nulls_last:对null值排序

  • 处理复杂类型
    1)结构体
    可以将结构体STRUCT视为DF中的DF。
//从DF中构建STRUCT
df.selectExpr("(Description, InvoiceNo) as complex", "*")
df.selectExpr("struct(Description, InvoiceNo) as complex", "*")

//操作STRUCT
//创建
import org.apache.spark.sql.functions.struct
val complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")
//访问
complexDF.select("complex.Description")
complexDF.select(col("complex").getField("Description"))
//访问结构体中的所有列
complexDF.select("complex.*")

2) 数组
主要操作是将DF中的列转换为数组类型和使用这些数组。

//构造数组
import org.apache.spark.sql.functions.split
df.select(split(col("Description"), " ")).show(2)
//通过下标访问数组元素
df.select(split(col("Description"), " ").alias("array_col"))
 .selectExpr("array_col[0]").show(2)
//获取数组大小
import org.apache.spark.sql.functions.size
df.select(size(split(col("Description"), " "))).show(2)
//判断数组是否包含某些元素
import org.apache.spark.sql.functions.array_contains
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)
//展开数组构造列
import org.apache.spark.sql.functions.{split, explode}
df.withColumn("splitted", split(col("Description"), " "))
 .withColumn("exploded", explode(col("splitted")))
 .select("Description", "InvoiceNo", "exploded").show(2)

3)map

//构建map类型的列
import org.apache.spark.sql.functions.map
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map")).show(2)
//使用map类型的列
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))
 .selectExpr("complex_map['WHITE METAL LANTERN']").show(2)
//展开map类型构造为列
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))
 .selectExpr("explode(complex_map)").show(2)
  • 处理JSON类型

Spark中对json数据有独特的支持。

//构建json字符串的列
val jsonDF = spark.range(1).selectExpr("""
 '{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")
 //查询json对象,仅有一层嵌套时可以使用json_tuple
import org.apache.spark.sql.functions.{get_json_object, json_tuple}
jsonDF.select(get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as "column",
json_tuple(col("jsonString"), "myJSONKey")).show(2)
//StructType转为json
import org.apache.spark.sql.functions.to_json
df.selectExpr("(InvoiceNo, Description) as myStruct")
 .select(to_json(col("myStruct")))
//解析json数据
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val parseSchema = new StructType(Array(
 new StructField("InvoiceNo",StringType,true),
 new StructField("Description",StringType,true)))
df.selectExpr("(InvoiceNo, Description) as myStruct")
 .select(to_json(col("myStruct")).alias("newJSON"))
 .select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)

  • 用户自定义函数(UDF)
//第1步:设计一个程序实现接受一个数字并返回它的三次幂。
val udfExampleDF = spark.range(5).toDF("num")
def power3(number:Double):Double = number * number * number
power3(2.0)
//第2步:注册UDF,只能在DF操作中使用
import org.apache.spark.sql.functions.udf
val power3udf = udf(power3(_:Double):Double)
//第3步:使用udf
udfExampleDF.select(power3udf(col("num"))).show()
//还可以将函数注册为sparkSQL函数,以便于在sparksql中使用
spark.udf.register("power3", power3(_:Double):Double)
udfExampleDF.selectExpr("power3(num)").show(2)

【注意】所以建议使用 Scala 或 Java编写UDF,不仅编写程序的时间少,还能提高性能。使用Python做UDF的代价较高。

为了确保我们的函数正常工作,还要做的一件事是指定返回类型。最后,还可以使用Hive语法来创建UDF/UDAF。为了实现这一点, 首先必须在创建SparkSession 时启用Hive支持(SparkSession.builder().enableHiveSupport()来启用)。然后, 你可以在SQL中注册UDF。这仅支持预编译的Scala和Java包, 因此你需要将它们指定为依赖项。

CREATE TEMPORARY FUNCTION myFunc AS 'com.organization.hive.udf.FunctionName'

此外, 还能通过删除TEMPORARY将其注册为Hive Metastore中的永久函数。

6、聚合操作

聚合操作是数据分析中的常用基本操作。一般情况下用户会对数据进行分组后的各组内数据的行汇总,汇总运算可能是求和、累乘、计数等。spark中可以将任何类型的值聚合称为array、list、map等。

测试数据准备:

val df = spark.read.format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("/data/retail-data/all/*.csv")
 .coalesce(5) //重新分区减少分区数,防止小文件过多
df.cache() //缓存起来以便于快速访问
df.createOrReplaceTempView("dfTable")
//执行最简单的聚合操作
df.count() //count()是一个action操作,还有个作用是将整个DF缓存到内存
  • 聚合函数

大多数聚合函数可以在org.apache.spark.sql.functions包中找到。Scala和python中导入的函数与SQL中的函数有些不一样。

1)count
在用作聚合函数时,count是一个transformation操作。可以指定列计数,也可以使用*或1整行计数.

import org.apache.spark.sql.functions.count
df.select(count("StockCode")).show()

【注意】执行count(*)时,spark会对null进行计数,指定某列计数时不会对null进行计数。

2)countDistinct
去重计数,只在正对某列计数时使用才有意义。

import org.apache.spark.sql.functions.countDistinct
df.select(countDistinct("StockCode")).show()

3)approx_count_distinct
处理超大型数据时,近似地统计数据量,可以指定参数调节近似度。

import org.apache.spark.sql.functions.approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show()

4)first和last
基于DF中行的顺序获取首尾位置的值。

5)min和max
提取DF中的最值。

6)sum 累加值、sumDistinct 去重累加、avg 求均值、mean 求均值

import org.apache.spark.sql.functions.{first, last}
df.select(first("StockCode"), last("StockCode")).show()

import org.apache.spark.sql.functions.{min, max}
df.select(min("Quantity"), max("Quantity")).show()

import org.apache.spark.sql.functions.sum
df.select(sum("Quantity")).show()

import org.apache.spark.sql.functions.sumDistinct
df.select(sumDistinct("Quantity")).show()

import org.apache.spark.sql.functions.{sum, count, avg, expr}
df.select(
 count("Quantity").alias("total_transactions"),
 sum("Quantity").alias("total_purchases"),
 avg("Quantity").alias("avg_purchases"),
 expr("mean(Quantity)").alias("mean_purchases"))
 .selectExpr(
 "total_purchases/total_transactions",
 "avg_purchases",
 "mean_purchases").show()

7)方差和标准差
spark中支持样本的标准差方差计算,也支持总体的方差标准差计算。
variance函数和stddev函数默认是计算样本方差和样本标准差。

import org.apache.spark.sql.functions.{var_pop, stddev_pop}
import org.apache.spark.sql.functions.{var_samp, stddev_samp}
df.select(var_pop("Quantity"), var_samp("Quantity"),
stddev_pop("Quantity"), stddev_samp("Quantity")).show()

8)偏度系数和峰度系数
偏度系数(skewness)和峰度系数(kurtosis)都是对数据集中的极端数据点的衡量指标。偏度系数衡量数据相对于平均值的不对称程度,而峰度系数衡量数据分布形态陡缓程度。在将数据建模为随机变量的概率分布时,它们都很重要。

import org.apache.spark.sql.functions.{skewness, kurtosis}
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

9)协方差和相关性
比较两个不同列的值之间的相互关系。相关性采用Pearson相关系数来衡量,范围是-1~+1。协方差的范围由数据中的输入决定。协方差分样本协方差和整体协方差,需要在使用时指定。

import org.apache.spark.sql.functions.{corr, covar_pop, covar_samp}
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
 covar_pop("InvoiceNo", "Quantity")).show()

10)聚合输出复杂类型
用户可以在流水线处理的后续操作中再访问该集合,或者将整个集合传递给用户自定义函数(UDF)

import org.apache.spark.sql.functions.{collect_set, collect_list}
df.agg(collect_set("Country"), collect_list("Country")).show()
  • 分组
    分两个阶段进行分组:首先指定要对其进行分组的一列或多列,然后指定一个或多个聚合操作。
df.groupBy("InvoiceNo", "CustomerId").count().show()

1)使用表达式分组
如前所述,计数有点特殊,因为它是作为一种方法存在的。为此,通常我们更喜欢使用count函数。我们不是将该函数作为表达式传递到select语句中,而是在agg中指定它。这使得仅需指定一些聚合操作,即可传入任意表达式。甚至可以在转换某列之后给它取别名,以便在之后的数据流处理中使用。

import org.apache.spark.sql.functions.count
df.groupBy("InvoiceNo").agg(
 count("Quantity").alias("quan"),
 expr("count(Quantity)")).show()

2)使用map进行分组
有时将转换操作指定为一系列Map会更方便,其中键为列,值为要执行的字符串形式的聚合函数。如果以inline方式指定也可以重用多个列名。

df.groupBy("InvoiceNo").agg("Quantity"->"avg", "Quantity"->"stddev_pop").show()
  • window函数
    spark支持三种串口函数:排名函数、解析函数、聚合函数。
//在原DF中添加date列,该列只包含日期信息
import org.apache.spark.sql.functions.{col, to_date}
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"),
 "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
//第一步:创建窗口规范
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val windowSpec = Window
 .partitionBy("CustomerId", "date") //根据指定列进行分组,与之前认识的partition by无关
 .orderBy(col("Quantity").desc) //分组内的排序规则
 .rowsBetween(Window.unboundedPreceding, Window.currentRow) //frame配置即window中的数据范围,本例中指包含当前行和之前所有行的数据范围
//第二步:编写业务需求,求有史以来最大购买量和对应客户
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
//创建购买量排名,使用dense_rank而不是rank,是为了避免在有等值的情况下避免排序结果不连续
import org.apache.spark.sql.functions.{dense_rank, rank}
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)
//第三步:使用select执行并查看计算结果
import org.apache.spark.sql.functions.col
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
 .select(
 col("CustomerId"),
 col("date"),
 col("Quantity"),
 purchaseRank.alias("quantityRank"),
 purchaseDenseRank.alias("quantityDenseRank"),
 maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

3)分组集 GROUPING SET
分组集是用于将多组聚合操作组合在一起的底层工具,使得能够在group-by语句中创建任意的聚合操作。分组集需要过滤null值,否则会得到错误的结果。分组集就是实现了将各种分组统计的结果union到一起。

//去掉可空值,准备测试数据
val dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
--SQL实现分组集操作
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode
ORDER BY CustomerId DESC, stockCode DESC
--GROUPING SET 实现分组集
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC
--多分组聚合
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC

GROUPING SET只能在SQL中使用,在DF操作中,需要使用rollup和cube实现。
rollup
当我们设置分组的key为多个列时,Spark会分析这些列,并根据各列中存在的实际数值,确定列值组合作为分组的key。而rollup分组聚合是一种多维聚合操作,可以执行多种不同group-by风格的计算。

//根据Date和Country创建rollup分组。结果包含:Date分组聚合的数据、以Date和Country分组聚合的数据以及所有数据聚合的数据。
val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))
 .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
 .orderBy("Date")
rolledUpDF.show()
//得到的结果中有列存在null值:每列的null值表示不区分该列的总数(比如Country列为null值表示该日期所有地点的总数),而如果在两列中都是null值则表示所有日期和地点的总数

cube
cube分组聚合是对所有参与的列值进行所有维度的全组合聚合。

//获得所有Date所有Country的聚合数据、每个Date所有Country的聚合数据、每个Date每个Country的聚合数据、所有Date每个Country的聚合数据
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))
 .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()

对元数据进行分组
使用cube和rollup进行分组聚合时,可以增加一列grouping_id表示聚合级别。示例:

[图片上传失败...(image-ebab88-1640316912601)]

import org.apache.spark.sql.functions.{grouping_id, sum, expr}
dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))
.orderBy(expr("grouping_id()").desc)
.show()

透视转换
透视转换可以根据某列中的不同行创建多个列。

//使用透视转换,在使用了透视转换后,现在DataFrame会为每一个Country和数值型列组合产生一个新列,以及之前的date列。
val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
//查看透视转换后的数据
pivoted.where("date > '2011-12-05'").select("date" ,"`USA_sum(Quantity)`").show()
  • 用户自定义聚合函数 UDAF
    若要创建UDAF,必须继承UserDefinedAggregateFunction基类并实现以下方法:
    inputSchema用于指定输入参数,输入参数类型为StructType。
    bufferSchema用于指定UDAF中间结果,中间结果类型为StructType。
    dataType用于指定返回结果,返回结果的类型为DataType。
    deterministic是一个布尔值,它指定此UDAF对于某个输入是否会返回相同的结果。
    initialize初始化聚合缓冲区的初始值。
    update描述应如何根据给定行更新内部缓冲区。
    merge描述应如何合并两个聚合缓冲区。
    evaluate将生成聚合最终结果。
//编写UDAF函数,实现返回(给定列)所有的行是否为 true;如果不是,则返回 false
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class BoolAnd extends UserDefinedAggregateFunction{
    def inputSchema:org.apache.spark.sql.types.StructType = 
    StructType(StructField("value",BooleanType) :: Nil)
    def bufferSchema:StructType = StructType(
        StructField("result",BooleanType) :: Nil
    )
    def dataType: DataType = BooleanType
    def determinstic: Boolean = true
    def initialize(buffer:MutableAggregationBuffer):Unit = {
        buffer(0) = true
    }
    def update(buffer:MutableAggregationBuffer,input:Row):Unit = {
        buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
    }
    def merge(buffer1:MutableAggregationBuffer,buffer2:Row):Unit = {
        buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
    }
    def evaluate(buffer:Row):Any = {
        buffer(0)
    }
}
//实例化该类并注册成函数在表达式中调用
val ba = new BoolAnd
spark.udf.register("booland", ba)
import org.apache.spark.sql.functions._
spark.range(1)
 .selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
 .selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
 .select(ba(col("t")), expr("booland(f)"))
 .show()

【注意】UDAF仅在Scala和java中可用。

7、连接操作

连接操作不局限于单个数据集,结合spark的能力,可以实现各种数据使用。

Spark中提供的连接类型:

  1. inner join,内连接,保留左、右数据集内某个键都存在的行
  2. outer join,外链接,保留左侧或右侧数据集中具有某个键的行
  3. left outer join,左外连接,保留左侧数据集中具有某个键的行
  4. right outer join,右外连接,保留右侧数据集中具有某个键的行
  5. left semi join,左半连接,如果某键在右侧数据行中出现,则保留且仅保留左侧数据行
  6. left anti join,左反连接,如果某键在右侧数据行中没出现,则保留且仅保留左侧数据行
  7. nutural join,自然连接,通过隐式匹配两个的数据集之间具有相同名称的列来执行连接
  8. cross join,笛卡尔连接,将左侧数据集中的每一行与右侧数据集中的每一行匹配
//准备示例数据
val person = Seq(
 (0, "Bill Chambers", 0, Seq(100)),
 (1, "Matei Zaharia", 1, Seq(500, 250, 100)),
 (2, "Michael Armbrust", 1, Seq(250, 100)))
 .toDF("id", "name", "graduate_program", "spark_status")
val graduateProgram = Seq(
 (0, "Masters", "School of Information", "UC Berkeley"),
 (2, "Masters", "EECS", "UC Berkeley"),
 (1, "Ph.D.", "EECS", "UC Berkeley"))
 .toDF("id", "degree", "department", "school")
val sparkStatus = Seq(
 (500, "Vice President"),
 (250, "PMC Member"),
 (100, "Contributor"))
 .toDF("id", "status")
 
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")
//1、内连接
//编写连接条件
val joinExpression = person.col("graduate_program") === graduateProgram.col("id")
//使用连接,默认就是内连接
person.join(graduateProgram, joinExpression).show()
var joinType = "inner" //指定连接参数
//使用连接参数指定内连接
person.join(graduateProgram, joinExpression, joinType).show()
//相当于SQL
SELECT * FROM person INNER JOIN graduateProgram ON person.graduate_program = graduateProgram.id
//2、外链接
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()
//相当于SQL
SELECT * FROM person FULL OUTER JOIN graduateProgram ON graduate_program = graduateProgram.id
//3、左外连接
joinType = "left_outer"
graduateProgram.join(person, joinExpression, joinType).show()
//4、右外连接
joinType = "right_outer"
person.join(graduateProgram, joinExpression, joinType).show()
//5、左半连接
joinType = "left_semi"
graduateProgram.join(person, joinExpression, joinType).show()
val gradProgram2 = graduateProgram.union(Seq(
 (0, "Masters", "Duplicated Row", "Duplicated School")).toDF())
gradProgram2.createOrReplaceTempView("gradProgram2")
gradProgram2.createOrReplaceTempView("gradProgram2")
gradProgram2.join(person, joinExpression, joinType).show()
//6、左反连接
joinType = "left_anti"
graduateProgram.join(person, joinExpression, joinType).show()
//7、自然连接
//自然连接(natural join)不必指定连接条件,它对你执行连接操作中可能要基于的列进行隐式猜测(往往是根据相同的列名),找出匹配列并返回连接结果。目前自然连接支持左连接、右连接和外连接。隐示的操作始终是危险的!
//8、笛卡尔连接(交叉连接)
joinType = "cross"
graduateProgram.join(person, joinExpression, joinType).show()
//也可以显式地使用笛卡尔连接
person.crossJoin(graduateProgram).show()
//【注意】只有在真的百分之百需要交叉连接的时候才使用它。在Spark中定义交叉连接时,需要清楚这种做法很危险!数据量会剧增。
连接操作常见问题与解决方案
  • 对复杂类型的连接操作
    任何返回Boolean值的表达式都是有效的连接。
import org.apache.spark.sql.functions.expr
person.withColumnRenamed("id", "personId")
 .join(sparkStatus, expr("array_contains(spark_status, id)")).show()
//相当于SQL
SELECT * FROM
 (select id as personId, name, graduate_program, spark_status FROM person)
 INNER JOIN sparkStatus ON array_contains(spark_status, id)
  • 处理重复的列名
    当两个DF关联时,如果判断条件中的列名相同会报错。
//构建测试数据集
val gradProgramDupe = graduateProgram.withColumnRenamed("id", "graduate_program")
val joinExpr = gradProgramDupe.col("graduate_program") === person.col(
 "graduate_program")

person.join(gradProgramDupe,joinExpr).select("graduate_program").show() //直接使用就会报错
//解决办法一:将布尔表达式改为字符串表达式
person.join(gradProgramDupe,"graduate_program").select("graduate_program").show()
//解决办法二:在链接后删除同名列
person.join(gradProgramDupe, joinExpr).drop(person.col(“graduate_program"))
 .select(“graduate_program").show()
//解决办法三:在链接前将同名列重命名
val gradProgram3 = graduateProgram.withColumnRenamed("id","grad_id")
val joinExpr = person.col("graduate_program") === gradProgram3.col("grad_id")
person.join(gradProgram3,joinExpr).show()

8、数据源

spark的核心数据源:
1)CSV
2)JSON
3)Parquet
4)ORC
5)JDBC/ODBC连接
6)纯文本文件
社区数据源:
1)Cassandra
2)HBase
3)MongoDB
4)AWS Redshift
5)XML
6)其他数据源

8.1 数据源API(DataSource API)
  1. Read API的结构:
DataFrameReader.format(...).option("key", "value").schema(...).load()

此结构可以用来读取所有的数据源。参数介绍:

   DataFrameReader:通过sparkSession的read属性得到,spark.read
   format:指定要解析的数据源,默认为Parquet
   schema:选择指定的schema
   option:配置k-v对来执行读取数据的方式
spark.read.format(“csv")
 .option(“mode", “FAILFAST")
 .option(“inferSchema", “true")
 .option(“path", “path/to/file(s)")
 .schema(someSchema)
 .load()

读取模式参考:

[图片上传失败...(image-55e993-1640316912601)]
默认是permissive。

  1. Write API结构
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()
  DataFrameWriter:通过DataFrame的write属性来获取,DF.write
  format:写数据格式,默认为Parquet
  option:k-v对配置写出数据的方法,PartitionBy,bucketBy和sortBy仅适用于基于文件的数据源,可以基于这些方法控制写出目标文件的具体结构
  save模式:option中配置的保存模式
dataframe.write.format(“csv")
 .option(“mode", “OVERWRITE")
 .option(“dateFormat", “yyyy-MM-dd")
 .option(“path", “path/to/file(s)")
 .save()

保存模式参考:


image.png

默认值为errorIfExists,也就是说如果目标路径已有数据存在数据则Spark立即写入失败。
对各种数据源的option选项参见《权威指南》157页-167页

关于SQL数据库、纯文本文件、高级IO分片分桶等介绍见1665页-177页

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容