RDD的Pari操作
也就是对二元的rdd =((a,b),(c,b))元组进行操作,这不是字典而是元组,取值是rdd [0]==(a,b),rdd [0][0]==a。
在spark中把元组的第一个值作为’键值‘,第二个元素作为‘value’。通过键值对 RDD 提供了一些新的操作接口(比如 统计每个产品的评论,将数据中键相同的分为一组,将两个不同的 RDD 进行分组合并等)。
import sys
from pyspark import SparkContext
if __name__ == "__main__":
master = "local"
if len(sys.argv) == 2:
master = sys.argv[1]
try:
sc.stop()
except:
pass
sc = SparkContext(master, "WordCount")
nums= sc.parallelize(((1, 2), (3, 4), (3, 6)))
sumCount = nums.reduceByKey(lambda x, y: x + y)
print(sumCount.collect())
结果:
[(1, 2), (3, 10)]
常见的pair操作:
两个RDD操作:
import sys
from pyspark import SparkContext
if __name__ == "__main__":
master = "local"
if len(sys.argv) == 2:
master = sys.argv[1]
try:
sc.stop()
except:
pass
sc = SparkContext(master, "WordCount")
RDD1= sc.parallelize(((1, 2), (3, 4), (3, 6)))
print(RDD1.collect())
RDD2= sc.parallelize(((3,9),))
print(RDD2.collect())
RDD3 = RDD1.join(RDD2)
print(RDD3.collect())
结果:
[(1, 2), (3, 4), (3, 6)]
[(3, 9)]
[(3, (4, 9)), (3, (6, 9))]
import sys
from pyspark import SparkContext
if __name__ == "__main__":
master = "local"
if len(sys.argv) == 2:
master = sys.argv[1]
try:
sc.stop()
except:
pass
sc = SparkContext(master, 'test')
RDD1= sc.parallelize(((1, '012345678'),
(2, '0123457'),
(3, '012345'),
(4, '01234')))
print(RDD1.collect())
RDD1 = RDD1.filter(lambda keyValue: len(keyValue[1]) < 8)
print(RDD1.collect())
说明pair可以看作普通的元组对待:
[(1, '012345678'), (2, '0123457'), (3, '012345'), (4, '01234')]
[(2, '0123457'), (3, '012345'), (4, '01234')]