PySpark DataFrame 入门

1 创建数据

from pyspark.sql import *
from pyspark.sql import functions as F
Employee = Row("firstName", "lastName", "email", "salary","depid")
employee1 = Employee('Basher', 'armbrust', 'bash@edureka.co', 100000,1)
employee2 = Employee('Daniel', 'meng', 'daniel@stanford.edu', 120000,1 )
employee3 = Employee('Muriel', None, 'muriel@waterloo.edu', 140000 ,2)
employee4 = Employee('Rachel', 'wendell', 'rach_1@edureka.co', 160000,3)
employee5 = Employee('Rachel', 'galifianakis', 'rach_2@edureka.co', 160000,4 )
df_employee = spark.createDataFrame((employee1,employee2,employee3,employee4,employee5))
# cache 一下,避免反复运行
df_employee.cache()
df_employee.count()

>>> df_employee.show()
+---------+------------+-------------------+------+-----+
|firstName|    lastName|              email|salary|depid|
+---------+------------+-------------------+------+-----+
|   Basher|    armbrust|    bash@edureka.co|100000|    1|
|   Daniel|        meng|daniel@stanford.edu|120000|    1|
|   Muriel|        null|muriel@waterloo.edu|140000|    2|
|   Rachel|     wendell|  rach_1@edureka.co|160000|    3|
|   Rachel|galifianakis|  rach_2@edureka.co|160000|    4|
+---------+------------+-------------------+------+-----+

2 Distinct 去重

df_employee.select('firstName').distinct().show()
+---------+
|firstName|
+---------+
|   Muriel|
|   Basher|
|   Rachel|
|   Daniel|
+---------+
>>> df_employee.select('firstName','salary').distinct().show()
+---------+------+
|firstName|salary|
+---------+------+
|   Rachel|160000|
|   Muriel|140000|
|   Daniel|120000|
|   Basher|100000|
+---------+------+
# select count(distinct(firstName)) from employee
df_employee.select('firstName').distinct().count()

3 聚合Group by

# select depid,count(1) as count from employee group by depid
df_employee.groupby('depid').count().show()
# group by 多个字段
df_employee.groupby('depid','firstName').count().show()
+-----+---------+-----+
|depid|firstName|count|
+-----+---------+-----+
|    1|   Basher|    1|
|    3|   Rachel|    1|
|    4|   Rachel|    1|
|    2|   Muriel|    1|
|    1|   Daniel|    1|
+-----+---------+-----+
# group by 聚合sum
# select depid,firstName,sum(salary) from employee group by depid,firstName;
>>> df_employee.groupby('depid','firstName').sum('salary').show()
+-----+---------+-----------+
|depid|firstName|sum(salary)|
+-----+---------+-----------+
|    1|   Basher|     100000|
|    3|   Rachel|     160000|
|    4|   Rachel|     160000|
|    2|   Muriel|     140000|
|    1|   Daniel|     120000|
+-----+---------+-----------+
# 可以看到sum(salary)这个name非常不合理,有没有办法提供alias
df_employee.groupby('depid','firstName').agg(F.sum(F.col('salary')).alias('total_salary')).show()

# agg 这个接口更通用,多个groupby指标,也可以使用

agg_stat = [
F.sum(F.col('salary')).alias('total_salary'),
F.max(F.col('salary')).alias('max_salary'),
F.count(F.col('salary')).alias('n')
]

df_employee.groupby('depid').agg(*agg_stat).show()
+-----+------------+----------+---+
|depid|total_salary|max_salary|  n|
+-----+------------+----------+---+
|    1|      220000|    120000|  2|
|    3|      160000|    160000|  1|
|    2|      140000|    140000|  1|
|    4|      160000|    160000|  1|
+-----+------------+----------+---+

4 Filter/ Where 按条件删选

>>> df_employee.filter(df_employee.firstName=='Rachel').show()
+---------+------------+-----------------+------+
|firstName|    lastName|            email|salary|
+---------+------------+-----------------+------+
|   Rachel|     wendell|rach_1@edureka.co|160000|
|   Rachel|galifianakis|rach_2@edureka.co|160000|
+---------+------------+-----------------+------+
## 下面的方式也是可以的
from pyspark.sql import functions as F
df_employee.filter(F.col('firstName')=='Rachel').show()

# 多个条件 and: & or: |
>>> df_employee.filter((F.col('firstName')=='Rachel') | (F.col('firstName')=='Muriel')).show()
+---------+------------+-------------------+------+
|firstName|    lastName|              email|salary|
+---------+------------+-------------------+------+
|   Muriel|        null|muriel@waterloo.edu|140000|
|   Rachel|     wendell|  rach_1@edureka.co|160000|
|   Rachel|galifianakis|  rach_2@edureka.co|160000|
+---------+------------+-------------------+------+
# and
df_employee.filter((F.col('firstName')=='Rachel') & (F.col('lastName')=='wendell')).show()

