Spark算子

一、算子分类
1、transformation算子:这类算子并不触发提交作业,完成作业中间过程处理
Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,可以理解为懒加载,需要等到有 Action 操作的时候才会真正触发运算。
2、action算子:这类算子会触发 SparkContext 提交 Job 作业
Action 算子会触发 Spark 提交作业(Job),spark job的划分就是依据action算子
RDD中算子的运行过程:
输入:
在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空间,转化为Spark中的数据块,通过BlockManager进行管理。
运行:
在Spark数据输入形成RDD后便可以通过变换算子,如filter等,对数据进行操作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业,。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。
输出:
程序运行结束,数据会输出Spark运行时的空间,存储到分布式存储中(如saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回Scala Int型数据)

注意点:
如何区分transformation算子和action算子:transformation算子一定会返回一个rdd,action大多没有返回值,也可能有返回值,但是一定不是rdd。

二、transformation算子介绍
1、常用transformation方法

操作 介绍
map 将RDD中的每个元素传入自定义函数,获取一个新的元素,然后用新的元素组成新的RDD
filter 对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除
flatMap 与map类似,但是对每个元素都可以返回一个或多个新元素
groupByKey 根据key进行分组,每个key对应一个Iterable<value>
reduceByKey 对每个Key对应的value进行reduce操作
sortByKey 对每个key对应的value进行排序操作
join 对两个包含<key,value>的RDD进行join操作,每个key join上的pair,都会传入自定义函数进行处理
cogroup 同join,但是每个key对应的Itreable<value>都会传入自定义函数进行处理

2、示例代码

package test
 
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext, TaskContext}
 
/**
  * spark的RDD的算子详解;
  */
object rddDemo {
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("localTest").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("D:\\test.txt",2)
    //因为一些算子只能对PairRDD操作,所以在这我就直接转成PairRDD了
    val pair_rdd = rdd.flatMap(_.split(" ")).map((_,1))
    //---------------------------------------------transformation算子----------------------------------------
 
    /**
      * 1.map(func):返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中的元素应用一个函数
      * func 来生成对RDD每个元素转换
      */
    val map_rdd = pair_rdd.map(x=>(x._1,x._2*2))
 
    /**
      * 2.filter(func):返回一个新的 distributed dataset(分布式数据集),它由每个 source(数据源)中应用一个函数 func
      * 且返回值为 true 的元素来生成.
      */
    val filter_rdd = pair_rdd.filter(x=> x._1.equals("hello"))
 
    /**
      * 3.flatMap(func):与 map 类似,但是每一个输入的 item 可以被映射成 0 个或多个输出的 items(所以 func 应该返回一个 Seq
      * 而不是一个单独的 item)对RDD每个元素转换, 然后再扁平化(即将所有对象合并为一个对象)
      * 相当于先map然后flat.
      */
    val flatMap_rdd = pair_rdd.flatMap(_._1.split(" "))
 
    /**
      * 4.mapPartitions(func):与 map 类似,但是单独的运行在在每个 RDD 的 partition(分区,block)上,
      * 所以在一个类型为 T 的 RDD 上运行时 func 必须是 Iterator<T> => Iterator<U> 类型.
      */
    val mapPartitions_rdd = pair_rdd.mapPartitions(testMapPartition)
 
    /**
      * 5.mapPartitionsWithIndex(func):与 mapPartitions 类似,但是也需要提供一个代表 partition 的 index(索引)的
      * interger value(整型值)作为参数的 func,所以在一个类型为 T 的 RDD 上运行时 func 必须是 (Int, Iterator<T>) => Iterator<U> 类型.
      */
    val mapPartitionsWithIndex_rdd = pair_rdd.mapPartitionsWithIndex((x,it)=>{
      var result = List[Int]()
      var i = 0
      while(it.hasNext){
        i += it.next()._2
      }
      result.::(x + "|" + i).iterator
    })
 
    /**
      * 6.sample(withReplacement, fraction, seed):样本数据,设置是否放回(withReplacement), 采样的百分比(fraction)
      * 使用指定的随机数生成器的种子(seed).
      * withReplacement:元素可以多次抽样(在抽样时替换)
      * fraction:期望样本的大小作为RDD大小的一部分,
      * 当withReplacement=false时:选择每个元素的概率;分数一定是[0,1] ;
      * 当withReplacement=true时:选择每个元素的期望次数; 分数必须大于等于0。
      * seed:随机数生成器的种子
      */
    val sample_rdd = pair_rdd.sample(true,0.5)
 
    /**
      * 7.union(otherDataset):返回一个新的 dataset,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的并集.
      */
    val union_rdd = pair_rdd.union(map_rdd)
 
    /**
      * 8.intersection(otherDataset):返回一个新的 RDD,它包含了 source dataset(源数据集)和 otherDataset(其它数据集)的交集.
      */
    val intersection_rdd = pair_rdd.intersection(pair_rdd)
 
    /**
      * 9.distinct([numTasks])):返回一个新的 dataset,它包含了 source dataset(源数据集)中去重的元素.
      */
    val distinct_rdd = pair_rdd.distinct()
 
