PySpark笔记(三):DataFrame

DataFrame是在Spark 1.3中正式引入的一种以RDD为基础的不可变的分布式数据集,类似于传统数据库的二维表格,数据在其中以列的形式被组织存储。如果熟悉Pandas,其与Pandas DataFrame是非常类似的东西。

DataFrame API受到R和Python(Pandas)中的数据框架的启发,但是从底层开始设计以支持现代大数据和数据科学应用程序。作为现有RDD API的扩展,DataFrame具有以下功能:

  • 能够从单台笔记本电脑上的千字节数据扩展到大型群集上的PB级数据
  • 支持各种数据格式和存储系统
  • 通过Spark SQL Catalyst优化器实现最先进的优化和代码生成
  • 通过Spark无缝集成所有大数据工具和基础架构
  • Python,Java,Scala和R的API(通过SparkR开发)
  • 对于熟悉其他编程语言数据框架的新用户,此API应该让他们感到宾至如归。对于现有的Spark用户,此扩展API将使Spark更易于编程,同时通过智能优化和代码生成来提高性能。

通过DataFrame与Catalyst优化器,现有的Spark程序迁移到DataFrame时性能得到改善。由于优化器生成用于执行的JVM字节码,因此Python用户将体验到与Scala和Java用户相同的高性能。


performance.png

创建DataFrame

Spark中有两种方式可以将数据从RDD转化为DataFrame:反射推断或者编程指定。反射推断是Spark应用程序自动识列的类型,然后通过Spark SQL将行对象的RDD转换为DataFrame。编程指定则是在运行之前,人工从Spark SQL中引入数据类型分配给不同的列。

使用数据结构:

data

普通读取csv为DataFrames数据。

# 读取csv为DataFrame
traffic = spark.read.csv('E:\Documents\Desktop\data.csv', header='true')
# 创建临时表
traffic.createOrReplaceTempView("traffic")
# 显示前10行
traffic.show(10)
show

打印表结构,可以看出Spark自动将所有列推断为string,这不是我们想要的类型。

traffic.printSchema()
schema

通过pandas辅助读取csv。

import pandas as pd 

df = pd.read_csv('E:\Documents\Desktop\data.csv') 
traffic = spark.createDataFrame(df)
traffic.createOrReplaceTempView("traffic")
traffic.printSchema()
schema

反射推断

traffic = spark.read.csv('E:\Documents\Desktop\data.csv', header='true', inferSchema='true')
traffic.createOrReplaceTempView("traffic")
traffic.show(10)
traffic.printSchema()

inferSchema属性用来指示是否使用自动推断,默认为False。

schema

编程指定

尽管自动推断比较方便,如果启用了inferSchema,则函数将数据全部读入以确定输入模式。要避免遍历整个数据一次,应该使用模式明确指定模式。

StructField(field, data_type=None, nullable=True, metadata=None)

  • field – Either the name of the field or a StructField object
  • data_type – If present, the DataType of the StructField to create
  • nullable – Whether the field to add should be nullable (default True)
  • metadata – Any additional metadata (default None)
from pyspark.sql.types import *

# 指定DataFrame每个列的模式
schema = StructType([
... StructField("detectorid", IntegerType()),
... StructField("starttime",StringType()),
... StructField("volume", IntegerType()),
... StructField("speed", FloatType()),
... StructField("occupancy", FloatType())])

# 使用指定模式读入
traffic = spark.read.csv('E:\Documents\Desktop\data.csv', header='true', schema=schema)
traffic.createOrReplaceTempView("traffic")
traffic.show(10)
traffic.printSchema()
schema

DataFrame查询

常用API

select()
投影一组表达式并返回一个新的DataFrame。
参数:cols - 列名称(字符串)或表达式(列)的列表。 如果其中一个列名是'*',则该列将展开以包含当前DataFrame中的所有列。

>>> traffic.select("speed").show(5)
+-----+
|speed|
+-----+
|56.52|
|53.54|
|54.64|
|54.94|
|51.65|
+-----+
only showing top 5 rows

filter()
使用给定的条件过滤行。where()是filter()的别名。
参数:condition - 类型的一列.BooleanType或一个SQL表达式的字符串。