# filter 太长了,简略一些
filters = (F.col('firstName')=='Rachel') & (F.col('lastName')=='wendell')
df_employee.filter(filters).show()

# filters 再长一点怎么办,分行写
filters = (
(
 (F.col('firstName')=='Rachel') 
 | 
 (F.col('lastName')=='wendell')
 ) & (F.col('salary') >  140000)
)
df_employee.filter(filters).show()
# 直接传字符串

df_employee.filter('(firstName=="Rachel" or lastName is null) or firstName=="Daniel"').show()
+---------+------------+-------------------+------+-----+
|firstName|    lastName|              email|salary|depid|
+---------+------------+-------------------+------+-----+
|   Daniel|        meng|daniel@stanford.edu|120000|    1|
|   Muriel|        null|muriel@waterloo.edu|140000|    2|
|   Rachel|     wendell|  rach_1@edureka.co|160000|    3|
|   Rachel|galifianakis|  rach_2@edureka.co|160000|    4|
+---------+------------+-------------------+------+-----+
# where 等同于filter
df_employee.where('salary>140000').show()

5 排序Order By

df_employee.orderBy('salary',ascending=False)

## 多个字段
df_employee.orderBy(['firstName','salary'],ascending=True).show()

## 多个字段排序方式不同
df_employee.orderBy([F.col('firstName').desc(),F.col('salary').asc()]).show()
## 更简单的方式
df_employee.orderBy([F.desc('firstName'),F.asc('salary')]).show()

6 Join

# 创建表
department1 = Row(id=1, name='HR',bonus=0.2)
department2 = Row(id=2, name='OPS',bonus=0.3)
department3 = Row(id=3, name='FN',bonus=0.3)
department4 = Row(id=4, name='DEV',bonus=0.35)
department5 = Row(id=5, name='AD',bonus=0.21)

df_dep = spark.createDataFrame((department1,department2,department3,department4,department5))
df_dep.cache()
df_dep.count()
>>> df_dep.show()
+-----+---+----+
|bonus| id|name|
+-----+---+----+
|  0.2|  1|  HR|
|  0.3|  2| OPS|
|  0.3|  3|  FN|
| 0.35|  4| DEV|
| 0.21|  5|  AD|
+-----+---+----+
# 默认inner join
>>> df_employee.join(df_dep,df_dep.id==df_employee.depid).show()
+---------+------------+-------------------+------+-----+-----+---+----+
|firstName|    lastName|              email|salary|depid|bonus| id|name|
+---------+------------+-------------------+------+-----+-----+---+----+
|   Basher|    armbrust|    bash@edureka.co|100000|    1|  0.2|  1|  HR|
|   Daniel|        meng|daniel@stanford.edu|120000|    1|  0.2|  1|  HR|
|   Muriel|        null|muriel@waterloo.edu|140000|    2|  0.3|  2| OPS|
|   Rachel|     wendell|  rach_1@edureka.co|160000|    3|  0.3|  3|  FN|
|   Rachel|galifianakis|  rach_2@edureka.co|160000|    4| 0.35|  4| DEV|
+---------+------------+-------------------+------+-----+-----+---+----+
# outer join
>>> df_employee.join(df_dep,df_dep.id==df_employee.depid,how='outer').show()
+---------+------------+-------------------+------+-----+-----+---+----+
|firstName|    lastName|              email|salary|depid|bonus| id|name|
+---------+------------+-------------------+------+-----+-----+---+----+
|     null|        null|               null|  null| null| 0.21|  5|  AD|
|   Basher|    armbrust|    bash@edureka.co|100000|    1|  0.2|  1|  HR|
|   Daniel|        meng|daniel@stanford.edu|120000|    1|  0.2|  1|  HR|
|   Rachel|     wendell|  rach_1@edureka.co|160000|    3|  0.3|  3|  FN|
|   Muriel|        null|muriel@waterloo.edu|140000|    2|  0.3|  2| OPS|
|   Rachel|galifianakis|  rach_2@edureka.co|160000|    4| 0.35|  4| DEV|
+---------+------------+-------------------+------+-----+-----+---+----+


# 还可选 left_outer, right_outer, leftsemi,
# 另,如果join的key相同,可以直接传入column name 
df1.join(df2, ['id1','id2']).show()

问题记录

1 partition 字段首字母消失

dataframe 通过 partition存储后,再读回来,发现原来的partition首字母会被忽略。如"0100" 成了 100
原因是,spark有类型推断,如果字符串像数值,则转为了数值。之后再进行统一类型为字符。
所以,如果出现这种情况,请关闭这个开关

spark.conf.set('spark.sql.sources.partitionColumnTypeInference.enabled','false')
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容