一、RDD概念
RDD(Resillient Distributed Dataset):弹性分布式数据集,为抽象对象
RDD可分为多个分区,每个分区分布在集群中的不同节点上(分区即partition),从而让RDD中的数据可以被并行操作
RDD提供了容错性,可以自动从节点失败中恢复过来。(如果某个节点上的RDD partition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算partition。)
RDD的数据默认的情况下是存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性的特性)
二、RDD创建方式
1、使用程序中的集合创建RDD
主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用的流程
2、使用本地文件创建RDD
主要用于的场景为:在本地临时性地处理一些存储了大量数据的文件
3、使用HDFS文件创建RDD
主要可以针对HDFS上存储的大数据,进行离线批处理操作
4、通过消息源(例如kafka)创建RDD
主要用于流式处理应用
三、示例代码
1、通过parallelize()方法创建
针对程序中的集合,调用SparkContext中的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的RDD(数据集合)。
// 并行化创建RDD部分代码
// 实现1到5的累加求和
val arr = Array(1,2,3,4,5)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)
注意点:
在调用parallelize()方法时,有一个重要的参数可以指定,就是要将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark官方的建议是,为集群中的每个CPU创建2-4个partition。Spark默认会根据集群的情况来设置partition的数量。但是也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。比如,parallelize(arr, 10)
2、通过textFile方法创建
Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。Spark是支持使用任何Hadoop支持的存储系统上的文件创建RDD的,比如说HDFS、Cassandra、HBase以及本地文件。通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。
// 实现文件字数统计
// textFile()方法中,输入本地文件路径或是HDFS路径
// HDFS:hdfs://spark1:9000/data.txt
// local:/home/hadoop/data.txt
val rdd = sc.textFile(“/home/hadoop/data.txt”)
val wordCount = rdd.map(line => line.length).reduce(_ + _)
一些来创建RDD的特例方法:
(a)SparkContext的wholeTextFiles()方法,可以针对一个目录中的大量小文件,返回由(fileName,fileContent)组成的pair,即pairRDD,而不是普通的RDD。该方法返回的是文件名字和文件中的具体内容;而普通的textFile()方法返回的RDD中,每个元素就是文本中一行文本。
(b)SparkContext的sequenceFile<K,V>方法,可以针对SequenceFile创建RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化机制,比如IntWritable、Text等。
(c)SparkContext的hadoopRDD()方法,对于Hadoop的自定义输入类型,可以创建RDD。该方法接收JobConf、InputFormatClass、Key和Value的Class。
(d)SparkContext的objectFile()方法,可以针对之前调用的RDD的saveAsObjectFile()创建的对象序列化的文件,反序列化文件中的数据,并创建一个RDD。
注意点:
(a)如果是针对本地文件的话:
- 如果是在Windows上进行本地测试,windows上有一份文件即可;
- 如果是在Spark集群上针对Linux本地文件,那么需要将文件拷贝到所有worker节点上(就是在spark-submit上使用—master指定了master节点,使用standlone模式进行运行,而textFile()方法内仍然使用的是Linux本地文件,在这种情况下,是需要将文件拷贝到所有worker节点上的);
(b)Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD创建
(c)Spark默认会为hdfs文件的每一个block创建一个partition,但是也可以通过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少
3、通过DStream对象的foreachRDD创建
通过DStream对象将流数据按时间窗口进行切分,每个窗口数据为一个Rdd
部分示例代码:
kafkaStream.foreachRDD(
rdd => {
this.processRdd(rdd)
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
)
四、参考资料
1、https://blog.csdn.net/lemonZhaoTao/article/details/77923337