《Spark指南》一、快速开始

本文主要翻译至链接且不局限于该文内容,也加入了笔者实践内容,翻译水平有限,欢迎指正,转载请注明出处。

本教程提供了使用Spark的快速介绍。 我们将首先通过Spark的交互式shell(在Python或Scala中)介绍部分API,然后演示如何使用Java,Scala和Python编写应用程序。 有关更完整的参考,请参阅编程指南

你可以先从Spark网站下载Spark的打包版本。 由于本文中我们不会使用HDFS,因此下载时不需要关注Hadoop的版本。

使用Spark Shell进行交互式分析

基础

Spark的shell环境提供了一个简单的方法来学习API,同时它也是一个强大的交互式分析数据的工具。 它可以在Scala(Scala在Java VM上运行,因此可以方便的使用现有的Java库)或Python中使用。 (本文以Scala语言为例)通过在Spark目录中运行以下命令来启动它:

./bin/spark-shell

Spark上运行的主要抽象是一个称为RDD(Resilient Distributed Dataset,弹性分布式数据集)的集合,RDDs可以从Hadoop的InputFormats(例如HDFS文件)中创建,或者从其他的RDDs转换。我们先用如下命令以Spark目录下的README文件作为数据源创建一个RDD:

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:25

返回的RDDs很一些方法可以执行,参考文档1:actions文档2:transformations,其中actions返回普通的值,transformations返回新的RDD。例如,下面是两个action:

scala> textFile.count() // RDD中有多少行数
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // RDD的第一行
res1: String = # Apache Spark

下面这个例子使用filter转换,并返回一个新的RDD,它是README文件的一个子集:

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27

我们也可以链式调用这些方法:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

更多的一些RDD操作

RDD的actions和transformations可以用来执行更复杂的运算,例如我们想找出出现最多的单词:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

第一行创建了一个新的RDD,然后将每一行映射到一个整数值。reduce函数链式处理该RDD并找到最大行计数。 其中map和reduce的参数是Scala中的语法(闭包),这里也可以使用任何Scala / Java语言的其他特性或库。 例如,下面的例子中,我们使用Math.max()函数来使这段代码更容易理解:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

一种常见的数据流模式是MapReduce,是从Hadoop流行起来的。 Spark可以轻松实现MapReduce流程:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:28

在这里,我们将flatMap,map和reduceByKey命令结合起来,作为(String,Int)对的RDD来计算文件中的每个字计数。 要在我们的shell中收集字数,我们可以使用collect操作:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

缓存

Spark也允许对处理中的数据集进行缓存,数据可以缓存在集群范围内的节点内存中,以便可以对一些“热数据”快速访问。示例代码如下:

scala> linesWithSpark.cache()
res7: linesWithSpark.type = MapPartitionsRDD[2] at filter at <console>:27

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

Spark的功能绝对不仅限于处理这种只有几百行的小数据,更具有吸引力的是所有这些函数都支持在超大规模的数据集上工作,即使这些数据分布在数十或数百个节点上。你可以通过bin/spark-shell脚本连接的Spark集群中操作这些数据,详细的描述请参考编程指南

自包含应用程序

假设我们想要使用Spark API写一段自包含的应用程序,下面依次看几段示例代码:

Scala(使用sbt构建)

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    sc.stop()
  }
}

上面这个例子创建了一个名为SimpleApp.scala的文件。注意,应用程序应该定义一个main()方法,而不是继承scala.App,继承的这种方式可能无法正常工作。

该程序实现的功能是计算Spark README文件中包含字符‘a’的行数和包含字符‘b’的行数。如果要执行这个程序,请替换正确的YOUR_SPARK_HOME路径。与前面的Spark shell初始化自己的SparkContext的例子不同,这里我们需要手动初始化一个SparkContext。程序的配置信息则通过一个SparkConf对象传递给SparkContext的构造器。

我们的程序依赖Spark API,因此我们需要准备一个sbt的配置文件,simple.sbt,它将描述Spark是程序的依赖项。这个文件也添加了一个Spark依赖的存储库:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"

为了使sbt正常工作,我们按正常的代码目录格式分布文件SimpleApp.scala和simple.sbt,完成后,我们就可以将该应用程序打包成一个jar文件,然后使用spark-submit脚本提交到Spark执行。

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23

Java(使用Maven构建)

/* SimpleApp.java */
import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkConf conf = new SparkConf().setAppName("Simple Application");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
    
    sc.stop();
  }
}

这个列子使用了Maven来编译和构建一个名为SimpleApp.java的JAR程序,它实现的功能和上文Scala一致。你也可以使用其他任意可行的构建系统。

与上文Scala一样,我们需要初始化一个SparkContext对象,上文的例子中使用了一个更友好的JavaSparkContext对象,然后创建了RDDs对象(即JavaRDD)并在他们上执行了transformations方法。最后,我们给Spark传递了继承至spark.api.java.function.Function的匿名类来执行作业。更详细的功能请参考Spark编程指南

为了构建这个程序,我们需要编写一个pom.xml文件并添加Spark作为依赖项,注意,Spark的artifacts使用了Scala的版本标记(2.11表示的是scala的版本)。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.0</version>
    </dependency>
  </dependencies>
</project>

按目录组织这些文件,如:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

然后,使用maven命令进行编译和构建,之后就可以使用spark-submit脚本提交到Spark上执行:

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

Python

"""SimpleApp.py"""
from pyspark import SparkContext

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
sc = SparkContext("local", "Simple App")
logData = sc.textFile(logFile).cache()

numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

sc.stop()

类似的,python版本创建了一个SimpleApp.py(使用了pyspark,Spark Python API),功能与前述一致。python版本同样需要创建一个SparkContext,然后用它来创建RDDs,之后向它传递lambda表示的函数。如果应用程序使用了第三方的库,则需要我们将它们达成zip包,并在执行spark-submit时添加--py-files选项。在这个例子中,由于没有依赖第三方库,因此我们可以直接提交应用程序:

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

进一步学习

如果你按照前文进行了实践,那么恭喜你已经成功运行了你的第一个Spark应用程序。接下来,你可以:

  • 学习Spark programming guide以进一步了解如果编写更丰富的功能
  • 想要了解如果在集群中提交应用程序,可以参考deployment overview
  • 最后,Spark的安装包也包含了一些实例,位于example目录(Scala, Java, Python, R),你可以像下面这样执行它们:
# For Scala and Java, use run-example:
./bin/run-example SparkPi

# For Python examples, use spark-submit directly:
./bin/spark-submit examples/src/main/python/pi.py

# For R examples, use spark-submit directly:
./bin/spark-submit examples/src/main/r/dataframe.R

相关的文章

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,816评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,729评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,300评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,780评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,890评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,084评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,151评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,912评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,355评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,666评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,809评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,504评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,150评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,121评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,628评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,724评论 2 351

推荐阅读更多精彩内容