pyspark dataframe基本操作看这篇就够了

1 创建dataframe

1.1 读取文件创建

from pyspark.sql import SparkSession #sparkSession为同统一入口

#创建spakr对象
spark = SparkSession\
    .builder\
    .appName('readfile')\
    .getOrCreate()

# 1.读取csv文件
# 1.读取csv文件
logFilePath = 'births_train.csv'
log_df = spark.read.csv(logFilePath, 
                        encoding='utf-8', 
                        header=True, 
                        inferSchema=True,
                        sep=',')

logFilePath:这是我自定义的一个参数,为文件路径
encoding:文件编码格式,默认为utf-8
header:是否将文件第一行作为表头,True即将文件第一行作为表头
inferSchema:是否自动推断列类型
sep:列分割符

log_df.show()

展示结果如下图


image.png

1.2 手动创建

这种方式一般为测试的时候用,适用于数据量很小的时候

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FirstApp").getOrCreate()

employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), (5, "Kevin", 26), 
             (6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), (9, "Larry", 29), (10, "Kimberly", 29),
             (11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
employees=spark.createDataFrame(employees, schema=["emp_id","name","age"])

这里创建了三列
employees为数据内容,schema为表头,这种方式比较简单,类型为spark推断类型

可能有的同学会见到如下表头的创建方式,类型可以自己指定

from pyspark.sql import SparkSession #sparkSession为同统一入口
from pyspark.sql.types import *

#创建spakr对象
spark = SparkSession\
    .builder\
    .appName('readfile')\
    .getOrCreate()

employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), (5, "Kevin", 26), 
             (6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), (9, "Larry", 29), (10, "Kimberly", 29),
             (11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]
schema = StructType([StructField('emp_id',IntegerType(),True),
                    StructField('name',StringType(),True),
                    StructField('age',IntegerType(),True)])
df = spark.createDataFrame(employees,schema=schema)

StructType:即指定一个列类型的对象,里面包含列类型数组
StructField:指定每一个列,第一个参数为列名,第二个参数为列数据类型,从pyspark.sql.types里的数据类型引入
第三个参数为是否可以为空

1.3 从RDD创建

从rdd创建可以有如下两种方式:

from pyspark.sql import SparkSession #sparkSession为同统一入口
from pyspark.sql.types import *

#创建spakr对象
spark = SparkSession\
    .builder\
    .appName('byRdd')\
    .getOrCreate()

# 1 第一种方式利用指定列类型来创建
create_rdd = spark.sparkContext.parallelize([
    (1, "John", 25),
    (2, "Ray", 35),
    (3,"Mike", 24),
])

schema = StructType([StructField('emp_id',IntegerType(),True),
                    StructField('name',StringType(),True),
                    StructField('age',IntegerType(),True)])

df_rdd = spark.createDataFrame(create_rdd,schema)
df_rdd.show()

# 2 通过rdd转dataframe来创建(这种方式不建议采用,类型推断很容易出错)
create_rdd = spark.sparkContext.parallelize([
    (1, "John", 25),
    (2, "Ray", 35),
    (3,"Mike", 24),
])
df_rdd = create_rdd.toDF()
df_rdd.show()

2.dataframe操作

2.1 阅读数据操作

# 返回df的列名与数据类型
df.dtypes 
image.png
# 显示前20行df的内容
df.show()
image.png
# 返回前n行内容
df.head(2)
image.png
# 返回第一行数据
df.first()
image.png
# 返回df的列名
df.columns
image.png
# 返回df的行数
df.count()
image.png
# 返回df的schema
df.printSchema()
image.png
# 显示汇总统计数据
df.describe().show()
image.png

count:统计数量
mean:均值
stddev:标准差
min:最小值
max:最大值

2.2 过滤数据操作

  1. 单条件过滤
# filter过滤,用df['列名']或者df.列名均可
df1 = df.filter(df['name'] == 'John')
df2 = df.filter(df.name == 'John')
df1.show()
df2.show()
image.png

2 .多条件过滤

df3 = df.filter((df.name == 'John') | (df.name == 'Ray'))
df4 = df.filter((df.name == 'John') & (df.age == 25))

df3.show()
df4.show()
image.png

注意每一个条件用括号括起来,要不然你会收获这样的错误 -_-


image.png

2.3 查询数据操作

  1. 查询几列数据生成一个新的dataframe
df5 = df.select('emp_id','name')
df5.show()
image.png

2.带where条件查询

df6 = df.select('name').where((df.name == 'John') | (df.name == 'Ray'))
df6.show()
image.png

