参考Spark官方文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html
Spark shell以及SparkContext
以下代码可在Spark shell中运行,在Spark shell中默认已经为用户创建了一个特殊的SparkContext,叫sc。
RDD数据结构
RDD全称是resilient distributed dataset,弹性分布式数据集合,这个是一个可以容错并且能够被多个节点同时操作的数据集合。
创建RDD的两种途径
- 在driver程序中同步一个已经存在的集合
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
通过调用shell为我们创建的SparkContext的实例sc的parallelize方法。
- 从外部以文件的形式导入数据
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
通过调用textFile方法,该方法传入一个文件的URI作为参数,URI可以是本地的路径也可以是其他机器上的比如 hdfs:// 等。该方法将读取到文件视为一行行数据的集合。
在RDD上的操作
RDD支持两种类型的操作:transformations & actions
transformations :该类操作将从一个已经存在的RDD上创建一个新的RDD,可以将map理解为是transformations 类操作,因为map通过将一个dataset中的元素逐个传入某个map函数之后,返回一个新的RDD来表示经过函数处理过的结果。
actions:该类操作干的事情就是在对dataset执行一些计算活动之后,向driver程序返回计算的结果。reduce就是属于actions的操作,reduce通过函数归并 RDD中的元素并将最终结果返回给driver程序。
在Spark中transformations 操作被定义为懒操作,因为transformations 并不会立即去计算结果。只有当一个actions操作需要向driver返回结果的时候,transformations 才会被计算。
举个🌰说明一下:
如下一段scala代码:
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
这是从外部文件获取数据创建RDD并对其进行计算,整个三句话干的事情就是读取data.txt文件,并对文件中每一行的长度进行计算最后累加求得总长度。
第二行的map是一个transformations操作,它实际上并不会立刻计算出每一行的值并放入lineLength中,即lineLength目前没有值,到第三行的时候遇到了reduce,是一个actions方法,这时候Spark会将计算分解为多个tasks,分别在独立的机器上运行,每台机器运行格子的map 和本地的reduce,最终将结果返回driver程序。
如果在后面还要继续用到lineLengths的值,可以
lineLengths.persist()
将这个变量的值进行缓存。