- Spark SQL基本操作
将下列JSON格式数据复制到系统中,并保存命名为employee.json。
{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }
为employee.json创建DataFrame,并写出Python语句完成下列操作:
(1) 查询所有数据;
(2) 查询所有数据,并去除重复的数据;
(3) 查询所有数据,打印时去除id字段;
(4) 筛选出age>30的记录;
(5) 将数据按age分组;
(6) 将数据按name升序排列;
(7) 取出前3行数据;
(8) 查询所有记录的name列,并为其取别名为username;
(9) 查询年龄age的平均值;
(10) 查询年龄age的最小值。
- 编程实现将RDD转换为DataFrame
源文件内容如下(包含id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
请将数据复制保存到系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。
- 编程实现利用DataFrame读写MySQL的数据
(1)在MySQL数据库中新建数据库testspark,再创建表employee,包含如下表所示的两行数据。
表1 employee表原有数据
id name gender Age
1 Alice F 22
2 John M 25
(2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如下表所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。
表2employee表新增数据
id name gender age
3 Mary F 26
4 Tom M 23
代码:
头文件
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf, Row
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
任务1
#读入json文件
df = spark.read.json("C:\\Users\\98275\\Desktop\\时空实习\\任务\\employee.json")
# 查询全部
print("查询全部")
df.select("*").show()
# 去重复查询
print("去重复查询")
df=df.dropDuplicates()
df.show()
# 去除id字段
print("去除id字段")
df.drop("id").show()
# age>30的记录
print("age>30的记录")
df.filter(df["age"] > 30).show()
# 按age分组
print("按age分组")
df.groupBy("age").count().show()
# 按name升序排列
print("按name升序排列")
df.sort(df["name"].asc()).show()
# 前3行数据
print("前3行数据")
print(df.take(3))
# 查询所有记录的name列,并为其取别名为username
print("查询所有记录的name列,并为其取别名为username")
df.select(df["name"].alias("username")).show()
# 年龄age的平均值
print("年龄age的平均值")
df.agg({'age': 'avg'}).show()
# 年龄age的最小值
print("年龄age的最小值")
df.agg({'age': 'min'}).show()
return 0
任务2
people = spark.sparkContext. \
textFile(r"C:\Users\98275\Desktop\时空实习\任务\employee.txt"). \
map(lambda line: line.split(",")). \
map(lambda p: Row(id=p[0], name=p[1], age=p[2]))
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
personsDF = spark.sql("select * from people")
# personsDF.show(
personsRDD = personsDF.rdd.map(lambda p: "id: " + p.id + "," + "name:" + p.name + "age: " + str(p.age))
print("DataFrame的所有数据")
def f(v): print(v)
personsRDD.foreach(f)
return 0
任务3
# 下面设置模式信息
schema = StructType([StructField("id", IntegerType(), True), \
StructField("name", StringType(), True), \
StructField("gender", StringType(), True), \
StructField("age", IntegerType(), True)])
# 下面设置两条数据,表示两个学生的信息
studentRDD = spark \
.sparkContext \
.parallelize(["3 Mary F 26", "4 Tom M 23"]) \
.map(lambda x: x.split(" "))
rowRDD = studentRDD. \
map(lambda p: Row(int(p[0].strip()), p[1].strip(), p[2].strip(), int(p[3].strip())))
studentDF = spark.createDataFrame(rowRDD, schema)
prop = {}
prop['user'] = 'root'
prop['password'] = ''
prop['driver'] = "com.mysql.jdbc.Driver"
#studentDF.write.jdbc("jdbc:mysql://localhost:3306/spark", 'student', 'append', prop)
#连接数据库
jdbcDF = spark.read \
.format("jdbc") \
.option("driver", "com.mysql.jdbc.Driver") \
.option("url", "jdbc:mysql://localhost:3306/spark") \
.option("dbtable", "student") \
.option("user", "root") \
.option("password", "") \
.load()
print("最大age")
jdbcDF.agg({'age': 'max'}).show()
print("age求和")
jdbcDF.agg({'age': 'sum'}).show()
return 0
在主函数中依次调用这三个函数即可