Spark系列 —— 算子详解(二)

前言

本文接上一篇 Spark系列 —— 各类算子详解(一)
这篇主要来讲讲 Action 算子 以及 Cache 算子。

Action 算子

Spark 的执行算子,一个 Action算子 会触发一次 job 的生成。
这里需要注意的是,
Action 算子要么没有返回值,
如果有返回值,那么这个值是会被拉取到driver端的,
如果数据过大,你就得考虑下你的driver端是否装的下了...

  1. reduce(func)
    Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
    将RDD的数据进行聚合,并返回聚合后的值。
    执行逻辑类似于 reduceByKey
  1. collect()
    Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
    返回RDD中所有的数据,即 将RDD 的所有数据原封不动的拉回到 Driver 端

  2. count()
    Return the number of elements in the dataset.
    返回该 RDD 中的数据的条数。

  3. first()
    Return the first element of the dataset (similar to take(1)).
    返回 RDD 中的第一条数据。

  4. take(n)
    Return an array with the first n elements of the dataset.
    返回 RDD 中的前 N 条数据。
    n: 需要拿取数据的条数

  5. takeSample(withReplacement, num, [seed])
    Return an array with a random sample
    of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
    随机 返回 NUM 条数据。
    withReplacement:是否有放回抽样
    num: 抽取数据的条数
    seed:随机种子,相同的种子会有相同的随机数据

  6. takeOrdered(n, [ordering])
    Return the first n elements of the RDD using either their natural order or a custom comparator.
    根据 ordering 排序 ,然后返回 前 n 条数据。
    n:返回的数据
    ordering:排序函数

  7. saveAsTextFile(path)
    Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
    将RDD的 数据 以 txt 格式 保存到指定路径 path,
    path:保存路径,该路径可以是 local filesystem 或者 HDFS 或者 any other Hadoop-supported file system

  8. saveAsSequenceFile(path)
    (Java and Scala) | Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
    类似于 saveAsTextFile,不过格式是 Sequence
    Sequence 这是一种特殊的压缩格式。

  9. saveAsObjectFile(path)
    (Java and Scala)
    Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
    同上,不过一把用做保存 RDD 的里面的数据是 object 类型的 数据,
    这样加载的时候可以直接转换成对应的对象

  10. countByKey()
    Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
    这个就是个 Wordcount,不赘述

  11. foreach(func)
    Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.
    Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.
    遍历RDD的每一条数据。
    func : 遍历数据的逻辑。

  12. treeReduce(f: (T, T) => T, depth: Int = 2)
    这个算子是最近看优化的时候看到的,之前还真一直没怎么用过,
    一般也是用来优化 Reduce 操作的,
    不过因为是 Action 操作,会将数据拉回到Driver,所以用的地方一般不多,
    不过目前网上好像也没有能把这个算子讲明白的帖子,所以这里多啰嗦一下。
    f:这个是聚合逻辑,没什么好多说的。
    depth:从字面意思看起来有点像树的深度的意思。默认值是 2 。
    那么这个字段实际的作用是什么呢?
    首先我们来说说 treeReduce的作用。
    平时我们如果 reduceByKey 之后再 collectAsMap 将数据啦回Driver端,
    会有一个问题,如果分区数很多,那么每个分区的数据都要啦回 Driver,
    然后再进行聚合,Driver是单机的,所以很容易在数据拉回来之后直接把就OOM了,
    就算不OOM,单机做聚合的效率也肯定是比较慢的。
    所以这时候我们就应该用 treeReduce
    他会对我们的数据进行进行若干次 reduceByKey 操作,并慢慢减少分区数,
    假设我们现在有 100 个分区,设置 depth = 3;那么他会
    以 100/3 = 33(向下取整) 分区进行一次 reduceByKey
    再以 33/3 = 11(向下取整) 分区进行一次 reduceByKey
    再以 11/3 = 3(向下取整) 分区进行一次 reduceByKey
    这个时候才会将聚合好的数据拉回到Driver 端完成聚合。

Control 算子

控制类算子,也就是我们常说的 缓存类算子

  1. persist(StorageLevel)
    缓存算子,懒执行,返回一个 缓存类型的 RDD。
    当缓存 RDD 被 Action 算子执行后,
    该缓存RDD 会被储存起来,
    当再次需要该 RDD 执行其他job 的时候,
    就可以通过缓存直接读取数据了。
    StorageLevel:缓存级别

    • 关于缓存级别:我们可以来看下其构造函数,
      可以看到,其分为:
      _useDisk:是否使用磁盘
      _useMemory:是否使用内存
      _useOffHeap:是否使用堆外内存
      _deserialized:是否反序列化
      _replication:副本数
    class StorageLevel private(
        private var _useDisk: Boolean,
        private var _useMemory: Boolean,
        private var _useOffHeap: Boolean,
        private var _deserialized: Boolean,
        private var _replication: Int = 1)
    
    • 内置的缓存级别
      val NONE = new StorageLevel(false, false, false, false)
      val DISK_ONLY = new StorageLevel(true, false, false, false)
      val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
      val MEMORY_ONLY = new StorageLevel(false, true, false, true)
      val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
      val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
      val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
      val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
      val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
      val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
      val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
      val OFF_HEAP = new StorageLevel(false, false, true, false)
      可以根据构造函数来理解下这些内置的缓存级别。
      补充: 一般我们常用的缓存级别是:DISK_ONLY,MEMORY_ONLY_SER。
      副本数一般来说并没有很大的作用,当然如果你内存非常充足另说
      使用磁盘储存的话,会对效率有比较大的影响,
      当然如果你计算链确实很长,数据确实很多,那另说。
  2. cache
    其实就是 persist(MEMORY_ONLY),
    没什么好说的,一般缓存用这个就好了....

  1. checkpoint
    这严格来说不算是一个算子,
    不过这也是一种持久化的手段,
    其特点就是不会因为程序的结束而终止,
    需要人为的控制其生命周期,
    一般都是用在流式处理当中,
    这里稍微提一下,不做过多解释了。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,273评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,349评论 3 398
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,709评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,520评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,515评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,158评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,755评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,660评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,203评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,287评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,427评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,122评论 5 349
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,801评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,272评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,393评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,808评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,440评论 2 359

推荐阅读更多精彩内容