DataFrame是一种不可变的分布式数据集,这种数据被组织成指定的列,类似于关系数据库中的表。Spark DataFrame与Python pandas 中的DataFrame类似,通过在分布式数据集上施加结构,让 spark 用户利用spark SQL来查询结构化的数据或使用spark表达式方法。
1、创建DataFrame
可以通过直接读入json或parquet等文件来创建DataFrame,还可以通过RDD来创建DataFrame。
df = spark.read.parquet(parquet_file)
df = spark.read.csv(csv_file)
df = spark.read.json(json_file)
df = spark.createDataFrame(RDD, schema)
df = rdd.toDF(*cols)
2、DataFrame数据初步查看
通过printSchema可以查看DataFrame各列的数据类型,而describe则可以查看各列数据的统计情况。
# 查看DataFrame数据结构
df = spark.createDataFrame([('1', 'Joe', '70000', '1'), ('2', 'Henry', '80000', None)],
['Id', 'Name', 'Sallary', 'DepartmentId'])
df.printSchema()
# 输出
root
|-- Id: string (nullable = true)
|-- Name: string (nullable = true)
|-- Sallary: string (nullable = true)
|-- DepartmentId: string (nullable = true)
# 查看数据基本统计情况
df.describe().show()
# 输出
+-------+------------------+-----+-----------------+------------+
|summary| Id| Name| Sallary|DepartmentId|
+-------+------------------+-----+-----------------+------------+
| count| 2| 2| 2| 1|
| mean| 1.5| null| 75000.0| 1.0|
| stddev|0.7071067811865476| null|7071.067811865475| NaN|
| min| 1|Henry| 70000| 1|
| max| 2| Joe| 80000| 1|
+-------+------------------+-----+-----------------+------------+
from pyspark.sql.functions import count
# 查看各列非空记录的数量
df.agg(*[count(c).alias(c) for c in df.columns]).show()
# 输出
+---+----+-------+------------+
| Id|Name|Sallary|DepartmentId|
+---+----+-------+------------+
| 2| 2| 2| 1|
+---+----+-------+------------+
3、操作DataFrame
3.1 选择DataFrame子集
在很多时候我们不需要分析全部的DataFrame元素,只需要其中一部分,这时候便需要对其列进行选择。pyspark DataFrame筛选子集的方法很多:
-
df.select()
, 根据列名来选择子集; -
df.selectExpr()
, 用来选择某列并对某列进行变换,返回变换后的值; -
df.where()
,df.filter()
, 这两个函数的用法相同,都是用来提取符合特定条件的记录(行); -
df.distinct()
, 用来过滤重复的记录(行),返回不含重复记录的DataFrame子集; -
df.sample(withReplacement, fraction, seed=None)
,随机抽样; -
df.sampleBy(col, fractions, seed=None)
,根据某一列类别来进行抽样,用来进行分层抽样; -
df.withColumn(colName, col)
,用来对某一列进行操作,如转换数据类型,根据某一列创建新列等; -
withColumnRenamed(existing, new)
, 重命名列;
from pyspark.sql.functions import *
df = spark.createDataFrame([('a',[1,2,3]),('b',[2,3,4])], ['key','value'])
df.show()
df.select(df.key, explode(df.value)).show()
+---+---------+
|key| value|
+---+---------+
| a|[1, 2, 3]|
| b|[2, 3, 4]|
+---+---------+
+---+---+
|key|col|
+---+---+
| a| 1|
| a| 2|
| a| 3|
| b| 2|
| b| 3|
| b| 4|
+---+---+
df = spark.createDataFrame([('a',1),('a',2),('a',3),('a',1),('b',1),('b',2)],['key', 'val'])
df.show()
|key|val|
+---+---+
| a| 1|
| a| 2|
| a| 3|
| a| 1|
| b| 1|
| b| 2|
+---+---+
df.select('key').show()
df.selectExpr('length(key)').show()
+---+
|key|
+---+
| a|
| a|
| a|
| a|
| b|
| b|
+---+
+-----------+
|length(key)|
+-----------+
| 1|
| 1|
| 1|
| 1|
| 1|
| 1|
+-----------+
df.filter(df.key=='a').show()
df.where(df.key=='a').show()
+---+---+
|key|val|
+---+---+
| a| 1|
| a| 2|
| a| 3|
| a| 1|
+---+---+
+---+---+
|key|val|
+---+---+
| a| 1|
| a| 2|
| a| 3|
| a| 1|
+---+---+
df.distinct().show()
+---+---+
|key|val|
+---+---+
| a| 1|
| b| 1|
| a| 2|
| a| 3|
| b| 2|
+---+---+
df.sample(withReplacement=False, fraction=0.5, seed=666).show()
+---+---+
|key|val|
+---+---+
| a| 1|
| a| 2|
| a| 1|
| b| 2|
+---+---+
df.sampleBy('key', fractions={'a':0.2,'b':0.6}, seed=123).show()
+---+---+
|key|val|
+---+---+
| a| 3|
| b| 2|
+---+---+
add1 = udf(lambda x: x+1)
df.withColumn('val1', add1('val')).show()
df.withColumn('val', df.val.cast('float')).show()
+---+---+----+
|key|val|val1|
+---+---+----+
| a| 1| 2|
| a| 2| 3|
| a| 3| 4|
| a| 1| 2|
| b| 1| 2|
| b| 2| 3|
+---+---+----+
+---+---+
|key|val|
+---+---+
| a|1.0|
| a|2.0|
| a|3.0|
| a|1.0|
| b|1.0|
| b|2.0|
+---+---+
df.withColumnRenamed('key', 'kk').show()
+---+---+
| kk|val|
+---+---+
| a| 1|
| a| 2|
| a| 3|
| a| 1|
| b| 1|
| b| 2|
+---+---+
3.2 处理NA(空值)元素
pyspark中提供了df.na.drop
方法来丢掉空值行,使用df.na.fill
方法来使用某些值来替换空值。
df.show()
+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
| 1| Joe| 70000| 1|
| 2|Henry| 80000| null|
+---+-----+-------+------------+
# df.fillna('666') 效果与下面相同
df.na.fill('666')
+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
| 1| Joe| 70000| 1|
| 2|Henry| 80000| 666|
+---+-----+-------+------------+
# df.dropna()与下面结果相同
df.na.drop()
+---+----+-------+------------+
| Id|Name|Sallary|DepartmentId|
+---+----+-------+------------+
| 1| Joe| 70000| 1|
+---+----+-------+------------+
3.3 连接DataFrame
与大多数关系数据表相同,spark中的DataFrame也提供了join功能。
df1 = spark.createDataFrame([('a',1),('b',2),('c',3)],['x1','x2'])
df2 = spark.createDataFrame([('a','T'),('b','F'),('d','T')],['x1','x3'])
df1.show();df2.show()
+---+---+
| x1| x2|
+---+---+
| a| 1|
| b| 2|
| c| 3|
+---+---+
+---+---+
| x1| x3|
+---+---+
| a| T|
| b| F|
| d| T|
+---+---+
join
支持的方式有:'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 'left_outer', 'rightouter', 'right', 'right_outer', 'leftsemi', 'left_semi', 'leftanti', 'left_anti', 'cross':
print('left:')
df1.join(df2, on='x1', how='left').show()
df1.join(df2, df1.x1==df2.x1, how='left').show()
print('right:')
df1.join(df2, on='x1', how='right').show()
print('outer:')
df1.join(df2, on='x1', how='outer').show()
print('inner:')
df1.join(df2, on='x1', how='inner').show()
print('leftsemi:')
df1.join(df2, on='x1', how='leftsemi').show()
print('leftanti:')
df1.join(df2, on='x1', how='leftanti').show()
left:
+---+---+----+
| x1| x2| x3|
+---+---+----+
| c| 3|null|
| b| 2| F|
| a| 1| T|
+---+---+----+
+---+---+----+----+
| x1| x2| x1| x3|
+---+---+----+----+
| c| 3|null|null|
| b| 2| b| F|
| a| 1| a| T|
+---+---+----+----+
right:
+---+----+---+
| x1| x2| x3|
+---+----+---+
| d|null| T|
| b| 2| F|
| a| 1| T|
+---+----+---+
outer:
+---+----+----+
| x1| x2| x3|
+---+----+----+
| d|null| T|
| c| 3|null|
| b| 2| F|
| a| 1| T|
+---+----+----+
inner:
+---+---+---+
| x1| x2| x3|
+---+---+---+
| b| 2| F|
| a| 1| T|
+---+---+---+
leftsemi:
+---+---+
| x1| x2|
+---+---+
| b| 2|
| a| 1|
+---+---+
leftanti:
+---+---+
| x1| x2|
+---+---+
| c| 3|
+---+---+
需要注意的是,join后的DataFrame是乱序的。
3.4 集合操作
DataFrame也支持常见的集合操作:union
, intersection
, subtract
。用法如下:
df1 = spark.createDataFrame([('a',1),('b',2),('c',3)],['x1','x2'])
df2 = spark.createDataFrame([('a','T'),('b','F'),('d','T')],['x1','x2'])
df1.show();df2.show()
+---+---+
| x1| x2|
+---+---+
| a| 1|
| b| 2|
| c| 3|
+---+---+
+---+---+
| x1| x2|
+---+---+
| a| T|
| b| F|
| d| T|
+---+---+
集合操作如下:
print('union:')
df1.union(df2).orderBy('x1', ascending=True).show()
print('intersect:')
df1.intersect(df2).orderBy('x1', ascending=True).show()
print('subtract:')
df1.subtract(df2).orderBy('x1', ascending=True).show()
union:
+---+---+
| x1| x2|
+---+---+
| a| T|
| a| 1|
| b| 2|
| b| F|
| c| 3|
| d| T|
+---+---+
intersect:
+---+---+
| x1| x2|
+---+---+
+---+---+
subtract:
+---+---+
| x1| x2|
+---+---+
| a| 1|
| b| 2|
| c| 3|
+---+---+
3.4 DataFrame的一些高级操作
拆分DataFrame单列
df = spark.createDataFrame([('a',[1,2,3]), ('b', [4,5,6])], ['key', 'values'])
df.show()
df.printSchema()
+---+---------+
|key| values|
+---+---------+
| a|[1, 2, 3]|
| b|[4, 5, 6]|
+---+---------+
df.selectExpr('key', 'values[1]').show()
+---+---------+
|key|values[1]|
+---+---------+
| a| 2|
| b| 5|
+---+---------+
单列变多行
df = spark.createDataFrame([('a','1,2,3'),('b','4,5,6')],['key', 'values'])
df.show()
+---+------+
|key|values|
+---+------+
| a| 1,2,3|
| b| 4,5,6|
+---+------+
import pyspark.sql.functions as F
df.select("key", F.split("values", ",").alias("values"),
F.posexplode(F.split("values", ",")).alias("pos", "val")).drop("val").select("key", F.expr("values[pos]").alias("val")).show()
+---+---+
|key|val|
+---+---+
| a| 1|
| a| 2|
| a| 3|
| b| 4|
| b| 5|
| b| 6|
+---+---+
多列变多行
from pyspark.sql.functions import *
def to_long(df, by):
cols, dtypes = zip(*((c,t) for (c,t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes))==1, 'All columns have to be of the same type'
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([struct(lit(c).alias('key1'), col(c).alias('val')) for c in cols])).alias('kvs')
return df.select(by, kvs).select(by, 'kvs.key1', 'kvs.val')
df = spark.createDataFrame([('a',1,2,3),('b',4,5,6)],['key', 'c1', 'c2', 'c3'])
df.show()
+---+---+---+---+
|key| c1| c2| c3|
+---+---+---+---+
| a| 1| 2| 3|
| b| 4| 5| 6|
+---+---+---+---+
dd = to_long(df, 'key')
dd.show()
+---+---+---+
|key|key1|val|
+---+---+---+
| a| c1| 1|
| a| c2| 2|
| a| c3| 3|
| b| c1| 4|
| b| c2| 5|
| b| c3| 6|
+---+---+---+
分组统计
dd.show()
+---+----+---+
|key|key1|val|
+---+----+---+
| a| c1| 1|
| a| c2| 2|
| a| c3| 3|
| b| c1| 4|
| b| c2| 5|
| b| c3| 6|
+---+----+---+
dd.groupby('key').count().show()
+---+-----+
|key|count|
+---+-----+
| b| 3|
| a| 3|
+---+-----+
数据透视表
dd.show()
+---+----+---+
|key|key1|val|
+---+----+---+
| a| c1| 1|
| a| c2| 2|
| a| c3| 3|
| b| c1| 4|
| b| c2| 5|
| b| c3| 6|
+---+----+---+
dd.groupby('key').pivot('val').count().show()
+---+----+----+----+----+----+----+
|key| 1| 2| 3| 4| 5| 6|
+---+----+----+----+----+----+----+
| b|null|null|null| 1| 1| 1|
| a| 1| 1| 1|null|null|null|
+---+----+----+----+----+----+----+
聚合函数
dd.show()
+---+----+---+
|key|key1|val|
+---+----+---+
| a| c1| 1|
| a| c2| 2|
| a| c3| 3|
| b| c1| 4|
| b| c2| 5|
| b| c3| 6|
+---+----+---+
import pyspark.sql.functions as F
dd.agg(F.sum(dd.val), F.max(dd.val), F.min(dd.val)).show()
+--------+--------+--------+
|sum(val)|max(val)|min(val)|
+--------+--------+--------+
| 21| 6| 1|
+--------+--------+--------+
参考:
pyspark官方文档