pyspark常用操作

加载 读取数据

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('data_processing').getOrCreate()
df = spark.read_csv('data.csv',inferSchema=True,header=True) # inferSchema用于自行推断数据类型
-------------
df.columns #查看列名
(df.count,len(df.columns))#查看形状
df.printSchema() #类似info
df.show(5) #类似head
df.select('age','name').show()
df.describe().show()
-------------

添加一列新列

df = df.withColumn('age_after_10_years',(df['age']+10))

转换数据类型

from pyspark.sql.types import StringType,DoubleType
df['age'].cast(DoubleType())
-------------

筛选数据

df.filter(df['age']==20).show()
df.filter((df['age']==20) & (df['name']=='biob')).show()
求unique
df.select('name').distinct().show()
-------------

数据分组

df.groupBy('age').count().orderBy('age',ascending=False).show()
df.groupBy('age').mean() #返回 所有除去age的其他列的mean值
聚合
df.groupBy('age').agg({'salary':'sum'})
------------

自定义函数

from pyspark.sql.function import udf
def func():
    ***
    ***
    return **
UDF = udf(func,StringType())  #func的返回指的类型 可以是 StringType 也可以是 IntegerType() 看具体情况
df.withColumn('salary_new',UDF(df['salary']))  #新加一列 对df['salary'] 进行定义的函数操作
-----------
UDF = udf(lambda x:....,StringType())
df.withColumn('salary_new',UDF(df['salary']))
-----------

使用pandas_udf速度更快

from pyspark.sql.function import pandas_udf
def func():
    ...
UDF = pandas_udf(func,IntegerType())
df.withColumn(df['name'],UDF(df['name']))

函数的方法也可适用于多列

-----------

去重

df = df.dropDuplicates()
-----------

删除列

df = df.drop('salary')

写入数据 csv格式

path = 'result/train'
df.coalesce(1).write.format('csv').option('header','true').save(path)
-----------

写入数据 嵌套结构 适用于数据集特别大 可以对其进行压缩

path = 'result/train'
df.write.format('parquet').save(path)
------------

多列特征合并

from  pyspark.ml.linalg import Vector
from pyspark.ml.feature import VectorAssembler #VectorAssembler可将所有的特征汇成一列特征  

vec_assembler = VectorAssembler(inputCols=['city','career','age','gender'],outputCol='people_feature')
df = vec_assembler.transform(df)
-----------

划分数据集

train,test = df.randomSplit([0.8,0.2])
-----------
from pyspark.ml.regression import LinearRegression
linear_reg = LinearRegression(labelCol='label')
model = linear_reg.fit(train)
model_ = model.evaluate(train)
r2 = model_.r2 #还可以取更多的指标

res = model.evaluate(test).predictions
-----------

特征工程

from spark.ml.feature import StringIndexer  #类似于sklearn 的 label encoder
from spark.ml.feature import OneHotEncoder
from spark.ml.feature import VectorAssembler

df = StringIndexer(inputCol='city',outputCol='city_').fit_transform(df)
-----------

查看数据分布 这里用不了value_counts

df.groupBy('label').count().show()
-----------
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier

model = RandomForestClassifier(labelCol='label',numTrees=50).fit(train)
-----------

查看准确率 精度 auc

from spark.ml.evaluation import MulticlassClassificationEvaluator
from spark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(test)
accuracy = MulticlassClassificationEvaluator(labelCol='label',metricName='accuracy').evaluate(predictions) # metricName = 'weightedPrecision' 精度

auc = BinaryClassificationEvaluator(labelCol='label').evaluate(predictions)
----------

推荐算法  ALS(基于矩阵分解的一种方法)

from pyspark.ml.recommendation import ALS
rec = ALS(maxIter=10, regParam=0.01, userCol='userID', itemCol='itemID', ratingCol='rating', nonnegative=True, coldStartStrategy='drop')

其中 nonnegative 表示 不会在推荐系统中创建负数评分  coldStartStrategy可以防止生成任何nan评分预测
-----------
rec_model = rec.fit(train)
predicted_ratings = rec_model.transfrom(train)

from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName='rmse',predictionCol='prediction',labelCol='rating')
rmase = evaluator.evaluate(predicted_ratings)

进行到这一步还没有完成推荐  只是完成了对测试集用户评分的预测 
-----------

接下来
推荐用户可能喜欢的排名靠前的电影 
先创建一系列独立的电影
unique_movies = df.select('movie').distinct()

假如我们对一位特定的userid 推荐电影  先过滤掉 他看过的电影
watched_movies = df.filter(df['userid']==userid).select('movie').distinct()
-----------

然后通过合并两张表 过滤空值 找出可以推荐的电影

total_movies = unique_movies.join(watched_movies,unique_movies.movie == watched_movies.movie)
remaining_movies = total_movies.where(col('watched_movies.movie').isNull()).select(unique.movies).distinct()
-----------

然后再用之前的模型对其进行评分预测 

recommendations = rec_model.transform(remaing_movies).orderBy('prediction',ascending=False)

之后还可以用 IndexToString 反变换  把推荐的电影数字 映射为 电影名字

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容