时空数据处理与组织-Spark SQL

  1. Spark SQL基本操作
    将下列JSON格式数据复制到系统中,并保存命名为employee.json。
{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }

为employee.json创建DataFrame,并写出Python语句完成下列操作:
(1) 查询所有数据;
(2) 查询所有数据,并去除重复的数据;
(3) 查询所有数据,打印时去除id字段;
(4) 筛选出age>30的记录;
(5) 将数据按age分组;
(6) 将数据按name升序排列;
(7) 取出前3行数据;
(8) 查询所有记录的name列,并为其取别名为username;
(9) 查询年龄age的平均值;
(10) 查询年龄age的最小值。

  1. 编程实现将RDD转换为DataFrame
    源文件内容如下(包含id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29

请将数据复制保存到系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。

  1. 编程实现利用DataFrame读写MySQL的数据
    (1)在MySQL数据库中新建数据库testspark,再创建表employee,包含如下表所示的两行数据。
表1 employee表原有数据
id  name    gender  Age
1   Alice   F   22
2   John    M   25

(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如下表所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

表2employee表新增数据
id  name    gender  age
3   Mary    F   26
4   Tom M   23

代码:
头文件

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, Row
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()

任务1

#读入json文件
    df = spark.read.json("C:\\Users\\98275\\Desktop\\时空实习\\任务\\employee.json")
    # 查询全部
    print("查询全部")
    df.select("*").show()
    # 去重复查询
    print("去重复查询")
    df=df.dropDuplicates()
    df.show()
    # 去除id字段
    print("去除id字段")
    df.drop("id").show()
    # age>30的记录
    print("age>30的记录")
    df.filter(df["age"] > 30).show()
    # 按age分组
    print("按age分组")
    df.groupBy("age").count().show()
    # 按name升序排列
    print("按name升序排列")
    df.sort(df["name"].asc()).show()
    # 前3行数据
    print("前3行数据")
    print(df.take(3))
    # 查询所有记录的name列,并为其取别名为username
    print("查询所有记录的name列,并为其取别名为username")
    df.select(df["name"].alias("username")).show()
    # 年龄age的平均值
    print("年龄age的平均值")
    df.agg({'age': 'avg'}).show()
    # 年龄age的最小值
    print("年龄age的最小值")
    df.agg({'age': 'min'}).show()
    return 0

任务2

people = spark.sparkContext. \
        textFile(r"C:\Users\98275\Desktop\时空实习\任务\employee.txt"). \
        map(lambda line: line.split(",")). \
        map(lambda p: Row(id=p[0], name=p[1], age=p[2]))
    schemaPeople = spark.createDataFrame(people)
    schemaPeople.createOrReplaceTempView("people")
    personsDF = spark.sql("select * from people")
    # personsDF.show(
    personsRDD = personsDF.rdd.map(lambda p: "id: " + p.id + "," + "name:" + p.name + "age: " + str(p.age))
    print("DataFrame的所有数据")
    def f(v): print(v)
    personsRDD.foreach(f)
    return 0

任务3

# 下面设置模式信息
    schema = StructType([StructField("id", IntegerType(), True), \
                         StructField("name", StringType(), True), \
                         StructField("gender", StringType(), True), \
                         StructField("age", IntegerType(), True)])
    # 下面设置两条数据,表示两个学生的信息
    studentRDD = spark \
        .sparkContext \
        .parallelize(["3 Mary F 26", "4 Tom M 23"]) \
        .map(lambda x: x.split(" "))
    rowRDD = studentRDD. \
        map(lambda p: Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
    studentDF = spark.createDataFrame(rowRDD, schema)
    prop = {}
    prop['user'] = 'root'
    prop['password'] = ''
    prop['driver'] = "com.mysql.jdbc.Driver"
    #studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark", 'student', 'append', prop)
    #连接数据库
    jdbcDF = spark.read \
        .format("jdbc") \
        .option("driver", "com.mysql.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost:3306/spark") \
        .option("dbtable", "student") \
        .option("user", "root") \
        .option("password", "") \
        .load()
    print("最大age")
    jdbcDF.agg({'age': 'max'}).show()
    print("age求和")
    jdbcDF.agg({'age': 'sum'}).show()
    return 0

在主函数中依次调用这三个函数即可

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容