RDD(一)

1.png

这里unified指的是生态栈,包含了很多组件

  1. 概念

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark

弹性的分布式数据集(RDD),是spark中最基本的抽象

解释

其中弹性指的是: 
RDDA =map=> RDDB =map=> RDDC,当RDDC的机器failure, 可以从RDDB计算出RDDC, 所以弹性指的是从错误恢复的特性

represents an immutable, partitioned collection of elements that can be operated on in parallel

代表一个不可变的,可以并行操作的分区的元素的集合

解释

  (1) 不可变: A => map => B, 但是在这里A和B不一样 
      也就是说RDD只要创建了就不会改变

  (2) 分区的集合: 
      eg. List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
      (1, 2, 3) 在第一台机器上(第一个partition)
      (4, 5, 6) 在第二台机器上(第二个partition)
      (7, 8, 9, 10) 在第三台机器上(第三个partition)

This class contains the basic operations available on all RDDs such as map, filter, and persist

class RDD 包含了在所有RDD上可用的基本操作 如map, filter和persist

In addition, org.apache.spark.rdd.PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as groupByKey and join;

除此之外,org.apache.spark.rdd.PairRDDFunctions 
包含了只在key-value形式的RDD上才可以使用的操作
如groupby和join

解释:

如 select from * A join B on [a.id](http://a.id) = [b.id](http://b.id); 如果在spark里执行类型这种语句,id就是key,其他则为value

Internally, each RDD is characterized by five main properties:

内部上说,每个RDD特性都由以下五个主要性质决定的

(1) A list of partitions

有一组分区

(2) A function for computing each split

有函数可以作用到每个分区上

(3) A list of dependencies on other RDDs

有一组作用在其他RDDs上的依赖

(4) Optionally, a Partitioner for key-value RDDs(e.g. to say that the RDD is hash-partitioned)

可选,对于key-value RDDs有一个分区器

(5) Optionally, a list of preferred locations to compute each split on(e.g. block locations foran HDFS file)

可选,有对于计算每个split都有一组喜好的位置

RDD五大特性解释

(1) 一个RDD是由分区构成的
(2) 对RDD所使用的函数其实是作用到其所有分区上
      eg .rdd.map() <==>rdd.split1.map() + rdd.split2.map()
      在spark里partition <==> task, 有几个partition就有几个task 
(3) 对于一个有三副本的文件,其存在三台机器上
      eg. file1: hadoop001 => replica1
                 hadoop002 => replica2
                 hadoop003 => replica3
对于(3)的解释:
    最好的是将计算放在这三个机器上
    但是有时由于三台机器繁忙,没有资源进行计算
    此时有两种选择,拷贝数据或者等待, 但是最好选择等待,
    因为拷贝数据花费时间更多,消耗资源更大,所以移动计算优于移动数据
  1. 操作

Spark revolves around the concept of a resilient distributed dataset (RDD)

Spark是围绕着RDD概念展开的

which is a fault-tolerant collection of elements that can be operated on in parallel.

RDD就是一个可以并行操作的容错的集合

There are two ways to create RDDs

有两种方式可以创建RDD

parallelizing an existing collection in your driver program

在driver program中并行化一个现有的collection
该方式经常在测试时使用

referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat

在外部存储系统(如HDFS、HBASE或者其它能提供InputFormat的数据源)refer一个dataset
  1. 如何在代码中使用RDD

3.1 代码

val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
//此时将Array分成4片来执行
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10),4)

3.1.1 代码解释

在启动spark-shell时,设置master为local[2], 
所以默认是将Array分成2片来执行

代码需要用parallelize函数转换的原因:
    Array是一个集合,但是Spark不能识别集合,只能识别RDD

3.2 parallelize源代码解析

3.2.1 源代码

def parallelize[T: ClassTag](
      //seq可以理解为数组
      seq: Seq[T],
     //numSlices有默认值,默认值为defaultParallelism
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

3.2.2 源代码注释

Methods for creating RDDs

创建RDD的方法

Distribute a local Scala collection to form an RDD

为了生成RDD,提供本地的scala集合

3.3 ParallelCollectionRDD源代码

//ParallelCollectionRDD继承与RDD
//说明ParallelCollectionRDD也是RDD
private[spark] class ParallelCollectionRDD[T: ClassTag](
    sc: SparkContext,
    @transient private val data: Seq[T],
    numSlices: Int,
    locationPrefs: Map[Int, Seq[String]])
    extends RDD[T](sc, Nil) {......}                 

3.4 打印RDD

3.4.1 代码

rdd.collect().foreach(println)

3.4.2 collect源代码注释

Return an array that contains all of the elements in this RDD

返回一个包含在此RDD的所有元素的array

3.4.3 collect源代码

def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
}

3.5 从外部数据源读取文件

3.5.1 代码

//读取本地文件
sc.textFile("file:///home/hadoop/test/first.txt")
//读取hdfs上文件
sc.textFile("hdfs://hadoop:9000//henryData/first/pom.xml")

3.5.2 错误代码示例

sc.textFile("/henryData/first/pom.xml").collect

3.5.2.1 console输出

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist

