啥?RDD?
相信有小伙伴第一次看到这个名词的时候也是一脸懵逼,我先不说话,静静看着官网的解释:
Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.
RDD是Spark的基本数据结构。它是一个不可修改的对象集合。每一个在RDD里面的dataset都被切分成不同的partition,这些partitions会在集群的不同节点中计算。RDD们包含了Python或者Java或者Scala的对象,包括用户自定义对象。
一句话,RDD就是一个对象dataset(我的理解,欢迎更正)。
先抛开RDD不说,Spark将所有的复杂逻辑都封装在底层了,并且能让你的数据处理过程具有弹性,高容错的性质。
如果一个在某一个node(节点)这个process挂了,那么此时Spark可以返回最近的一个正常的节点然后通知你,“嘿,这里挂了,想想办法,数据我先退回上一步,没事的bro,you got me...”
so that end of the day an RDD is just a giant set of data basically row after row after row after row of information.
And that can be lines of raw text or can be key value informations
(says our prof.)
Spark Context
上下文?
小伙伴:你是不是又要说Hadoop?
我:……嗯
Hadoop的context跟这个Spark Context 差不多(我自己感觉,欢迎指正),都是一个MR所运行的上下文。
但是Spark Context需要我们自己去创建,而Hadoop的只需要我们作为参数传入,底层的包会帮我们把这些处理好。
这个SC(Spark Context)做了什么
- 创建Driver App
- 对你的RDD的弹性+分布特性负责
- 创建RDD
当然,这个SC是底层的SparkCore实现的。
上代码:
# 在spark shell中运行的哦,一些示例而已
# line 1
> val nums = parallelize(List(1,2,3,4))
> sc.textFile('hdfs://hadoopserver/file/path')
> hiveCtx = HiveContext(sc)
# or > rows = hiveCtx.sql("SELECT name, age FROM users")
看到了吧,数组,文件,还有Hive都可以处理,看起来是极好的。
RDD Transformation
说起transformation(咳咳,这里不说Hadoop了),熟悉python和Rlang的小伙伴!有福了!基本上差不多!如果你用过numpy或者pandas或者r语言!
一共有好几种transformation:
- map(func)
- flatMap(func)
- filter(func)
- distinct([numverPartitions])
- sample(withReplacement, fraction, seed)
- union, intersection, subtract, cartesian (和Apache pig很像)
map
map就是普通的MR中的map,比如一个数据表,line by line,一行一行的映射,然后map return给我们的是一个新的RDD。
这个RDD和你map的RDD是一行一行对应的。比如rddA -> rddB, 那么rddA和rddB的数据量是相同的。
val rdd = sc.parallelized(List(1,2,3,4))
val squares = rdd.map(x => x*x) // 这里有点像java的lambda对不对
// 1, 4, 9, 16
flatmap
flatmap也是map的一种,但是非一行一行映射。
如果你有原始数据集rddA,结果数据集rddB,那么rddB和rddA数据量不一定相同,可以在flatmap里面执行trim down的操作。
filter
与flatmap相似,filter接受的是一个boolean值。返回的也是一个trim down的RDD
disitinct
获取独一值得RDD
sample
获取一个随机的sample数据集。
我觉得这个在做测试的时候还是挺有用的。
RDD Actions
Action是一个重头戏。为啥?
因为Spark只有在action触发的时候,才会真正的运算并且返回结果!
好比说你上了高速,服务区是你的Transformation(可以让你肚子里多点东西),但是到了收费站才是Action(说明到了,),才真正到达目的。
可能不太准确哈……勿喷……
大致通常会用到的actions:
- collect
- count
- take(n)
- top
- reduce
- ... and more ...
collect
开始让Spark回到你的Driver App。
return你的最终的RDD。
但是不要用!!, 因为返回的是整个数据集RDD!除非你的RDD不是big data。
count
返回dataset的元素的个数
take(n)
返回一个长度为n的数组。有n个dataset的元素。
reduce(func)
这是很重要的一个方法,对应了Transformation的map方法。
所以我们看到,在运行map的时候,Spark其实不启动计算,直到reduce出现,然后开始回到Driver,然后通过DAG(有向无环图,以后说)找出最优路径,然后把reduce结果聚合,返回。
总结一下
- RDD是一个高容错,高弹性的数据集。
- RDD在Spark中有两种运行类型,Transformation 和 Action
- Transformation不触发Spark的运算
- Action才出发Spark运算
- 通常是 Transformation -> Transformation -> Actions, 然后给出结果
- DAG是实现RDD Transformation 和 Actions的核心概念。
- 是的这篇没有啥代码,都是理论,记住RDD是个数据集就行了