spark学习(三)RDD初窥

The main abstraction Spark provides is a resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.
以上时spark官网对RDD的描述,下面让我们进入RDD的世界吧!

  • RDD:每个spark应用都有一个driver端,主要运行main函数并负责任务分配。RDD是spark中一组分布式存储数据,它可以并行操作。RDD可以从文件中读取(例如分布式文件系统HDFS),也可以由driver端转化(转化操作)本地数据获得。如果要反复使用RDD数据,最好对这个RDD数据进行缓存,这样下次使用时(多个行动操作)可以直接从缓存读取,不然需要依靠lineage重新计算RDD,费时。同样当节点出现故障后,RDD可以通过lineage重新建立起来,有很强的容错性。

  • 共享变量:并行计算的程序中共享变量十分重要,因此引入了广播和累加器。其中广播变量可以在不同节点的内存中换成数据;累加器变量可以在集群中做一些加法操作;

  • 初始化spark

    SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
    JavaSparkContext sc = new JavaSparkContext(conf);
    
    • 通过以上代码完成了spark程序的初始化,setAppName()方法用于设置spark程序的名称,用户可以在web UI界面看到spark程序名称。
  • 初始化RDD

    List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
    JavaRDD<Integer> distData = sc.parallelize(data);   
    JavaRDD<String> distFile = sc.textFile("data.txt");
    
    • 以上代码完成了RDD的初始化,可以通过driver端本地创建RDD也可以通过读取外部文件完成RDD的创建。
    • 本例中,前两行代码是通过driver端本地创建RDD,第三行代码通过读取外部文件创建RDD,外部文件指driver端可以访问的外部文件,包括HDFS和s3上的文件等(如访问HDFS文件需要添加前缀hdfs://*** ,访问s3文件也需要添加前缀s3://*** ,访问本地文件可以添加前缀file://***)并且支持文本文件和Hadoop所支持的输入文件。如果读取本地文件时,需要保证各个节点都可以访问本地文件。spark支持读取各种路径,例如文件夹、通配符、gz压缩文件(gz压缩文件不可以分片,bz2可以,所有在程序中尽量读取bz2文件)等。默认情况下,spark读取文件后,为每一个block创建的一个partiton,例如HDFS上每个block为128M;当然用户也可以自定义创建更多的partition,但注意不能创建比默认值少的partition。
  • 转化操作:从已有的RDD中创建一个新的RDD;例如map操作,对RDD中每个元素做一次函数操作然后返回一个新的RDD。

  • 行动操作:计算RDD,并将结果返回到driver端;例如reduce操作,它将聚合所有的RDD元素做一次函数操作,最后将结果返回给driver。

  • 转化操作算子

    • map(func);返回一个新的分布式数据集,其中每个元素都是由源RDD中一个元素经func转换得到的。
    • filter(func);返回一个新的数据集,其中包含的元素来自源RDD中元素经func过滤后(func返回true时才选中)的结果;
    • flatMap(func);类似于map,但每个输入元素可以映射到0到n个输出元素(所以要求func必须返回一个Seq而不是单个元素)。
    • mapPartitions(func);类似于map,但基于每个RDD分区(或者数据block)独立运行,所以如果RDD包含元素类型为T,则 func 必须是 Iterator<T> => Iterator<U> 的映射函数。使用此方法要小心,相对于map算子,mapPartitions会对每个partition进行操作,容易造成OOM问题。
    • mapPartitionsWithIndex(func);类似于 mapPartitions,只是func 多了一个整型的分区索引值,因此如果RDD包含元素类型为T,则 func 必须是 Iterator<T> => Iterator<U> 的映射函数。
    • sample(withReplacement, fraction, seed);采样部分(比例取决于 fraction )数据,同时可以指定是否使用回置采样(withReplacement:true抽取放回,false抽取不放回。),以及随机数种子(seed)。
    • union(otherDataset);返回源数据集和参数数据集(otherDataset)的并集。
    • intersection(otherDataset);返回源数据集和参数数据集(otherDataset)的交集。
    • distinct([numTasks]));返回对源数据集做元素去重后的新数据集。
    • groupByKey([numTasks]);只对包含键值对的RDD有效,如源RDD包含 (K, V) 对,则该算子返回一个新的数据集包含 (K, Iterable<V>) 对。注意:如果你需要按key分组聚合的话(如sum或average),推荐使用 reduceByKey或者 aggregateByKey 以获得更好的性能。注意:默认情况下,输出计算的并行度取决于源RDD的分区个数。当然,你也可以通过设置可选参数 numTasks 来指定并行任务的个数。
    • reduceByKey(func, [numTasks]);如果源RDD包含元素类型 (K, V) 对,则该算子也返回包含(K, V) 对的RDD,只不过每个key对应的value是经过func聚合后的结果,而func本身是一个 (V, V) => V 的映射函数。另外,和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。
    • aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]);如果源RDD包含 (K, V) 对,则返回新RDD包含 (K, U) 对,其中每个key对应的value都是由 combOp 函数 和 一个“0”值zeroValue 聚合得到。允许聚合后value类型和输入value类型不同,避免了不必要的开销。和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。
    • sortByKey([ascending], [numTasks]);如果源RDD包含元素类型 (K, V) 对,其中K可排序,则返回新的RDD包含 (K, V) 对,并按照 K 排序(升序还是降序取决于 ascending 参数)。
    • join(otherDataset, [numTasks]);如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中将包含内关联后key对应的 (K, (V, W)) 对。外关联(Outer joins)操作请参考 leftOuterJoin、rightOuterJoin 以及 fullOuterJoin 算子。
    • cogroup(otherDataset, [numTasks]);如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中包含 (K, (Iterable<V>, Iterable<W>))。该算子还有个别名:groupWith
      cartesian(otherDataset) 如果源RDD包含元素类型 T 且参数RDD(otherDataset)包含元素类型 U,则返回的新RDD包含前二者的笛卡尔积,其元素类型为 (T, U) 对。
    • pipe(command, [envVars]);以shell命令行管道处理RDD的每个分区,如:Perl 或者 bash 脚本。RDD中每个元素都将依次写入进程的标准输入(stdin),然后按行输出到标准输出(stdout),每一行输出字符串即成为一个新的RDD元素。
    • coalesce(numPartitions); 将RDD的分区数减少到numPartitions。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。
    • repartition(numPartitions);将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于numPartitions。该算子总是需要通过网络混洗所有数据。
    • repartitionAndSortWithinPartitions(partitioner);根据partitioner(spark自带有HashPartitioner和RangePartitioner等)重新分区RDD,并且在每个结果分区中按key做排序。这是一个组合算子,功能上等价于先 repartition 再在每个分区内排序,但这个算子内部做了优化(将排序过程下推到混洗同时进行),因此性能更好。
  • 行动操作算子

    • reduce(func);将RDD中元素按func进行聚合(func是一个 (T,T) => T 的映射函数,其中T为源RDD元素类型,并且func需要满足 交换律 和 结合律 以便支持并行计算)。
    • collect();将数据集中所有元素以数组形式返回驱动器(driver)程序。通常用于,在RDD进行了filter或其他过滤操作后,将一个足够小的数据子集返回到驱动器内存中。使用此操作时需要注意driver内存是否能够存储所有返回数据,不然可能会造成OOM。
    • count();返回数据集中元素个数。
    • first();返回数据集中首个元素(类似于 take(1) )。
    • take(n);返回数据集中前 n 个元素。
    • takeSample(withReplacement,num, [seed]);返回数据集的随机采样子集,最多包含 num 个元素,withReplacement 表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。
    • takeOrdered(n, [ordering]);按元素排序(可以通过 ordering 自定义排序规则)后,返回前 n 个元素。
    • saveAsTextFile(path);将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。保存过程中,Spark会调用每个元素的toString方法,并将结果保存成文件中的一行。
    • saveAsSequenceFile(path)(Java and Scala);将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,同样也适用于能够被隐式转换为Writable的类型(Spark实现了所有基本类型的隐式转换,如:Int,Double,String 等)。
    • saveAsObjectFile(path)(Java and Scala);将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。
    • countByKey();只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每个key的个数。
    • foreach(func);在RDD的每个元素上运行 func 函数。通常被用于累加操作,如:更新一个累加器(Accumulator ) 或者 和外部存储系统互操作。
  • 所有的转化操作都是惰性求值,也就是说转化操作不会马上执行,当有一个行动操作要将计算结果返回个driver时,转化操作才会得到执行。这种设计使得spark有更高的执行效率,因为转化操作不用马上算出结果,它只需要在行动操作时将结果传递给行动操作。如果一组转化操作需要支撑多个行动操作,那么每个行动操作都会触发一遍转化操作,很浪费时间,这个时候可以对最后一个转化操作的RDD进行缓存,这样多个行动操作只需要执行一次转化操作用于生成RDD,后面的行动操作可以从缓存中获得RDD。

  • shuffle操作

    • shuffle是spark中的一种机制,它的作用是重新分配数据,结果是对不同分区的数据按照规则分组。shuffle操作时会在不同的executor和机器间进行I/O操作,导致性能降低。
    • 当执行shuffle算子时会进行stage划分,其中shuffle算子会将相同的key写在同一个或临近的磁盘上,此过程为shuffle write;在下一个stage中会读取这些数据,此过程了shuffle read 。执行shuffle操作时由于key分布不均匀,容易造成数据倾斜
    • 由于shuffle算子操作会对性能产生影响,开发中尽量减少shuffle类算子使用,并且优先使用map-side预聚合的shuffle操作,例如reduceByKey等,这类操作首先在自己节点上聚合,在整体聚合。
    • 常见的的shuffle操作包括,groupByKey,cogroup,subtractByKey,reduceByKey,foldByKey,sortByKey,partitionBy,coalesce,repartitionAndSortWithinPartitions,aggregateByKey。
  • 好了,这节就分享到这里吧,下个章节会进一步的介绍RDD相关操作,RDD缓存,广播变量和累加器等。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容