3.5.2.2 错误原因

path参数没有file://或者hdfs://前缀

3.5.3 使用textFile注意事项

如果spark是一个有三台机器组成的集群,集群的机器名分别为HDFS1 HDFS2 HDFS3,
如果使用textFile()读取文件,必须要保证该文件在三台机器上都存在,否则的话会报
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist错误

3.6 textFile源代码

3.6.1 textFile源代码注释

Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.

从HDFS,一个本地文件文件系统(需要读取的文件在每个节点上都有拷贝),
或者一个支持hadoop的文件系统的URI读取text文件,并且返回一个String的RDD

3.6.2 textFile源代码

def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
      assertNotStopped()
      hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
}

要保证所有节点上相同路径相同的数据

3.7 使用spark读文件注意事项

If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

如果使用本地文件系统,该文件必须能在其他的节点上的相同路径上能被访问 
要么将文件拷贝到所有的工作节点上 要么使用网络共享文件系统(如HDFS)

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").

所有spark的基于文件的input方法,包括textFile()方法, 
支持读取目录,压缩文件并且支持正则表达式

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.

textFile方法可以接受第二个可选的参数 该参数能够控制文件分区的数量,
即minPartitions 默认情况下,
Spark能够为文件的每个block创建一个分区 (在HDFS上,一个block是128M) 
但是可以通过传递一个更大的值来要求更多的分区数量 也就是说并行度提高了,
同一时刻处理数据的job多了, 在资源足够情况下,性能也就越高

Note that you cannot have fewer partitions than blocks.

注意不能让分区数比block小
但是分区数可以比block小,但是会造成每一个task处理的数据就更多了, 
因为会发生一个文件会被拆分成很大的+很小的
!!!暂时不知道会造成什么问题,以后解决 (默认情况下,
eg.一个有10个block的文件可以被拆分为10个分区 
如果分区数比block小,
partition1是整个block1+一小部分block2 一个task运行一个partition,
此时partition就会比原来的大 )

Apart from text files, Spark’s Scala API also supports several other data formats:

除了text file,spark也支持其他的数据类型:

SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file. Partitioning is determined by data locality which, in some cases, may result in too few partitions. For those cases, wholeTextFiles provides an optional second argument for controlling the minimal number of partitions.

SparkContext.wholeTextFiles让你读一个包含多个小的text文件的目录
以(filename, content)形式返回一个key-value对。 这个是与textFiles进行对比,
它的一个record就是任意文件的一行 
分区是由数据位置决定的,可能会造成分区过小 
所以,wholeTextFiles 提供另一个参数来控制最小分区

For [SequenceFiles], use SparkContext’s sequenceFile[K, V] method where K and V are the types of key and values in the file

对于sequenceFile,使用SparkContext的sequenceFile[K, V]方法
其中k是key, v是value

For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method

对于其他Hadoop的InputFormats, 可以使用SparkContext.hadoopRDD 方法

RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects

RDD.saveAsObjectFile和SparkContext.objectFile支持 
已由序列化的java object所组成的简单形式来保存RDD

3.8 wholeTextFiles

3.8.1 代码

sc.wholeTextFiles("hdfs://hadoop:9000/henryData/first/abc.txt").collect

3.8.2 代码输出

//一个(key, value)形式的数组
Array((hdfs://hadoop:9000/henryData/first/abc.txt, first of first second of second))

3.8.3 wholeTextFiles源代码注释

Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.

读取一个HDFS(或本地文件系统或任意Hadoop支持的文件系统的URI)上的text文件的目录
每一个文件当读取时都会被当成单个记录 返回是一个key-value对 
key就是每个文件的路径,value就是每个文件的内容

Small files are preferred, large file is also allowable, but may cause bad performance

小文件适合用(但是不常用),大文件也会被允许使用,但是会造成性能上的损失

3.8.4 wholeTextFiles源代码

def wholeTextFiles(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
    assertNotStopped()
    val job = NewHadoopJob.getInstance(hadoopConfiguration)
    // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
    // comma separated files as input. (see SPARK-7155)
    NewFileInputFormat.setInputPaths(job, path)
    val updateConf = job.getConfiguration
    new WholeTextFileRDD(
      this,
      classOf[WholeTextFileInputFormat],
      classOf[Text],
      classOf[Text],
      updateConf,
      minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354

推荐阅读更多精彩内容

  • 版本: 2.3.0 原文链接: http://spark.apache.org/docs/latest/rdd-...
    金刚_30bf阅读 1,149评论 0 0
  • 概论 较高的层次上,每个Spark应用程序都包含一个驱动程序,该程序运行用户的main功能并在集群上执行各种并行操...
    Liam_ml阅读 287评论 0 1
  • Spark算子总结 算子分类 Transformation(转换)转换算子含义map(func)返回一个新的RDD...
    ronnie_yuan阅读 524评论 0 0
  • RDD 五大特性 A list of partitions一组分区:多个分区,在RDD中用分区的概念。 A fun...
    万事万物阅读 226评论 0 1
  • # 1 什么是RDD *Resilient Distributed Dataset弹性分布式数据集 *Repres...
    深海suke阅读 251评论 0 1