Scala常用函数

1. 常用函数

  • takeWhile
# takeWhile是从第一个元素开始,取满足条件的元素,直到不满足为止
val s1 = List(1,2,3,4,10,20,30,40,5,6,7,8,50,60,70,80)
val r1 = s1.takeWhile( _ < 10)
r1: List[Int] = List(1, 2, 3, 4)
  • Iterator类型的drop函数
val it = List.range(0, 10, 2).map {i => i.toString}
it.drop(1).zip(it.dropRight(1))
  • List
# List add Element
it3  :+ (1000,2000)   # 向末尾加
it3  :: (1000,2000)   # 向头部加
  • reduceByKey
# reduce不按map顺序执行, 可以使用groupBy
  • cogroup | join | groupByKey 区别

github:引用链接

Join() returns an dataset of [key, leftValue, rightValue], where [key, leftValue] comes from one dataset, and [key, rightValue] from the other dataset.

CoGroup() returns an dataset of [key, leftValues, rightValues], where [key, leftValue] entries from one dataset are group together into [key, leftValues], and [key, rightValue] from the other dataset are grouped into [key, rightValues], and both grouped entries are combined into [key, leftValues, rightValues].

GroupByKey() returns an dataset of [key, values], where [key, value] entries from one dataset are group together.

Join(), GroupByKey() and CoGroup() all depend on Partition(). Both of the input datasets should be partitioned by the same key, and partitioned to the same number of shards. Otherwise, a relatively costly partitioning will be performed.

join过程包含cogroup和flatmap两个过程, 如下图:
引自: join(otherRDD, numPartitions)

join计算流程
#After the broadcast variable is created, it should be used instead of the value v in any functions run on the cluster so that v is not shipped to the nodes more than once. 
#In addition, the object v should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable 
#(e.g. if the variable is shipped to a new node later).
#即: 广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。
# 广播变量可被用于有效地给每个节点一个大输入数据集的副本。
# Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。

 scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
 broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int} = Broadcast(0)

 scala> broadcastVar.value
 res0: Array[Int] = Array(1, 2, 3)
  • foldLeft
左侧累计
0为初始值(记住numbers是List[Int]类型),m作为一个累加器。

直接观察运行过程:
scala> numbers.foldLeft(0) { (m: Int, n: Int) => println("m: " + m + " n: " + n); m + n }
m: 0 n: 1
m: 1 n: 2
m: 3 n: 3
m: 6 n: 4
m: 10 n: 5
m: 15 n: 6
m: 21 n: 7
m: 28 n: 8
m: 36 n: 9
m: 45 n: 10
res0: Int = 55
  • Option
scala> val myMap: Map[String, (String, Boolean)] = Map("key1" -> ("value", true))
myMap: Map[String,(String, Boolean)] = Map(key1 -> (value,true))
scala> val vs = myMap.get("key1")
vs: Option[(String, Boolean)] = Some((value,true))

# 以上是元组方式,取出元组中数据,方式如下

# 方法一:
val (v2, s2) = vs match {
    case Some((v,s)) => (v, s)
    case _        => ("null", "null")
}

#方法二:
#如果被map的元素个数是0,就不执行map,但是可以执行map之后的函数,如下:
val (v2, s2) = vs.map { case (s, b) => (s, b.toString)}.getOrElse((null, null))
# val (v2, s2) = vs.map { case (s, b) => (s, b.toString)}.getOrElse(("null", "null"))

#注意:方法二,null不是string,后面s2不能调用关于String的方法, 关于null的类型转化,以下例子帮助理解
# null不能调用toString, 但None是可以的

scala> null.toString
java.lang.NullPointerException
scala> None.toString
res42: String = None

# null的类型,及其使用:
scala> "null"
res38: String = null
scala> null
res39: Null = null
scala> null.asInstanceOf[String]
res40: String = null
scala> Array("a",null).mkString(",")
res41: String = a,null
  • Option[Boolean]
scala> val myMap: Map[String, (String, Boolean)] = Map("key1" ->  true)
myMap: Map[String,(String, Boolean)] = Map(key1 -> (value,true))
scala> val myMap2 = myMap + ("k2" -> false)

// 体会以下区别, 返回值
scala> myMap2.get("k8").map(_.toString).getOrElse(null)
res160: String = null

scala> myMap2.get("k8").getOrElse(null)
res161: Any = null

  • HashMap
scala> val map1 = mutable.HashMap[String, String]()
map1: scala.collection.mutable.HashMap[String,String] = Map()

scala> map1.put("a1","aa1")
res104: Option[String] = None

scala> map1
res105: scala.collection.mutable.HashMap[String,String] = Map(a1 -> aa1)

scala> map1("a2") = "aa2"

scala> map1
res108: scala.collection.mutable.HashMap[String,String] = Map(a1 -> aa1, a2 -> aa2)
  • immutable.Map
// myMap 是immutable, 即不可改变的Map, 不能对其增加元素
scala> val myMap = Map("k1" -> true)
myMap: scala.collection.immutable.Map[String,Boolean] = Map(k1 -> true)

