本文简要介绍一下Spark。首先通过交互式shell介绍Spark API,然后是如何使用Scala编写应用程序。
要注意的是,Spark 2.0之前,Spark的主要编程接口是弹性分布式数据集(RDD)。而Spark 2.0之后,RDD被Dataset取代,Dataset是一种强类型的RDD,但是底层做了很多优化。Spark依然支持RDD接口,可以在[RDD编程指南]中查看更多内容。不过Spark更推荐使用Dataset,它的性能比RDD更好,更多内容包含在[SQL编程指南]中。
使用Spark shell进行交互式分析
基本用法
Spark shell即可用于学习API,也是一个交互式分析数据的利器。这里选用Scala shell,首先执行脚本启动shell:
./spark-shell
Spark的主要抽象是Dataset(分布式数据项集合)。Dataset可以从Hadoop输入格式创建,也可以从其他Dataset转换而来。下面代码从README文件创建了一个Dataset:
scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
可以直接调用动作算子从Dataset获取值,也可以使用转换算子将Dataset转换成新Dataset。
scala> textFile.count() // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first() // First item in this Dataset
res1: String = # Apache Spark
下面代码使用filter
算子返回一个原Dataset的子集:
scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
也可以将多个算子结合起来使用:
scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 15
更多Dataset操作
Dataset算子可以应用于更复杂的计算。假设我们想找到单词数最多的行:
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res4: Long = 15
首先map
算子将行映射成单词数,生成一个新Dataset,接着调用reduce
算子找到最大单词数。map
和reduce
的参数都是Scala函数直接量(闭包),也可以使用Scala的类库。例如,可以直接使用预先声明的函数:
scala> import java.lang.Math
import java.lang.Math
scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 15
更常见的数据流是MapReduce,Spark也可以很容易的实现:
scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
这里,flatMap
算子将行Dataset转换成单词Dataset,之后将groupByKey
和count
结合起来计算单词出现次数。collect
算子可以打印结果:
scala> wordCounts.collect()
缓存
Spark可以在内存中缓存数据。对于那些需要被频繁查询的“热点”数据,或者运行PageRank这样的迭代算法时,这一特性很有用。下面是一个简单示例:
scala> linesWithSpark.cache()
res7: linesWithSpark.type = [value: string]
scala> linesWithSpark.count()
res8: Long = 15
scala> linesWithSpark.count()
res9: Long = 15
自包含应用程序
这里使用Scala创建一个简单的应用程序。
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
应用程序应当直接定义main
方法,不要继承scala.App
类,scala.App
的子类可能无法正常工作。
该程序统计了包含字母'a'和'b'的行数。不同于Spark shell,程序中需要初始化一个SparkSession。
调用SparkSession.builder
方法直接构造一个SparkSession
,之后设置应用程序名,然后调用getOrCreate
创建一个SparkSession
实例。
应用程序依赖Spark API,需要在sbt配置文件中添加依赖。
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"
如果sbt工作正常,将源代码打包,之后可以使用spark-submit
运行应用程序。
# Your directory layout should look like this
$ find .
.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala
# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar
# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23