RDD中伪集合操作 :去重操作就是去除RDD中重复元素。
RDD.distinct(): 去重操作
RDD1.union(RDD2): 返回包含两个 RDD 中所有元素的 RDD,未去重
RDD1.intersection(RDD2) :返回RDD1和RDD2的并集,有去重操作
RDD1.other(RDD2) ,返回 一个由只存在于第一个 RDD 中而不存在于第二个 RDD 中的所有元素组成的 RDD。有去重操作
关于scala中的'=>'的意思,
a=>b,其中a为输入参数,b为输出参数,相当与a被转化为b,并输出b。
scals:
lines.filter(line => line.contains("Python"))
python:
pythonLines = lines.filter(lambda line: "Python" in line)
下面为防止忘记上几张图,来自(Spark快速大数据分析):
转化操作
执行操作
reduce:减约操作,与python的reduce一样,注意这里的x,y均是list的值,
import sys
from pyspark import SparkContext
if __name__ == "__main__":
master = "local"
if len(sys.argv) == 2:
master = sys.argv[1]
try:
sc.stop()
except:
pass
sc = SparkContext(master, "WordCount")
rdd = sc.parallelize([1, 2, 3, 3])
sums = rdd.reduce(lambda x, y: x + y)
print(sums)
输出:
9
sums = rdd.fold(0,lambda x, y: x + y)
fold中第一次执行时的x是0,y才是list的值
aggregate() 函数可以对
import sys
from pyspark import SparkContext
if __name__ == "__main__":
master = "local"
if len(sys.argv) == 2:
master = sys.argv[1]
try:
sc.stop()
except:
pass
sc = SparkContext(master, "WordCount")
nums= sc.parallelize([1, 2, 3, 3])
sumCount = nums.aggregate((0, 1),
lambda acc, value: (acc[0] + value, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
print(sumCount)
print(sumCount[0] / float(sumCount[1]))
注意aggregate() 中的acc和acc1代表的是(0,1),value和acc2代表的是list的值:
lambda acc, value: (acc[0] + value, acc[1] + 1)大概为reduce()操作,执行后返回(9,5)
这里的acc, value为:(0,1)与(1, 2, 3, 3)
返回(9,5)
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])大概reduce()操作,执行后返回(9,6)
这里的acc1, acc2为:(0,1)与(9,5)
返回(9,6)
结果:
(9, 6)
1.5