>>> traffic.filter(traffic.speed > 50).show(5)
+----------+--------------+------+-----+---------+
|detectorid|     starttime|volume|speed|occupancy|
+----------+--------------+------+-----+---------+
|    100625|2015/12/1 0:00|    48|56.52|     1.29|
|    100625|2015/12/1 0:15|    50|53.54|     1.48|
|    100625|2015/12/1 0:30|    25|54.64|     0.62|
|    100625|2015/12/1 0:45|    34|54.94|     0.85|
|    100625|2015/12/1 1:00|    23|51.65|      0.6|
+----------+--------------+------+-----+---------+
only showing top 5 rows
>>> traffic.where(traffic.volume > 50).show(5)
+----------+--------------+------+-----+---------+
|detectorid|     starttime|volume|speed|occupancy|
+----------+--------------+------+-----+---------+
|    100625|2015/12/1 3:45|    61|57.62|     1.65|
|    100625|2015/12/1 4:00|    69| 56.7|     1.89|
|    100625|2015/12/1 4:15|    94|56.53|     2.69|
|    100625|2015/12/1 4:30|    87|55.53|     2.58|
|    100625|2015/12/1 4:45|   161|55.51|     4.62|
+----------+--------------+------+-----+---------+
only showing top 5 rows

drop()
返回删除指定列的新DataFrame。
参数:cols - 要删除的列的字符串名称,要删除的列或要删除的列的字符串名称的列表。

>>> traffic.drop("speed").show(5)
+----------+--------------+------+---------+
|detectorid|     starttime|volume|occupancy|
+----------+--------------+------+---------+
|    100625|2015/12/1 0:00|    48|     1.29|
|    100625|2015/12/1 0:15|    50|     1.48|
|    100625|2015/12/1 0:30|    25|     0.62|
|    100625|2015/12/1 0:45|    34|     0.85|
|    100625|2015/12/1 1:00|    23|      0.6|
+----------+--------------+------+---------+
only showing top 5 rows

cache()
使用默认存储级别(MEMORY_AND_DISK)持久保存DataFrame。

traffic.cache()

collect()
以Row列表形式返回所有记录。

traffic.collect()

show()
将前n行打印到控制台。
参数:
n - 要显示的行数。
truncate - 如果设置为True,则默认截断超过20个字符的字符串。 如果设置为大于1的数字,则截断长字符串以截断长度并将其右对齐。

>>> traffic.show(5)
+----------+--------------+------+-----+---------+
|detectorid|     starttime|volume|speed|occupancy|
+----------+--------------+------+-----+---------+
|    100625|2015/12/1 0:00|    48|56.52|     1.29|
|    100625|2015/12/1 0:15|    50|53.54|     1.48|
|    100625|2015/12/1 0:30|    25|54.64|     0.62|
|    100625|2015/12/1 0:45|    34|54.94|     0.85|
|    100625|2015/12/1 1:00|    23|51.65|      0.6|
+----------+--------------+------+-----+---------+
only showing top 5 rows

count()
返回此DataFrame中的行数。

>>> traffic.count()
17814

columns
以列表形式返回所有列名称。

>>> traffic.columns
['detectorid', 'starttime', 'volume', 'speed', 'occupancy']

dtypes
将所有列名称及其数据类型作为列表返回。

>>> traffic.dtypes
[('detectorid', 'int'), ('starttime', 'string'), ('volume', 'int'), ('speed', 'double'), ('occupancy', 'double')]

fillna()
替换的空值,别名na.fill()

参数:
value - int,long,float,string或dict。 用来替换空值的值。 如果值是字典,则子集将被忽略,并且值必须是从列名(字符串)到替换值的映射。 替换值必须是int,long,float,boolean或string。
子集 - 要考虑的列名称的可选列表。 子集中指定的不具有匹配数据类型的列将被忽略。 例如,如果value是一个字符串,并且子集包含一个非字符串列,则非字符串列将被忽略。

>>> traffic.na.fill(10)
>>> traffic.na.fill({'volume': 0, 'speed': '0'})

corr()
以双精度值计算DataFrame的两列的相关性。 目前只支持Pearson Correlation Coefficient。 DataFrame.corr()和DataFrameStatFunctions.corr()是彼此的别名。

参数:
col1 - 第一列的名称
col2 - 第二列的名称
方法 - 相关方法。 目前只支持“Pearson”

>>> traffic.corr("volume", "speed")
-0.588695158526705

cov()
计算给定列的样本协方差(由它们的名称指定)作为双精度值。 DataFrame.cov()和DataFrameStatFunctions.cov()是别名。

