5W字总结Spark(一)(建议收藏)

本文目录:

一、Spark 基础
二、Spark Core
三、Spark SQL
四、Spark Streaming
五、Structured Streaming
六、Spark 两种核心 Shuffle
七、Spark 底层执行原理
八、Spark 数据倾斜
九、Spark 性能调优
十、Spark 故障排除
十一、Spark大厂面试真题

一、Spark 基础

1. 激动人心的 Spark 发展史

2. Spark 为什么会流行

  • 原因 1:优秀的数据模型和丰富计算抽象

简而言之,Spark 借鉴了 MapReduce 思想发展而来,保留了其分布式并行计算的优点并改进了其明显的缺陷。让中间数据存储在内存中提高了运行速度、并提供丰富的操作数据的 API 提高了开发速度。

  • 原因 2:完善的生态圈-fullstack

目前,Spark 已经发展成为一个包含多个子项目的集合,其中包含 SparkSQL、Spark Streaming、GraphX、MLlib 等子项目。

Spark Core:实现了 Spark 的基本功能,包含 RDD、任务调度、内存管理、错误恢复、与存储系统交互等模块。

Spark SQL:Spark 用来操作结构化数据的程序包。通过 Spark SQL,我们可以使用 SQL 操作数据。

Spark Streaming:Spark 提供的对实时数据进行流式计算的组件。提供了用来操作数据流的 API。

3. Spark VS Hadoop

Hadoop Spark
类型 分布式基础平台, 包含计算, 存储, 调度 分布式计算工具
场景 大规模数据集上的批处理 迭代计算, 交互式计算, 流计算
价格 对机器要求低, 便宜 对内存有要求, 相对较贵
编程范式 Map+Reduce, API 较为底层, 算法适应性差 RDD 组成 DAG 有向无环图, API 较为顶层, 方便使用
数据存储结构 MapReduce 中间计算结果存在 HDFS 磁盘上, 延迟大 RDD 中间运算结果存在内存中 , 延迟小
运行方式 Task 以进程方式维护, 任务启动慢 Task 以线程方式维护, 任务启动快

💖 注意:
尽管 Spark 相对于 Hadoop 而言具有较大优势,但 Spark 并不能完全替代 Hadoop,Spark 主要用于替代 Hadoop 中的 MapReduce 计算模型。存储依然可以使用 HDFS,但是中间结果可以存放在内存中;调度可以使用 Spark 内置的,也可以使用更成熟的调度系统 YARN 等。
实际上,Spark 已经很好地融入了 Hadoop 生态圈,并成为其中的重要一员,它可以借助于 YARN 实现资源调度管理,借助于 HDFS 实现分布式存储。
此外,Hadoop 可以使用廉价的、异构的机器来做分布式存储与计算,但是,Spark 对硬件的要求稍高一些,对内存与 CPU 有一定的要求。

3. Spark 特点

与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。计算的中间结果是存在于内存中的。

  • 易用

Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的Shell,可以非常方便地在这些Shell中使用Spark集群来验证解决问题的方法。

  • 通用

Spark提供了统一的解决方案。Spark可以用于,交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。减少了开发和维护的人力成本和部署平台的物力成本。

  • 兼容性

Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。

4. Spark 运行模式

详见文章
https://www.jianshu.com/p/3d47d58dd48e

二、Spark Core

详解RDD

详见 连载文章
https://www.jianshu.com/p/9c116975ba61

三、Spark SQL

1. 数据分析方式

1) 命令式

在前面的 RDD 部分, 非常明显可以感觉的到是命令式的, 主要特征是通过一个算子, 可以得到一个结果, 通过结果再进行后续计算。

sc.textFile("...")
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)
  .collect()
  1. 命令式的优点
  • 操作粒度更细,能够控制数据的每一个处理环节;

  • 操作更明确,步骤更清晰,容易维护;

  • 支持半/非结构化数据的操作。

  1. 命令式的缺点
  • 需要一定的代码功底;

  • 写起来比较麻烦。

2) SQL

3) 总结

SQL 擅长数据分析和通过简单的语法表示查询,命令式操作适合过程式处理和算法性的处理。

在 Spark 出现之前,对于结构化数据的查询和处理, 一个工具一向只能支持 SQL 或者命令式,使用者被迫要使用多个工具来适应两种场景,并且多个工具配合起来比较费劲。

而 Spark 出现了以后,统一了两种数据处理范式是一种革新性的进步。

2. SparkSQL 前世今生

1) 发展历史

  • Hive

解决的问题:

Hive 实现了 SQL on Hadoop,使用 MapReduce 执行任务 简化了 MapReduce 任务。

新的问题:

Hive 的查询延迟比较高,原因是使用 MapReduce 做计算。

3. Hive 和 SparkSQL

Hive 是将 SQL 转为 MapReduce。

SparkSQL 可以理解成是将 SQL 解析成:“RDD + 优化” 再执行。