    /**
      * 10.groupByKey([numTasks]):在一个 (K, V) pair 的 dataset 上调用时,返回一个 (K, Iterable<V>) .
      * Note: 如果分组是为了在每一个 key 上执行聚合操作(例如,sum 或 average),此时使用 reduceByKey 或 aggregateByKey 来计算性能会更好.
      * Note: 默认情况下,并行度取决于父 RDD 的分区数。可以传递一个可选的 numTasks 参数来设置不同的任务数.
      */
    val groupByKey_rdd = pair_rdd.groupByKey().map(x=>(x._1,x._2.size))
 
    /**
      * 11.reduceByKey(func, [numTasks]):在 (K, V) pairs 的 dataset 上调用时, 返回 dataset of (K, V) pairs 的 dataset,
      * 其中的 values 是针对每个 key使用给定的函数 func 来进行聚合的, 它必须是 type (V,V) => V 的类型. 像 groupByKey 一样,
      * reduce tasks 的数量是可以通过第二个可选的参数来配置的.
      */
    val reduceByKey_rdd = pair_rdd.reduceByKey(_+_)
 
    /**
      * 12.aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]):在 (K, V) pairs 的 dataset 上调用时, 返回 (K, U) pairs 的 dataset,
      * 其中的 values 是针对每个 key 使用给定的 combine 函数以及一个 neutral "0" 值来进行聚合的. 允许聚合值的类型与输入值的类型不一样,
      * 同时避免不必要的配置. 像 groupByKey 一样, reduce tasks 的数量是可以通过第二个可选的参数来配置的.
      */
    val data = List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8))
    val rdd1 = sc.parallelize(data)
    val aggregateByKey_rdd : RDD[(Int,Int)] = rdd1.aggregateByKey(0)(
      math.max(_,_),
      _+_
    )
 
    /**
      * 13.sortByKey([ascending], [numTasks]):在一个 (K, V) pair 的 dataset 上调用时,其中的 K 实现了 Ordered,返回一个按 keys
      * 升序或降序的 (K, V) pairs 的 dataset, 由 boolean 类型的 ascending 参数来指定.
      */
    val sortByKey_rdd = pair_rdd.sortByKey(true)
 
    /**
      * 14.join(otherDataset, [numTasks]):在一个 (K, V) 和 (K, W) 类型的 dataset 上调用时,返回一个 (K, (V, W)) pairs 的 dataset,
      * 它拥有每个 key 中所有的元素对。Outer joins 可以通过 leftOuterJoin, rightOuterJoin 和 fullOuterJoin 来实现.
      */
    val join_rdd = pair_rdd.join(map_rdd)
 
    /**
      * 15.cogroup(otherDataset, [numTasks]):在一个 (K, V) 和的 dataset 上调用时,返回一个 (K, (Iterable<V>, Iterable<W>)) tuples
      * 的 dataset. 这个操作也调用了 groupWith.
      */
    val cogroup_rdd = pair_rdd.cogroup(map_rdd,4)
 
    /**
      * 16.cartesian(otherDataset):在一个 T 和 U 类型的 dataset 上调用时,返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积).
      */
    val cartesian_rdd = pair_rdd.cartesian(map_rdd)
 
    /**
      * 18.coalesce(numPartitions):Decrease(降低)RDD 中 partitions(分区)的数量为 numPartitions。对于执行过滤后一个大的 dataset 操作是更有效的.
      */
    val coalesce_rdd = pair_rdd.coalesce(1)
 
    /**
      * 19.repartition(numPartitions):Reshuffle(重新洗牌)RDD 中的数据以创建或者更多的 partitions(分区)并将每个分区中的数据尽量保持均匀.
      * 该操作总是通过网络来 shuffles 所有的数据.
      */
    val repartition_rdd = pair_rdd.repartition(10)
 
    /**
      * 20.repartitionAndSortWithinPartitions:根据给定的 partitioner(分区器)对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。
      * 这比每一个分区中先调用 repartition 然后再 sorting(排序)效率更高,因为它可以将排序过程推送到 shuffle 操作的机器上进行.
      */
    val repartitionAndSortWithinPartitions_rdd = pair_rdd.zipWithIndex().repartitionAndSortWithinPartitions(new HashPartitioner(4))
    repartitionAndSortWithinPartitions_rdd.foreachPartition(pair=>{
      println("第几个分区-------------" + TaskContext.get.partitionId)
      pair.foreach(p=>{
        println(p._1 +"---------" +p._2)
      })
    })
  }

三、action算子介绍
1、常用action算法

