简介
DataFrame 结构代表的是数据的一个不可变分布式集合,其数据都被组织到有名字的列中,就像关系型数据库中的表一样。DataFrame 的目的就是要让对大型数据集的处理变得更简单,它让开发者可以为分布式的数据集指定一个模式,进行更高层次的抽象。
本文将着重介绍PySpark中DataFrame的各种创建方式,以及与RDD、Pandas之间的转换。
DataFrame的创建
1. 从RDD中创建
为了从存在的RDD结构中创建出DataFrame,我们先定义一些测试数据,如下:
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")] #list中的每一个元素都是元祖
接着先创建一个SparkSession,并通过调用SparkContext的parallelize()函数构造出一个RDD对象,代码如下:
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data)
1.1 使用toDF()函数
RDD的toDF()方法是用来从一个存在的RDD结构中创建一个DataFrame对象,因为RDD是一个分布式的 Java对象的集合,故它没有包含列的信息,因此DataFrame采用的是默认的列。上面列举的测试数据一共有2列,分别用"_1"和“_2”来表示。
dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
printSchema函数用于输出DataFrame的结构,即包含了哪些列,以及每一列的名称和类型等等,输出如下:如果想给DataFrame中的每一列指定名称的话,我们可以在toDF函数中传入列的名称,如下:
columns = ["language","users_count"]
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()
这样输出DataFrame的结构信息的时候,就会包含列名称以及类型了,如下:1.2 使用SparkSession中的createDataFrame()函数
我们可以直接使用createDataFrame函数来在一个原始list数据上创建一个DataFrame,并且叠加上toDF()操作,为每一列指定名称,代码如下:
dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
dfFromRDD2.printSchema()
输出与上图是一样的。
2. 从list对象中创建
2.1 使用createDataFrame函数并且指定行类型来创建
先将list中的每个元素都转换成一个PySpark中的row对象,接着使用createDataFrame函数来创建DataFram,代码如下:
rowData = map(lambda x: Row(*x), data)
dfFromData3 = spark.createDataFrame(rowData, columns)
dfFromData3.printSchema()
dfFromData3.show()
2.2 创建DataFrame时指定格式
如果在创建DataFrame的时候,同时想指定每一列的名称以及对应的类型,我们可以先创建一个StructType结构,然后再调用createDataFrame传入。
StructType会在下面的内容中讲述,这里就简单理解它指定了这两列的名称和类型,以及每个字段是否能为空。
schema = StructType([ \
StructField("language",StringType(),True), \
StructField("user_count",StringType(),True),
])
df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show(truncate=False)
3. 从数据源文件中创建
大部分情况下,我们都是从CSV,文本,JSON,XML等数据源文件中实时创建DataFrame。PySpark默认就支持许多数据格式,因此并不需要再单独导入其他库,我们可以从DataFrameReader类中选择合适的方法来创建DataFrame。
3.1 从CSV文件中创建DataFrame
使用csv()方法从CSV文件中读取并创建一个DataFrame对象。(这里采用的是MovieLens数据集中的用户评分文件)。
df2 = spark.read.csv("/Downloads/ml-latest-small/ratings.csv")
df2.printSchema()
df2.show(truncate=False)
输出如下:同理,也可以使用text(),json()等方法来读取TXT、Json等文件。
4. 创建带格式的空的DataFrame
有的时候我们并不是直接打开文件来进行处理,而是从网络或者其他地方获取到数据流,那此时创建一个空的DataFrame就很有必要。
一般有两种方式来创建空的DataFrame:
- 通过空的RDD结构来创建
schema = StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])
df = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)
df.printSchema()
- 种是通过空的list来创建
df1 = spark.sparkContext.parallelize([]).toDF(schema)
df1.printSchema()
df2 = spark.createDataFrame([], schema)
df2.printSchema()
输出均为:DataFrame与Pandas、RDD的转换
RDD转DataFrame
这个上文已经提及了,使用toDF()函数便可以完成。
dept = [("Finance",10),
("Marketing",20),
("Sales",30),
("IT",40)
]
rdd = spark.sparkContext.parallelize(dept)
deptColumns = ["dept_name","dept_id"]
df = rdd.toDF(deptColumns)
df.printSchema()
df.show(truncate=False)
DataFrame转RDD
最简单的可以直接使用rdd函数:
rdd1 = df.rdd
或者使用:
rdd2 = df.rdd.map(tuple)
DataFrame转Pandas
PySpark中的DataFrame可以通过toPandas()函数转换成Python的Pandas DataFrame结构。这两者的主要区别是,pandas的操作都是在单个结点上执行的,而PySpark运行在多台机器上,因此在处理大量数据时,PySpark会比Pandas快数倍以上。
df.show()
pandas = df.toPandas()
pandas
结果如下:注意,Pandas给数据添加了序号。
使用StructType和StructField来指定DataFrame的结构
在上面的例子中,其实我们已经使用过StructType和StructField了,这里再详细介绍一下。PySpark中的StructType和StructField是用来指定DataFrame的结构,并且可以用来创建一些复杂的列项,比如嵌套的结构体、数组等。 StructType是一系列StructField’s的集合,而StructField定义了列的名称,数据类型,以及通过布尔值来指定字段是否可以为空以及元数据等。
下面用一个例子来演示一下如何使用StructType和StructField来创建一个DataFrame。
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
spark = SparkSession.builder.master("local[1]") \
.appName('SparkByExamples.com') \
.getOrCreate()
data = [("James","","Smith","36636","M",3000),
("Michael","Rose","","40288","M",4000),
("Robert","","Williams","42114","M",4000),
("Maria","Anne","Jones","39192","F",4000),
("Jen","Mary","Brown","","F",-1)
]
schema = StructType([ \
StructField("firstname",StringType(),True), \
StructField("middlename",StringType(),True), \
StructField("lastname",StringType(),True), \
StructField("id", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("salary", IntegerType(), True) \
])
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
输出:定义嵌套结构
对于上面的DataFrame,其实我们很容易发现它有一些不合理的地方。比如前三列都是在表示名称,它们同属与一个名叫“name”的列才算是比较合理的。因此,我们可以重新定义一下结构,将"firstname"、“middlename”、“lastname”这三个字段合并为一个"name"字段,代码如下:
structureData = [
(("James","","Smith"),"36636","M",3100),
(("Michael","Rose",""),"40288","M",4300),
(("Robert","","Williams"),"42114","M",1400),
(("Maria","Anne","Jones"),"39192","F",5500),
(("Jen","Mary","Brown"),"","F",-1)
]
structureSchema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('id', StringType(), True),
StructField('gender', StringType(), True),
StructField('salary', IntegerType(), True)
])
df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)
输出:这里要注意一下,如果修改了StructType的结构,那么原始的list中也需要做相应的修改。