pySpark DataFrame入门

DataFrame是一种不可变的分布式数据集,这种数据被组织成指定的列,类似于关系数据库中的表。Spark DataFrame与Python pandas 中的DataFrame类似,通过在分布式数据集上施加结构,让 spark 用户利用spark SQL来查询结构化的数据或使用spark表达式方法。

1、创建DataFrame

可以通过直接读入json或parquet等文件来创建DataFrame,还可以通过RDD来创建DataFrame。

df = spark.read.parquet(parquet_file)
df = spark.read.csv(csv_file)
df = spark.read.json(json_file)

df = spark.createDataFrame(RDD, schema)
df = rdd.toDF(*cols)

2、DataFrame数据初步查看

通过printSchema可以查看DataFrame各列的数据类型,而describe则可以查看各列数据的统计情况。

# 查看DataFrame数据结构
df = spark.createDataFrame([('1', 'Joe', '70000', '1'), ('2', 'Henry', '80000', None)],
                           ['Id', 'Name', 'Sallary', 'DepartmentId'])
df.printSchema()
# 输出
root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sallary: string (nullable = true)
 |-- DepartmentId: string (nullable = true)

# 查看数据基本统计情况
df.describe().show()
# 输出
+-------+------------------+-----+-----------------+------------+
|summary|                Id| Name|          Sallary|DepartmentId|
+-------+------------------+-----+-----------------+------------+
|  count|                 2|    2|                2|           1|
|   mean|               1.5| null|          75000.0|         1.0|
| stddev|0.7071067811865476| null|7071.067811865475|         NaN|
|    min|                 1|Henry|            70000|           1|
|    max|                 2|  Joe|            80000|           1|
+-------+------------------+-----+-----------------+------------+

from pyspark.sql.functions import count
# 查看各列非空记录的数量
df.agg(*[count(c).alias(c) for c in df.columns]).show()
# 输出
+---+----+-------+------------+
| Id|Name|Sallary|DepartmentId|
+---+----+-------+------------+
|  2|   2|      2|           1|
+---+----+-------+------------+

3、操作DataFrame

3.1 选择DataFrame子集

在很多时候我们不需要分析全部的DataFrame元素,只需要其中一部分,这时候便需要对其列进行选择。pyspark DataFrame筛选子集的方法很多:

  • df.select(), 根据列名来选择子集;
  • df.selectExpr(), 用来选择某列并对某列进行变换,返回变换后的值;
  • df.where(), df.filter(), 这两个函数的用法相同,都是用来提取符合特定条件的记录(行);
  • df.distinct(), 用来过滤重复的记录(行),返回不含重复记录的DataFrame子集;
  • df.sample(withReplacement, fraction, seed=None),随机抽样;
  • df.sampleBy(col, fractions, seed=None),根据某一列类别来进行抽样,用来进行分层抽样;
  • df.withColumn(colName, col),用来对某一列进行操作,如转换数据类型,根据某一列创建新列等;
  • withColumnRenamed(existing, new), 重命名列;
from pyspark.sql.functions import *
df = spark.createDataFrame([('a',[1,2,3]),('b',[2,3,4])], ['key','value'])
df.show()
df.select(df.key, explode(df.value)).show()
+---+---------+
|key|    value|
+---+---------+
|  a|[1, 2, 3]|
|  b|[2, 3, 4]|
+---+---------+

+---+---+
|key|col|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  b|  2|
|  b|  3|
|  b|  4|
+---+---+

df = spark.createDataFrame([('a',1),('a',2),('a',3),('a',1),('b',1),('b',2)],['key', 'val'])
df.show()
|key|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  a|  1|
|  b|  1|
|  b|  2|
+---+---+

df.select('key').show()
df.selectExpr('length(key)').show()
+---+
|key|
+---+
|  a|
|  a|
|  a|
|  a|
|  b|
|  b|
+---+

+-----------+
|length(key)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
+-----------+

df.filter(df.key=='a').show()
df.where(df.key=='a').show()
+---+---+
|key|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  a|  1|
+---+---+

+---+---+
|key|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  a|  1|
+---+---+

df.distinct().show()
+---+---+
|key|val|
+---+---+
|  a|  1|
|  b|  1|
|  a|  2|
|  a|  3|
|  b|  2|
+---+---+

 df.sample(withReplacement=False, fraction=0.5, seed=666).show()
+---+---+
|key|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  1|
|  b|  2|
+---+---+

df.sampleBy('key', fractions={'a':0.2,'b':0.6}, seed=123).show()
+---+---+
|key|val|
+---+---+
|  a|  3|
|  b|  2|
+---+---+

