转载请注明出处,谢谢合作~
快速上手
- 安全模式(Security)
- 通过 Spark Shell 进行交互式分析
- 基础概念(Basics)
- 更多的 Dataset 操作方式(More on Dataset Operations)
- 缓存(Caching)
- 独立应用程序(Self-Contained Applications)
- 接下来干点啥呢(Where to Go from Here)
该教程可以让你快速上手 Spark 应用。首先介绍通过 Spark 的交互式 shell(Python 或者 Scala)模块展示 Spark 的 API,然后展示如果通过 Java,Scala,Python 语言编写应用程序。
为了接下来的练习,请首先在下载页面(Spark website)下载 Spark,由于不会用到 HDFS,你可以下载集成任意 Hadoop 版本的 Spark。
需要注意的是在 Spark 2.0 版本之前,主要的编程入口是弹性分布式数据集(Resilient Distributed Dataset),简称 RDD;在 Spark 2.0 版本之后,RDDs 被 Dataset 取代,Dataset 和 RDD 一样是强类型的,但是背后提供了更多的优化。RDD 编程接口依旧是可用的,详情参见 RDD programming guide。然而我们强烈建议你切换到 Dataset,Dataset 拥有更好的性能表现,详情参见 SQL programming guide 。
安全模式
Spark 中的安全机制默认是关闭的,这意味着在默认情况下你的 Spark 系统很容易收到攻击。请在下载和使用之前参考 Spark Security 。
通过 Spark Shell 进行交互式分析
基础概念
Spark shell 提供了一种简单的方式来学习 API,同时它也是一个强大的交互式分析工具。可以通过 Scala(运行在 JVM 之上,依赖 Java 类库) 或者 Python 启动 Spark shell,在 Spark 根目录启动如下命令:
Scala
./bin/spark-shell
Spark 抽象出来的主要概念叫做 Dataset 的分布式数据集,Datasets 可以通过 Hadoop InputFormats(例如 HDFS 文件)创建,或者从其他的 Datasets 转化而来。让我们通过 Spark 源码目录下的 README 文件中的文本创建一个新的 Dataset :
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
你可以通过调用 action 算子直接从 Dataset 中获取数据,或者通过 transform 算子获得一个新的 Dataset,详情参见 API doc。
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark
现在通过 transform 算子在这个 Dataset 的基础上得到一个新的 Dataset,调用 filter
方法来返回一个包含了 README 文件中部分文本的 Dataset。
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
可以把 transform 算子和 action 算子连接起来使用:
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
Python
./bin/pyspark
如果 PySpark 已经通过 pip 安装:
pyspark
Spark 抽象出来的主要概念叫做 Dataset 的分布式数据集,Datasets 可以通过 Hadoop InputFormats(例如 HDFS 文件)创建,或者从其他的 Datasets 转化而来。通过 Spark 源码目录下的 README 文件中的文本创建一个新的 Dataset :
>>> textFile = spark.read.text("README.md")
可以通过调用 action 算子直接从 Dataset 中获取数据,或者通过 transform 算子获得一个新的 Dataset,详情参见 API doc。
>>> textFile.count() # Number of rows in this DataFrame
126
>>> textFile.first() # First row in this DataFrame
Row(value=u'# Apache Spark')
现在通过 transform 算子在这个 Dataset 的基础上得到一个新的 Dataset,我们调用 filter
方法来返回一个包含了 README 文件中部分文本的 Dataset。
>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
可以把 transform 算子和 action 算子连接起来使用:
>>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
15
更多的 Dataset 操作方式
Dataset 的 transform 算子和 action 算子可以被应用的更复杂的计算中,比如说找出单词最多的那个文本行的单词数量:
Scala
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
这个操作首先将每一行映射成一个整数,得到一个新的 Dataset,调用新的 Dataset 的 reduce
方法来找出一行的最大单词数。map
和 reduce
算子的参数是 Scala 函数字面量(闭包),可以使用任意语言特性和类库。例如,可以轻松的调用在别处定义的函数,在此使用 Math.max()
函数让逻辑更清晰:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
由 Hadoop 提出的数据流处理模型叫做 MapReduce,Spark 可以轻松实现 MapReduce 的思路:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
这里,调用方法把文本行的 Dataset 转换为单词的 Dataset,然后通过 groupByKey
和 count
算子计算文件中每个单词的计数,保存为 (String, Long) 元组的 Dataset。可以通过调用 collect
方法汇聚 word count 的计算结果:
scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
Python
>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
[Row(max(numWords)=15)]
这个操作首先将每一行映射成一个整数,并将这些整数当作一列命名为「numWords」,由此创建了一个新的 DataFrame。调用该 DataFrame 的 agg
方法来找到最大的那个数字。select
和 agg
方法的参数都是 Column 类型,可以通过 df.colName
来从一个 DataFrame 中获取一列。还可以导入 import pyspark.sql.functions
,这个依赖库中提供了大量便捷的函数,这些函数能够从一个 Column 生成另一个 Column。
由 Hadoop 提出的数据流处理模型叫做 MapReduce,Spark 可以轻松实现 MapReduce 的思路:
>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
这里,在 select
方法中使用explode
方法来把一个文本行的 Dataset 转换成一个单词的 Dataset,然后通过 groupByKey
和 count
算子计算文件中每个单词的计数,保存为一个包含「word」和「count」两列的 DataFrame。可以通过调用 collect
方法汇聚 word count 的计算结果:
>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
Caching
Spark 还支持将数据集缓存到集群中(分布式缓存),在数据被重复使用的场景下缓存很有帮助,比如说高频查询小批量的数据或者执行像 PageRank 一样的迭代算法的时候。举例来说,把 Dataset 标记为需要被缓存:
Scala
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
用 Spark 缓存 100 行的文本文件看上去很 SB,其实重要的是缓存可以应用在很大体量的数据集上,即使它们分布在数千个计算节点上。还可以通过 bin/spark-shell
连接到一个集群上做缓存操作,详情参见 RDD programming guide。
Python
>>> linesWithSpark.cache()
>>> linesWithSpark.count()
15
>>> linesWithSpark.count()
15
用 Spark 缓存 100 行的文本文件看上去很 SB,其实重要的是缓存可以应用在很大体量的数据集上,即使它们分布在数千个计算节点上。还可以通过 bin/pyspark
连接到一个集群上做缓存操作,详情参见 RDD programming guide。
独立应用程序
要想用 Spark API 写一个独立应用程序,需要通过一个简单的应用程序感受一下。
Scala
通过 Scala 编写一个非常简单的 Spark 应用程序,文件命名为 SimpleApp.scala
:
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
注意应用程序需要定义一个 main()
方法而不是继承自 scala.App
, scala.App
的子类实例可能无法正常工作。
该程序片段只是统计了 Spark README 文件中文本行包含「a」字符以及包含「b」字符的行数,注意你需要将 YOUR_SPARK_HOME 替换为你安装 Spark 的根目录。不同于前面的示例(Spark shell 会初始化自己的 SparkSession 实例),这里把 SparkSession 的初始化当作应用程序的一部分。
调用 SparkSession.builder
方法来构建一个 SparkSession 实例,配置应用程序名称之后调用 getOrCreate
方法获取一个 SparkSession 实例。
应用程序依赖于 Spark API,所以还需要包括一个 sbt 配置文件 build.sbt
,该文件声明了 Spark 依赖,该文件还添加了 Spark 所依赖的仓库:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.10"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
为了让 sbt 正常工作,需要把 SimpleApp.scala
和 build.sbt
文件整理为经典目录结构。一旦操作完成,就可以创建一个包含应用程序的 JAR 文件,然后通过 spark-submit
脚本运行我们的程序。
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23
Java
在该示例中会通过 Maven 来编译应用程序 JAR 文件,也可以通过其他类似的构建系统打包。
创建一个非常简单的 Spark 应用程序 SimpleApp.java
:
/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();
long numAs = logData.filter(s -> s.contains("a")).count();
long numBs = logData.filter(s -> s.contains("b")).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
spark.stop();
}
}
该程序片段只是统计了 Spark README 文件中文本行包含「a」字符以及包含「b」字符的行数,注意你需要将 YOUR_SPARK_HOME 替换为你安装 Spark 的根目录。不同于前面的示例(Spark shell 会初始化自己的 SparkSession 实例),这里把 SparkSession 的初始化当作应用程序的一部分。
构建该程序片段还需要编写一个 Maven 的 pom.xml
文件,其中包含 Spark 依赖。注意 Spark 依赖的 artifacts 中带有 Scala 版本标记。
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
需要把这些文件整理为经典 Maven 目录结构:
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
现在,通过 Maven 打包程序然后通过脚本 ./bin/spark-submit
执行程序。
# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23
Python
现在展示如何用 Python API (PySpark)编写 Spark 应用程序。
如果你构建一个 PySpark 应用程序hove类库你可以把它添加到 setup.py 文件中:
install_requires=[
'pyspark=={site.SPARK_VERSION}'
]
例如,创建一个简单的 Spark 应用程序 SimpleApp.py
:
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
该程序片段只是统计了 Spark README 文件中文本行包含「a」字符以及包含「b」字符的行数,注意你需要将 YOUR_SPARK_HOME 替换为你安装 Spark 的根目录。与 Scala 和 Java 版本类似,通过一个 SparkSession 实例来创建 Datasets。对于依赖自定义类库或者第三方类库的应用程序,可以将它们压缩成 .zip 文件,之后通过spark-submit
脚本的 --py-files
参数添加依赖(详情参见 spark-submit --help
)。SimpleApp
过于简单,不需要指定其他依赖。
通过 bin/spark-submit
脚本运行该应用程序:
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--master local[4] \
SimpleApp.py
...
Lines with a: 46, Lines with b: 23
如果通过 pip 安装了 PySpark(e.g., pip install pyspark
),还可以通过 Python 解释器运行你的应用程序,或者想用 spark-submit
也行,你开心就好。
# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23
接下来干点啥呢
运行你的第一个 Spark 应用程序!
想深入了解 API,参见 RDD programming guide 和 SQL programming guide,或者点击「Programming Guides」菜单查看其他组件。
如果想在集群上运行应用程序,参见 deployment overview。
-
最后,下载的 Spark 安装包中包含了一些示例代码 在
examples
目录 (Scala, Java, Python, R),可以通过下面这几种方式运行:# For Scala and Java, use run-example: ./bin/run-example SparkPi # For Python examples, use spark-submit directly: ./bin/spark-submit examples/src/main/python/pi.py # For R examples, use spark-submit directly: ./bin/spark-submit examples/src/main/r/dataframe.R