本文发表于2010 IEEE 2nd International Conference on Cloud Computing Technology and Science (CloudCom 2010),是Spark系列论文的开篇之作。文中主要介绍了当时解决大规模数据的分布式框架存在的局限性,并针对这些问题提出了Spark的解决方案。下面是针对该篇论文所做的阅读笔记。
1. 摘要和引言
现有的大规模数据解决方案(主要指MapReduce)针对如下两类问题时,存在着局限性:
1)迭代式作业:虽然每次迭代可以都可以表示一个MR任务,但是每一次迭代必须从磁盘加载数据;
2)交互式数据分析:SQL虽然也可以转换成MR任务,但是每一次MR任务都要从磁盘加载数据。
这里都提到MR在解决问题的时候,都是从磁盘不停的加载数据,而磁盘IO的代价是非常高的,因此MR在解决上述问题的时候效率其实并不高。
针对上述问题,本文提出了一种新的大规模数据计算方案Spark,弹性分布式数据集(RDD)可以用来解决迭代式作业的问题;而Spark是基于Scala进行构建的,而Scala可以提供交互式的操作,可以很好的解决交互式的数据分析。
2. 编程模型
为了使用Spark,开发人员需要编写驱动程序(Driver),它的作用是控制应用程序的执行流程并在并行的环境中执行一系列的并行操作。Spark主要提供了两类抽象:RDD 和并行算子(parallel operation)。此外,Spark还提供了两种受限的共享变量。
2.1. 弹性分布式数据集(RDD)
RDD的特点:
1)跨计算机间的可分区的只读对象集合;
2)分区丢失之后可以重建(因为RDD不需要物化在物理存储上,相反可以通过物理存储上的数据来构建RDD);
3)可以持久化RDD,供后续计算来使用。
如何创建RDD?
1)从HDFS这样的分布式文件系统创建;
2)通过并行的读取Scala集合来创建;
3)从另一个RDD转化而来;
4)改变现有RDD的持久性。
RDD默认是惰性并且临时的,但是可以通过特定的操作来改变其持久性,如何改变?
1)Cache action:将数据保存在内存中,以便后期重用时,可以快速的使用。
2)Save action:将数据持久化到像HDFS这样的分布式文件存统上,这个被保存的版本也可以在后期的操作中重用。
2.2. 并行操作
可以在RDD上执行一系列的并行操作,如reduce, collect, foreach等等。
2.3. 共享变量
Spark提供了两种共享变量:
1)广播变量:这种变量只会被广播到每一个Worker一次;
2)累加器:可以在Worker节点间共享该变量,可以用来作为计数器。
3. 示例
下面会列举三个示例来显示如何使用上述特性。
3.1. 文本搜索
val file = spark.textFile("hdfs://...")
val errs = file.filter(_.contains("ERROR"))
val ones = errs.map(_ => 1)
val count = ones.reduce(_+_)
假设需要对存储在HDFS中的大型日志文件中包含的错误行进行统计。上面的代码示例使用Spark的方式实现了MapReduce操作。与MapReduce的操作不同的是,Spark可以保存中间数据。如果我们想保存errs数据,就可以使用如下方式创建一个缓存的RDD:
val cachedErrs = errs.cache()
这样如果后续我们需要读errs数据进行更多的操作,就会大大的提高执行效率了。
3.2. 逻辑回归
// 从文本文件中读取点数据,并缓存在内存中
val points = spark.textFile(...).map(parsePoint).cache()
// Initialize w to random D-dimensional vector
var w = Vector.random(D)
// Run multiple iterations to update w
for (i <- 1 to ITERATIONS) {
//将梯度设置成累计,可以在所有的worker之间累加该数据
val grad = spark.accumulator(new Vector(D))
//scala的for是语法糖,因此如下的代码会被转换成points.foreach来执行,是一个并行操作
for (p <- points) {
// Runs in parallel
val s = (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y
grad += s*p.x
}
w -= grad.value
}
LR是一种迭代算法,因此可将迭代数据缓存在内存中从而提高执行效率(迭代ITERATIONS次,每次points都是从cache到内存的数据来读)。将梯度设置成累加器变量,这样其就可以在并行的环境下进行累加了。
3.3. 最小二乘法
//每次计算的时候R都是被当作参数传进去,所以这里将数据集R设置成广播变量
val Rb = spark.broadcast(R)
for (i <- 1 to ITERATIONS) {
U = spark.parallelize(0 until u)
.map(j => updateUser(j, Rb, M)).collect()
M = spark.parallelize(0 until m)
.map(j => updateUser(j, Rb, U)).collect()
}
计算U和M时,都是通过并行化的方式进行计算的,而计算的过程中每一次循环,都需要数据集R,因此我们可以把数据集R设置成广播变量,在程序启动之后,数据集R只会被driver向所有参与计算的worker节点发送一次。