spark算子

val func1 = (index:Int,iter:Iterator[(Int)]) => {} //和下面的等同
def func1(index:Int,iter:Iterator[(Int)]): Iterator[String] = {
iter.toList.map(x => "partId:" + index + " val:"+x).iterator
}
val rdd.mapPartitionWithIndex(func1)

aggregate(0)(math.max(_,_),_+_) 将每个分区内的最大值求出,并相加,0是每个partition的初始值,
就是每个partition 的第一个元素和0比,最后合并的时候也是0加第一partition的最大值相加再往后加     action

val rdd1 = sc.parallelize(List("a","b","c","d","e","f"), 2)    
rdd1.aggregate("|")(_+_,_+_)   
// ||abc|def 也有可能是 ||def|abc 并行任务

val rdd2 = sc.parallelize(List("12","23","345","4567"),2)
rdd2.aggregate("")((x,y) =>math.max(x.length,y.length).toString,(x,y)=>x + y)
//24 或者42  并行的,不知道谁先出结果

val rdd3 = sc.parallelize(List("12","23","345",""),2)
rdd2.aggregate("")((x,y) =>math.min(x.length,y.length).toString,(x,y)=>x + y)
//10 或者01  并行的,不知道谁先出结果, 第一个partition初始值是“”,length是0,第二轮再比 0.length是1(第一轮 “”,“12”比的结果是0,tostring是“0”; 第二轮 “0”,“23”比的结果是 1,tostring是“1”)


val arr = List("","12","23")
arr.reduce((x, y ) => math.min(x.length, y.length).toString)
//1
reduce  要求返回值必须和arr中定义的类型一致

val pairRDD = sc.parallelize(List(("cat", 2),("cat,5"),("mouse",4),("cat",12),("dog",12),("mouse", 2)),2)
pairRDD.aggregateByKey(0)(_+_,_+_).collect

 (Cat,19) (mouse,6)(dog,12)

 PairRDD.aggregateByKey(0)(math.math(_,_),_+_).collect

 (Cat,17)(mouse,6)(dog,12)
Partition内局部操作,再整体操作

aggregate是action,aggregatebykey是transformation

PairRDD.aggregateByKey(100)(math.math(_,_),_+_).collect

(Cat,200) (mouse,200)(dog,100)
在每个分区内的key初始值都是100,最后出来汇总的时候没有初始值了

pairRDD.reduceByKey(_+_).collect

和上面的aggregateByKey是一样的最终,只不过一个传2组参数一个传一组参数

他们这两个方法最终都是调combineByKey

wordcount

sc.textfile().flatMap(_.split(" ")).map(_,1).reduceByKey(_+_)

GroupByKey也可以,但是没有partition内的局部求和,直接在整体求了

spark从hdfs读数据获取rdd的partition数和文件在hdfs中的block数相同。比如两个文件都有两个block那读这两个文件是4个partition。文件128m一个block。

(文件大小在128-256间则两个block)
问题,这样的文件在显示上是几个!

combineByKey

  def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
  partitioner, mapSideCombine, serializer)(null)
  }

createCombiner 中V是每个partition内数据(key,value)中的第一条数据的value值,经过一个函数变换,将这第一个value变换成数据类型C,
mergeValue是在同一个partition内部做的操作函数,以createcombiner变化出的第一条数据(类型为c)的结果作为初始值,对这个partititon进行处理,V是一条接着一条数据的(key,value)中的value值,最后这个partition返回一个值,数据类型是C
mergeCombiners是对最后各个partition结果的合并再操作

val rdd10 = sc.textFile().flatMap(_.split(" ")).map((_,1)).combineByKey(x=> x + 10, (m:Int, n:Int) => (m + n), (a:Int, b:Int) => (a + b)).collect

e.g rdd11内的数据是一组tuple

rdd11.collect = Array((1,"dog"),(1,"cat"),(2,"gnu"),(2,"salmon"),(2,"rabbit"),(1,"turkey"),(1,"wolf"),(2,"bear"),(2,"bee"))
rdd11.combineByKey(List(_),(x:List[String],y:String) =>x :+ y, (a:List[String], b:List[String]) => a ++ b).collect

List()的意思是将各个partition的第一个tuple中的value放到一个新建的 List中,比如分3个区,(1,"dog")是第一区的第一个tuple,List() 意思是List("dog").

(x:List[String],y:String) =>x :+ y是说,现在已经有List("dog"),然后接着往里加 “cat” 变成List("dog","cat")所以第一区的结果是
(1, list("dog","cat"))
(2, list("gnu"))
第二区的结果是
(1, list("turkey"))
(2, list("salmon","rabbit"))
第二区的结果是
(1, list("wolf"))
(2, list("bear","bee"))

(a:List[String], b:List[String]) => a ++ b就是按key把各个分区结果合并得到result

Array[(Int, List[String])] = Array((1,List("dog","cat","turkey","wolf")),(2,List("gnu","salmon","rabbit","bear","bee")))

repartition

  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
  }

collectAsMap action
countByKey action 可以用在wordcount
countByValue

rdd13.collect = arr = List(("a",1),("b",2),("b",2))
rdd13.countByValue() = Map(("a",1) -> 1,("b",2) -> 2)

val rdd14 = sc.parallelize((List("e",5),(c,3),(d,4),(c,2),(a,1),(b,6)))
rdd14.filterByRange("b","d").collect = Array((c,3),(d,4),(c,2),(b,6))

val rdd15 = sc.parallelize(List("a","1 2"),(v, "3 4"))
rdd15.flatMapValues(_.split(" ")).collect = Array((a,1 ),(a,2),(v,3),(v,4))

val rdd16 =sc.parallelize(List("dog",wolf,cat,bear),2).map(x => (x.length,x)).foldByKey("")(_+_).collect() = Array((4,wolfbear),(3,catdog))


val rdd17 = sc.parallelize(List("dog",wolf,cat,bear),2).keyBy(_.length).collect = Array((4,bear),(4,wolf),(3,dog),(3,cat))

val rdd18 =sc.parallelize(List("dog",wolf,cat,bear),2).map(x => (x.length,x).keys.collect = Array(3,4,3,4)

val rdd19 =sc.parallelize(List("dog",wolf,cat,bear),2).map(x => (x.length,x).values.collect = Array(dog,wolf,cat,bear)



  /**
     * Return this RDD sorted by the given key function.
   */
  def sortBy[K]( //RDD[T] 是返回值类型 ,方法名后的[K]中间涉及的类型, 比如Set[T]
      f: (T) => K,
      ascending: Boolean = true,
      numPartitions: Int = this.partitions.length)
      (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =     withScope {
    this.keyBy[K](f)
    .sortByKey(ascending, numPartitions)
    .values
  }



 object  OrderContext {
  implicit val girlOrdering = new Ordering[Girl] {
    override def compare(x: Girl, y: Girl): Int = {
      x.faceValue - y.faceValue
    }
  }
}
case class Girl(val faceValue:Int) extends 
Ordered[Girl] with Serializable {
  override def compare(that: Girl): Int = {
    that.faceValue -that.faceValue
   }
}

object Test {
  def main(args: Array[String]): Unit = {
    val sparkSession: SparkSession = new SparkSession.Builder().master("local[*]").getOrCreate()
    val rdd = sparkSession.sparkContext.parallelize(List(("girl1",10),("girl3",30),("girl2", 20)),3)
    import  OrderContext._
    val result = rdd.sortBy(x => Girl(x._2)) //x._2是
facevalue
    println(result.collect.toBuffer)
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容