- RDD支持两种类型的操作:
- 转 化 操 作:会由一个 RDD 生成一个新的 RDD。例如map()或者filter()操作
- 行 动 操 作:行动操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中。例如count()和first()操作。
- 转化操作和行动操作的区别在于 Spark 计算 RDD 的方式不同。虽然你可以在任何时候定义新的 RDD, 但 Spark 只会惰性计算这些 RDD。 它们只有第一次在一个行动操作中用到时,才会真正计算。
- Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。如果想
在多个行动操作中重用同一个 RDD, 可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来。 - 两种创建 RDD 的方式:
- 读取外部数据集:例如sc.textFile("/path/to/REDEME.md")
- 在驱动器程序中对一个集合进行并行化:parallelize
- RDD 还有一个 collect() 函数,可以用来获取整个 RDD 中的数据。如果你的程序把 RDD 筛选到一个很小的规模,并且你想在本地处理这些数据时, 就可以使用它。在大多数情况下, RDD 不能通过 collect() 收集到驱动器进程中,因为它们一般都很大。每当我们调用一个新的行动操作时,整个 RDD 都会从头开始计算。要避免这种低效的行为,用户可以将中间结果持久化。
- 可以随时通过运行一个行动操作来强制Spark 执行 RDD 的转化操作,比如使用 count()。但一般不要去这么做。
- 传递函数给Spark:传递比较短的函数时,可以使用lambda 表达式来传递, 如例 3-2 和例 3-18 所示。除了 lambda 表达式,我们也可以传递顶层函数或是定义的局部函数。
注意:
当你传递的对象是某个对象的成员, 或者包含了对某个对象中一个字段的
引用时(例如 self.field), Spark 就会把整个对象发到工作节点上,这可
能比你想传递的东西大得多有时,如果传递的类里面包含 Python 不知道
如何序列化传输的对象,也会导致你的程序失败。
class SearchFunctions(object):
def __init__(self, query):
self.query = query
def isMatch(self, s):
return self.query in s
def getMatchesFunctionReference(self, rdd):
# 问题:在"self.isMatch"中引用了整个self
return rdd.filter(self.isMatch)
def getMatchesMemberReference(self, rdd):
# 问题:在"self.query"中引用了整个self
return rdd.filter(lambda x: self.query in x)
应该使用下面这种方式:
class WordFunctions(object):
...
def getMatchesNoReference(self, rdd):
# 安全:只把需要的字段提取到局部变量中
query = self.query
return rdd.filter(lambda x: query in x)
-
常见的RDD转换操作:
-
常见的RDD行动操作: