本教程提供了如何使用 Spark 的快速入门介绍。首先通过运行 Spark 交互式的 shell(在 Python 或 Scala 中)来介绍 API, 然后展示如何使用 Java , Scala 和 Python 来编写应用程序。
为了继续阅读本指南, 首先从Spark 官网下载 Spark 的发行包。因为我们将不使用 HDFS, 所以你可以下载一个任何 Hadoop 版本的软件包。
请注意, 在 Spark 2.0 之前, Spark 的主要编程接口是弹性分布式数据集(RDD)。 在 Spark 2.0 之后, RDD 被 Dataset 替换, 它是像RDD 一样的 strongly-typed(强类型), 但是在引擎盖下更加优化。 RDD 接口仍然受支持, 您可以在RDD 编程指南中获得更完整的参考。 但是, 我们强烈建议您切换到使用 Dataset(数据集), 其性能要更优于 RDD。 请参阅SQL 编程指南获取更多有关 Dataset 的信息。
Spark shell 提供了一种来学习该 API 比较简单的方式, 以及一个强大的来分析数据交互的工具。在 Scala(运行于 Java 虚拟机之上, 并能很好的调用已存在的 Java 类库)或者 Python 中它是可用的。通过在 Spark 目录中运行以下的命令来启动它:
./bin/spark-shell
Spark 的主要抽象是一个称为 Dataset 的分布式的 item 集合。Datasets 可以从 Hadoop 的 InputFormats(例如 HDFS文件)或者通过其它的 Datasets 转换来创建。让我们从 Spark 源目录中的 README 文件来创建一个新的 Dataset:
scala>valtextFile=spark.read.textFile("README.md")textFile:org.apache.spark.sql.Dataset[String]=[value:string]
您可以直接从 Dataset 中获取 values(值), 通过调用一些 actions(动作), 或者 transform(转换)Dataset 以获得一个新的。更多细节, 请参阅API doc。
scala>textFile.count()// Number of items in this Datasetres0:Long=126// May be different from yours as README.md will change over time, similar to other outputsscala>textFile.first()// First item in this Datasetres1:String=#ApacheSpark
现在让我们 transform 这个 Dataset 以获得一个新的。我们调用filter以返回一个新的 Dataset, 它是文件中的 items 的一个子集。
scala>vallinesWithSpark=textFile.filter(line=>line.contains("Spark"))linesWithSpark:org.apache.spark.sql.Dataset[String]=[value:string]
我们可以链式操作 transformation(转换)和 action(动作):
scala>textFile.filter(line=>line.contains("Spark")).count()// How many lines contain "Spark"?res3:Long=15
Dataset actions(操作)和 transformations(转换)可以用于更复杂的计算。例如, 统计出现次数最多的单词 :
scala>textFile.map(line=>line.split(" ").size).reduce((a,b)=>if(a>b)aelseb)res4:Long=15
第一个 map 操作创建一个新的 Dataset, 将一行数据 map 为一个整型值。在 Dataset 上调用reduce来找到最大的行计数。参数map与reduce是 Scala 函数(closures), 并且可以使用 Scala/Java 库的任何语言特性。例如, 我们可以很容易地调用函数声明, 我们将定义一个 max 函数来使代码更易于理解 :
scala>importjava.lang.Mathimportjava.lang.Mathscala>textFile.map(line=>line.split(" ").size).reduce((a,b)=>Math.max(a,b))res5:Int=15
一种常见的数据流模式是被 Hadoop 所推广的 MapReduce。Spark 可以很容易实现 MapReduce:
scala>valwordCounts=textFile.flatMap(line=>line.split(" ")).groupByKey(identity).count()wordCounts:org.apache.spark.sql.Dataset[(String,Long)]=[value:string,count(1):bigint]
在这里, 我们调用了flatMap以 transform 一个 lines 的 Dataset 为一个 words 的 Dataset, 然后结合groupByKey和count来计算文件中每个单词的 counts 作为一个 (String, Long) 的 Dataset pairs。要在 shell 中收集 word counts, 我们可以调用collect:
scala>wordCounts.collect()res6:Array[(String,Int)]=Array((means,1),(under,2),(this,3),(Because,1),(Python,2),(agree,1),(cluster.,1),...)
Spark 还支持 Pulling(拉取)数据集到一个群集范围的内存缓存中。例如当查询一个小的 “hot” 数据集或运行一个像 PageRANK 这样的迭代算法时, 在数据被重复访问时是非常高效的。举一个简单的例子, 让我们标记我们的linesWithSpark数据集到缓存中:
scala>linesWithSpark.cache()res7:linesWithSpark.type=[value:string]scala>linesWithSpark.count()res8:Long=15scala>linesWithSpark.count()res9:Long=15
使用 Spark 来探索和缓存一个 100 行的文本文件看起来比较愚蠢。有趣的是, 即使在他们跨越几十或者几百个节点时, 这些相同的函数也可以用于非常大的数据集。您也可以像编程指南. 中描述的一样通过连接bin/spark-shell到集群中, 使用交互式的方式来做这件事情。
假设我们希望使用 Spark API 来创建一个独立的应用程序。我们在 Scala(SBT), Java(Maven)和 Python 中练习一个简单应用程序。
我们将在 Scala 中创建一个非常简单的 Spark 应用程序 - 很简单的, 事实上, 它名为SimpleApp.scala:
/* SimpleApp.scala */importorg.apache.spark.sql.SparkSessionobjectSimpleApp{defmain(args:Array[String]){vallogFile="YOUR_SPARK_HOME/README.md"// Should be some file on your systemvalspark=SparkSession.builder.appName("Simple Application").getOrCreate()vallogData=spark.read.textFile(logFile).cache()valnumAs=logData.filter(line=>line.contains("a")).count()valnumBs=logData.filter(line=>line.contains("b")).count()println(s"Lines with a:$numAs, Lines with b:$numBs")spark.stop()}}
注意, 这个应用程序我们应该定义一个main()方法而不是去扩展scala.App。使用scala.App的子类可能不会正常运行。
该程序仅仅统计了 Spark README 文件中每一行包含 ‘a’ 的数量和包含 ‘b’ 的数量。注意, 您需要将 YOUR_SPARK_HOME 替换为您 Spark 安装的位置。不像先前使用 spark shell 操作的示例, 它们初始化了它们自己的 SparkContext, 我们初始化了一个 SparkContext 作为应用程序的一部分。
我们调用SparkSession.builder以构造一个 [[SparkSession]], 然后设置 application name(应用名称), 最终调用getOrCreate以获得 [[SparkSession]] 实例。
我们的应用依赖了 Spark API, 所以我们将包含一个名为build.sbt的 sbt 配置文件, 它描述了 Spark 的依赖。该文件也会添加一个 Spark 依赖的 repository:
name:="Simple Project"version:="1.0"scalaVersion:="2.11.8"libraryDependencies+="org.apache.spark"%%"spark-sql"%"2.2.0"
为了让 sbt 正常的运行, 我们需要根据经典的目录结构来布局SimpleApp.scala和build.sbt文件。在成功后, 我们可以创建一个包含应用程序代码的 JAR 包, 然后使用spark-submit脚本来运行我们的程序。
# Your directory layout should look like this$ find .../build.sbt./src./src/main./src/main/scala./src/main/scala/SimpleApp.scala# Package a jar containing your application$ sbt package...[info]Packaging{..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar# Use spark-submit to run your application$ YOUR_SPARK_HOME/bin/spark-submit\--class"SimpleApp"\--master local[4]\target/scala-2.11/simple-project_2.11-1.0.jar...Lines with a:46, Lines with b:23
恭喜您成功的运行了您的第一个 Spark 应用程序!
更多 API 的深入概述, 从RDD programming guide和SQL programming guide这里开始, 或者看看 “编程指南” 菜单中的其它组件。
为了在集群上运行应用程序, 请前往deployment overview.
最后, 在 Spark 的examples目录中包含了一些 (Scala,Java,Python,R) 示例。您可以按照如下方式来运行它们:
# 针对 Scala 和 Java, 使用 run-example:./bin/run-example SparkPi# 针对 Python 示例, 直接使用 spark-submit:./bin/spark-submit examples/src/main/python/pi.py# 针对 R 示例, 直接使用 spark-submit:./bin/spark-submit examples/src/main/r/dataframe.R
原文地址: http://spark.apachecn.org/docs/cn/2.2.0/quick-start.html
网页地址: http://spark.apachecn.org/
github: https://github.com/apachecn/spark-doc-zh(觉得不错麻烦给个 Star,谢谢!~)