01 Quick Start

转载请注明出处,谢谢合作~

快速上手

该教程可以让你快速上手 Spark 应用。首先介绍通过 Spark 的交互式 shell(Python 或者 Scala)模块展示 Spark 的 API,然后展示如果通过 Java,Scala,Python 语言编写应用程序。

为了接下来的练习,请首先在下载页面(Spark website)下载 Spark,由于不会用到 HDFS,你可以下载集成任意 Hadoop 版本的 Spark。

需要注意的是在 Spark 2.0 版本之前,主要的编程入口是弹性分布式数据集(Resilient Distributed Dataset),简称 RDD;在 Spark 2.0 版本之后,RDDs 被 Dataset 取代,Dataset 和 RDD 一样是强类型的,但是背后提供了更多的优化。RDD 编程接口依旧是可用的,详情参见 RDD programming guide。然而我们强烈建议你切换到 Dataset,Dataset 拥有更好的性能表现,详情参见 SQL programming guide

安全模式

Spark 中的安全机制默认是关闭的,这意味着在默认情况下你的 Spark 系统很容易收到攻击。请在下载和使用之前参考 Spark Security

通过 Spark Shell 进行交互式分析

基础概念

Spark shell 提供了一种简单的方式来学习 API,同时它也是一个强大的交互式分析工具。可以通过 Scala(运行在 JVM 之上,依赖 Java 类库) 或者 Python 启动 Spark shell,在 Spark 根目录启动如下命令:

Scala

./bin/spark-shell

Spark 抽象出来的主要概念叫做 Dataset 的分布式数据集,Datasets 可以通过 Hadoop InputFormats(例如 HDFS 文件)创建,或者从其他的 Datasets 转化而来。让我们通过 Spark 源码目录下的 README 文件中的文本创建一个新的 Dataset :

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

你可以通过调用 action 算子直接从 Dataset 中获取数据,或者通过 transform 算子获得一个新的 Dataset,详情参见 API doc

scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark

现在通过 transform 算子在这个 Dataset 的基础上得到一个新的 Dataset,调用 filter 方法来返回一个包含了 README 文件中部分文本的 Dataset。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

可以把 transform 算子和 action 算子连接起来使用:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15

Python

./bin/pyspark

如果 PySpark 已经通过 pip 安装:

pyspark

Spark 抽象出来的主要概念叫做 Dataset 的分布式数据集,Datasets 可以通过 Hadoop InputFormats(例如 HDFS 文件)创建,或者从其他的 Datasets 转化而来。通过 Spark 源码目录下的 README 文件中的文本创建一个新的 Dataset :

>>> textFile = spark.read.text("README.md")

可以通过调用 action 算子直接从 Dataset 中获取数据,或者通过 transform 算子获得一个新的 Dataset,详情参见 API doc

>>> textFile.count()  # Number of rows in this DataFrame
126

>>> textFile.first()  # First row in this DataFrame
Row(value=u'# Apache Spark')

现在通过 transform 算子在这个 Dataset 的基础上得到一个新的 Dataset,我们调用 filter 方法来返回一个包含了 README 文件中部分文本的 Dataset。

>>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))

可以把 transform 算子和 action 算子连接起来使用:

>>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
15

更多的 Dataset 操作方式

Dataset 的 transform 算子和 action 算子可以被应用的更复杂的计算中,比如说找出单词最多的那个文本行的单词数量:

Scala

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15

这个操作首先将每一行映射成一个整数,得到一个新的 Dataset,调用新的 Dataset 的 reduce 方法来找出一行的最大单词数。mapreduce 算子的参数是 Scala 函数字面量(闭包),可以使用任意语言特性和类库。例如,可以轻松的调用在别处定义的函数,在此使用 Math.max() 函数让逻辑更清晰:

scala> import java.lang.Math
import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15

由 Hadoop 提出的数据流处理模型叫做 MapReduce,Spark 可以轻松实现 MapReduce 的思路:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

这里,调用方法把文本行的 Dataset 转换为单词的 Dataset,然后通过 groupByKeycount 算子计算文件中每个单词的计数,保存为 (String, Long) 元组的 Dataset。可以通过调用 collect 方法汇聚 word count 的计算结果:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)

Python

>>> from pyspark.sql.functions import *
>>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
[Row(max(numWords)=15)]

这个操作首先将每一行映射成一个整数,并将这些整数当作一列命名为「numWords」,由此创建了一个新的 DataFrame。调用该 DataFrame 的 agg 方法来找到最大的那个数字。selectagg方法的参数都是 Column 类型,可以通过 df.colName 来从一个 DataFrame 中获取一列。还可以导入 import pyspark.sql.functions,这个依赖库中提供了大量便捷的函数,这些函数能够从一个 Column 生成另一个 Column。

由 Hadoop 提出的数据流处理模型叫做 MapReduce,Spark 可以轻松实现 MapReduce 的思路:

>>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()

这里,在 select 方法中使用explode 方法来把一个文本行的 Dataset 转换成一个单词的 Dataset,然后通过 groupByKeycount 算子计算文件中每个单词的计数,保存为一个包含「word」和「count」两列的 DataFrame。可以通过调用 collect 方法汇聚 word count 的计算结果:

>>> wordCounts.collect()
[Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]

Caching

Spark 还支持将数据集缓存到集群中(分布式缓存),在数据被重复使用的场景下缓存很有帮助,比如说高频查询小批量的数据或者执行像 PageRank 一样的迭代算法的时候。举例来说,把 Dataset 标记为需要被缓存:

