Python 操作Spark —— 基本使用

pySpark API文档

输入文件内容

zhangsan,77,88,99
lisi,56,78,89
wanger,78,77,67

1.map

一对一处理函数, 应用于RDD的每个元素,并返回一个RDD

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)

lines = sc.textFile("/test_file")

def sp(line):
        ss = line.strip().split(",")
        name =  str(ss[0])
        eng = str(ss[1])
        math = str(ss[2])
        lang = str(ss[3])
        return name,eng,math,lang

rdd = lines.map(sp).collect()
#print (rdd)
[('zhangsan', '77', '88', '99'), ('lisi', '56', '78', '89'), ('wanger', '78', '77', '67')]

for line in rdd:
        print (line)

('zhangsan', '77', '88', '99')
('lisi', '56', '78', '89')
('wanger', '78', '77', '67')

2. flatMap

一对多函数, 将数据按照定义的规则拆分成多条记录 并返回一个新的RDD

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)

def flatMap_func(x):
        return x.strip().split(",")

lines = sc.textFile("/test_file")
rdd = lines.flatMap(flatMap_func).collect()
print (rdd)
#把一个文件里的内容按行读取  并按照“,”分割成单词
[u'zhangsan', u'77', u'88', u'99', u'lisi', u'56', u'78', u'89', u'wanger', u'78', u'77', u'67']

3.filter

过滤掉不符合条件的元素,返回一个新的RDD 函数需传入两个参数

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)


def sp(line):
        ss = line.strip().split(",")
        name =  str(ss[0])
        eng = int(ss[1])
        math = int(ss[2])
        lang = int(ss[3])
        return name,eng,math,lang


def flatMap_func(x):
        return x.strip().split(",")

def filter_func(x):
        if x[3] > 80:
                return x

lines = sc.textFile("/test_file")
rdd = lines.map(sp) \
        .filter(filter_func)\
        .collect()

print (rdd)

#过滤掉了lang小于80的记录
[('zhangsan', 77, 88, 99), ('lisi', 56, 78, 89)]

4.reduce

汇总RDD的所有元素

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)

li = [1,2,3,4,5]
vec = sc.parallelize(li)

rdd = vec.reduce(lambda x,y:int(x)+int(y))
print (rdd)
#15

li = ["asd","asd","word"]
vec = sc.parallelize(li)

rdd = vec.reduce(lambda x,y:x+y)
print (rdd)
#asdasdword

5. countByValue

统计RDD中每个元素出现的次数

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)

li = ["asd","ad","asd","hello","word","word"]
vec = sc.parrallelize(li)

re = vec.countByValue()
print (re)
#defaultdict(<type 'int'>, {'asd': 2, 'word': 2, 'hello': 1, 'ad': 1})

6.reduceByKey

按key聚合 可自定义函数

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)


def sp(line):
        ss = line.strip().split(",")
        name =  str(ss[0])
        eng = int(ss[1])
        math = int(ss[2])
        lang = int(ss[3])
        return name,eng,math,lang


def flatMap_func(x):
        return x.strip().split(",")

def filter_func(x):
        if x[3] > 80:
                return x

lines = sc.textFile("/test_file")
rdd = lines.flatMap(flatMap_func) \
        .map(lambda x:(x,1)) \
        .reduceByKey(lambda x,y:x+y) \
        .collect()

print (rdd)

#[(u'77', 2), (u'wanger', 1), (u'56', 1), (u'99', 1), (u'lisi', 1), (u'88', 1), (u'89', 1), (u'67', 1), (u'zhangsan', 1), (u'78', 2)]

7.sortBy 排序
from pyspark import SparkConf,SparkContext
import sys

reload(sys)
sys.setdefaultencoding("utf8")


conf = SparkConf().setAppName("Name").setMaster("local")
sc = SparkContext(conf=conf)

infile_path = "file:///home/njliu/prc/pyspark/RDD/The_Man_of_Property.txt"
infile = sc.textFile(infile_path)
re = infile.flatMap(lambda l:l.strip().split(" ")) \
        .map(lambda x:(x,1)) \
        .reduceByKey(lambda x,y:x +y) \
        .sortBy(lambda x:x[1],False) \

for line in re.collect():
        print ("\t".join([line[0],str(line[1])]))


7.groupByKey
from pyspark import SparkConf,SparkContext

def sub_process(k,v):
        tmp_list = []
        for tu in v:
                tmp_list.append(tu)
        res_list = sorted(tmp_list,key=lambda x:x[1],reverse=True)
        res_list.insert(0,k)
        return res_list

if __name__ == "__main__":
        conf = SparkConf().setAppName("A Name").setMaster("local")
        sc = SparkContext(conf=conf)

        infiles = sc.textFile("file:///home/njliu/prc/pyspark/RDD/uis.data")
        res = infiles.map(lambda line:line.strip().split("\t")) \
                .filter(lambda x:x[2]>1.5) \
                .map(lambda x:(x[0],(str(x[1]),str(x[2])))) \
                .groupByKey() \
                .map(lambda (k,v):sub_process(k,v))\

        for line in res.collect():
                print (line)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352