操作 介绍
reduce(func) 通过函数func聚集集合中的所有的元素。func函数接收2个同构的元素,返回一个值。这个函数必须是关联性的,确保可以被正确地并发执行。这个算子不像reduceByKey一样通过key进行分组,所以其是一个全量的操作。
collect() 在Driver的程序中,以数组的形式,返回数据集的所有元素。但是,请注意,这个只能在返回一个较小的数据子集时才能使用,不然会很容易导致OOM异常。
count() 返回数据集的元素个数(Long类型的数)。
take(n) 返回数据集中前n(Int类型)个元素组成的一个数组。注意,这个操作并不在多个节点上运行,而是在Driver所在的节点上。如果要拿到的数据量较大,尽量不要使用该算子,会导致Driver所在节点压力过大。
first() 返回数据集的第一个元素(类似于take(1))。
saveAsTextFile(path) 将数据集中的所有元素以textfile的格式保存到本地,hdfs等文件系统中的指定目录下。Spark会调用toString()方法将每一个元素转换为一行文本保存。
saveAsSequenceFile(path) 将数据集中的所有元素以sequencefile的格式保存到本地,hdfs等文件系统中的指定的目录下。但是,这种方法需要RDD的元素必须是key-value对组成,并实现Writable接口或隐性可以转换为Writable(Spark中的基本类型包含了该转换)。
foreach(func) 在数据集中的每一个元素,运行函数func。
countByKey 和reduceByKey效果相同,只是reduceByKey是一个Transformation算子。

2、示例代码

package test
 
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext, TaskContext}
 
/**
  * spark的RDD的算子详解;
  */
object rddDemo {
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("localTest").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("D:\\test.txt",2)
    //因为一些算子只能对PairRDD操作,所以在这我就直接转成PairRDD了
    val pair_rdd = rdd.flatMap(_.split(" ")).map((_,1))
    //--------------------------------------------Action算子---------------------------------------
 
    /**
      * 1.reduce(func):使用函数 func 聚合 dataset 中的元素,这个函数 func 输入为两个元素,返回为一个元素。这个函数应该是可交换(commutative )
      * 和关联(associative)的,这样才能保证它可以被并行地正确计算.
      */
    val reduce_rdd = pair_rdd.reduce((a,b) =>{
      (a._1+ "---" +b._1,a._2+b._2)
    })
 
    /**
      * 2.collect():在 driver 程序中,以一个 array 数组的形式返回 dataset 的所有元素。这在过滤器(filter)或其他操作(other operation)
      * 之后返回足够小(sufficiently small)的数据子集通常是有用的.
      */
    val collect_ = pair_rdd.collect()
 
    /**
      * 3.count():返回 dataset 中元素的个数.
      */
    val count_ = pair_rdd.count()
 
    /**
      * 4.first():返回 dataset 中的第一个元素类似于 take(1).
      */
    val first_ = pair_rdd.first()
 
    /**
      * 5.take(n):将数据集中的前 n 个元素作为一个 array 数组返回.
      */
    val take_ = pair_rdd.take(2)
 
    /**
      * 6.takeSample(withReplacement, num, [seed]):对一个 dataset 进行随机抽样,返回一个包含 num 个随机抽样(random sample)元素的数组,
      * 参数 withReplacement 指定是否有放回抽样,参数 seed 指定生成随机数的种子.
      */
    val takeSample_ = pair_rdd.takeSample(true,5)
 
    /**
      * 7.takeOrdered(n, [ordering]):返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素
      */
    val takeOrdered_ = pair_rdd.takeOrdered(3)
 
    /**
      * 8.saveAsTextFile(path):将 dataset 中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop
      * 支持的文件系统中的给定目录中。
      * Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录.
      */
 
    /**
      * 9.saveAsTextFile(path):将 dataset 中的元素以文本文件(或文本文件集合)的形式写入本地文件系统、HDFS 或其它 Hadoop
      * 支持的文件系统中的给定目录中。
      * Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录.
      */
    val saveAsTextFile_ = pair_rdd.saveAsTextFile("D:\\result1.txt")
 
 
    /**
      * 10.saveAsSequenceFile(path):将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS
      * 或其它 Hadoop 支持的文件系统指定的路径中。该操作可以在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)
      * 的 RDD 上使用。在 Scala 中,它还可以隐式转换为 Writable
      * 的类型(Spark 包括了基本类型的转换,例如 Int, Double, String 等等).
      * (Java and Scala)
      */
    val saveAsSequenceFile_ = pair_rdd.saveAsSequenceFile("D:\\result2.txt")
 
    /**
      * 11.saveAsObjectFile(path):使用 Java 序列化(serialization)以简单的格式(simple format)编写数据集的元素,
      * 然后使用 SparkContext.objectFile() 进行加载.
      * (Java and Scala)
      */
    val saveAsObjectFile_ = pair_rdd.saveAsObjectFile("D:\\result3.txt")
 
    /**
      * 12.countByKey():仅适用于(K,V)类型的 RDD 。返回具有每个 key 的计数的 (K , Int)pairs 的 hashmap.
      */
    val countByKey_ = pair_rdd.countByKey()
 
    /**
      * 13.foreach(func):对 dataset 中每个元素运行函数 func 。这通常用于副作用(side effects),例如更新一个 Accumulator(累加器)
      * 或与外部存储系统(external storage systems)进行交互。Note:修改除 foreach()之外的累加器以外的变量(variables)
      * 可能会导致未定义的行为(undefined behavior)。详细介绍请阅读 Understanding closures(理解闭包) 部分.
      */
    val foreach_ = pair_rdd.foreach(println)
  }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353