Spark从入门到精通 RDDs, Accumulators, BroadcastVars
本文代码对应的git地址
-
初始化spark并消除结果info日志
val spark = SparkSession .builder() .master("local") .appName("RDDs, Accumulators, Broadcast Vars") .config("spark.some.config.option", "some-value") .getOrCreate() val rootLogger = Logger.getRootLogger() rootLogger.setLevel(Level.ERROR) val sc = spark.sparkContext
-
创建RDD
- parallelize
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) val rdd = sc.parallelize(1 to 5)
- 外部文件(textFile)
val distFile = sc.textFile("/Users/didi/learn/learnSpark/src/main/resources/people.txt")
-
RDD操作
-
打印
rdd.foreach(x => print(x + " ")) rdd.collect().foreach(println) rdd.take(n).foreach(println)
-
transformations(转换):延迟计算
-
map
-
map
val rdd = sc.parallelize(1 to 5) val map = rdd.map(x => x * 2)
-
flatMap: 每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出
val fm = sc.parallelize(1 to 5) .flatMap(x => (1 to x))
-
-
采样: sample(withReplacement, fraction, seed)
val sample1 = sc.parallelize(1 to 20) .sample(false, 0.5, 1)
-
RDD交并集
-
并集: union(ortherDataset)
val unionRDD = sc.parallelize(1 to 3) .union(sc.parallelize(3 to 5))
-
交集: intersection(otherDataset)
val inRDD = sc.parallelize(1 to 3) .intersection(sc.parallelize(3 to 5))
-
-
去重: distinct
val disRDD = sc.parallelize(List(1, 2, 2, 3)).distinct()
-
笛卡尔积操作: cartesian(otherDataset)
val cartRDD = sc.parallelize(1 to 2) .cartesian(sc.parallelize(2 to 3))
-
筛选: filter
val ft = sc.parallelize(1 to 5) .filter(x => x >2)
-
Key-Value
-
reduceByKey
val counts = distFile.map(s => (s, 1)) .reduceByKey((a, b) => a + b)
-
sortByKey
val scounts = distFile.map(s => (s, 1)) .reduceByKey((a, b) => a + b) .sortByKey()
-
-
-
actions(执行)触发计算
-
reduce: 通过函数func先聚集各分区的数据集,再聚集分区之间的数据,func接收两个参数,返回一个值,值再做为参数继续传递给函数func,直到最后一个元素
val actionRDD = sc.parallelize(1 to 10, 2) val reduceRDD = actionRDD.reduce(_ + _)
-
collect: 以数据的形式返回数据集中的所有元素
actionRDD.collect().foreach(x => print(x + " "))
-
count: 返回数据集元素个数
val countRDD = actionRDD.count()
-
first: 返回数据集的第一个元素
val firstRDD = actionRDD.first()
-
take(n): 以数组的形式返回数据集上的前n个元素
val takeRDD = actionRDD.take(5)
-
top(n): 按默认或者指定的排序规则返回前n个元素,默认按降序输出
val topRDD = actionRDD.top(3)
-
takeOrdered(n,[ordering]): 按自然顺序或者指定的排序规则返回前n个元素
val takeOrderedRDD = actionRDD.takeOrdered(3)
-
Key-Value
val keyRDD = sc.parallelize(List(("a", 1), ("b", 2), ("a", 2), ("b", 3)))
-
countByKey: 统计每个key的个数
val countByKeyRDD = keyRDD.countByKey()
-
collectAsMap: 不包含重复的key
val collectAsMapRDD = keyRDD.collectAsMap()
-
lookup(k): 查找指定K的所有V值
val lookupRDD = keyRDD.lookup("a")
-
-
saveAsTextFile(path): 将结果保存到指定的HDFS目录中
rdd.saveAsTextFile(path)
-
-
persist(持久化)
-
persist()
rdd.persist()
-
cache()
rdd.cache()
-
-
Functions(函数)
-
匿名函数
val preFileLength = distFile.map(x => "pre" + x) .map(s => s.length) .reduce((a, b) => a + b)
-
单例对象中的静态方法
object MyFunctions { def fun1(s: String): String = { "pre" + s }} val funFileLength = distFile.map(MyFunctions.fun1) .map(s => s.length) .reduce((a, b) => a + b)
-
Shuffle 操作: shuffle 是spark 重新分配数据的一种机制,使得这些数据可以跨不同的区域进行分组。这通常涉及在 executors 和 机器之间拷贝数据,这使得 shuffle 成为一个复杂的、代价高的操作。包括repartition, coalesce, groupByKey, reduceByKey, cogroup, join等
-
删除数据
RDD.unpersist()
-
-
共享变量 shared variables
-
Broadcast Vars(广播变量)
val broadcastVar = sc.broadcast(Array(1, 2, 3))
-
Accumulators(累加器)
val accum = sc.longAccumulator("my Accumulator") sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
-