4. 数据分类和 SparkSQL 适用场景

  • 数据分类总结
定义 特点 举例
结构化数据 有固定的 Schema 有预定义的 Schema 关系型数据库的表
半结构化数据 没有固定的 Schema,但是有结构 没有固定的 Schema,有结构信息,数据一般是自描述的 指一些有结构的文件格式,例如 JSON
非结构化数据 没有固定 Schema,也没有结构 没有固定 Schema,也没有结构 指图片/音频之类的格式
  • Spark 处理什么样的数据

RDD 主要用于处理非结构化数据 、半结构化数据、结构化;

SparkSQL 主要用于处理结构化数据(较为规范的半结构化数据也可以处理)。

5. Spark SQL 数据抽象

1) DataFrame

  • 什么是 DataFrame

DataFrame 的前身是 SchemaRDD,从 Spark 1.3.0 开始 SchemaRDD 更名为 DataFrame。并不再直接继承自 RDD,而是自己实现了 RDD 的绝大多数功能。

DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格,带有 Schema 元信息(可以理解为数据库的列名和类型)。

  • 总结:

DataFrame 就是一个分布式的表

DataFrame = RDD - 泛型 + SQL 的操作 + 优化

2) DataSet

  • DataSet

DataSet 是在 Spark1.6 中添加的新的接口。

与 RDD 相比,保存了更多的描述信息,概念上等同于关系型数据库中的二维表。

与 DataFrame 相比,保存了类型信息,是强类型的,提供了编译时类型检查。

调用 Dataset 的方法先会生成逻辑计划,然后被 spark 的优化器进行优化,最终生成物理计划,然后提交到集群中运行!

DataSet 包含了 DataFrame 的功能。

Spark2.0 中两者统一,DataFrame 表示为 DataSet[Row],即 DataSet 的子集。

DataFrame 其实就是 Dateset[Row]

3) RDD、DataFrame、DataSet 的区别

  1. 结构图解
  • RDD[Person]:

    以 Person 为类型参数,但不了解 其内部结构。

  • DataFrame:

    提供了详细的结构信息 schema 列的名称和类型。这样看起来就像一张表了。

  • DataSet[Person]

    不光有 schema 信息,还有类型信息。

  1. 数据图解
  • 假设 RDD 中的两行数据长这样:

     RDD[Person]:
    
  • 那么 DataFrame 中的数据长这样:

    DataFrame = RDD[Person] - 泛型 + Schema + SQL 操作 + 优化:

  • 那么 Dataset 中的数据长这样:

    Dataset[Person] = DataFrame + 泛型:

  • Dataset 也可能长这样:Dataset[Row]:

    即 DataFrame = DataSet[Row]:

4) 总结

DataFrame = RDD - 泛型 + Schema + SQL + 优化

DataSet = DataFrame + 泛型

DataSet = RDD + Schema + SQL + 优化

6. Spark SQL 应用

  • 在 spark2.0 版本之前

    SQLContext 是创建 DataFrame 和执行 SQL 的入口。

    HiveContext 通过 hive sql 语句操作 hive 表数据,兼容 hive 操作,hiveContext 继承自 SQLContext。

  • 在 spark2.0 之后

    这些都统一于 SparkSession,SparkSession 封装了 SqlContext 及 HiveContext;

    实现了 SQLContext 及 HiveContext 所有功能;

    通过 SparkSession 还可以获取到 SparkConetxt。

1) 创建 DataFrame/DataSet

  • 读取文本文件
  1. 在本地创建一个文件,有 id、name、age 三列,用空格分隔,然后上传到 hdfs 上。

vim /root/person.txt

1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
  1. 打开 spark-shell

spark/bin/spark-shell

创建 RDD

val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]]

  1. 定义 case class(相当于表的 schema)

case class Person(id:Int, name:String, age:Int)

  1. 将 RDD 和 case class 关联

val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]

  1. 将 RDD 转换成 DataFrame

val personDF = personRDD.toDF //DataFrame

  1. 查看数据和 schema

personDF.show

+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan| 20|
|  2|    lisi| 29|
|  3|  wangwu| 25|
|  4| zhaoliu| 30|
|  5|  tianqi| 35|
|  6|    kobe| 40|
+---+--------+---+

personDF.printSchema

  1. 注册表

personDF.createOrReplaceTempView("t_person")

  1. 执行 SQL

spark.sql("select id,name from t_person where id > 3").show

  1. 也可以通过 SparkSession 构建 DataFrame
val dataFrame=spark.read.text("hdfs://node1:8020/person.txt")
dataFrame.show //注意:直接读取的文本文件没有完整schema信息
dataFrame.printSchema
  • 读取 json 文件:

val jsonDF= spark.read.json("file:///resources/people.json")

接下来就可以使用 DataFrame 的函数操作

jsonDF.show