add1 = udf(lambda x: x+1)
df.withColumn('val1', add1('val')).show()
df.withColumn('val', df.val.cast('float')).show()
+---+---+----+
|key|val|val1|
+---+---+----+
|  a|  1|   2|
|  a|  2|   3|
|  a|  3|   4|
|  a|  1|   2|
|  b|  1|   2|
|  b|  2|   3|
+---+---+----+

+---+---+
|key|val|
+---+---+
|  a|1.0|
|  a|2.0|
|  a|3.0|
|  a|1.0|
|  b|1.0|
|  b|2.0|
+---+---+

df.withColumnRenamed('key', 'kk').show()
+---+---+
| kk|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  a|  1|
|  b|  1|
|  b|  2|
+---+---+
3.2 处理NA(空值)元素

pyspark中提供了df.na.drop方法来丢掉空值行,使用df.na.fill方法来使用某些值来替换空值。

df.show()
+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|        null|
+---+-----+-------+------------+

# df.fillna('666') 效果与下面相同
df.na.fill('666')
+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|         666|
+---+-----+-------+------------+

# df.dropna()与下面结果相同
df.na.drop()
+---+----+-------+------------+
| Id|Name|Sallary|DepartmentId|
+---+----+-------+------------+
|  1| Joe|  70000|           1|
+---+----+-------+------------+
3.3 连接DataFrame

与大多数关系数据表相同,spark中的DataFrame也提供了join功能。

df1 = spark.createDataFrame([('a',1),('b',2),('c',3)],['x1','x2'])
df2 = spark.createDataFrame([('a','T'),('b','F'),('d','T')],['x1','x3'])
df1.show();df2.show()
+---+---+   
| x1| x2|
+---+---+
|  a|  1|
|  b|  2|
|  c|  3|
+---+---+

+---+---+
| x1| x3|
+---+---+
|  a|  T|
|  b|  F|
|  d|  T|
+---+---+

join支持的方式有:'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 'left_outer', 'rightouter', 'right', 'right_outer', 'leftsemi', 'left_semi', 'leftanti', 'left_anti', 'cross':

print('left:')
df1.join(df2, on='x1', how='left').show()
df1.join(df2, df1.x1==df2.x1, how='left').show()
print('right:')
df1.join(df2, on='x1', how='right').show()
print('outer:')
df1.join(df2, on='x1', how='outer').show()
print('inner:')
df1.join(df2, on='x1', how='inner').show()
print('leftsemi:')
df1.join(df2, on='x1', how='leftsemi').show()
print('leftanti:')
df1.join(df2, on='x1', how='leftanti').show()

left:
+---+---+----+
| x1| x2|  x3|
+---+---+----+
|  c|  3|null|
|  b|  2|   F|
|  a|  1|   T|
+---+---+----+

+---+---+----+----+
| x1| x2|  x1|  x3|
+---+---+----+----+
|  c|  3|null|null|
|  b|  2|   b|   F|
|  a|  1|   a|   T|
+---+---+----+----+

right:
+---+----+---+
| x1|  x2| x3|
+---+----+---+
|  d|null|  T|
|  b|   2|  F|
|  a|   1|  T|
+---+----+---+

outer:
+---+----+----+
| x1|  x2|  x3|
+---+----+----+
|  d|null|   T|
|  c|   3|null|
|  b|   2|   F|
|  a|   1|   T|
+---+----+----+

inner:
+---+---+---+
| x1| x2| x3|
+---+---+---+
|  b|  2|  F|
|  a|  1|  T|
+---+---+---+

leftsemi:
+---+---+
| x1| x2|
+---+---+
|  b|  2|
|  a|  1|
+---+---+

leftanti:
+---+---+
| x1| x2|
+---+---+
|  c|  3|
+---+---+

需要注意的是,join后的DataFrame是乱序的。

3.4 集合操作

DataFrame也支持常见的集合操作:union, intersection, subtract。用法如下:

df1 = spark.createDataFrame([('a',1),('b',2),('c',3)],['x1','x2'])
df2 = spark.createDataFrame([('a','T'),('b','F'),('d','T')],['x1','x2'])
df1.show();df2.show()
+---+---+
| x1| x2|
+---+---+
|  a|  1|
|  b|  2|
|  c|  3|
+---+---+

+---+---+
| x1| x2|
+---+---+
|  a|  T|
|  b|  F|
|  d|  T|
+---+---+

集合操作如下:

