Spark 常用算子及代码

sc.parallelize 和 sc.markRDD

parallelize()源码

def parallelize[T: ClassTag](  
      seq: Seq[T],  
      numSlices: Int = defaultParallelism): RDD[T] = withScope {  
    assertNotStopped()  
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())  
  }  

makeRDD(),有两种重构方法

/** Distribute a local Scala collection to form an RDD.  
   *  
   * This method is identical to `parallelize`.  
   */  
  def makeRDD[T: ClassTag](  
      seq: Seq[T],  
      numSlices: Int = defaultParallelism): RDD[T] = withScope {  
    parallelize(seq, numSlices)  
  }  
/**  
   * Distribute a local Scala collection to form an RDD, with one or more  
   * location preferences (hostnames of Spark nodes) for each object.  
   * Create a new partition for each collection item.  
   */  
  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {  
    assertNotStopped()  
    val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap  
    new ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)  
  }  

注释的意思为:分配一个本地Scala集合形成一个RDD,为每个集合对象创建一个最佳分区

测试使用

object MyTask2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("rdd maker").setMaster("local")
    val sc = new SparkContext(conf)
    val list = List(("A",List("a1","a2","a3")),("B",List("b1","b2","b3"),("C",List("c1","c2","c3"))))
    val rddmaker = sc.makeRDD(list)
    val rddP = sc.parallelize(list)

    println("rddmaker partitions size:",rddmaker.partitions.size)
    println("rddP partitions size:",rddP.partitions.size)
  }

}
//(rddmaker partitions size:,1)
//(rddP partitions size:,1)

distinct

代码

object MyTask3 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("task3"))
    println("rdd partitions size is ",rdd.partitions.size)
    val rdd = sc.parallelize(List("a","b","c","b","b","a"))
    rdd.distinct().collect().foreach(print(_))
  }
}
//(rdd partitions size is ,1)
//abc

union

代码

object MyTask4 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task4"))
    val rddLeft = sc.parallelize(List("2","3","4","5"))
    val rddRight = sc.parallelize(List("1","3","5","7"))
    val rddUnion = rddLeft.union(rddRight)
    rddUnion.collect().foreach(item => print(item + ","))
  }
}
//2,3,4,5,1,3,5,7,

intersection 求交集

object MyTask5 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task4"))
    val rddLeft = sc.parallelize(List("2","3","4","5"))
    val rddRight = sc.parallelize(List("1","3","5","7"))
    val rddIntersec = rddLeft.intersection(rddRight)
    rddIntersec.collect().foreach(item => print(item + ","))
  }
}
//5,3,

subtract 把Rdd中的与另一个Rdd相同的元素去掉

代码

object MyTask6 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task6"))
    val rddLeft = sc.parallelize(List("2","3","4","5"))
    val rddRight = sc.parallelize(List("1","3","5","7"))
    val rddSubtract = rddLeft.subtract(rddRight)
    rddSubtract.collect().foreach(item => print(item + ","))
  }
}
//2,4,

cartesian 笛卡尔积

代码

object MyTask7 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task7"))
    val rddLeft = sc.parallelize(List("2","3","4","5"))
    val rddRight = sc.parallelize(List("1","3","5","7"))
    val rddCartesian = rddLeft.cartesian(rddRight)
    rddCartesian.collect().foreach(item => print(item + ","))
  }
}
//(2,1),(2,3),(2,5),(2,7),(3,1),(3,3),(3,5),(3,7),(4,1),(4,3),(4,5),(4,7),(5,1),(5,3),(5,5),(5,7)

countByValue 求出value出现的次数

代码

object MyTask8 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task8"))
    val rddLeft = sc.parallelize(List("2","3","4","5"))
    val rddRight = sc.parallelize(List("1","3","5","7"))
    val rddUnion = rddLeft.union(rddRight)
    val rddCountByValue:scala.collection.Map[String, scala.Long] = rddUnion.countByValue
    rddCountByValue.foreach(item => println(item._1 + "," + item._2))
  }
}
/*
4,1
5,2
1,1
2,1
7,1
3,2
*/

reduce 并行计算出函数

代码

object MyTask9 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task9"))
    val rdd = sc.parallelize(1 to 11)
    val result = rdd.reduce((x,y) => x+y)
    println(result)
  }
}
//66

fold

代码

object MyTask10 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task10"))
    val rdd = sc.parallelize(1 to 11,2)
    val result = rdd.fold(10)(_+_)
    println(result)
  }
}
//96

解释,与reduce类似,只是多了一个初始值。

aggregate

函数签名

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

解释:
aggregate先对每个分区的元素做聚集,然后对所有分区的结果做聚集,聚集过程中,使用的是给定的聚集函数以及初始值”zero value”。这个函数能返回一个与原始RDD不同的类型U,因此,需要一个合并RDD类型T到结果类型U的函数,还需要一个合并类型U的函数。这两个函数都可以修改和返回他们的第一个参数,而不是重新新建一个U类型的参数以避免重新分配内存。
参数zeroValue:seqOp运算符的每个分区的累积结果的初始值以及combOp运算符的不同分区的组合结果的初始值 - 这通常将是初始元素(例如“Nil”表的列表 连接或“0”表示求和)
参数seqOp: 每个分区累积结果的聚集函数。
参数combOp: 一个关联运算符用于组合不同分区的结果

代码

object MyTask11 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setMaster("local[1]").setAppName("task11"))
    val rdd = sc.parallelize(1 to 4,3)
    val result = rdd.aggregate((0,0,0))(
      (acc,number) => (acc._1+number,acc._1,acc._3+1),
      (x,y) => (x._1 + y._1,x._2 + y._2,x._3+y._3)
    )
    println(result)
  }
}
//(10,3,4)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351

推荐阅读更多精彩内容