这里unified指的是生态栈,包含了很多组件
- 概念
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark
弹性的分布式数据集(RDD),是spark中最基本的抽象
解释
其中弹性指的是:
RDDA =map=> RDDB =map=> RDDC,当RDDC的机器failure, 可以从RDDB计算出RDDC, 所以弹性指的是从错误恢复的特性
represents an immutable, partitioned collection of elements that can be operated on in parallel
代表一个不可变的,可以并行操作的分区的元素的集合
解释
(1) 不可变: A => map => B, 但是在这里A和B不一样
也就是说RDD只要创建了就不会改变
(2) 分区的集合:
eg. List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
(1, 2, 3) 在第一台机器上(第一个partition)
(4, 5, 6) 在第二台机器上(第二个partition)
(7, 8, 9, 10) 在第三台机器上(第三个partition)
This class contains the basic operations available on all RDDs such as map
, filter
, and persist
class RDD 包含了在所有RDD上可用的基本操作 如map, filter和persist
In addition, org.apache.spark.rdd.PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as groupByKey
and join
;
除此之外,org.apache.spark.rdd.PairRDDFunctions
包含了只在key-value形式的RDD上才可以使用的操作
如groupby和join
解释:
如 select from * A join B on [a.id](http://a.id) = [b.id](http://b.id); 如果在spark里执行类型这种语句,id就是key,其他则为value
Internally, each RDD is characterized by five main properties:
内部上说,每个RDD特性都由以下五个主要性质决定的
(1) A list of partitions
有一组分区
(2) A function for computing each split
有函数可以作用到每个分区上
(3) A list of dependencies on other RDDs
有一组作用在其他RDDs上的依赖
(4) Optionally, a Partitioner for key-value RDDs(e.g. to say that the RDD is hash-partitioned)
可选,对于key-value RDDs有一个分区器
(5) Optionally, a list of preferred locations to compute each split on(e.g. block locations foran HDFS file)
可选,有对于计算每个split都有一组喜好的位置
RDD五大特性解释
(1) 一个RDD是由分区构成的
(2) 对RDD所使用的函数其实是作用到其所有分区上
eg .rdd.map() <==>rdd.split1.map() + rdd.split2.map()
在spark里partition <==> task, 有几个partition就有几个task
(3) 对于一个有三副本的文件,其存在三台机器上
eg. file1: hadoop001 => replica1
hadoop002 => replica2
hadoop003 => replica3
对于(3)的解释:
最好的是将计算放在这三个机器上
但是有时由于三台机器繁忙,没有资源进行计算
此时有两种选择,拷贝数据或者等待, 但是最好选择等待,
因为拷贝数据花费时间更多,消耗资源更大,所以移动计算优于移动数据
- 操作
Spark revolves around the concept of a resilient distributed dataset (RDD)
Spark是围绕着RDD概念展开的
which is a fault-tolerant collection of elements that can be operated on in parallel.
RDD就是一个可以并行操作的容错的集合
There are two ways to create RDDs
有两种方式可以创建RDD
parallelizing an existing collection in your driver program
在driver program中并行化一个现有的collection
该方式经常在测试时使用
referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat
在外部存储系统(如HDFS、HBASE或者其它能提供InputFormat的数据源)refer一个dataset
- 如何在代码中使用RDD
3.1 代码
val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
//此时将Array分成4片来执行
val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10),4)
3.1.1 代码解释
在启动spark-shell时,设置master为local[2],
所以默认是将Array分成2片来执行
代码需要用parallelize函数转换的原因:
Array是一个集合,但是Spark不能识别集合,只能识别RDD
3.2 parallelize源代码解析
3.2.1 源代码
def parallelize[T: ClassTag](
//seq可以理解为数组
seq: Seq[T],
//numSlices有默认值,默认值为defaultParallelism
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
3.2.2 源代码注释
Methods for creating RDDs
创建RDD的方法
Distribute a local Scala collection to form an RDD
为了生成RDD,提供本地的scala集合
3.3 ParallelCollectionRDD源代码
//ParallelCollectionRDD继承与RDD
//说明ParallelCollectionRDD也是RDD
private[spark] class ParallelCollectionRDD[T: ClassTag](
sc: SparkContext,
@transient private val data: Seq[T],
numSlices: Int,
locationPrefs: Map[Int, Seq[String]])
extends RDD[T](sc, Nil) {......}
3.4 打印RDD
3.4.1 代码
rdd.collect().foreach(println)
3.4.2 collect源代码注释
Return an array that contains all of the elements in this RDD
返回一个包含在此RDD的所有元素的array
3.4.3 collect源代码
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
3.5 从外部数据源读取文件
3.5.1 代码
//读取本地文件
sc.textFile("file:///home/hadoop/test/first.txt")
//读取hdfs上文件
sc.textFile("hdfs://hadoop:9000//henryData/first/pom.xml")
3.5.2 错误代码示例
sc.textFile("/henryData/first/pom.xml").collect
3.5.2.1 console输出
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist
3.5.2.2 错误原因
path参数没有file://或者hdfs://前缀
3.5.3 使用textFile注意事项
如果spark是一个有三台机器组成的集群,集群的机器名分别为HDFS1 HDFS2 HDFS3,
如果使用textFile()读取文件,必须要保证该文件在三台机器上都存在,否则的话会报
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist错误
3.6 textFile源代码
3.6.1 textFile源代码注释
Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings.
从HDFS,一个本地文件文件系统(需要读取的文件在每个节点上都有拷贝),
或者一个支持hadoop的文件系统的URI读取text文件,并且返回一个String的RDD
3.6.2 textFile源代码
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
注
要保证所有节点上相同路径相同的数据
3.7 使用spark读文件注意事项
If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
如果使用本地文件系统,该文件必须能在其他的节点上的相同路径上能被访问
要么将文件拷贝到所有的工作节点上 要么使用网络共享文件系统(如HDFS)
All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").
所有spark的基于文件的input方法,包括textFile()方法,
支持读取目录,压缩文件并且支持正则表达式
The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value.
textFile方法可以接受第二个可选的参数 该参数能够控制文件分区的数量,
即minPartitions 默认情况下,
Spark能够为文件的每个block创建一个分区 (在HDFS上,一个block是128M)
但是可以通过传递一个更大的值来要求更多的分区数量 也就是说并行度提高了,
同一时刻处理数据的job多了, 在资源足够情况下,性能也就越高
Note that you cannot have fewer partitions than blocks.
注意不能让分区数比block小
但是分区数可以比block小,但是会造成每一个task处理的数据就更多了,
因为会发生一个文件会被拆分成很大的+很小的
!!!暂时不知道会造成什么问题,以后解决 (默认情况下,
eg.一个有10个block的文件可以被拆分为10个分区
如果分区数比block小,
partition1是整个block1+一小部分block2 一个task运行一个partition,
此时partition就会比原来的大 )
Apart from text files, Spark’s Scala API also supports several other data formats:
除了text file,spark也支持其他的数据类型:
SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file. Partitioning is determined by data locality which, in some cases, may result in too few partitions. For those cases, wholeTextFiles provides an optional second argument for controlling the minimal number of partitions.
SparkContext.wholeTextFiles让你读一个包含多个小的text文件的目录
以(filename, content)形式返回一个key-value对。 这个是与textFiles进行对比,
它的一个record就是任意文件的一行
分区是由数据位置决定的,可能会造成分区过小
所以,wholeTextFiles 提供另一个参数来控制最小分区
For [SequenceFiles], use SparkContext’s sequenceFile[K, V]
method where K
and V
are the types of key and values in the file
对于sequenceFile,使用SparkContext的sequenceFile[K, V]方法
其中k是key, v是value
For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method
对于其他Hadoop的InputFormats, 可以使用SparkContext.hadoopRDD 方法
RDD.saveAsObjectFile and SparkContext.objectFile support saving an RDD in a simple format consisting of serialized Java objects
RDD.saveAsObjectFile和SparkContext.objectFile支持
已由序列化的java object所组成的简单形式来保存RDD
3.8 wholeTextFiles
3.8.1 代码
sc.wholeTextFiles("hdfs://hadoop:9000/henryData/first/abc.txt").collect
3.8.2 代码输出
//一个(key, value)形式的数组
Array((hdfs://hadoop:9000/henryData/first/abc.txt, first of first second of second))
3.8.3 wholeTextFiles源代码注释
Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. Each file is read as a single record and returned in a key-value pair, where the key is the path of each file, the value is the content of each file.
读取一个HDFS(或本地文件系统或任意Hadoop支持的文件系统的URI)上的text文件的目录
每一个文件当读取时都会被当成单个记录 返回是一个key-value对
key就是每个文件的路径,value就是每个文件的内容
Small files are preferred, large file is also allowable, but may cause bad performance
小文件适合用(但是不常用),大文件也会被允许使用,但是会造成性能上的损失
3.8.4 wholeTextFiles源代码
def wholeTextFiles(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
assertNotStopped()
val job = NewHadoopJob.getInstance(hadoopConfiguration)
// Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
// comma separated files as input. (see SPARK-7155)
NewFileInputFormat.setInputPaths(job, path)
val updateConf = job.getConfiguration
new WholeTextFileRDD(
this,
classOf[WholeTextFileInputFormat],
classOf[Text],
classOf[Text],
updateConf,
minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
}