spark RDD

job由stage构成,stage由task构成。
job:一个action就是一个job
job-划分->stage:当遇到宽依赖,则划分一个stage。
stage-划分->task:task对等partition概念。

Spark 的核心是建立在统一的抽象弹性分布式数据集(Resiliennt Distributed Datasets,RDD)之上的.

RDD:不可变、只读的,可被分区的数据集

RDD特性:

  • 只读:不能修改,只能通过转换操作生成新的 RDD。
  • 分布式:可以分布在多台机器上进行并行处理。
  • 弹性:计算过程中内存不够时它会和磁盘进行数据交换。
  • 基于内存:可以全部或部分缓存在内存中,在多次计算间重用。
RDD分区及分区与工作节点的分布关系

图 1 RDD 分区及分区与工作节点的分布关系

RDD基本操作:transformation + action

transformation:惰性、实际没有执行、直到action操作才真正运行
表 1 RDD转换操作(rdd1={1, 2, 3, 3},rdd2={3,4,5})

  • 函数名 作用 示例 结果
  • map() 将函数应用于 RDD 的每个元素,返回值是新的 RDD rdd1.map(x=>x+l) {2,3,4,4}
  • flatMap() 将函数应用于 RDD 的每个元素,将元素数据进行拆分,变成迭代器,返回值是新的 RDD rdd1.flatMap(x=>x.to(3)) {1,2,3,2,3,3,3}
  • filter() 函数会过滤掉不符合条件的元素,返回值是新的 RDD rdd1.filter(x=>x!=1) {2,3,3}
  • distinct() 将 RDD 里的元素进行去重操作 rdd1.distinct() (1,2,3)
  • union() 生成包含两个 RDD 所有元素的新的 RDD rdd1.union(rdd2) {1,2,3,3,3,4,5}
  • intersection() 求出两个 RDD 的共同元素 rdd1.intersection(rdd2) {3}
  • subtract() 将原 RDD 里和参数 RDD 里相同的元素去掉 rdd1.subtract(rdd2) {1,2}
  • cartesian() 求两个 RDD 的笛卡儿积 rdd1.cartesian(rdd2) {(1,3),(1,4)……(3,5)}

action操作:行动操作接受 RDD,但是返回非 RDD,即输出一个值或者结果

  • collect() 返回 RDD 的所有元素 rdd.collect() {1,2,3,3}
  • count() RDD 里元素的个数 rdd.count() 4
  • countByValue() 各元素在 RDD 中的出现次数 rdd.countByValue() {(1,1),(2,1),(3,2})}
  • take(num) 从 RDD 中返回 num 个元素 rdd.take(2) {1,2}
  • top(num) 从 RDD 中,按照默认(降序)或者指定的排序返回最前面的 + num 个元素 rdd.top(2) {3,3}
  • reduce() 并行整合所有 RDD 数据,如求和操作 rdd.reduce((x,y)=>x+y) 9
     val wordCounts = textFile.flatMap(line => line.split(" ")) //flatmap:  将一行文本 split 成多个单词
           .map(word => (word, 1))
          .reduceByKey((a, b) => a + b)
  • fold(zero)(func) 和 reduce() 功能一样,但需要提供初始值 rdd.fold(0)((x,y)=>x+y) 9
  • foreach(func) 对 RDD 的每个元素都使用特定函数 rdd1.foreach(x=>printIn(x)) 打印每一个元素
  • saveAsTextFile(path) 将数据集的元素,以文本的形式保存到文件系统中 rdd1.saveAsTextFile(file://home/test)
  • saveAsSequenceFile(path) 将数据集的元素,以顺序文件格式保存到指 定的目录下 saveAsSequenceFile(hdfs://home/test)
  • groupByKey() 只是将输入的tuple按照tuple[0]进行分组,将tuple[1]堆成一个sequence,groupByKey本身不能自定义操作函数。 例如:
(python)
    data = [('tom',90),('jerry',97),('luck',92),('tom',78),('luck',64),('jerry',50)]
    rdd = sc.parallelize(data)
    print rdd.groupByKey().map(lambda x: (x[0],list(x[1]))).collect()
   # 输出:
   [('tom', [90, 78]), ('jerry', [97, 50]), ('luck', [92, 64])]
  • reduceByKey(func) : 要把分布在集群各个节点上的数据中的同一个key,对应的values,都给 集中到一个节点的一个executor的一个task中,对集合起来的value执行传入的函数进行 reduce操作,最后变成一个value---这属于spark调优的shuffle优化,这样就避免了shuffle,提高了spark执行的效率.
    例如:tupleRDD.reduceByKey((x,y)=>x+y)
        // reduceByKey,按照相同的key进行reduce操作
        List<String> list = Arrays.asList("key1", "key1", "key2", "key2", "key3");
        JavaRDD<String> stringRDD = javaSparkContext.parallelize(list);
        //转为key-value形式
        JavaPairRDD<String, Integer> pairRDD = stringRDD.mapToPair(k -> new Tuple2<>(k, 1));
        List list1 = pairRDD.reduceByKey((x, y) -> x + y).collect();
        System.out.println(list1)

RDD依赖类型:血缘关系的依赖分为窄依赖和宽依赖。

窄依赖是指父 RDD 的每个分区 最多 会被1个子 RDD 的分区所使用。
宽依赖是指父 RDD 的每个分区 会被多个子分区所依赖。

map、filter、union 等操作是窄依赖,而 groupByKey、reduceByKey 等操作是宽依赖。

join 操作有两种情况,如果 join 操作中使用的每个 Partition 仅仅和固定个 Partition 进行 join,则该 join 操作是窄依赖,其他情况下的 join 操作是宽依赖。

所以可得出一个结论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖,也就是说,对父 RDD 依赖的 Partition 不会随着 RDD 数据规模的改变而改变。

  1. 窄依赖
    1)子 RDD 的每个分区依赖于常数个父分区(即与数据规模无关)。
    2)输入输出一对一的算子,且结果 RDD 的分区结构不变,如 map、flatMap。
    3)输入输出一对一的算子,但结果 RDD 的分区结构发生了变化,如 union。
    4)从输入中选择部分元素的算子,如 filter、distinct、subtract、sample。

  2. 宽依赖
    1)子 RDD 的每个分区依赖于所有父 RDD 分区。
    2)对单个 RDD 基于 Key 进行重组和 reduce,如 groupByKey、reduceByKey。
    3)对两个 RDD 基于 Key 进行 join 和重组,如 join。

Spark 的这种依赖关系设计,使其具有了天生的容错性,大大加快了 Spark 的执行速度。RDD 通过血缘关系记住了它是如何从其他 RDD 中演变过来的。当这个 RDD 的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的数据分区,从而带来性能的提升。

相对而言,窄依赖的失败恢复更为高效,它只需要根据父 RDD 分区重新计算丢失的分区即可,而不需要重新计算父 RDD 的所有分区。而对于宽依赖来讲,单个结点失效,即使只是 RDD 的一个分区失效,也需要重新计算父 RDD 的所有分区,开销较大。

常用算子

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容