// 但可以把immutable与其他map相加, 返回新的值
scala> val myMap2 = myMap + ("k2" -> false)
myMap2: scala.collection.immutable.Map[String,Boolean] = Map(k1 -> true, k2 -> false)

scala> myMap2.get("k8").isEmpty
res147: Boolean = true

// sortBy
// 本地创建, 测试该函数
scala> val data = List(3,1,90,3,5,12)
data: List[Int] = List(3, 1, 90, 3, 5, 12)
 
scala> val rdd = sc.parallelize(data)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14
 
scala> rdd.collect
res0: Array[Int] = Array(3, 1, 90, 3, 5, 12)
 
scala> rdd.sortBy(x => x).collect
res1: Array[Int] = Array(1, 3, 3, 5, 12, 90)
 
scala> rdd.sortBy(x => x, false).collect
res3: Array[Int] = Array(90, 12, 5, 3, 3, 1)
 
scala> val result = rdd.sortBy(x => x, false)
result: org.apache.spark.rdd.RDD[Int] = MappedRDD[23] at sortBy at <console>:16
 
// 默认的partitions = 6
scala> result.partitions.size
res9: Int = 6
 
// 这里我们可以设置partitions的数量
scala> val result = rdd.sortBy(x => x, false, 1)
result: org.apache.spark.rdd.RDD[Int] = MappedRDD[26] at sortBy at <console>:16
 
scala> result.partitions.size
res10: Int = 1


// sortByKey

scala> val a = sc.parallelize(List("wyp", "iteblog", "com", "397090770", "test"), 2)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[84] at parallelize at <console>:25

scala> val b = sc. parallelize (1 to a.count.toInt , 2)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[86] at parallelize at <console>:27

scala> b.collect
res60: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[92] at zip at <console>:29

scala> c.sortByKey().collect
res61: Array[(String, Int)] = Array((397090770,4), (com,3), (iteblog,2), (test,5), (wyp,1))

// top取出按key倒序排列的的top N元素, 注意使用top不需要进行sortBy操作, 它自带操作
scala> c.top(3)
res63: Array[(Int, String)] = Array((5,test), (4,397090770), (3,com))

// 默认是升序排列
scala> c.sortByKey().collect
res64: Array[(Int, String)] = Array((1,wyp), (2,iteblog), (3,com), (4,397090770), (5,test))

scala> c.sortByKey(false).collect
res66: Array[(Int, String)] = Array((5,test), (4,397090770), (3,com), (2,iteblog), (1,wyp))

// top 注意, 如果为rdd, 且, 结构为(k,v), 那么使用top函数进行排序时, v中不能含有Array[Long], 但可以含有long
scala> val rdd2 = sc.parallelize(List((10, ("a", Array(1,2))), (9, ("b", Array(3,5))), (1, ("c", Array(6,0)))))
rdd2: org.apache.spark.rdd.RDD[(Int, (String, Array[Int]))] = ParallelCollectionRDD[134] at parallelize at <console>:26

scala> rdd2.top(1)
<console>:29: error: No implicit Ordering defined for (Int, (String, Array[Int])).
       rdd2.top(1)

// 但可以含有long
scala> val rdd2 = sc.parallelize(List((10, ("a", 11)), (9, ("b", 10)), (100, ("c", 20))))
rdd2: org.apache.spark.rdd.RDD[(Int, (String, Int))] = ParallelCollectionRDD[138] at parallelize at <console>:26

scala> rdd2.top(2)
res209: Array[(Int, (String, Int))] = Array((100,(c,20)), (10,(a,11)))
  • 数组Array.grouped
// 将数组, 分成N组:
scala> val a = (1 to 9).toArray
a: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)

scala> a.grouped(3).toArray
res178: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9))

  • zip函数
// 原来:
reduceByKey{case ((s1, c1), (s2, c2)) =>
            val n1 = s1.split("\t")(0).toLong + s2.split("\t")(0).toLong
            val n2 = s1.split("\t")(1).toLong + s2.split("\t")(1).toLong
            val n3 = s1.split("\t")(2).toLong + s2.split("\t")(2).toLong
            val n4 = s1.split("\t")(3).toLong + s2.split("\t")(3).toLong
            val n5 = s1.split("\t")(4).toLong + s2.split("\t")(4).toLong
            val statusTrueNumStr = Array(n1, n2, n3, n4, n5).mkString("\t")
            val count = c1 + c2
// 使用zip后:
        val rddLastOneWeek2 = rddLastOneWeek.map{case (_, bigVersion, arrStatusTrueNum, isStable, count) =>
            ((bigVersion, isStable), (arrStatusTrueNum, count))
        }.reduceByKey{case ((arr1, count1), (arr2, count2)) =>
            val arr = arr1.zip(arr2).map{case (x,y) => x+y}
            val count = count1 + count2
  • zipWithIndex
scala> l
res21: List[Int] = List(1, 2, 3, 4)
scala> l.zipWithIndex
res22: List[(Int, Int)] = List((1,0), (2,1), (3,2), (4,3))
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,372评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,368评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,415评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,157评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,171评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,125评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,028评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,887评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,310评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,533评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,690评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,411评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,004评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,812评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,693评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,577评论 2 353

推荐阅读更多精彩内容