翻译:《Spark: The Definitive Guide 》
Author:Bill Chambers and Matei Zaharia
译者: 雨钓(有增改)
Spark Applications
一個spark应用程序包含一个driver process 程序和一系列 executor processes,driver process负责在你集群的一个节点上执行你的main()方法,他的主要功能如下:
- 维护Spark Application的一些信息;
- 响应用户的程序或者输入;
- 分析分配以及调度executors之间的工作(稍后讨论);
Driver process 是至关重要的——他在Spark Application整个生命周期中维护应用信息的核心。
Executors负责实际执行driver分配给他的任务。这就意味着每一个executor仅仅负责两件事:
- 执行driver分配(assigned)给他的任务;
- 报告当前executor的计算状态给driver node。
Figure 2-1示范了Cluster Manager如何控制物理服务器,以及给Spark Application分配资源。常见的Cluster Manger有:Spark·s standalone cluster manager,Yarn 以及Mesos。通过Cluster Manger可以同时有多个Spark Application在集群上并发执行,第四部分我们将讨论更多的关于Cluster Manger的内容。
上图Figure 2-1 中我们可以看到左边的一个Driver和右边的四个Executors,在这个图(diagram)中,我们移除了集群节点的概念。用户通过配置可以指定(specify)每个节点上有多少个executors.
Note:
Spark 除了集群模式(cluster mode)之外,也存在本地模式(local mode),此时Driver和Executor是非常相似的程序,二者作为线程运行在你的个人电脑上而非集群上,此书中主要以local mode为主,所以你可以在单一的机器上运行本书提到的所有的内容。
这里有关于理解Spark Application的几个关键点:
- Spark 使用(employs)一个Cluster Manger来检测跟踪集群可用资源;
- Driver process主要负责执行drivevr程序命令并通过Executors去完成给定的任务。
Executors 通常将执行Spark code;然而,driver通过Spark 的API可以支持自多种不同的语言。下一节介绍。
Spark`s Lnaguage APIs
它使得你可以使用多种语言运行Spark code。大多数情况下,Spark在每个语言中都有一些核心的概念;这些概念被翻译成可以运行在集权上的Spark代码。如果你按照规范使用这些API,你可以期望所有的语言都有类似的特性:
Scala:
Spark主要由scala写的,这使得它成为Spark的默认语言。这本数据将包含Scala语言代码。
Java
虽然Spark是使用Scala写的,Spark的作者们确保你可以使用Java写SparkCode。本书主要以Scala为主,但也会涉及Java代码。
Python
Python 支持的结构与Scala非常相似,本书包含python 代码和Python API
SQL
Spark 支持标准的ANSISQL2003标准。这就使得分析师和非程序猿也可以利用Spark的大数据优势。
R
Spark有两个通用的R 库:SparkR,sparklyr(R community-driven package)
Figure 2-2对这种关系给出了一个简单的说明:
每一种语言的API都维护着我们前面提到的几个核心概念,其中SparkSession类是对用户可用的,它是运行Spark代码的入口。当使用Python或者R编写Spark程序时,你不需要编写明确的JVM指令;取而代之的是,你只需要编写Python和R代码,Spark会将它翻译成可以在executor 的JVM上执行的代码。
Spark`s APIs
虽然你可以通过多种语言来使用Spark,但是不同的语言间可以提供哪些特性是值得一提的。Spark有两组基本的API:low-level 非结构化API,和higher-level 的结构化API。我们在本书中都将讨论,但是重点讨论higher-level结构化APIS.
Starting Spark
至此我们提到了Spark基本的概念,这些都是概念性的,但我们实际编写我们Spark程序的时候,我们需要一个方法发送我们的命令和数据给它。这就需要我们首先创建一个SparkSession。
Note
我们以Spark的Local mode为例介绍,这也就意味着运行./bin/spark-shell,即通过Scala控制台去启动一个交互式连接,当然你也可以通过./bin/pyspark启动一个Python的控制台。这样就可以启动一个交互式的Spark Application。同时spark通过一个程序提交standalone application到Spark的,这个程序的名字叫做:spark-submit;通过spark-submit你可以提交一个预编译的程序到spark,我们之后会详细介绍。
当你使用交互模式启动Spark的时候,后台会创建一个SparkSession用来管理Spark Application。但是,如果你通过Standalone application启动spark的时候你就必须在你的应用程序代码中创建一个SparkSession 类。
The SparkSession
前面介绍了,你可以通过一个名叫SparkSession的类控制你的Spark Application,这个SparkSession instance是Spark在集群上执行用户自定义程序的途径。在Scala和Python中,当你启动一个控制台时,Spark这个变量是可用的。我们来看看SparkSession在Python和 Scala中的情况:
IN Scala你将看到如下的内容:
res0: org.apache.spark.sql.Spark Session = org.apache.spark.sql.Spark Session@...
IN Python你将看到如下:
<pyspark.sql.session.Spark Session at 0x7efda4c1ccd0>
现在我们执行一个简单的任务:创建一个range of number,这个range of number相当于电子表格中的一个列。
//in Scala
val myRange = spark.range(1000).toDF("number")
//in Python
myRange = spark.range(1000).toDF("number")
我们创建了一个DataFrame,即一个列,包含1000行,且数值是0-999之间的数。这个range of number 代表了一个分布式集合(distribution collection)。当在集群中执行时,这个range of number的每一部分存在在不同的executor上,这就是Spark的DataFrame
DataFrames
DataFrame是一个最常见的Structure API,也是最简单的表示,由列和行组成的表的表示形式。一个定义这些列及其类型的list被称作Schema,你可以将DataFrame看做是有列名称的电子表格。Figure 2-3说明了根本区别:电子表格位于一台计算机的特定位置上,但是Spark 的DateFrame可以分布在数千台计算机上。将数据放在多台计算机上是有原因的:数据量可能太大导致一台机器放不下,其次在一台计算机上执行计算需要耗费大量的时间。
DataFrame的概念在Spark中并不是唯一的。R 和Python也有相似的概念,然而,Python和R的DataFrame存储在单台计算机上而不是多台,这就限定了你对于给定的存在在特定机器上的DataFrame的处理。然而,因为Spark拥有Python和R的接口,所以无论是将Python 中Pandas 的DataFrame转化成Spark的DataFrame,还是讲R 的DataFrame转换成Spark DataFrame都是很简单的。
NOTE
Spark有几个核心的抽象:Datasets,DataFrames,SQL Tbale,以及Rsilient Distributed DataSets(RDDS).这些不同的抽象都是分布式数据集的代表,最简单和最有影响的是DataFrames,他在所有的语言中都是可用的,第二部分将介绍DataSets,第三部分将介绍RDDS.
partitions
为了使所有executor并发的执行任务,Spark将数据分解成块并命名为Partitions。一个Partition就是一个行的集合,并且分布在集群中的一台物理服务器上。一个DataFrame的partitions意味着在执行期间数据在集群之间是如何物理分布的,如果你有很多PartITion但是只有一个Executor,Spark仍然只有一个并行度,因为仅仅只有一个计算资源。
一个非常重要的事情是,关于DataFrame,在大多数情况下你不需要手动的或显示的操作Partitions,你只需要简单的指定数据的high-level transformation操作即可,Spark决定任务怎样在集群上执行该操作,lower-level APIs也是如此(基于RDD interface),第三部分介绍。
Transformations:
在Spark中核心的数据结构是不可变的(immutable),也就意味着当他们被创建之后,这些数据结构是不能修改的,乍一看,这似乎是一个奇怪的概念:如果你不能修改他,你如何使用他?要想“Change” 一个DataFrame,你需要通知Spark你想怎样修改他。这些通知被称为transformations,我们执行一个简单的Transformation来查找当前DataFrame中所有的偶数:
//in Scala
val divisBy2 = myRange.where("number % 2 = 0")
# in Python
divisBy2 = myRange.where("number % 2 = 0")
注意这里没有return输出,这是因为仅指定了一个抽象的transformation,并且Spark在你调用一个action之前,不会执行任何transformation。transformation是你在使用spark时用于表达你的业务逻辑的核心,有两类transformation: narrow dependencies 以及wide dependencies.
Transformation包含的narrow dependencies(我们也称为narrow transformations)指的是每一个input partition将生成一个output partition。上面的代码中,where声明指定一个narrow dependency,一个partition最多生成一个output partition,如图Figure 2-4:
wide dependency(or wide transformation)指的是一个input partition可以生成很多output partitions。正如你经常听到的这被称为shuffle,通过他,Spark可以跨集群交换分区数据,这意味着我们在DataFrame上指定多个filters(过滤器),他们都将在内存中执行。当我们执行一个shuffle,Spark将结果写到磁盘。wide transformation如Figure 2-5所示:
之后你将会看到很多关于transformation的讨论,但是现在最重要的是你需要知道有两种transformation就可以了,你现在将看到在针对数据指定一系列不同的操作时,transformation是多么的简单,这就引出了一个话题叫做:lazy evaluation
Lazy Evaluation
lazy evaluation意思是Spark将等到最后一刻才去执行计算指令的DAG。在Spark中,当你表达一些操作时,并不是立即执行数据修改,而是建立一个transformation的执行计划来表达你想怎样使用你的数据。
直到最后执行时,Spark编译整个执行计划,将你的raw Dataframe transformations 编译成流线型的物理执行计划,整个物理的流式计划将在集群中有效的执行。这提供了巨大的好处,因为Spark可以从头到尾优化整个数据流,其中的一个例子就是Dataframes 的predicate pushdown(谓词下推):如果你构建了一个大的Spark任务,但是在最后指定了一个过滤器,要求从原数据中抽取一行数据,执行整个任务的最有效的方法是访问当行记录,Spark实际上会自动通过predicate pushdown来为我们进行优化。
Actions
transformations允许我们建立一个针对业务逻辑的transformation计划。我们通过执行一个action来引发计算。一个 action 命令Spark 计算一系列transformation的结果。一个简单的action就是Count,他返回DataFrame中所有的记录的总数。
divisBy2.count()
当然,count 并不是唯一的action,Action分三种 :
- 在控制台( console)中查看数据的Actions
- 用相应的语言收集数据到本机对象的Actions
- 写输出数据源的Actions
针对上面指定的那个Action,我们开始一个Spark job,执行我们的filter trasformation(一个narrow transformation),之后aggregation(一个wide transformation),这些执行基于每一个partition,之后执行collect将我们的结果收集到本地对象中。可以看到所有的过程通过SparkUI进行监控,SparkUI是一个内置在Spark中的一个工具,通过他你可以监控运行在Spark上的作业。
Spark UI
Spark UI在driver节点的4040端口是可用的。如果你使用的是local模式,那么就是:http://localhost:4040。SparkUI显示Sparkjob的状态,环境,和集权状态等信息。这很有用,尤其是debugging的时候。Figure 2-6显示了一个例子,两个包含9个tasks的stages正在执行。
18章将详细介绍SparkUI,
Spark job 代表了一系列的transformations,他们被一个独特的action所引爆,并且你可以通过SparkUI监控Spark任务的执行状态。
An End-to-End Example
上面的例子中数据太简单,我们现在使用美国运输统计局的大数据进行更加实际的实例,
使用CSV的文件,之后将会你讲看到有一些其他不同格式的文件,这里主要关注CSV数据文件。
每一个CSV文件中多有一些数据,且数据格式相似,每一行代表DataFrame中的一行:
$ head /data/flight-data/csv/2015-summary.csv
DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Spark具有对大数据源进行读写的能力。我们将使用DataFrameReader读取数据,它与SparkSession相关,在此过程中我们将制定文件格式以及任何我们想制定的东西。在我们的例子中,我们需要做一件被称为:schema inference的设置,它意味这我们想要Spark执行数据时预测我们DataFrame数据的schema。同时我们指定文件中的第一行是header,此外,我们也可以指定其他的选项。
当我们开启schema inference的设置后,spark为了获取Schema信息会读取少量的数据并尝试使用他支持的数据类型去解析这些数据。当然这种预测的方式无法保证百分百正确,有时也会出错。所以当你读取数据时,你也可以选择为你的数据指定一个schema(推荐使用指定的方式)。
// in Scala
val flightData2015 = spark
.read
.option("inferSchema","true")
.option("header","true")
.csv("/data/flight-data/csv/2015-summary.csv")
//in Python
flightData2015 = park\
.read\
.option("inferSchema","true")\
.option("header","true")\
csv("/data/flight-data/csv/2015-summary.csv")
每一个DataFrame都有一组列,但是没有指定具体的行数,原因就是读取数据是一个transformations操作,因此它是一个lazy operation。Spark只查看几行数据并推测他可能是什么数据类型,Figure 2-7给出了一个例子,CSV被入去到一个DataFrame中,之后被转换成一个本地的针对于行的array或者list。
如果我们在这个DataFramte上执行action,我们将看到与上面结果一样:
flightData2015.take(3)
Array([United States,Romania,15], [United States,Croatia...
让我们指定一些更多的信息!现在根据count列对我们的数据进行排序,该列是integer类型,Figure 2-8 显示了该过程:
Note:
记住,sort并不是modify DataFrame,我们使用sort作为transformation,他通过转换先前的Dataframe返回一个新的DataFrame
我们调用sort数据没有发生任何改变,因为它仅仅是一个transformation。然而通过Explain plan我们可以看到Spark正在构建一个计划,即他将怎样在集群上执行整个任务的计划。我们可以在任何DataFrame中执行explain,去查看DataFrame的血缘关系(lineage)或者说Spark怎样执行整个query 的。
fightData2015.sort("count").explain()
== Physical Plan ==
*Sort [count#195 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#195 ASC NULLS FIRST, 200)
+- *File Scan csv [DEST_COUNTRY_NAME#193,ORIGIN_COUNTRY_NAME#194,count#195] ...
***Explain ***有点神秘,但是稍加联系就能理解,你可以从头到尾阅读执行计划,顶部是最终结果,底部是数据来源,在这个例子中,每一行的第一个单词是Sort、exchange、FileScan。这是因为对我们的数据进行排序是一个wide transformation,因为不同partition中的行需要进行比较。
即使上面所有的内容无法理解也不用太过担心,因为Explain仅仅是一个非常有用的工具尤其是在Debug和提升你对Spark程序的了解上。
正如上面我们所做的,我们可以指定一个action 去开始这个计划(kick off this plan),然而,在做这个之前,我们需要进行配置,默认的,当我们执行shuffle时,Spark输出200个shuffle partitions,让我们这是这个shuffle的输出partition为5个reduce:
spark.conf.set("spark.sql.shuffle.partitions","5")
flightData2015.sort("count").take(2)
... Array([United States,Singapore,1], [Moldova,United States,1])
Figure 2-9展示这个操作,注意,除了transformation,还包含物理分区计数。
我们建立的这个逻辑transformation定义关于DataFrame的血缘(lineage),这样在任何给定的时间点,Spark知道怎样通过执行之前在相同的操作去重新计算任何partition。这是Spark函数式编程的核心,当数据transformation保持不变时,任何相同的输入总是可以导致相同的输出(因为RDD是不可变的,这是前提)。
我们没有操作物理数据,取而代之,通过我们刚才设置的shuffle.partition参数配置物理执行特性。我们最终得到五个输出分区,即我们在shuffle.partition中指定的值,你可以修改这个来帮助你控制你的Spark jobs的物理执行特性,你可以尝试不同的值并查看分区的数量。在尝试不同的值时,你将看到不同的运行状态。记住你可以使用SparkUI来查看Spark任务的物理和逻辑执行特征。
DataFrame AND SQL
前面我们简单介绍了DataFrame,现在我们研究一个更复杂的问题,并跟踪DataFrame和SQL。Spark可以使用完全一样的方式执行相同的transformation,无论是使用什么语言。你可以使用DataFrame(无论是Java,R,Scala还是Python)或者SQL表达你的业务逻辑,在真正执行你的代码前,Spark将该逻辑编译成底层执行计划(可以在explain plan中查看)。使用SQL你可以注册任何DataFrame作为表或者视图,并且使用纯SQL对他们进行查询。编写SQL查询和编写DataFrame代码之间没有任何性能差异,他们都会编译成相同的底层执行计划,你可以使用一个简单的方法调用将任何DataFrame转化成表或者视图:
flightData2015.createOrReplaceTempView("flight_date_2015")
现在,我们使用SQL查询我们的数据。我们将使用saprk.sql函数(注意spark是我们SparkSession的参数)这样会便利的返回一个新的DataFrame。尽管这在逻辑上看起来有点循环——针对DataFrame的SQL查询返回另一个DataFrame——他实际上相当强大。这使得你可以在任何时间点以最方便的方式指定transformation,并且不牺牲任何效率。为了理解发生了什么,我们看一个例子:
// in Scala
val sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME,COUNT(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
val dataFrameWay = flightData2015
.groupBy('DEST_COUNTRY_NAME')
.count()
sqlWay.explain
dataFrameWay.explain
//IN Python
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME,COUNT(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
dataFrameWay = flightData2015\
.groupBy('DEST_COUNTRY_NAME')\
.count()
sqlWay.explain()
dataFrameWay.explain()
== Physical Plan ==
*Hash Aggregate(keys=[DEST_COUNTRY_NAME#182], functions=[count(1)])
+-Exchange hashpartitioning(DEST_COUNTRY_NAME#182, 5)
+- *Hash Aggregate(keys=[DEST_COUNTRY_NAME#182], functions=[partial_count(1)])
+- *File Scan csv [DEST_COUNTRY_NAME#182] ...
== Physical Plan ==
*Hash Aggregate(keys=[DEST_COUNTRY_NAME#182], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#182, 5)
+- *Hash Aggregate(keys=[DEST_COUNTRY_NAME#182], functions=[partial_count(1)])
+- *File Scan csv [DEST_COUNTRY_NAME#182] ...
上面的执行计划中你可能已经注意到:这些不同语言编写的程序会被编译成完全相同的底层执行计划。
让我们从数据中提取一些有趣的数据。需要理解的一点是,Spark的DataFrame和Sql中已经有大量的操作是可以直接使用的。你可以使用和导入数百个函数来帮助你快速的解决大数据问题。我们将使用Max函数,来确定航班数最大值。这仅仅扫描DataFrame中所有的相关列的值并且检查该值是否比之前的所有值都大。这是一个Transformation,且我们有效的输出只有一行:
spark.sql("SELECT max(count) from flight_date_2015").take(1)
//in Scala
import org.apache.spark.sql.functions.max
flightData2015.select(max("count")).take(1)
//in python
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)
正确的结果将会是 370002。让我们执行更多的操作,找出航班数据中排名前五的目的地城市,这是我们第一个并发的transformation查询,因此我们一步一步进行,让我们从一个非常简单的SQL aggregation开始:
//in Scala
val maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME,sum(count) As destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()
# in Python
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME,sum(count) As destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()
DEST_COUNTRY_NAME | destination_total |
---|---|
United States | 411352 |
Canada | 8399 |
Mexico | 7140 |
United Kingdom | 2025 |
Japan | 1548 |
现在让我们来看看DataFrame语法,在语义上类似但是在实现和顺序上不同。但是,正如我们所知道的,底层执行计划是相同的,:
//in Scala
import org.apache.spark.sql.function.desc
flightData2015
.groupBy("DEST_COUNTRY_NAME")
.sum("count")
.withColumnRenamed("sum(count)","destination_total")
.sort(desc("destination_total"))
.limit(5)
.show()
//in Python
import pyspark.sql.function import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)","destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()
DEST_COUNTRY_NAME | destination_total |
---|---|
United States | 411352 |
Canada | 8399 |
Mexico | 7140 |
United Kingdom | 2025 |
Japan | 1548 |
如上总共有7步操作,让我们回到数据源你可以看到在这个DataFrame的explain plan,Figure2-10展示了我们执行的一系列操作,真正的执行计划(执行explain所显示的)将与图中所示的有所不同,这主要是因为物理执行中的优化;然而这个展示是一个很好的起点。这个执行计划是一个transformation的DAG(directed acyclic graph),每一步都产生一个不可变的DataFrame,在每一个DataFrame中我们都可以调用action操作生成一个结果。
第一步是读取数据。我们之前定义了一个DataFrmae,但是作为一个提示,Spark不会真的去读取数据,除非针对这个DataFrame的一个action被调用,或者来自原始的DataFrame的action被调用。
第二步是grouping,从技术上将我们称为groupBy,这个方法会返回一个RelationalGroupedDataset,它是一个专用名用于指定分组的DataFrame,但是需要用户在进一步查询之前指定聚合,通常我们指定一个Key或者一组Key进行聚合,现在我们指定一个Key。
第三步是指定聚合,让我们使用Sum聚合方法。他接收一个输入列表,或者简单的说,他接收一个列名。sum方法调用的结果是返回一个新的DataFrame,你将看到他有一个新的schema,但是他并不知道每一列的数据类型。重要的补充是没有任何计算被执行。这只是我们表达过的一个变换,Spark可以通过他跟踪我们的类型信息。
第四步:是简单的重命,我们使用 withColumnRenamed方法。他包含了两个参数,原始的列名称和新的列名称。当然,他不执行计算,这是另一种transformation。
第五步,对数据进行排序,这样我们从DataFrame顶部获取结果,可以获得destination_total列中最大的值。
你可能注意到了我们有一个非常重要分函数:desc.它不是返回一个string而是返回一个Column ,通常,很多DataFrame方法接收String或者Column类型或者表达式,Columns 和表达式(expresssions )实际上是同一件事。
倒数第二步:我们指定了一个限制,我们仅仅想返回最后获取的DataFrame中的前5个值,而不是全部的数据。
最后,我们真正开始收集数据集的结果,并且Spark将用我们使用的语言给我们返回一个list 或者array 。让我们来看看前面查询的执行计划。
// in Scala
flightData2015
.groupBy("DEST_COUNTRY_NAME")
.sum("count")
.withColumnRenamed("sum(count)", "destination_total")
.sort(desc("destination_total"))
.limit(5)
.explain()
// in Python
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()
== Physical Plan ==
Take Ordered And Project(limit=5, order By=[destination_total#16194L DESC], outpu...
+- *Hash Aggregate(keys=[DEST_COUNTRY_NAME#7323], functions=[sum(count#7325L)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#7323, 5)
+- *Hash Aggregate(keys=[DEST_COUNTRY_NAME#7323], functions=[partial_sum...
+- In Memory Table Scan [DEST_COUNTRY_NAME#7323, count#7325L]
+- In Memory Relation [DEST_COUNTRY_NAME#7323, ORIGIN_COUNTRY_NA...
+- *Scan csv [DEST_COUNTRY_NAME#7578,ORIGIN_COUNTRY_NAME...
虽然这个解释计划与我们确切的“概念计划”不匹配,但是所有的部分都在那里。您可以看到limit语句以及order By(在第一行)。 您还可以看到聚合是如何在第二个阶段发生的,即在partial_sum调用时。 这是因为对数字列表求和是可交换的,Spark可以按分区执行求和。 当然,我们也能看到我们是如何读取Data Frame的。
当然,我们并不总是需要collect结果数据。我们也可以将它写入Spark支持的任何数据源。例如,假设我们希望将信息存储在postgre SQL这样的数据库中,或者将它们写到另一个文件中。
Conclusion
本章介绍了Apache Spark的基础知识, 我们讨论了transformations 和action,以及Spark如何惰性地执行transformations的DAG,以便优化dataframe上的执行计划。 我们还讨论了如何将数据组织成分区,并为处理更复杂的转换设置了条件。 在第3章中,我们将带您参观Spark生态系统,并查看Spark中可用的一些更高级的概念和工具,从streaming到机器学习。