print('union:')
df1.union(df2).orderBy('x1', ascending=True).show()
print('intersect:')
df1.intersect(df2).orderBy('x1', ascending=True).show()
print('subtract:')
df1.subtract(df2).orderBy('x1', ascending=True).show()

union:
+---+---+
| x1| x2|
+---+---+
|  a|  T|
|  a|  1|
|  b|  2|
|  b|  F|
|  c|  3|
|  d|  T|
+---+---+

intersect:
+---+---+
| x1| x2|
+---+---+
+---+---+

subtract:
+---+---+
| x1| x2|
+---+---+
|  a|  1|
|  b|  2|
|  c|  3|
+---+---+
3.4 DataFrame的一些高级操作

拆分DataFrame单列

df = spark.createDataFrame([('a',[1,2,3]), ('b', [4,5,6])], ['key', 'values'])
df.show()
df.printSchema()

+---+---------+
|key|   values|
+---+---------+
|  a|[1, 2, 3]|
|  b|[4, 5, 6]|
+---+---------+

df.selectExpr('key', 'values[1]').show()
+---+---------+
|key|values[1]|
+---+---------+
|  a|        2|
|  b|        5|
+---+---------+

单列变多行

df = spark.createDataFrame([('a','1,2,3'),('b','4,5,6')],['key', 'values'])
df.show()
+---+------+
|key|values|
+---+------+
|  a| 1,2,3|
|  b| 4,5,6|
+---+------+

import pyspark.sql.functions as F
df.select("key", F.split("values", ",").alias("values"),
          F.posexplode(F.split("values", ",")).alias("pos", "val")).drop("val").select("key", F.expr("values[pos]").alias("val")).show()

+---+---+
|key|val|
+---+---+
|  a|  1|
|  a|  2|
|  a|  3|
|  b|  4|
|  b|  5|
|  b|  6|
+---+---+

多列变多行

from pyspark.sql.functions import *

def to_long(df, by):
    cols, dtypes = zip(*((c,t) for (c,t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes))==1, 'All columns have to be of the same type'
    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([struct(lit(c).alias('key1'), col(c).alias('val')) for c in cols])).alias('kvs')
    return df.select(by, kvs).select(by, 'kvs.key1', 'kvs.val')


df = spark.createDataFrame([('a',1,2,3),('b',4,5,6)],['key', 'c1', 'c2', 'c3'])
df.show()
+---+---+---+---+
|key| c1| c2| c3|
+---+---+---+---+
|  a|  1|  2|  3|
|  b|  4|  5|  6|
+---+---+---+---+

dd = to_long(df, 'key')
dd.show()
+---+---+---+
|key|key1|val|
+---+---+---+
|  a| c1|  1|
|  a| c2|  2|
|  a| c3|  3|
|  b| c1|  4|
|  b| c2|  5|
|  b| c3|  6|
+---+---+---+

分组统计

dd.show()
+---+----+---+
|key|key1|val|
+---+----+---+
|  a|  c1|  1|
|  a|  c2|  2|
|  a|  c3|  3|
|  b|  c1|  4|
|  b|  c2|  5|
|  b|  c3|  6|
+---+----+---+

dd.groupby('key').count().show()
+---+-----+
|key|count|
+---+-----+
|  b|    3|
|  a|    3|
+---+-----+

数据透视表

dd.show()
+---+----+---+
|key|key1|val|
+---+----+---+
|  a|  c1|  1|
|  a|  c2|  2|
|  a|  c3|  3|
|  b|  c1|  4|
|  b|  c2|  5|
|  b|  c3|  6|
+---+----+---+

dd.groupby('key').pivot('val').count().show()
+---+----+----+----+----+----+----+
|key|   1|   2|   3|   4|   5|   6|
+---+----+----+----+----+----+----+
|  b|null|null|null|   1|   1|   1|
|  a|   1|   1|   1|null|null|null|
+---+----+----+----+----+----+----+

聚合函数

dd.show()
+---+----+---+
|key|key1|val|
+---+----+---+
|  a|  c1|  1|
|  a|  c2|  2|
|  a|  c3|  3|
|  b|  c1|  4|
|  b|  c2|  5|
|  b|  c3|  6|
+---+----+---+

import pyspark.sql.functions as F
dd.agg(F.sum(dd.val), F.max(dd.val), F.min(dd.val)).show()
+--------+--------+--------+
|sum(val)|max(val)|min(val)|
+--------+--------+--------+
|      21|       6|       1|
+--------+--------+--------+

参考:
pyspark官方文档

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容