spark编程指南
可以使容器并行化
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
扩展数据集
- 可以直接使用文本文件
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
- 或者其他分布式文件系统的文件
RDD操作
分为transformations和actions
- transformations
只记录操作,不产生计算,返回的也是RDD - actions
进行实际的计算,得到具体的值
基本用法
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
想缓存的话:
lineLengths.persist()
spark中传递函数
推荐2种
- 匿名函数
- 全局单例对象中的静态函数
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
注意:同样可以传递类方法到spark
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
如这个例子,在对一个实例ins调用doStuff方法时,由于其引用了func1,所以需要将ins对象传递给spark集群,这回带来一些问题。
解决办法是使用一个变量显示的捕获field,而不是传递整个对象:
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
理解闭包
捕获外部变量的闭包将导致错误的结果。
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
例如这个例子,counter将被复制一份发送给各个executor,它们是独立的进程,不会共享变量。所以并不会得到预期的结果。所以在有共享变量的需求时,需要使用Accumulator。
打印RDD的值
尝试使用rdd.foreach(println)
时,并不会在driver端产生预期输出,而是输出在每个executor的stdout。正确的方法是先求值到driver端,然后打印:
rdd.collect().foreach(println)
但是数据量大的情况下,这也是大多数情况,内存是不够用的。所以你应该限制输出的数量:
rdd.take(100).foreach(println)
键值对
RDD的少数操作只针对键值对,例如通过key进行分组聚合等操作。
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
注意:使用自定义对象作key时,要实现equal()方法和合适的hashcode()方法