转载请注明出处,谢谢合作~
RDD 编程指南
- 概述(Overview)
- 启用 Spark(Linking with Spark)
- 初始化 Spark(Initializing Spark)
- 使用 Spark Shell(Using the Shell)
- 弹性分布式数据集【Resilient Distributed Datasets (RDDs)】
- 并行化的数据集合(Parallelized Collections)
- 外部数据集(External Datasets)
- RDD 算子(RDD Operations)
- 基本操作(Basics)
- 传递函数(Passing Functions to Spark)
- 理解闭包(Understanding closures)
- 示例(Example)
- 本地模式 vs 集群模式(Local vs. cluster modes)
- 打印 RDD 的数据(Printing elements of an RDD)
- 使用键值对(Working with Key-Value Pairs)
- Transformation 算子(Transformations)
- Action 算子(Actions)
- Shuffle 算子(Shuffle operations)
- 底层原理(Background)
- 性能影响(Performance Impact)
- RDD 持久化(RDD Persistence)
- 如何选择存储级别(Which Storage Level to Choose?)
- 删除数据(Removing Data)
- 共享变量(Shared Variables)
- 广播变量(Broadcast Variables)
- 累加器(Accumulators)
- 集群部署(Deploying to a Cluster)
- 启动 Java/Scala 应用程序(Launching Spark jobs from Java / Scala)
- 单元测试(Unit Testing)
- 接下来干点啥呢(Where to Go from Here)
概述
从高层的角度来看,每一个 Spark 应用程序都包含一个 driver 程序,它执行用户的 main
方法,在集群中对数据进行各种各样的并行计算(parallel operations)。Spark 提出的最主要的抽象是弹性分布式数据集(resilient distributed dataset),简称 RDD。RDD 是一个在集群范围内被分区,从而可以进行并行处理的数据集合。RDD 可以通过 Hadoop 文件系统(或者其他 Hadoop 支持的文件系统)中的文件创建,也可以将 driver 程序中的 Scala 集合转换成一个 RDD。用户还会有在内存从缓存 RDD 的需求,从而在并行计算中高效的复用 RDD。RDD 还能够从失败的任务中自动重新计算。
Spark 中的第二个抽象叫做共享变量(shared variables),被用于在并行计算中传递变量。默认情况下,函数的计算过程以任务集合的形式在不同的节点上并行执行,Spark 会给每个节点传递它所需要的变量。有时一个变量需要在子任务之间共享,或者在子任务和 driver 程序之间共享。Spark 提供了两种类型的共享变量:广播变量(broadcast variables)和累加器(accumulators),广播变量用于在各个节点的内存中缓存同一个对象,累加器用于可以在子任务之间被更新的对象,比如说计数和求和。
这篇文档以 Spark 所支持的不同语言展示了上述特色,最简单的方式就是通过交互式 Spark shell 进行测试,Scala 版本的 bin/spark-shell
和 Python 版本的 bin/pyspark
都可以。
启用 Spark
Scala
默认情况下 Spark 3.0.0 通过 Scala 2.12 版本构建(Spark 还可以通过其他版本的 Scala 构建)。编写 Scala 版本的应用程序需要使用相应版本的 Scala(e.g. 2.12.X)。
编写 Spark 应用程序需要引入 Spark 的 Maven 依赖,标识符和版本信息如下:
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.0.0
另外,如果涉及到访问 HDFS 集群,还需要添加与 HDFS 版本相应的 hadoop-client
依赖。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,需要在代码中引入 Spark 相关的类,如下所示:
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
(在 Spark 1.3.0 版本之前,还需要显示的引入 import org.apache.spark.SparkContext._
来使用基本的隐式转换)
Java
Spark 3.0.0 版本支持 lambda 表达式(lambda expressions)来简化函数的编写,不然则需要通过 org.apache.spark.api.java.function 中的类和接口来实现。
注意在 Spark 2.2.0 版本中已经移除了对 Java 7 的支持。
编写 Spark 应用程序需要引入 Spark 的 Maven 依赖,标识符和版本信息如下:
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.0.0
另外,如果涉及到访问 HDFS 集群,还需要添加与 HDFS 版本相应的 hadoop-client
依赖。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最后,需要在代码中引入 Spark 相关的类,如下所示:
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
Python
Spark 3.0.0 版本支持 Python 2.7+ 或者 Python 3.4+,可以使用标准的 CPython 解释器,所以像 NumPy 之类的 C 依赖库也可以被使用, Spark 3.0.0 还支持 PyPy 2.3+。
注意对 Python 2 的支持在 Spark 3.0.0 版本中已经被标记为弃用。
Python 版本的 Spark 应用程序可以通过 bin/spark-submit
脚本提交执行,或者在你的文件中添加 PySpark 依赖:
install_requires=[
'pyspark=={site.SPARK_VERSION}'
]
如果没有通过 pip 安装 PySpark,请使用 Spark 根目录下的 bin/spark-submit
脚本。该脚本会加载 Spark 的 Java/Scala 依赖库,能够提交应用程序到集群上去运行。还可以使用 bin/pyspark
脚本启动交互式的 Python shell。
如果涉及到访问 HDFS 数据,需要继集成相应版本 HDFS 的 PySpark 。Prebuilt packages 安装包中提供了集成主流 HDFS 版本的 Spark。
最后,你需要在你的代码中引入 Spark 相关的类,如下所示:
from pyspark import SparkContext, SparkConf
PySpark 需要在 driver 程序中使用和 worker 相同的 Python 版本,Spark 会使用 PATH 环境变量中默认的 Python 版本,可以通过定义 PYSPARK_PYTHON
环境变量使用你想要的 Python 版本,例如:
$ PYSPARK_PYTHON=python3.4 bin/pyspark
$ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py
初始化 Spark
Scala
Spark 应用程序的第一件事情就是要创建一个 SparkContext 对象,该对象会决定如何访问一个集群。创建 SparkContext
对象首先需要构建一个 SparkConf 对象,其中包含了有关应用程序的一些信息。
在一个 JVM 中只允许有一个 SparkContext
对象处于激活状态,在创建一个新的 SparkContext
对象之前必须调用 stop()
方法停止旧的那个。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
参数 appName
定义了显示在集群 UI 上的应用程序名称,参数 master
指定集群地址,可以是 Spark, Mesos or YARN cluster URL 或者设置为「local」采用本地模式运行。在实践中,集群环境下硬编码 master
参数到应用程序代码中不是一个明智的选择,更推荐的做法是在通过 spark-submit
脚本提交应用程序( launch the application with spark-submit
)时指定。不过对于测试和单元测试的场景,常常通过「local」模式运行 Spark 应用程序。
Java
Spark 应用程序的第一件事情就是要创建一个 JavaSparkContext 对象,该对象会决定如何访问一个集群。创建 JavaSparkContext
对象首先需要构建一个 SparkConf 对象,其中包含了有关应用程序的一些信息。
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
参数 appName
定义了显示在集群 UI 上的应用程序名称,参数 master
指定集群地址,可以是 Spark, Mesos or YARN cluster URL 或者设置为「local」采用本地模式运行。在实践中,集群环境下硬编码 master
参数到应用程序代码中不是一个明智的选择,更推荐的做法是在通过 spark-submit
脚本提交应用程序( launch the application with spark-submit
)时指定。不过对于测试和单元测试的场景,常常通过「local」模式运行 Spark 应用程序。
Python
Spark 应用程序的第一件事情就是要创建一个 SparkContext 对象,该对象会决定如何访问一个集群。创建 SparkContext
对象首先需要构建一个 SparkConf 对象,其中包含了有关应用程序的一些信息。
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
参数 appName
定义了显示在集群 UI 上的应用程序名称,参数 master
指定集群地址,可以是 Spark, Mesos or YARN cluster URL 或者设置为「local」采用本地模式运行。在实践中,集群环境下硬编码 master
参数到应用程序代码中不是一个明智的选择,更推荐的做法是在通过 spark-submit
脚本提交应用程序( launch the application with spark-submit
)时指定。不过对于测试和单元测试的场景,常常通过「local」模式运行 Spark 应用程序。
使用 Spark Shell
Scala
在 Spark Shell 当中,一个专用于解释器作用范围内的 SparkContext
对象已经被创建好了,对象名称叫做 sc
。创建新的 SparkContext
对象将不会生效。可以通过 --master
参数指定需要 SparkContext
对象连接的集群地址,还可以通过 --jars
参数传递指定的 JAR 文件(以逗号分隔)到 classpath 当中。甚至可以通过 --packages
参数添加额外的 Maven 依赖(以逗号分隔),通过 --repositories
参数指定额外的 Maven 仓库。例如,启动 bin/spark-shell
并为其分配 4 个核:
$ ./bin/spark-shell --master local[4]
或者,添加 code.jar
到应用程序的 classpath 当中:
$ ./bin/spark-shell --master local[4] --jars code.jar
通过 Maven 坐标引入其他依赖:
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
完整选项可以通过运行 spark-shell --help
,spark-shell
的底层调用了 spark-submit
脚本。
Python
在 PySpark shell 当中,一个专用于解释器作用范围内的 SparkContext
对象已经被创建好了,对象名称叫做 sc
。创建新的 SparkContext
对象将不会生效。可以通过 --master
参数指定需要 SparkContext
对象连接的集群地址,还可以通过 --py-files
参数传递额外的 Python 依赖文件(以逗号分隔),可以是 .zip,.egg 或者 .py 文件。甚至可以通过 --packages
参数添加额外的 Maven 依赖(以逗号分隔),通过 --repositories
参数指定额外的 Maven 仓库。Spark 需要的任何 Python 依赖(列举在 requirements.txt 文件中)在被用到的时候必须通过 pip 安装。例如,启动 bin/spark-shell
并为其分配 4 个核:
$ ./bin/pyspark --master local[4]
或者,添加 code.py
文件到搜索路径中(为了在之后可以被引用):
$ ./bin/pyspark --master local[4] --py-files code.py
完整选项可以通过运行 pyspark --help
,pyspark
的底层调用了 spark-submit
脚本。
还可以在 IPython(加强版的 Python 解释器)中启动 PySpark,PySpark 需要 IPython 1.0.0 版本或以上。在启动 PySpark 之前需要设置环境变量 PYSPARK_DRIVER_PYTHON
为 ipython
:
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
如果要使用 Jupyter notebook(之前被叫做 IPython notebook):
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
可以通过环境变量 PYSPARK_DRIVER_PYTHON_OPTS
配置你自己的 ipython
或者 jupyter
解释器。
当 Jupyter Notebook server 启动之后,可以通过「Files」tab 页创建一个新的「Python 2」 notebook 。在该 notebook 中,尝试 Spark 之前可以输入 %pylab inline
命令作为 notebook 的一部分。
弹性分布式数据集
Spark 的核心概念是弹性分布式数据集(resilient distributed dataset),简称 RDD,RDD 是一个容错的、可以被并行计算的数据集合。有两种方式可以创建 RDD:在 driver 程序中 parallelizing 已经存在的数据集合,或者通过一个外部存储系统生成一个数据集,比如说共享文件系统,HDFS,HBase,或者其他支持 Hadoop InputFormat 的数据源。
并行化的数据集合
Scala
RDD 可以通过调用 SparkContext
对象的 parallelize
方法获得,参数为一个已经在 driver 程序中存在的数据集合(Sacla Seq
)。集合中的元素会被拷贝到一个可以被并行计算的分布式数据集中。例如,创建一个包含数字 1 到 5 的 RDD:
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
一旦被创建,分布式数据集对象(distData
)就能够被并行化计算。例如,可以调用 distData.reduce((a, b) => a + b)
方法对数组中的数字进行求和。稍后讨论分布式数据集上的操作。
分布式数据集的一个重要参数是分区的数量,Spark 会在集群中为每个分区生成一个子任务,一般情况下可以为每个 CPU 分配 2 到 4 个分区。通常 Spark 会基于集群的配置自动的设置分区数量,也可以手动传递一个参数来指定分区数(e.g. sc.parallelize(data, 10)
)。注意:为了向后兼容,代码中有些地方使用术语 slice(与分区含义相同)。
Java
RDD 可以通过调用 JavaSparkContext
对象的 parallelize
方法获得,参数为一个已经在 driver 程序中存在的数据集合(Java Collection
)。集合中的元素会被拷贝到一个可以被并行计算的分布式数据集中。例如,创建一个包含数字 1 到 5 的 RDD:
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
一旦被创建,分布式数据集对象(distData
)就能够被并行化计算。例如,可以调用 distData.reduce((a, b) -> a + b)
方法对数组中的数字进行求和。稍后讨论分布式数据集上的操作。
分布式数据集的一个重要参数是分区的数量,Spark 会在集群中为每个分区生成一个子任务,一般情况下可以为每个 CPU 分配 2 到 4 个分区。通常 Spark 会基于集群的配置自动的设置分区数量,也可以手动传递一个参数来指定分区数(e.g. sc.parallelize(data, 10)
)。注意:为了向后兼容,代码中有些地方使用术语 slice(与分区含义相同)。
Python
RDD 可以通过调用 SparkContext
对象的 parallelize
方法获得,参数为一个已经在 driver 程序中存在的迭代器或者集合。集合中的元素会被拷贝到一个可以被并行计算的分布式数据集中。例如,创建一个包含数字 1 到 5 的 RDD:
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
一旦被创建,分布式数据集对象(distData
)就能够被并行化计算。例如,可以调用 distData.reduce(lambda a, b: a + b)
方法对数组中的数字进行求和。稍后讨论分布式数据集上的操作。
分布式数据集的一个重要参数是分区的数量,Spark 会在集群中为每个分区生成一个子任务,一般情况下可以为每个 CPU 分配 2 到 4 个分区。通常 Spark 会基于集群的配置自动的设置分区数量,也可以手动传递一个参数来指定分区数(e.g. sc.parallelize(data, 10)
)。注意:为了向后兼容,代码中有些地方使用术语 slice(与分区含义相同)。
外部数据集
Scala
Spark 可以从任何支持 Hadoop 文件系统的数据源中创建 RDD,包括本地文件系统,HDFS,Cassandra, HBase,Amazon S3 等等。Spark 支持文本文件,SequenceFiles,以及其他 Hadoop InputFormat。
文本文件的 RDD 可以通过 SparkContext
对象的 textFile
方法获得。该方法接收一个 URI 地址(本地路径,或 者 hdfs://
,s3a://
,etc URI)当作参数,将地址中的文件读取为文本行的集合。例如:
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
一旦创建成功,distFile
对象便可以执行 dataset 的操作。例如,可以通过 map
和 reduce
方法计算所有文本行的长度之和:distFile.map(s => s.length).reduce((a, b) => a + b)
。
Spark 读取文件的一些提醒:
- 如果使用本地文件系统的路径,该路径在 worker 节点上也必须是可读的。可以将文件拷贝到所有的 worker 节点或者使用网络共享文件系统。
- Spark 中包括
textFile
在内的所有文件类型输入方法,支持输入参数为目录、压缩文件,也支持通配符表达式。例如,可以使用textFile("/my/directory")
,textFile("/my/directory/*.txt")
,textFile("/my/directory/*.gz")
。 -
textFile
方法可以接收第二个参数来控制文件的分区数量。默认情况下 Spark 为每个 block(在 HDFS 中默认 128M)生成一个分区,但是也可以指定一个较大的数字作为分区数。注意分区数量不会少于 block 数量。
除了文本文件之外,Spark 的 Scala API 还支持其他几种数据格式:
-
SparkContext.wholeTextFiles
方法可以读取指定目录中多个比较小的文本文件,返回一个 (filename, content) 对的 RDD,而不是像textFile
那样返回文件中的每一行文本。在某些情况下,分区数量由数据的本地性控制从而导致较少的分区,此时可以通过wholeTextFiles
方法的第二个参数控制最小的分区数量。 - 对于 SequenceFiles,请使用
SparkContext
的sequenceFile[K, V]
方法,其中 K 和 V 分别指定文件中 key 和 value 的数据类型,它们应该是 Hadoop Writable 接口的子类,比如说 IntWritable 和 Text。此外,可以指定一些常用的基础数据类型,例如,sequenceFile[Int, String]
会自动读取 IntWritable 和 Text。 - 对于其他的 Hadoop InputFormat,可以调用
SparkContext.hadoopRDD
方法获取 RDD,该方法需要一些参数:JobConf
对象,input format class,key class 和 value class。这些参数可以按照 Hadoop job 的方式传入。还可以通过SparkContext.newAPIHadoopRDD
方法创建基于新 MapReduce API(org.apache.hadoop.mapreduce
)的 RDD。 -
RDD.saveAsObjectFile
和SparkContext.objectFile
方法支持把一个 RDD 存储为序列化后的 Java 对象,相比于 Avro 这不是一个高效的序列化方式,Avro 提供了一种简单的方式存储任何 RDD。
Java
Spark 可以从任何支持 Hadoop 文件系统的数据源中创建 RDD,包括本地文件系统,HDFS,Cassandra, HBase,Amazon S3 等等。Spark 支持文本文件,SequenceFiles,以及其他 Hadoop InputFormat。
文本文件的 RDD 可以通过 SparkContext
对象的 textFile
方法获得。该方法接收一个 URI 地址(本地路径,或 者 hdfs://
,s3a://
,etc URI)当作参数,将地址中的文件读取为文本行的集合。例如:
JavaRDD<String> distFile = sc.textFile("data.txt");
一旦创建成功,distFile
对象便可以执行 dataset 的操作。例如,可以通过 map
和 reduce
方法计算所有文本行的长度之和:distFile.map(s -> s.length()).reduce((a, b) -> a + b)
。
Spark 读取文件的一些提醒:
- 如果使用本地文件系统的路径,该路径在 worker 节点上也必须是可读的。可以将文件拷贝到所有的 worker 节点或者使用网络共享文件系统。
- Spark 中包括
textFile
在内的所有文件类型输入方法,支持输入参数为目录、压缩文件,也支持通配符表达式。例如,可以使用textFile("/my/directory")
,textFile("/my/directory/*.txt")
,textFile("/my/directory/*.gz")
。 -
textFile
方法可以接收第二个参数来控制文件的分区数量。默认情况下 Spark 为每个 block(在 HDFS 中默认 128M)生成一个分区,但是也可以指定一个较大的数字作为分区数。注意分区数量不会少于 block 数量。
除了文本文件之外,Spark 的 Java API 还支持其他几种数据格式:
-
JavaSparkContext.wholeTextFiles
方法可以读取指定目录中多个比较小的文本文件,返回一个 (filename, content) 对的 RDD,而不是像textFile
那样返回文件中的每一行文本。 - 对于 SequenceFiles,请使用
SparkContext
的sequenceFile[K, V]
方法,其中 K 和 V 分别指定文件中 key 和 value 的数据类型,它们应该是 Hadoop Writable 接口的子类,比如说 IntWritable 和 Text。此外,可以指定一些常用的基础数据类型,例如,sequenceFile[Int, String]
会自动读取 IntWritable 和 Text。 - 对于其他的 Hadoop InputFormat,可以调用
JavaSparkContext.hadoopRDD
方法获取 RDD,该方法需要一些参数:JobConf
对象,input format class,key class 和 value class。这些参数可以按照 Hadoop job 的方式传入。还可以通过SparkContext.newAPIHadoopRDD
方法创建基于新 MapReduce API(org.apache.hadoop.mapreduce
)的 RDD。 -
JavaRDD.saveAsObjectFile
和JavaSparkContext.objectFile
方法支持把一个 RDD 存储为序列化后的 Java 对象,相比于 Avro 这不是一个高效的序列化方式,Avro 提供了一种简单的方式存储任何 RDD。
Python
PySpark 可以从任何支持 Hadoop 文件系统的数据源中创建 RDD,包括本地文件系统,HDFS,Cassandra, HBase,Amazon S3 等等。Spark 支持文本文件,SequenceFiles,以及其他 Hadoop InputFormat。
文本文件的 RDD 可以通过 SparkContext
对象的 textFile
方法获得。该方法接收一个 URI 地址(本地路径,或 者 hdfs://
,s3a://
,etc URI)当作参数,将地址中的文件读取为文本行的集合。例如:
>>> distFile = sc.textFile("data.txt")
一旦创建成功,distFile
对象便可以执行 dataset 的操作。例如,可以通过 map
和 reduce
方法计算所有文本行的长度之和:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
。
Spark 读取文件的一些提醒:
- 如果使用本地文件系统的路径,该路径在 worker 节点上也必须是可读的。可以将文件拷贝到所有的 worker 节点或者使用网络共享文件系统。
- Spark 中包括
textFile
在内的所有文件类型输入方法,支持输入参数为目录、压缩文件,也支持通配符表达式。例如,可以使用textFile("/my/directory")
,textFile("/my/directory/*.txt")
,textFile("/my/directory/*.gz")
。 -
textFile
方法可以接收第二个参数来控制文件的分区数量。默认情况下 Spark 为每个 block(在 HDFS 中默认 128M)生成一个分区,但是也可以指定一个较大的数字作为分区数。注意分区数量不会少于 block 数量。
除了文本文件之外,Spark 的 Python API 还支持其他几种数据格式:
-
SparkContext.wholeTextFiles
方法可以读取指定目录中多个比较小的文本文件,返回一个 (filename, content) 对的 RDD,而不是像textFile
那样返回文件中的每一行文本。 -
RDD.saveAsPickleFile
andSparkContext.pickleFile
支持把一个 RDD 存储为 pickled Python 对象,pickle 序列化采用批量模式,默认的 batch size 是 10。 - SequenceFiles 以及其他 Hadoop Input/Output Formats。
注意该功能目前被标记为 Experimental
而且是针对高级开发者的,在将来的版本中可能会被移除,替换为基于 Spark SQL 的读写支持,Spark SQL 是更好的一种方式。
Writable 支持
PySpark SequenceFile 支持在 Java 中加载键值对形式的 RDD,将 Writable 转换为 Java 数据类型,然后通过 Pyrolite pickle Java 对象。当将键值对形式的 RDD 存储到 SequenceFile 中时刚好反过来,unpickle Python 对象到 Java 对象,然后再转换成 Writable。下述 Writable 类型可以自动转换:
Writable Type | Python Type |
---|---|
Text | unicode str |
IntWritable | int |
FloatWritable | float |
DoubleWritable | float |
BooleanWritable | bool |
BytesWritable | bytearray |
NullWritable | None |
MapWritable | dict |
数组类型并不是开箱即用的,在读写的时候需要指定 ArrayWritable
的子类型。在写出时需要指定把数组转换为 ArrayWritable
子类型的转换器,在读取时默认的转换器可以将 ArrayWritable
子类型转换为 Java Object[]
然后再 pickle 成 Python tuple。如果要获取基础数据类型 Python 数组的 array.array
,需要指定转换器。
存储和加载 SequenceFiles
与文本文件类似,可以通过指定路径来存储和加载 SequenceFiles。可以指定 key class 和 value class,但对于标准的 Writable 可以不用指定。
Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value classes can be specified, but for standard Writables this is not required.
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
存储和加载其他 Hadoop Input/Output Formats
PySpark 还可以读取任意 Hadoop InputFormat 或者写出 Hadoop OutputFormat,对新老 Hadoop Mapreduce API 都适用。如果需要,还能以 Python dict 的形式传递一个 Hadoop Configuration 参数。下面是一个使用 Elasticsearch ESInputFormat 的例子:
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
"org.apache.hadoop.io.NullWritable",
"org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf)
>>> rdd.first() # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
{u'field1': True,
u'field2': u'Some Text',
u'field3': 12345})
注意,如果 InputFormat 只依赖于 Hadoop 配置信息 和/或 输入路径,而且 key class 和 value class 出现在上面的表格当中,采用以上这种方式就问题不大。
如果需要处理自定义序列化方式的二进制数据(比如从 Cassandra / HBase 中加载数据),则需要首先将数据转换成 Scala/Java 类型,以便 Pyrolite’s pickler 处理。Converter 特质就是针对这种场景的,继承该特质然后实现其中的 convert
方法来处理转换逻辑,记住转换器类和跟自定义的 InputFormat
相关的依赖都要被打包在 Spark job 的 jar 文件中,也会包含在 PySpark 的 classpath 中。
关于使用自定义转换器操作 Cassandra / HBase InputFormat
和 OutputFormat
的示例参见 Python examples 和 the Converter examples。
RDD 算子
RDD 支持两种类型的操作算子:能够从一个 dataset 得到另一个新的 dataset 的 transformation 算子,以及在计算后能够返回数据到 driver 程序的 action 算子。例如,transformation 算子 map
把一个指定函数作用于 RDD 中的每个元素,将结果生成一个新的 RDD。另一方面,action 算子 reduce
通过一个相同函数将 RDD 中的每个元素聚合成一个结果值到 driver 程序中(还有一个并行化的 reduceByKey
算子返回一个新的 RDD)。
在 Spark 当中所有的 transformation 操作都是惰性的,它们并不是立即计算出结果,而是先记录下应用在 dataset 上的 transformation 操作。transformation 算子的计算只会被一个 action 算子触发,action 算子的计算结果会返回到 driver 程序。这样的设计可以让 Spark 应用程序更高效的运行。例如,你会发现通过 map
算子得到的 RDD 会被 reduce
算子采用,后者只把聚合结果返回给 driver,而不是巨大的 mapped dataset。
默认情况下,如果一个 transformation 操作被多个 action 算子使用,transformation 操作将会被重复计算多次。不过可以通过 persist
(或者 cache
)方法把一个 RDD 持久化到内存中,这样在下一次 action 算子用到该 RDD 的时候就可以直接从内存读取。Spark 也支持将 RDD 持久化到硬盘,或者在集群的其他节点上存储副本。
基本操作
Scala
来用一个简单的例子说明 RDD 的基本操作:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
第一行从一个外部文件中定义了一个 RDD,在定义后这个数据集并没有被加载到内存中或是被触发计算,lines
变量仅仅是指向那个文件。第二行定义了一个新的变量 lineLengths
来表示 transformation 算子操作后的 RDD,同样出于惰性计算的原因,lineLengths
也没有被立即计算。最终,执行 reduce
这个 action 算子,此时 Spark 将整个计算链条分解成多个子任务在不同的节点上分布式的运行,每个节点都会计算 map
的一部分和相应的本地 reduce
操作,只把结果返回给 driver 程序。
如果之后还会用到 lineLengths
这个 RDD,可以将其缓存:
lineLengths.persist()
这个操作需要在 reduce
之前,lineLengths
的结果在第一次运行之后被缓存。
Java
来用一个简单的例子说明 RDD 的基本操作:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
第一行从一个外部文件中定义了一个 RDD,在定义后这个数据集并没有被加载到内存中或是被触发计算,lines
变量仅仅是指向那个文件。第二行定义了一个新的变量 lineLengths
来表示 transformation 算子操作后的 RDD,同样出于惰性计算的原因,lineLengths
也没有被立即计算。最终,执行 reduce
这个 action 算子,此时 Spark 将整个计算链条分解成多个子任务在不同的节点上分布式的运行,每个节点都会计算 map
的一部分和相应的本地 reduce
操作,只把结果返回给 driver 程序。
如果之后还会用到 lineLengths
这个 RDD,可以将其缓存:
lineLengths.persist(StorageLevel.MEMORY_ONLY());
这个操作需要在 reduce
之前,lineLengths
的结果在第一次运行之后被缓存。
Python
来用一个简单的例子说明 RDD 的基本操作:
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
第一行从一个外部文件中定义了一个 RDD,在定义后这个数据集并没有被加载到内存中或是被触发计算,lines
变量仅仅是指向那个文件。第二行定义了一个新的变量 lineLengths
来表示 transformation 算子操作后的 RDD,同样出于惰性计算的原因,lineLengths
也没有被立即计算。最终,执行 reduce
这个 action 算子,此时 Spark 将整个计算链条分解成多个子任务在不同的节点上分布式的运行,每个节点都会计算 map
的一部分和相应的本地 reduce
操作,只把结果返回给 driver 程序。
如果之后还会用到 lineLengths
这个 RDD,可以将其缓存:
lineLengths.persist()
这个操作需要在 reduce
之前,lineLengths
的结果在第一次运行之后被缓存。
传递函数
Scala
Spark API 十分依赖从 driver 程序中传递函数到集群上运行。有两种推荐的方式:
Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:
- 匿名函数(Anonymous function syntax), 适用于代码很少的函数。
- 单例对象中的静态方法。例如,你可以定义一个
object MyFunctions
然后传递方法MyFunctions.func1
,如下所示:
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
注意也可以传递一个实例对象(而不是单例对象)的方法引用,这种方式需要将包含目标方法的实例对象传递出去,例如:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
这里,如果创建一个新的 MyClass
实例然后调用 doStuff
方法,其中的 map
方法又引用了该实例对象的 func1
方法,所以这个实例对象需要被发送到集群中去参与计算。相当于 rdd.map(x => this.func1(x))
。
类似的,访问外部对象的实例字段也会引用整个对象:
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
相当于 rdd.map(x => this.field + x)
,引用了整个实例对象 this
。为了避免这种情况,最简单的方式是把实例字段赋值给一个 local 变量,而不是直接访问外部对象的实例字段。
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
Java
Spark API 十分依赖从 driver 程序中传递函数到集群上运行。在 Java 中,函数需要被定义为实现 org.apache.spark.api.java.function 包中接口的子类。有两种方式可以创建这样的函数:
- 实现 Function 接口或者使用匿名以及非匿名的内部类,之后传递一个实例给 Spark。
- 使用 lambda 表达式(lambda expressions)定义一个实现类实例。
尽管文档中处于简洁的考虑使用 lambda 语法,使用完整的写法也很简单。例如:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
或者,如果觉得上述写法太过笨重,还可以:
class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
注意 Java 中的匿名内部类也可以在闭包作用域内使用标记为 final
的变量,Spark 会传递这些变量分副本到其他节点,就像其他语言一样。
Python
Spark API 十分依赖从 driver 程序中传递函数到集群上运行。有三种推荐的实现方式:
- Lambda 表达式(Lambda expressions),对于简单的函数可以将其写成一个表达式。(Lambda 不支持多语句函数或者不返回单值的语句)
- 对于复杂的函数,可以使用在函数中用
def
定义一个函数。 - 模块中的顶级函数。
例如,传递一个定义的函数:
"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)
注意也可以传递一个实例对象(而不是单例对象)的方法引用,这种方式需要将包含目标方法的实例对象传递出去,例如:
class MyClass(object):
def func(self, s):
return s
def doStuff(self, rdd):
return rdd.map(self.func)
这里,如果创建一个新的 MyClass
实例然后调用 doStuff
方法,其中的 map
方法又引用了该实例对象的 func1
方法,所以这个实例对象需要被发送到集群中去参与计算。
类似的,访问外部对象的实例字段也会引用整个对象:
class MyClass(object):
def __init__(self):
self.field = "Hello"
def doStuff(self, rdd):
return rdd.map(lambda s: self.field + s)
为了避免这种情况,最简单的方式是把实例字段赋值给一个 local 变量,而不是直接访问外部对象的实例字段。
def doStuff(self, rdd):
field = self.field
return rdd.map(lambda s: field + s)
理解闭包
在 Spark 中理解在集群中参与计算的变量和方法的作用域和生命周期是一个困难的事情,更新 RDD 算子作用域之外的变量是问题的源头所在。下面的例子展示了在 foreach()
算子中增加计数的场景,其他的算子也会有相同的问题。
示例
下面是一个计算 RDD 中元素数量的例子,计算的结果取决于子任务的执行是不是在同一个 JVM 当中。将本地(local
)模式(--master = local[n]
)和集群模式(e.g. 通过 spark-submit 脚本提交到 YARN)做个对比:
Scala
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
Java
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
Python
counter = 0
rdd = sc.parallelize(data)
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print("Counter value: ", counter)
本地模式 vs. 集群模式
上面代码的行为是不确定的,结果可能不符合预期。对于一个 Job,Spark 会把被操作的 RDD 分解成多个子任务,每个子任务会在一个 execuror 中运行。在运行之前,Spark 会计算子任务的闭包变量。闭包变量是指 RDD 计算过程(在这里是指 foreach()
))中需要用到的变量和方法。闭包变量会被序列化之后传递给每个 executor。
由于传递给 executor 的闭包变量是一个副本,当变量 counter 在方法 foreach
中被访问的时候,它已经不在是 driver 程序中的那个 counter 了。driver 端端内存中还存在着一个 counter 变量,但是它无法被其他 executor 访问了!Executors 只能看到被序列化之后传递过来的副本。所以,counter 变量的最终值还是 0,因为在 executor 端被更新的只是它被序列化之后的闭包变量。
在本地模式的某些情况下,foreach
方法是在一个 JVM 中执行的,可以访问到 driver 端端那个 counter 变量,然后对它做更新操作。
为了保证这类场景下的语义正确,应该使用累加器(Accumulator
)。Spark 当中的累加器提供了一种安全更新共享变量的机制,专门应对任务分散到集群中不同节点上执行的情况。文档中的累加器板块有详细的讨论。
一般来说,闭包变量——被应用在循环语句中或者本地定义的方法,不应该更新外部的全局作用域的状态。在 Spark 中引用外部可变对象的行为是未定义的。有些代码在本地模式下是可行的,但那只是巧合,这样的行为不会发生在集群模式下。如果需要全局聚合的支持,请使用累加器。
打印 RDD 的数据
另一个常见的误区是通过 rdd.foreach(println)
或者 rdd.map(println)
方法打印 RDD 的元素。在单台机器上,执行结果将会符合预期,RDD 中所有的元素都会打印在控制台。然而,在集群模式下,stdout
中的输出是由 executor 执行的,结果会打印在 executor 端的标准输出中,而不是在 driver 端,所以 driver 端不会显示那些数据。如果需要在 driver 端打印,应该先 collect()
调用方法把 RDD 中所有的数据都拉取到 driver 端;如果你只需要打印 RDD 中的几个元素,更安全的方式是调用 take()
方法:rdd.take(100).foreach(println)
。
使用键值对
Scala
尽管大多数的 RDD 算子可以包含任意类型的对象,还有一些特殊的算子只能够作用于键值对类型的 RDD。最常见的就是「shuffle」操作,比如说通过 key 来进行分组聚合。
在 Scala 中,对于 Tuple2 对象(语法内置的元组类型,可以通过 (a, b)
快速创建)类型的 RDD 这些算子是即开即用的。操作键值对类型的算子都在类 PairRDDFunctions 中,会自动包装二元组为键值对形式。
例如,下面的代码使用了键值对类型的 reduceByKey
算子来计算相同的一行在文件中出现了多少次:
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
还可以通过 counts.sortByKey()
方法来给这些键值对按字典序排序,最后通过 counts.collect()
方法将结果作为一个数组汇聚到 driver 端。
注意:当使用自定义类的对象作为 key 时,需要保证该类重写了 equals()
和相应的 hashCode()
方法。详情参见 Object.hashCode() documentation。
Java
尽管大多数的 RDD 算子可以包含任意类型的对象,还有一些特殊的算子只能够作用于键值对类型的 RDD。最常见的就是「shuffle」操作,比如说通过 key 来进行分组聚合。
在 Java 中,键值对使用 Scala 标准库中的 scala.Tuple2 类表示,可以通过构造方法 new Tuple2(a, b)
创建一个二元组,通过 tuple._1()
和 tuple._2()
访问 kv 字段。
键值对类型的 RDD 通过 JavaPairRDD 类表示,可以通过像 mapToPair
和 flatMapToPair
这样的特殊的 map
算子把一个 JavaRDD 转换成 JavaPairRDD。JavaPairRDD 可以使用标准的 RDD 算子和特有的 kv 操作算子。
例如,下面的代码使用了键值对类型的 reduceByKey
算子来计算相同的一行在文件中出现了多少次:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
还可以通过 counts.sortByKey()
方法来给这些键值对按字典序排序,最后通过 counts.collect()
方法将结果作为一个数组汇聚到 driver 端。
注意:当使用自定义类的对象作为 key 时,需要保证该类重写了 equals()
和相应的 hashCode()
方法。详情参见 Object.hashCode() documentation。
Python
尽管大多数的 RDD 算子可以包含任意类型的对象,还有一些特殊的算子只能够作用于键值对类型的 RDD。最常见的就是「shuffle」操作,比如说通过 key 来进行分组聚合。
在 Python 中,这些算子支持内置元组类型的 RDD,比如 (1, 2)
,创建好元组类型的 RDD 之后就可以使用相关的算子。
例如,下面的代码使用了键值对类型的 reduceByKey
算子来计算相同的一行在文件中出现了多少次:
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
还可以通过 counts.sortByKey()
方法来给这些键值对按字典序排序,最后通过 counts.collect()
方法将结果作为一个数组汇聚到 driver 端。
Transformation 算子
下面的表格中列出了 Spark 中常用的 transformation 算子,详情参见 RDD API doc(Scala, Java, Python, R)和键值对 RDD functions doc(Scala, Java)。
Transformation | Meaning |
---|---|
map(func) | 把每个元素经过函数 func 的映射结果生成一个新的 RDD |
filter(func) | 把每个元素经过过滤函数 func 的计算,返回 true 的元素生成一个新的 RDD |
flatMap(func) | 与 map 类似,但是每一个输入值都可以被映射成 0 或者多个新的元素(func 的返回值应该是一个序列(Seq)而不是单值)。 |
mapPartitions(func) | 与 map 类似,但是 RDD 的每个分区独立计算,所以 func 的类型应该是 Iterator<T> => Iterator<U>。 |
mapPartitionsWithIndex(func) | 与 mapPartitions 类似,额外提供一个表示分区 ID 的整数,所以 func 的类型应该是 (Int, Iterator<T>) => Iterator<U>。 |
sample(withReplacement, fraction, seed) | 按照比例对 RDD 进行数据采样,需要指定一个元素能否被采样多次,还需要提供一个随机数生成种子。 |
union(otherDataset) | 合并两个 RDD 成为一个。 |
intersection(otherDataset) | 计算两个 RDD 的交集。 |
distinct([numPartitions])) | Return a new dataset that contains the distinct elements of the source dataset. |
groupByKey([numPartitions]) | 对 kv 类型的 RDD 按 key 进行分分组,返回一个新的 (K, Iterable<V>) 类型的 RDD。注意:如果在分组之后有聚合操作(比如说求和或者取平均值),那么 reduceByKey 和 aggregateByKey 会有更好的性能。注意:默认情况下新的 RDD 的分区数与旧 RDD 相同,不过你可以通过可选参数 numPartitions 指定新 RDD 的分区数。 |
reduceByKey(func, [numPartitions]) | 适用于 kv 类型的 RDD,对每个 key 的所有 value 值应用聚合函数 func 求值,聚合函数的类型必须为 (V,V) => V,跟 groupByKey 一样,可以通过可选参数 numPartitions 指定 reduce 的分区数。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) | 适用于 kv 类型的 RDD,对每个 key 的所有 value 值进行聚合操作,聚合值的类型可以和 V 不同。聚合操作由分区内聚合函数和分区间融合函数以及一个中立的初始值共同协作完成。跟 groupByKey 一样,可以通过可选参数 numPartitions 指定聚合操作的分区数。 |
sortByKey([ascending], [numPartitions]) | 适用于 kv 类型的 RDD,按照 key 对分区内的数据进行排序,返回排序后的 RDD。可以通过 boolean 类型的参数ascending 指定正序或者逆序。 |
join(otherDataset, [numPartitions]) | 适用于 kv 类型的 RDD,调用者和参数分别是 (K, V) 和 (K, W) 类型的 RDD,返回一个 (K, (V, W)) 类型的 RDD,根据 key 进行连接操作。外连接目前支持 leftOuterJoin ,rightOuterJoin 和 fullOuterJoin 。 |
cogroup(otherDataset, [numPartitions]) | 适用于 kv 类型的 RDD,调用者和参数分别是 (K, V) 和 (K, W) 类型的 RDD,返回一个 (K, (Iterable<V>, Iterable<W>)) 类型的 RDD,也被叫做 groupWith 算子。 |
cartesian(otherDataset) | 调用者和参数分别是 T 和 U 类型的 RDD,计算两者的笛卡尔积。 |
pipe(command, [envVars]) | 通过一个 shell 命令操作每个分区中的数据,RDD 中的每个元素被输入处理进程的标准输入,将处理进程的标准输出作为新 RDD 中的元素。 |
coalesce(numPartitions) | 将 RDD 的分区减少到指定数量。适用于过滤了一个大数据集之后只留下少部分数据的场景。 |
repartition(numPartitions) | 对 RDD 进行重分区操作,将数据随机分散到各个分区中去。这个算子总是会对所有数据进行 shuffle 操作,造成相应的网络开销。 |
repartitionAndSortWithinPartitions(partitioner) | 通过指定的分区器对 RDD 进行重分区操作,在每个重分区后的分区中,按照 key 进行排序。该算子比 repartition 之后再排序更高效,因为在排序发生在 shuffle 过程中。 |
Action 算子
下面的表格中列出了 Spark 中常用的 action 算子,详情参见 RDD API doc(Scala, Java, Python, R)和键值对 RDD functions doc(Scala, Java)。
Action | Meaning |
---|---|
reduce(func) | 按照聚合函数 func 对 RDD 中的元算进行聚合。聚合函数的算法需要满足交换律和结合律从而能够正确的并行计算。 |
collect() | 返回包含 RDD 中所有元素的数组,通常是用在计算之后数据量比较小的情况下。 |
count() | 返回 dataset 中元素的计数。 |
first() | 返回 RDD 中的第一个元算(与 take(1) 相同)。 |
take(n) | 返回一个包含 RDD 中 n 个元素的数组。 |
takeSample(withReplacement, num, [seed]) | 返回 RDD 中随机取样的 n 个元素的数组,需要指定一个元素能否被采样多次,还可以提供一个可选的随机数生成种子。 |
takeOrdered(n, [ordering]) | 根据默认的排序规器或者指定的排序器返回 RDD 中前 n 个元素。 |
saveAsTextFile(path) | 将 RDD 中的元素以文本的格式写入到指定的本地文件系统,HDFS 或者其他 Hadoop 支持的文件系统。Spark 会调用元素的 toString 方法将其转换为文本文件中的一行。 |
saveAsSequenceFile(path) (Java and Scala) | 将 RDD 中的元素以 Hadoop SequenceFile 的格式写入到指定的本地文件系统,HDFS 或者其他 Hadoop 支持的文件系统。适用于 kv 类型的 RDD,其中 key 和 value 的类型需要实现 Hadoop Writable 接口。 在 Scala 中,也适用于可以隐式转换为 Writable 类型的基础数据类型(例如 Int,Double,String 等等)。 |
saveAsObjectFile(path) (Java and Scala) | 按照 Java 序列化的格式将 RDD 中的元素写入文件,之后可以通过SparkContext.objectFile() 加载。 |
countByKey() | 适用于 kv 类型的 RDD,返回一个 (K, Int) 类型的 HashMap,value 为 key 的计数。 |
foreach(func) | 对 RDD 中的每个元素进行调用 func 函数。该算子通常用来处理计算过程中的副作用(状态变更或者 IO),比如说更新累加器(Accumulator)或者与外部存储系统交互。 注意:更新非累加器类型的外部变量的结果是不确定的,详情参见 Understanding closures 。 |
Spark 的 RDD API 中的某些 action 算子还有一个异步的版本,比如 foreachAsync
之于 foreach
, 异步版本立即返回一个 FutureAction
对象而不是一直阻塞直到 action 算子计算完成。这种方式可以控制异步执行 action 算子时的等待时间。
Shuffle 算子
Spark 中有一个很关键的概念叫做 shuffle,shuffle 是 Spark 中重分布数据的一个机制,以便于在不同的分区间进行分组操作,这就会导致在不同的节点之间拷贝数据,于是 shuffle 成了一个复杂而且开销很大的操作。
底层原理
为了理解在 shuffle 过程中究竟发生了什么,在此以 reduceByKey
算子为例。reduceByKey
算子会生成一个新的 RDD,其中相同 key 的 value 会融合成一个二元组——key 值和通过聚合函数对 value 计算出的聚合值。挑战在于所有相同 key 对应的 value 值并不一定在同一个分区内或者在同一节点上,但是它们必须被重新整合来进行计算。
在 Spark 中,数据通常不会按照特定算子的需要分布在它们应该在的位置。在计算过程中,一个单独的子任务会在一个分区上运行,所以 reduceByKey
的 reduce 子任务要运行,就需要把各个节点上的数据重分布到不同的分区里。需要在每个分区中找到相同 key 的 value 值,再把这些 value 值汇聚到同一个分区中才能为这个 key 计算聚合值,这个过程就叫做 shuffle。
尽管在 shuffle 刚结束时每个分区中的数据集是确定的,分区的顺序也是确定的,但是分区中的数据却不一定是有序的。如果要求 shuffle 之后数据是严格有序的,可以采用以下几种方式:
- 通过
mapPartitions
算子对每个分区内的数据进行排序,例如使用.sorted
方法 - 通过
repartitionAndSortWithinPartitions
算子在重分区的同时高效的对数据进行排序 - 通过
sortBy
算子得到一个全局有序的 RDD
会导致 shuffle 发生的算子包括像 repartition
和 coalesce
这样的重分区(repartition)算子,像 groupByKey
和 reduceByKey
这样的基于键(‘ByKey)的算子,以及像 cogroup
和 join
这样的连接(join)算子。
性能影响
Shuffle 是一个开销很大的算子,因为它会引发磁盘 I/O,数据序列化以及网络 I/O。为了安排 shuffle 需要的数据,Spark 会生成子任务集合—— map 子任务来组织数据,reduce 子任务来执行聚合操作。子任务的命名规则来源于 MapReduce 思想而不是指 Spark 当中的 map
和 reduce
算子。
在内部,一个 map 任务的计算结果保存在内存中直到装不下为止。之后这些计算结果会基于目标分区(下游的 reduce 分区)进行排序,写入到一个单独的文件中。在 reduce 端,会拉取相应分区的 map 端计算结果,也就是排序后的文件中的一部分。
在 map 端的计算结果被传递的前后,Shuffle 操作会在内存中组织管理这些数据,所以会消耗可观的堆内存。具体来说,reduceByKey
和 aggregateByKey
算子在 map 端组织这些数据,而 'ByKey
类型的算子在 reduce 端进行。当内存装不下中间计算结果的时候,Spark 会把数据溢写到硬盘,因而导致磁盘 I/O 以及更多的垃圾回收。
Shuffle 还会在磁盘上生成大量的中间文件。对于 Spark 1.3 版本,这些文件直到相应的 RDD 不在被需要时才会被清理。这样做的好处是避免在中间结果被重用的时候进行重复的计算。如果应用程序保持了对 RDD 的引用(通过可达性分析判定)或者 GC 发生的并不频繁,清理工作可能在很长一段时间之后才会进行。这意味着长期运行的 Spark 程序可能会占用很多的磁盘空间。在配置 Spark 任务的上下文时,临时目录通过参数 spark.local.dir
指定。
Shuffle 过程可以通过一系列的配置参数进行调优,参见 Spark Configuration Guide 章节的「Shuffle Behavior」部分。
RDD 持久化
将 RDD 持久化到内存是 Spark 中一个很重要的功能。一旦持久化了一个 RDD,每个节点在内存中存储 RDD 在该节点的部分分区,在其他 action 算子用到的时候重复利用。这样可以加速之后的 action 算子的计算(通常会有 10 倍以上的提升)。在迭代计算和快速交互式响应的场景下缓存是一个关键的工具。
你可以通过 persist()
或者 cache()
方法将一个 RDD 标记为需要被持久化。当 RDD 在 action 算子中第一次被计算之后,计算结果将会被保留在集群上各个节点的内存中。Spark 的缓存具备容错能力——如果某个 RDD 的分区丢失,该分区会根据已经创建好的转换操作自动重算。
另外,每一个被持久化的 RDD 可以选择不同的存储级别(storage level),例如,可以持久化到磁盘,以序列化 Java 对象的方式(为了节省内存)保存到内存中,或者在其他节点备份。这些存储级别可以通过传递一个 StorageLevel
(Scala, Java, Python) 参数给 persist()
方法来设置。 cache()
方法使用默认存储级别: StorageLevel.MEMORY_ONLY
(以未序列化 Java 对象的方式存储到内存),完整的存储级别如下表所示:
Storage Level | Meaning |
---|---|
MEMORY_ONLY | 以未序列化 Java 对象的方式存储到 JVM中。如果内存不够用,有些分区则不会被缓存,在被重用的时候会被重新计算。该选项是默认存储级别。 |
MEMORY_AND_DISK | 以未序列化 Java 对象的方式存储到 JVM中。如果内存不够用,存储多出来的部分到硬盘,这部分数据在被重用的时候会从硬盘读取。 |
MEMORY_ONLY_SER (Java and Scala) | 以序列化 Java 对象的方式存储到 JVM中(每个分区存储为一个字节数组)。相比于未序列化,这种方式可以提高空间利用率 ,尤其在使用快速序列化器(fast serializer)的时候,但是在读取的时候会消耗更多的 CPU 资源。 |
MEMORY_AND_DISK_SER (Java and Scala) | 与 MEMORY_ONLY_SER 类似,但是在内存不够用的时候会把多出来的部分溢写到硬盘而不是在被重用的时候再次计算。 |
DISK_ONLY | 存储 RDD 到硬盘。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 与上述级别类型,但是会复制一份到集群中的另一个节点做备份用。 |
OFF_HEAP (experimental) | 与 MEMORY_ONLY_SER类似,但是使用堆外内存(off-heap memory),这种方式需要开启堆外内存的使用。 |
注意:在 Python 中,总会通过 Pickle 库对存储对象进行序列化,所以选择哪种存储级别并不对对其造成影响。Python 中支持的存储级别包括MEMORY_ONLY
,MEMORY_ONLY_2
,MEMORY_AND_DISK
,MEMORY_AND_DISK_2
, DISK_ONLY
和 DISK_ONLY_2
。
Spark 还会自动的持久化 shuffle 算子的一些中间计算结果(e.g. reduceByKey
),即使用户不调用 persist
方法。这样做是为了避免在节点宕机时重算整个输入。不过还是建议用户在需要重用 RDD 的时候调用 persist
方法来持久化。
Spark also automatically persists some intermediate data in shuffle operations (e.g. reduceByKey
), even without users calling persist
. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call persist
on the resulting RDD if they plan to reuse it.
如何选择存储级别
Spark 中的存储级别旨在提供针对内存使用和 CPU 资源的不同的权衡方案,建议以下面的流程来选择存储级别:
- 如果 RDD 的大小在内存中装得下,那就使用默认的存储级别(
MEMORY_ONLY
),这是对于 CPU 来说最高效的方案,可以让 RDD 算子按最快的速度运行。 - 如果不是,可以考虑
MEMORY_ONLY_SER
以及选取一个快速的序列化库(selecting a fast serialization library)来提高空间利用率,同时还可以提供相对较快的访问速度。(Java 和 Scala) - 除非计算 RDD 的代价很高或者过滤了很大体量的数据,不要选择溢写到硬盘的方案。否则重新计算可能比从硬盘读取要快。
- 如果需要快速从失败中恢复,可以考虑备份存储级别的方案(比如使用 Spark 为 web 应用提供服务的场景)。所有的存储级别都支持通过重新计算的方式进行容错,而备份存储级别可以不用等待重新计算失效的分区。
删除数据
Spark 自动监控集群中各个节点上缓存的使用,通过 LRU 策略删除老的数据分区。如果需要手动移除 RDD 而不是等待自动清理,可以调用 RDD.unpersist()
方法。注意该方法默认情况下不会阻塞,如果需要阻塞直到空间被释放,请在方法中指定参数 blocking=true
。
共享变量
通常,一个函数被传递给 Spark 算子(比如 map
或者 reduce
)到集群中的节点上运行,函数所用到的变量都是被复制的独立副本。这些变量被拷贝到集群中的各个节点,对其的更新都不会反映在 driver 中的同名变量上。支持子任务之间常规的可读可写的共享变量是一个很低效的方式。然而 Spark 还是提供了两种有所限制的针对特定场景的共享变量(shared variables):广播变量和累加器。
广播变量
广播变量允许程序可以访问一个在集群中每个节点都有一份缓存的只读变量,而不是由每个子任务在运行时拷贝一份传递过去。例如,广播变量可以用来以一种高效的方式在集群中拷贝一个比较大的数据集。Spark 还尝试通过广播变量实现广播算法来减少 reduce 端的网络传输开销。
Spark 的 action 算子的执行由一系列的阶段构成,这些阶段由 shuffle 操作界定。Spark 会自动的将每个阶段的子任务需要用到的相同数据广播到集群中去。被广播的变量以序列化后的方式缓存并在被使用前进行反序列化。这说明只有在不同的阶段中都用到同一份数据或者以未序列化方式保存数据很重要的时候才需要显示的定义广播变量。
通过调用方法 SparkContext.broadcast(v)
可以创建一个保存变量 v
的广播变量。广播变量是原始变量的包装器,真实的数据可以通过调用 value
方法获得。如下所示:
Scala
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Java
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
Python
>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]
在广播变量被创建之后,在分布式计算过程中所有使用到变量 v
的地方都要替换为对广播变量的操作,从而避免变量 v
在集群中被传递多次。另外,在使用过程中变量 v
不应该被更新,以保证所有的节点拿到的都是相同数据的广播变量(比如说变量在之后被传递给了一个新的节点)。
调用 .unpersist()
方法可以释放广播变量在所有 executor 上占用的空间。如果之后该广播变量又被用到了,它会被再次广播。调用 .destroy()
方法来永久的释放该广播变量所占用的资源。在此之后广播变量就无法被使用了。注意这些方法默认情况下不会阻塞,如果需要阻塞直到资源被释放,请在方法中指定参数 blocking=true
。
累加器
累加器是一种应用满足交换律和结合律的算法只能被「加」的变量,所以可以在并行环境下高效执行。它们可以用来实现计数(就像在 MapReduce 中那样)和求和。Spark 原生支持数字类型的累加器,自定义类型的累加器需要用户实现特定的接口。
用户可以创建命名或者匿名的累加器。如下图所示,一个命名的累加器(在这里名称为 counter
)会展示在 web 界面中更新它的那个 stage 页面,在 「Tables」表格中展示了每个子任务该累加器所做的更新。
在界面上追踪累加器的更新可以帮助理解程序在运行阶段的进度。(注意:目前 Python 还不支持)
Scala
数字类型的累加器可以通过调用 SparkContext.longAccumulator()
或者 SparkContext.doubleAccumulator()
方法来创建一个 Long 或者 Double 类型的累加器。之后在集群中运行的子任务就可以通过 add
方法对其进行更新。但是在子任务中不能读取累加器的值,只有 driver 程序可以通过 value
方法读取累加器的值。
下面的代码展示了如何使用累加器对数组内的元算求和:
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
尽管代码中使用的是内置的 Long 类型的累加器,程序员也可以通过继承 AccumulatorV2 来实现自己的累加器。抽象类 AccumulatorV2 有一些必须重写的方法:reset
方法用来将累加器重置为零值,add
方法用来更新累加器的值,merge
方法用来融合另一个同类型的累加器。其他必须重写的方法参考 API documentation。例如,假设有一个类 MyVector
代表数学中的向量:
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
注意,当程序员定义自己的累加器的时候,结果类型可以跟累加数据的类型不一样。
Java
数字类型的累加器可以通过调用 SparkContext.longAccumulator()
或者 SparkContext.doubleAccumulator()
方法来创建一个 Long 或者 Double 类型的累加器。之后在集群中运行的子任务就可以通过 add
方法对其进行更新。但是在子任务中不能读取累加器的值,只有 driver 程序可以通过 value
方法读取累加器的值。
下面的代码展示了如何使用累加器对数组内的元算求和:
LongAccumulator accum = jsc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
尽管代码中使用的是内置的 Long 类型的累加器,程序员也可以通过继承 AccumulatorV2 来实现自己的累加器。抽象类 AccumulatorV2 有一些必须重写的方法:reset
方法用来将累加器重置为零值,add
方法用来更新累加器的值,merge
方法用来融合另一个同类型的累加器。其他必须重写的方法参考 API documentation。例如,假设有一个类 MyVector
代表数学中的向量:
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {
private MyVector myVector = MyVector.createZeroVector();
public void reset() {
myVector.reset();
}
public void add(MyVector v) {
myVector.add(v);
}
...
}
// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");
注意,当程序员定义自己的累加器的时候,结果类型可以跟累加数据的类型不一样。
Python
一个累加器通过调用 SparkContext.accumulator(v)
方法并传入一个初始值 v
来创建,之后在集群中运行的子任务就可以通过 add
方法或者 +=
操作符对其进行更新。但是在子任务中不能读取累加器的值,只有 driver 程序可以通过 value
方法读取累加器的值。
下面的代码展示了如何使用累加器对数组内的元算求和:
>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
>>> accum.value
10
尽管代码中使用的是内置的 Int 类型的累加器,程序员也可以通过继承 AccumulatorParam 来实现自己的累加器。接口 AccumulatorV2 有两个方法:zero
方法用来提供一个相应数据类型的零值,addInPlace
方法用来两个值加到一起。例如,假设有一个类 MyVector
代表数学中的向量:
class VectorAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return Vector.zeros(initialValue.size)
def addInPlace(self, v1, v2):
v1 += v2
return v1
# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
注意,当程序员定义自己的累加器的时候,结果类型可以跟累加数据的类型不一样。
警告:当子任务结束的时候,Spark 会尝试融合累加器在该子任务中的更新。如果操作失败,Spark 会忽略失败并且将该子任务标记为成功,之后继续运行其他的子任务。所以,一个有问题的累加器并不会影响整个 Spark 作业;但是一个累加器有可能会被错误的更新,即使整个作业成功了。
对于只在 action 算子(actions only)中更新的累加器,Spark 可以保证每个子任务中的累加器只会被更新一次,也就是说,重新计算子任务不会更新累加器的值。在 transformation 算子中,用户应该了解如果任务所在阶段被重新计算的话,累加器的更新操作可能不止一次。
累加器并不改变 Spark 中惰性求值的计算模型。如果累加器在某个子任务中被更新,其值的更新只会在 action 算子计算过程中触发一次。所以对于像 map()
这样的 transformation 算子中,累加器的更新并不能保证被执行。下面的代码片段展示了这种特性:
Scala
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
Java
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.
Python
accum = sc.accumulator(0)
def g(x):
accum.add(x)
return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.
集群部署
应用程序提交指南(application submission guide)阐述了如何提交应用程序到集群上运行。简而言之,一旦将你自己的应用程序打包成了一个 JAR 文件(Java/Scala)或者一些压缩以及未压缩的 .py
文件(Python),bin/spark-submit
脚本能够将应用程序提交到任何支持的集群管理系统中。
启动 Java/Scala 应用程序
org.apache.spark.launcher 依赖包提供了一些通过 Java API 以子进程的方式启动 Spark 任务类库。
单元测试
Spark 支持任意常用的单元测试框架。只要在测试程序中定义一个 local
模式的 SparkContext
,运行你的算子,之后调用 SparkContext.stop()
清理上下文环境即可。Spark 不支持在同一个程序中存在两个对象实例,所以需要确保在 finally
代码块或者单元测试框架的 tearDown
方法中停止了 SparkContext
对象。
接下来干点啥呢
可以在 Spark 网站页面看到一些示例程序(example Spark programs)。另外,Spark 安装包的 examples
目录中包含了一些示例代码 (Scala, Java, Python, R)。你可以通过将类名传递给 bin/run-example
脚本来运行示例程序,例如:
./bin/run-example SparkPi
对于 Python 示例程序,请使用 spark-submit
脚本:
./bin/spark-submit examples/src/main/python/pi.py
对于 R 示例程序,请使用 spark-submit
脚本:
./bin/spark-submit examples/src/main/r/dataframe.R
对于调优你的应用程序,configuration 和 tuning 文档提供了一些最佳实践。这些建议特别重要,可以保证你的数据以一种高效的方式存储在内存中。对于应用程序的部署,cluster mode overview 文档阐述了分布式计算中的组件和所支持的集群管理系统。