PySpark之DataFrame的创建与转换

简介

DataFrame 结构代表的是数据的一个不可变分布式集合,其数据都被组织到有名字的列中,就像关系型数据库中的表一样。DataFrame 的目的就是要让对大型数据集的处理变得更简单,它让开发者可以为分布式的数据集指定一个模式,进行更高层次的抽象。
本文将着重介绍PySpark中DataFrame的各种创建方式,以及与RDD、Pandas之间的转换。


DataFrame的创建

1. 从RDD中创建

为了从存在的RDD结构中创建出DataFrame,我们先定义一些测试数据,如下:

data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")] #list中的每一个元素都是元祖

接着先创建一个SparkSession,并通过调用SparkContext的parallelize()函数构造出一个RDD对象,代码如下:

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data)

1.1 使用toDF()函数

RDD的toDF()方法是用来从一个存在的RDD结构中创建一个DataFrame对象,因为RDD是一个分布式的 Java对象的集合,故它没有包含列的信息,因此DataFrame采用的是默认的列。上面列举的测试数据一共有2列,分别用"_1"和“_2”来表示。

dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()

printSchema函数用于输出DataFrame的结构,即包含了哪些列,以及每一列的名称和类型等等,输出如下:

如果想给DataFrame中的每一列指定名称的话,我们可以在toDF函数中传入列的名称,如下:

columns = ["language","users_count"]
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()

这样输出DataFrame的结构信息的时候,就会包含列名称以及类型了,如下:

1.2 使用SparkSession中的createDataFrame()函数

我们可以直接使用createDataFrame函数来在一个原始list数据上创建一个DataFrame,并且叠加上toDF()操作,为每一列指定名称,代码如下:

dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
dfFromRDD2.printSchema()

输出与上图是一样的。

2. 从list对象中创建

2.1 使用createDataFrame函数并且指定行类型来创建

先将list中的每个元素都转换成一个PySpark中的row对象,接着使用createDataFrame函数来创建DataFram,代码如下:

rowData = map(lambda x: Row(*x), data)
dfFromData3 = spark.createDataFrame(rowData, columns)
dfFromData3.printSchema()
dfFromData3.show()

2.2 创建DataFrame时指定格式

如果在创建DataFrame的时候,同时想指定每一列的名称以及对应的类型,我们可以先创建一个StructType结构,然后再调用createDataFrame传入。

StructType会在下面的内容中讲述,这里就简单理解它指定了这两列的名称和类型,以及每个字段是否能为空。

schema = StructType([ \
    StructField("language",StringType(),True), \
    StructField("user_count",StringType(),True),
  ])

df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show(truncate=False)

3. 从数据源文件中创建

大部分情况下,我们都是从CSV,文本,JSON,XML等数据源文件中实时创建DataFrame。PySpark默认就支持许多数据格式,因此并不需要再单独导入其他库,我们可以从DataFrameReader类中选择合适的方法来创建DataFrame。

3.1 从CSV文件中创建DataFrame

使用csv()方法从CSV文件中读取并创建一个DataFrame对象。(这里采用的是MovieLens数据集中的用户评分文件)。

df2 = spark.read.csv("/Downloads/ml-latest-small/ratings.csv")
df2.printSchema()
df2.show(truncate=False)

输出如下:

同理,也可以使用text(),json()等方法来读取TXT、Json等文件。

4. 创建带格式的空的DataFrame

有的时候我们并不是直接打开文件来进行处理,而是从网络或者其他地方获取到数据流,那此时创建一个空的DataFrame就很有必要。
一般有两种方式来创建空的DataFrame:

  • 通过空的RDD结构来创建
schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])

df = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)
df.printSchema()
  • 种是通过空的list来创建
df1 = spark.sparkContext.parallelize([]).toDF(schema)
df1.printSchema()

df2 = spark.createDataFrame([], schema)
df2.printSchema()

输出均为:

DataFrame与Pandas、RDD的转换

RDD转DataFrame

这个上文已经提及了,使用toDF()函数便可以完成。

dept = [("Finance",10), 
        ("Marketing",20), 
        ("Sales",30), 
        ("IT",40) 
      ]
rdd = spark.sparkContext.parallelize(dept)
deptColumns = ["dept_name","dept_id"]
df = rdd.toDF(deptColumns)
df.printSchema()
df.show(truncate=False)

DataFrame转RDD

最简单的可以直接使用rdd函数:

rdd1 = df.rdd

或者使用:

rdd2 = df.rdd.map(tuple)

DataFrame转Pandas

PySpark中的DataFrame可以通过toPandas()函数转换成Python的Pandas DataFrame结构。这两者的主要区别是,pandas的操作都是在单个结点上执行的,而PySpark运行在多台机器上,因此在处理大量数据时,PySpark会比Pandas快数倍以上。

df.show()
pandas = df.toPandas()
pandas

结果如下:

注意,Pandas给数据添加了序号。


使用StructType和StructField来指定DataFrame的结构

在上面的例子中,其实我们已经使用过StructType和StructField了,这里再详细介绍一下。PySpark中的StructType和StructField是用来指定DataFrame的结构,并且可以用来创建一些复杂的列项,比如嵌套的结构体、数组等。 StructType是一系列StructField’s的集合,而StructField定义了列的名称,数据类型,以及通过布尔值来指定字段是否可以为空以及元数据等。
下面用一个例子来演示一下如何使用StructType和StructField来创建一个DataFrame。

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

输出:

定义嵌套结构

对于上面的DataFrame,其实我们很容易发现它有一些不合理的地方。比如前三列都是在表示名称,它们同属与一个名叫“name”的列才算是比较合理的。因此,我们可以重新定义一下结构,将"firstname"、“middlename”、“lastname”这三个字段合并为一个"name"字段,代码如下:

structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)

输出:

这里要注意一下,如果修改了StructType的结构,那么原始的list中也需要做相应的修改。

参考

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容