pySpark API文档
输入文件内容
zhangsan,77,88,99
lisi,56,78,89
wanger,78,77,67
1.map
一对一处理函数, 应用于RDD的每个元素,并返回一个RDD
import sys
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)
lines = sc.textFile("/test_file")
def sp(line):
ss = line.strip().split(",")
name = str(ss[0])
eng = str(ss[1])
math = str(ss[2])
lang = str(ss[3])
return name,eng,math,lang
rdd = lines.map(sp).collect()
#print (rdd)
[('zhangsan', '77', '88', '99'), ('lisi', '56', '78', '89'), ('wanger', '78', '77', '67')]
for line in rdd:
print (line)
('zhangsan', '77', '88', '99')
('lisi', '56', '78', '89')
('wanger', '78', '77', '67')
2. flatMap
一对多函数, 将数据按照定义的规则拆分成多条记录 并返回一个新的RDD
import sys
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)
def flatMap_func(x):
return x.strip().split(",")
lines = sc.textFile("/test_file")
rdd = lines.flatMap(flatMap_func).collect()
print (rdd)
#把一个文件里的内容按行读取 并按照“,”分割成单词
[u'zhangsan', u'77', u'88', u'99', u'lisi', u'56', u'78', u'89', u'wanger', u'78', u'77', u'67']
3.filter
过滤掉不符合条件的元素,返回一个新的RDD 函数需传入两个参数
import sys
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)
def sp(line):
ss = line.strip().split(",")
name = str(ss[0])
eng = int(ss[1])
math = int(ss[2])
lang = int(ss[3])
return name,eng,math,lang
def flatMap_func(x):
return x.strip().split(",")
def filter_func(x):
if x[3] > 80:
return x
lines = sc.textFile("/test_file")
rdd = lines.map(sp) \
.filter(filter_func)\
.collect()
print (rdd)
#过滤掉了lang小于80的记录
[('zhangsan', 77, 88, 99), ('lisi', 56, 78, 89)]
4.reduce
汇总RDD的所有元素
import sys
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)
li = [1,2,3,4,5]
vec = sc.parallelize(li)
rdd = vec.reduce(lambda x,y:int(x)+int(y))
print (rdd)
#15
li = ["asd","asd","word"]
vec = sc.parallelize(li)
rdd = vec.reduce(lambda x,y:x+y)
print (rdd)
#asdasdword
5. countByValue
统计RDD中每个元素出现的次数
import sys
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)
li = ["asd","ad","asd","hello","word","word"]
vec = sc.parrallelize(li)
re = vec.countByValue()
print (re)
#defaultdict(<type 'int'>, {'asd': 2, 'word': 2, 'hello': 1, 'ad': 1})
6.reduceByKey
按key聚合 可自定义函数
import sys
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)
def sp(line):
ss = line.strip().split(",")
name = str(ss[0])
eng = int(ss[1])
math = int(ss[2])
lang = int(ss[3])
return name,eng,math,lang
def flatMap_func(x):
return x.strip().split(",")
def filter_func(x):
if x[3] > 80:
return x
lines = sc.textFile("/test_file")
rdd = lines.flatMap(flatMap_func) \
.map(lambda x:(x,1)) \
.reduceByKey(lambda x,y:x+y) \
.collect()
print (rdd)
#[(u'77', 2), (u'wanger', 1), (u'56', 1), (u'99', 1), (u'lisi', 1), (u'88', 1), (u'89', 1), (u'67', 1), (u'zhangsan', 1), (u'78', 2)]
7.sortBy 排序
from pyspark import SparkConf,SparkContext
import sys
reload(sys)
sys.setdefaultencoding("utf8")
conf = SparkConf().setAppName("Name").setMaster("local")
sc = SparkContext(conf=conf)
infile_path = "file:///home/njliu/prc/pyspark/RDD/The_Man_of_Property.txt"
infile = sc.textFile(infile_path)
re = infile.flatMap(lambda l:l.strip().split(" ")) \
.map(lambda x:(x,1)) \
.reduceByKey(lambda x,y:x +y) \
.sortBy(lambda x:x[1],False) \
for line in re.collect():
print ("\t".join([line[0],str(line[1])]))
7.groupByKey
from pyspark import SparkConf,SparkContext
def sub_process(k,v):
tmp_list = []
for tu in v:
tmp_list.append(tu)
res_list = sorted(tmp_list,key=lambda x:x[1],reverse=True)
res_list.insert(0,k)
return res_list
if __name__ == "__main__":
conf = SparkConf().setAppName("A Name").setMaster("local")
sc = SparkContext(conf=conf)
infiles = sc.textFile("file:///home/njliu/prc/pyspark/RDD/uis.data")
res = infiles.map(lambda line:line.strip().split("\t")) \
.filter(lambda x:x[2]>1.5) \
.map(lambda x:(x[0],(str(x[1]),str(x[2])))) \
.groupByKey() \
.map(lambda (k,v):sub_process(k,v))\
for line in res.collect():
print (line)