Scala

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

用 Spark 缓存 100 行的文本文件看上去很 SB,其实重要的是缓存可以应用在很大体量的数据集上,即使它们分布在数千个计算节点上。还可以通过 bin/spark-shell 连接到一个集群上做缓存操作,详情参见 RDD programming guide

Python

>>> linesWithSpark.cache()

>>> linesWithSpark.count()
15

>>> linesWithSpark.count()
15

用 Spark 缓存 100 行的文本文件看上去很 SB,其实重要的是缓存可以应用在很大体量的数据集上,即使它们分布在数千个计算节点上。还可以通过 bin/pyspark 连接到一个集群上做缓存操作,详情参见 RDD programming guide

独立应用程序

要想用 Spark API 写一个独立应用程序,需要通过一个简单的应用程序感受一下。

Scala

通过 Scala 编写一个非常简单的 Spark 应用程序,文件命名为 SimpleApp.scala

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

注意应用程序需要定义一个 main() 方法而不是继承自 scala.Appscala.App 的子类实例可能无法正常工作。

该程序片段只是统计了 Spark README 文件中文本行包含「a」字符以及包含「b」字符的行数,注意你需要将 YOUR_SPARK_HOME 替换为你安装 Spark 的根目录。不同于前面的示例(Spark shell 会初始化自己的 SparkSession 实例),这里把 SparkSession 的初始化当作应用程序的一部分。

调用 SparkSession.builder 方法来构建一个 SparkSession 实例,配置应用程序名称之后调用 getOrCreate 方法获取一个 SparkSession 实例。

应用程序依赖于 Spark API,所以还需要包括一个 sbt 配置文件 build.sbt,该文件声明了 Spark 依赖,该文件还添加了 Spark 所依赖的仓库:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"

为了让 sbt 正常工作,需要把 SimpleApp.scalabuild.sbt 文件整理为经典目录结构。一旦操作完成,就可以创建一个包含应用程序的 JAR 文件,然后通过 spark-submit 脚本运行我们的程序。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.12/simple-project_2.12-1.0.jar
...
Lines with a: 46, Lines with b: 23

Java

在该示例中会通过 Maven 来编译应用程序 JAR 文件,也可以通过其他类似的构建系统打包。

创建一个非常简单的 Spark 应用程序 SimpleApp.java

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}

该程序片段只是统计了 Spark README 文件中文本行包含「a」字符以及包含「b」字符的行数,注意你需要将 YOUR_SPARK_HOME 替换为你安装 Spark 的根目录。不同于前面的示例(Spark shell 会初始化自己的 SparkSession 实例),这里把 SparkSession 的初始化当作应用程序的一部分。

构建该程序片段还需要编写一个 Maven 的 pom.xml 文件,其中包含 Spark 依赖。注意 Spark 依赖的 artifacts 中带有 Scala 版本标记。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.0.0</version>
      <scope>provided</scope>
    </dependency>
  </dependencies>
</project>

需要把这些文件整理为经典 Maven 目录结构:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在,通过 Maven 打包程序然后通过脚本 ./bin/spark-submit 执行程序。

# Package a JAR containing your application
$ mvn package
...
[INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/simple-project-1.0.jar
...
Lines with a: 46, Lines with b: 23

Python

现在展示如何用 Python API (PySpark)编写 Spark 应用程序。

如果你构建一个 PySpark 应用程序hove类库你可以把它添加到 setup.py 文件中:

    install_requires=[
        'pyspark=={site.SPARK_VERSION}'
    ]

例如,创建一个简单的 Spark 应用程序 SimpleApp.py

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

该程序片段只是统计了 Spark README 文件中文本行包含「a」字符以及包含「b」字符的行数,注意你需要将 YOUR_SPARK_HOME 替换为你安装 Spark 的根目录。与 Scala 和 Java 版本类似,通过一个 SparkSession 实例来创建 Datasets。对于依赖自定义类库或者第三方类库的应用程序,可以将它们压缩成 .zip 文件,之后通过spark-submit 脚本的 --py-files 参数添加依赖(详情参见 spark-submit --help )。SimpleApp 过于简单,不需要指定其他依赖。

通过 bin/spark-submit 脚本运行该应用程序:

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --master local[4] \
  SimpleApp.py
...
Lines with a: 46, Lines with b: 23

如果通过 pip 安装了 PySpark(e.g., pip install pyspark),还可以通过 Python 解释器运行你的应用程序,或者想用 spark-submit 也行,你开心就好。

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 46, Lines with b: 23

接下来干点啥呢

运行你的第一个 Spark 应用程序!

  • 想深入了解 API,参见 RDD programming guideSQL programming guide,或者点击「Programming Guides」菜单查看其他组件。

  • 如果想在集群上运行应用程序,参见 deployment overview

  • 最后,下载的 Spark 安装包中包含了一些示例代码 在examples 目录 (Scala, Java, Python, R),可以通过下面这几种方式运行:

    # For Scala and Java, use run-example:
    ./bin/run-example SparkPi
    
    # For Python examples, use spark-submit directly:
    ./bin/spark-submit examples/src/main/python/pi.py
    
    # For R examples, use spark-submit directly:
    ./bin/spark-submit examples/src/main/r/dataframe.R
    
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,185评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,652评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,524评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,339评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,387评论 6 391
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,287评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,130评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,985评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,420评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,617评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,779评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,477评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,088评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,716评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,857评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,876评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,700评论 2 354