Spark文档 - 快速入门

本文简要介绍一下Spark。首先通过交互式shell介绍Spark API,然后是如何使用Scala编写应用程序。

要注意的是,Spark 2.0之前,Spark的主要编程接口是弹性分布式数据集(RDD)。而Spark 2.0之后,RDD被Dataset取代,Dataset是一种强类型的RDD,但是底层做了很多优化。Spark依然支持RDD接口,可以在[RDD编程指南]中查看更多内容。不过Spark更推荐使用Dataset,它的性能比RDD更好,更多内容包含在[SQL编程指南]中。

使用Spark shell进行交互式分析

基本用法

Spark shell即可用于学习API,也是一个交互式分析数据的利器。这里选用Scala shell,首先执行脚本启动shell:

./spark-shell

Spark的主要抽象是Dataset(分布式数据项集合)。Dataset可以从Hadoop输入格式创建,也可以从其他Dataset转换而来。下面代码从README文件创建了一个Dataset:

scala> val textFile = spark.read.textFile("README.md")

textFile: org.apache.spark.sql.Dataset[String] = [value: string]

可以直接调用动作算子从Dataset获取值,也可以使用转换算子将Dataset转换成新Dataset。

scala> textFile.count() // Number of items in this Dataset

res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first() // First item in this Dataset

res1: String = # Apache Spark

下面代码使用filter算子返回一个原Dataset的子集:

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))

linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

也可以将多个算子结合起来使用:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?

res3: Long = 15

更多Dataset操作

Dataset算子可以应用于更复杂的计算。假设我们想找到单词数最多的行:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

res4: Long = 15

首先map算子将行映射成单词数,生成一个新Dataset,接着调用reduce算子找到最大单词数。mapreduce的参数都是Scala函数直接量(闭包),也可以使用Scala的类库。例如,可以直接使用预先声明的函数:

scala> import java.lang.Math

import java.lang.Math

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))

res5: Int = 15

更常见的数据流是MapReduce,Spark也可以很容易的实现:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()

wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]

这里,flatMap算子将行Dataset转换成单词Dataset,之后将groupByKeycount结合起来计算单词出现次数。collect算子可以打印结果:

scala> wordCounts.collect()

缓存

Spark可以在内存中缓存数据。对于那些需要被频繁查询的“热点”数据,或者运行PageRank这样的迭代算法时,这一特性很有用。下面是一个简单示例:

scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]

scala> linesWithSpark.count()
res8: Long = 15

scala> linesWithSpark.count()
res9: Long = 15

自包含应用程序

这里使用Scala创建一个简单的应用程序。

/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}

应用程序应当直接定义main方法,不要继承scala.App类,scala.App的子类可能无法正常工作。

该程序统计了包含字母'a'和'b'的行数。不同于Spark shell,程序中需要初始化一个SparkSession。

调用SparkSession.builder方法直接构造一个SparkSession,之后设置应用程序名,然后调用getOrCreate创建一个SparkSession实例。

应用程序依赖Spark API,需要在sbt配置文件中添加依赖。

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.12"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"

如果sbt工作正常,将源代码打包,之后可以使用spark-submit运行应用程序。

# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容