DataFrame

DataFrame是一种不可变的分布式数据集,这种数据集被组织成指定的列,类似于关系数据库中的表。

通过在分布式数据集中施加结构,让Spark用户利用Spark SQL来查询结构化的数据或使用Spark表达式方法(而不是lambda)

利用DataFrame加速PySpark

DataFrame和Catalyst优化器的意义是在和非优化的RDD查询比较时增加PySpark查询的性能。引入DataFrame之前,Python查询速度普遍比使用RDD的Scala查询慢。这种查询性能的降低源于Python和JVM之间的通信开销。

创建DataFrame

生成json数据 RDD


import findspark

findspark.init()

from pyspark import SparkContext, SparkConf

from pyspark.sql.session import SparkSession

from pyspark.sql.types import StructField, StructType, StringType, IntegerType

conf = SparkConf().setAppName("wordcount")

sc =SparkContext(conf=conf)

spark = SparkSession.builder.master("local").appName("SparkOnHive").enableHiveSupport().getOrCreate()

people_json = [{"name":"Michael","age":22},{"name":"Andy", "age":30},{"name":"Justin", "age":19}]

schema = StructType([

        StructField("name", StringType(), True),

        StructField("age", IntegerType(), True)

    ])

stringJSONRDD = sc.parallelize(people_json)

people_dataframe = spark.createDataFrame(stringJSONRDD,schema)

people_dataframe.createOrReplaceTempView("table1")

print(spark.sql("select * from table1").collect())

输出:[Row(name='Michael', age=None), Row(name='Andy', age=30), Row(name='Justin', age=19)]

RDD的交互操作

有两种从RDD变换到DataFrame的不同方法:使用反射推断模式或以编程方式指定模式。

上面使用的就是编程方式指定模式

利用DataFrame API查询

people_dataframe.count()

返回DataFrame中的行数

运行筛选语句

people_dataframe.select("name","age").filter("age=19").show()

输出如下:

+------+---+

|  name|age|

+------+---+

|Justin| 19|

+------+---+

利用SQL查询

我们执行了.createOrReplaceTempView方法,可以使用SQL查询。

行数

spark.sql("select count(1) from table1").show()

利用where子句运行筛选语句

spark.sql("select name,age from table1 where age=19").show()

spark.sql("select name,age from table1 where name like 'a%'").show()

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容