注意:直接读取 json 文件有 schema 信息,因为 json 文件本身含有 Schema 信息,SparkSQL 可以自动解析。

  • 读取 parquet 文件

val parquetDF=spark.read.parquet("file:///resources/users.parquet")

接下来就可以使用 DataFrame 的函数操作

parquetDF.show

注意:直接读取 parquet 文件有 schema 信息,因为 parquet 文件中保存了列的信息。

2) 两种查询风格:DSL 和 SQL

  • 准备工作:

先读取文件并转换为 DataFrame 或 DataSet:

val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" "))
case class Person(id:Int, name:String, age:Int)
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
val personDF = personRDD.toDF
personDF.show
//val personDS = personRDD.toDS
//personDS.show
  • DSL 风格:

SparkSQL 提供了一个领域特定语言(DSL)以方便操作结构化数据

  1. 查看 name 字段的数据
personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show
  1. 查看 name 和 age 字段数据
personDF.select("name", "age").show
  1. 查询所有的 name 和 age,并将 age+1
personDF.select(personDF.col("name"), personDF.col("age") + 1).show
personDF.select(personDF("name"), personDF("age") + 1).show
personDF.select(col("name"), col("age") + 1).show
personDF.select("name","age").show
//personDF.select("name", "age"+1).show
personDF.select($"name",$"age",$"age"+1).show
  1. 过滤 age 大于等于 25 的,使用 filter 方法过滤
personDF.filter(col("age") >= 25).show
personDF.filter($"age" >25).show
  1. 统计年龄大于 30 的人数
personDF.filter(col("age")>30).count()
personDF.filter($"age" >30).count()
  1. 按年龄进行分组并统计相同年龄的人数
personDF.groupBy("age").count().show
  • SQL 风格:

DataFrame 的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用 spark.sql() 来执行 SQL 查询,结果将作为一个 DataFrame 返回。

如果想使用 SQL 风格的语法,需要将 DataFrame 注册成表,采用如下的方式:

personDF.createOrReplaceTempView("t_person")
spark.sql("select * from t_person").show
  1. 显示表的描述信息
spark.sql("desc t_person").show
  1. 查询年龄最大的前两名
spark.sql("select * from t_person order by age desc limit 2").show
  1. 查询年龄大于 30 的人的信息
spark.sql("select * from t_person where age > 30 ").show
  1. 使用 SQL 风格完成 DSL 中的需求
spark.sql("select name, age + 1 from t_person").show
spark.sql("select name, age from t_person where age > 25").show
spark.sql("select count(age) from t_person where age > 30").show
spark.sql("select age, count(age) from t_person group by age").show
  • 总结
  1. DataFrame 和 DataSet 都可以通过 RDD 来进行创建

  2. 也可以通过读取普通文本创建--注意:直接读取没有完整的约束,需要通过 RDD+Schema

  3. 通过 json/parquet 会有完整的约束

  4. 不管是 DataFrame 还是 DataSet 都可以注册成表,之后就可以使用 SQL 进行查询了! 也可以使用 DSL!

3) Spark SQL 完成 WordCount

  • SQL 风格
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}


object WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
    //fileDF.show()
    //fileDS.show()
    //3.对每一行按照空格进行切分并压平
    //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
    import spark.implicits._
    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String
    //wordDS.show()
    /*
    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    |hello|
    |  you|
      ...
     */
    //4.对上面的数据进行WordCount
    wordDS.createOrReplaceTempView("t_word")
    val sql =
      """
        |select value ,count(value) as count
        |from t_word
        |group by value
        |order by count desc
      """.stripMargin
    spark.sql(sql).show()

    sc.stop()
    spark.stop()
  }
}
  • DSL 风格
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}


object WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
    //fileDF.show()
    //fileDS.show()
    //3.对每一行按照空格进行切分并压平
    //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
    import spark.implicits._
    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String
    //wordDS.show()
    /*
    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    |hello|
    |  you|
      ...
     */
    //4.对上面的数据进行WordCount
    wordDS.createOrReplaceTempView("t_word")
    val sql =
      """
        |select value ,count(value) as count
        |from t_word
        |group by value
        |order by count desc
      """.stripMargin
    spark.sql(sql).show()

    sc.stop()
    spark.stop()
  }
}

4) Spark SQL 多数据源交互

  • 读数据

读取 json 文件:

spark.read.json("D:\\data\\output\\json").show()

读取 csv 文件:

spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()

读取 parquet 文件:

spark.read.parquet("D:\\data\\output\\parquet").show()

读取 mysql 表:

val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","root")
spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()

  • 写数据

写入 json 文件:

personDF.write.json("D:\\data\\output\\json")

写入 csv 文件:

personDF.write.csv("D:\\data\\output\\csv")

写入 parquet 文件:

personDF.write.parquet("D:\\data\\output\\parquet")

写入 mysql 表:

val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","root")
personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352