3.带when条件查询

from pyspark.sql import functions as F
#注意导入pyspark.sql的functions
df7 = df.select('name',F.when(df.age > 30,'壮年').otherwise('青年'))
df7.show()
image.png

4.模糊查询操作

df8 = df.select('name').where("name like 'M%'")
df9 = df.filter("name like 'M%'")
df8.show()
df9.show()
image.png

5.between操作

df10 = df.select('name','age').where('age between 25 and 30')
df11 = df.filter('age between 25 and 30')
df10.show()
df11.show()
image.png

2.4 列操作

  1. 修改列名
df12 = df.withColumnRenamed('emp_id','id')
df12.show()
image.png

image.png
  1. 一列数据统一操作
df13 = df.withColumn('age',df.age-10)
df13.show()
#这里的df.age-10操作可以支持各种数据类型的原生方法操作
#如:df.age*3,对name列截取字符sub
df14 = df.withColumn('name',df.name.substr(1,2))
d14.show()
image.png

image.png

3.生成新的列

df15 = df.withColumn('newColumn',F.lit('new'))
df15.show()

注意第二个参数必须为col也就是列对象
一般数据一样就用functions里面的lit方法生成一个,lit里可以是数字或字符串
建议采用别的列来生成新列或者用udf方式生成(下面介绍)

4.删除列

#生成一个新列
df15 = df.withColumn('newColumn',F.lit('new'))
#删除列
df16 = df.drop('newColumn')
df15.show()
df16.show()
image.png

image.png

2.5 分组及排序操作

1.分组统计

#根据年龄分组统计个数
# 类似于sql中select count(*) from df groupby age
df18 = df.groupBy('age').count()
df18.show()
image.png

2.排序操作

#按照年龄从大到小排序
# ascending:False正序排序,True倒序排序
df19 = df.orderBy(df.age,ascending=False)
df20 = df.sort(df.age,ascending=False)
df19.show()
df20.show()

#先按照年龄排序,年龄相同按照emp_id排序
df21 = df.orderBy([df.age,df.emp_id],ascending=False)
df21.show()
image.png

2.6 数据集合并操作

  1. union和unionAll(返回两个数据集的并集,列名不同不影响合并)
from pyspark.sql import SparkSession #sparkSession为同统一入口
from pyspark.sql.types import *
from pyspark.sql import functions as F

#创建spakr对象
spark = SparkSession\
    .builder\
    .appName('byRdd')\
    .getOrCreate()

employees = [(1, "John", 25), (2, "Ray", 35), (3,"Mike", 24), (4, "Jane", 28), (5, "Kevin", 26), 
             (6, "Vincent", 35), (7,"James", 38), (8, "Shane", 32), (9, "Larry", 29), (10, "Kimberly", 29),
             (11, "Alex", 28), (12, "Garry", 25), (13, "Max",31)]

employees1 = [(1, "小明", 15), (2, "小红", 14), (3,"小花", 13)]
schema = StructType([StructField('emp_id',IntegerType(),True),
                    StructField('name',StringType(),True),
                    StructField('age',IntegerType(),True)])

employees2 = [(1, "小明", 15), (2, "小红", 14), (3,"小花", 13),(4, "Jane", 28), (5, "Kevin", 26)]
schema = StructType([StructField('id',IntegerType(),True),
                    StructField('short_name',StringType(),True),
                    StructField('old',IntegerType(),True)])

df = spark.createDataFrame(employees,schema=schema)
col_df = spark.createDataFrame(employees2,schema=schema)

df24 = df.union(col_df)
df25 = df.unionAll(col_df)
df24.show()
df25.show()
image.png

2 .Intersect(求两个数据集的交集)

df26 = df.intersect(col_df)
df26.show()
image.png

3.minus(求两个数据集的差集)

# df对col_df求差集
df27 = df.subtract(col_df)
df27.show()
# col_df对df求差集
df28 = col_df.subtract(df)
df28.show()
image.png

2.7 数据集连接操作

join操作类似于sql中的表连接查询

#第一个参数为连接的dataframe
#第二个参数为连接条件,可以为多个
#第三个参数为连接规则
#支持的连接规则有以下
#'inner', 'outer', 'full', 'fullouter', 'full_outer', 'leftouter', 'left', 
#'left_outer', 'rightouter', 'right', 'right_outer',
#'leftsemi', 'left_semi', 'semi', 'leftanti', 'left_anti', 'anti', 'cross'
df29 = df.join(col_df,[df.emp_id == col_df.id],'left')
df29.show()
image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351