参数:
col1 - 第一列的名称
col2 - 第二列的名称

>>> traffic.cov("volume", "speed")
-1166.285227777989

describe()
计算数字和字符串列的统计信息。
这包括count,mean,stddev,min和max。 如果未给出列,则此函数将计算所有数字或字符串列的统计信息。

>>> df.describe().show()
+-------+--------------+--------------+------------------+------------------+------------------+
|summary|    detectorid|     starttime|            volume|             speed|         occupancy|
+-------+--------------+--------------+------------------+------------------+------------------+
|  count|         17814|         17814|             17814|             17737|             17814|
|   mean|      100627.5|          null|208.72779836083978| 45.94760105993146|13.775621421354007|
| stddev|1.707873064514|          null|  129.673023730382|15.010086497913619|13.391984211880049|
|    min|        100625|2015/12/1 0:00|                 0|              1.14|               0.0|
|    max|        100630|2015/12/9 9:45|               528|             69.33|             73.25|
+-------+--------------+--------------+------------------+------------------+------------------+
>>> traffic.describe(['speed']).show()
+-------+------------------+
|summary|             speed|
+-------+------------------+
|  count|             17737|
|   mean| 45.94760105993146|
| stddev|15.010086497913619|
|    min|              1.14|
|    max|             69.33|
+-------+------------------+

distinct()
返回包含此DataFrame中不同行的新DataFrame。

>>> traffic.distinct().count()
17814

createOrReplaceGlobalTempView()
使用给定名称创建或替换全局临时视图。
此临时视图的生命周期与此Spark应用程序相关联。

>>> traffic.createOrReplaceGlobalTempView("traffic")
>>> df = spark.sql("select * from traffic")
>>> df.count()
17814

createOrReplaceTempView()
使用此DataFrame创建或替换本地临时视图。
此临时表的生命周期与用于创建此DataFrame的SparkSession相关联。

>>> traffic.createOrReplaceTempView("traffic")
>>> df = spark.sql("select * from traffic")
>>> df.count()
17814

使用SQL查询

由于创建了临时表,我们可以对临时表执行sql操作。

>>> spark.sql("select * from traffic where volume > 50 and speed > 50").show()
+----------+---------------+------+-----+---------+
|detectorid|      starttime|volume|speed|occupancy|
+----------+---------------+------+-----+---------+
|    100625| 2015/12/1 3:45|    61|57.62|     1.65|
|    100625| 2015/12/1 4:00|    69| 56.7|     1.89|
|    100625| 2015/12/1 4:15|    94|56.53|     2.69|
|    100625| 2015/12/1 4:30|    87|55.53|     2.58|
|    100625| 2015/12/1 4:45|   161|55.51|     4.62|
|    100625| 2015/12/1 5:00|   203|55.41|     5.96|
|    100625| 2015/12/1 5:15|   185|55.14|     6.61|
|    100625| 2015/12/1 5:30|   308|52.39|     9.87|
|    100625| 2015/12/1 5:45|   343|51.01|    11.49|
|    100625|2015/12/1 10:15|   306| 50.6|    11.98|
|    100625|2015/12/1 10:30|   334|51.42|    11.53|
|    100625|2015/12/1 10:45|   349|52.67|    11.51|
|    100625|2015/12/1 11:00|   262|52.36|    10.54|
|    100625|2015/12/1 12:00|   255|52.47|     9.36|
|    100625|2015/12/1 12:15|   346|50.25|    13.44|
|    100625|2015/12/1 12:30|   367| 51.2|    12.47|
|    100625|2015/12/1 12:45|   330|52.78|    11.56|
|    100625|2015/12/1 13:00|   306|52.36|    12.01|
|    100625|2015/12/1 13:30|   371|50.28|    13.93|
|    100625|2015/12/1 13:45|   294|50.62|    12.92|
+----------+---------------+------+-----+---------+
only showing top 20 rows

Dataset

除了DataFrame,Spark 1.6中还引入了Dataset API,其提供了一种类型安全的面向对象的编程接口,但是其只能在Java与Scala中使用。Python不能使用该API的原因是因为其本身不是一种类型安全的语言。在Spark 2.0中DataFrame API被整合入如Dataset API,DataFrame是Dataset未类型化API的一个别名。

未类型化的API:DataFrame = Dataset[Row]
类型化的API:Dataset[T]
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容