《时空数据处理和组织课程实习》实验报告
题目: 实验3 Spark SQL
日期:6.11
实验环境:python3.6,windows,wsl2(ubuntu 20.04)
写在前面
完全完整的代码已经附在了文末,因此不将代码作为附件提供。
建议使用简书查看本文,排版更佳本文简书地址
本次涉及到的代码文件是day3_t1.py
、day3_t2.py
、day3_t3.py
、day3_t3_part2.py
,与之对应的readme是day3.md
实习涉及到的全部代码都已储存到了github仓库,建议在线查看我的代码
亦可以使用git clone https://github.com/uiharuayako/geoDataWork.git
实时获取我的最新进展!
所有代码均为本人原创或者来自老师给的资料,多点学习和交流思路,少点复制粘贴,谢谢!
实验内容与完成情况:
程序编程实现了实验内容的所有项目,为了保持代码的规整,我将每个问题的实现写成了不同的文件
其实代码里注释写得很清楚,建议直接看代码,在这里我主要写一下我的思路,以及代码片段的分析
题目1:Spark SQL 基本操作
1. Spark SQL 基本操作
将下列JSON 格式数据复制到Linux 系统中,并保存命名为employee.json。
{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }
为employee.json 创建DataFrame,并写出Python 语句完成下列操作:
(1) 查询所有数据;
(2) 查询所有数据,并去除重复的数据;
(3) 查询所有数据,打印时去除id 字段;
(4) 筛选出age>30 的记录;
(5) 将数据按age 分组;
(6) 将数据按name 升序排列;
(7) 取出前3 行数据;
(8) 查询所有记录的name 列,并为其取别名为username;
(9) 查询年龄age 的平均值;
(10) 查询年龄age 的最小值。
先用linux保存json文件,这里怎么用的linux下文会讲
第一题总体来说还是考察了一个spark sql的基础语句
import findspark
findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
# 创建一个dataframe对象
df_origin = spark.read.json("employee.json")
# (1) 查询所有数据;
print("(1) 查询所有数据")
# 其实本来不用select("*"),既然强调查询,就加上也行
df_origin.select("*").show()
# (2) 查询所有数据,并去除重复的数据;
print("(2) 查询所有数据,并去除重复的数据")
# 逻辑上,年龄,姓名有可能重复,id不能重复
df_unique = df_origin.drop_duplicates(subset=['id'])
df_unique.select("*").show()
# (3) 查询所有数据,打印时去除id 字段;
print("(3) 查询所有数据,打印时去除id 字段")
# 选取所有数据
df_1 = df_origin.select('*')
# 去除id字段
df_1.drop(df_1.id).show()
print("有一种更单纯的方法")
df_origin.select('name', 'age').show()
# (4) 筛选出age>30 的记录;
print("(4) 筛选出age>30 的记录")
df_origin.filter(df_origin['age'] > 30).show()
# (5) 将数据按age 分组;
print("(5) 将数据按age 分组")
df_origin.groupby('age').count().show()
# (6) 将数据按name 升序排列;
print("(6) 将数据按name 升序排列")
df_origin.sort(df_origin['name'].asc()).show()
# (7) 取出前3行数据;
print("(7) 取出前3行数据")
var = df_origin.head(3)
for i in var:
print(i)
# (8) 查询所有记录的name 列,并为其取别名为username;
print("(8) 查询所有记录的name 列,并为其取别名为username")
name = df_origin.select('name')
# 把取别名理解成改名,在c或者java里面,“别名”可能会被理解为浅拷贝,或者说引用,在这里我不知道该怎么理解这个词,思前想后,我将其理解为重命名
name.withColumnRenamed('name', 'username').show()
# (9) 查询年龄age 的平均值;
print("(9) 查询年龄age 的平均值")
df_origin.agg({'age': 'mean'}).show()
# (10) 查询年龄age 的最小值。
print("(10) 查询年龄age 的最小值。")
df_origin.agg({'age': 'min'}).show()
这个spark sql给我的感觉就是...和一般的sql差不多呀。语法差不多,就是在python里调用的时候不能直接写SQL语句。毕竟这个dataframe也没存进硬盘,只是在内存里。
我觉得如果建一个临时表格或许能解决这个问题。
问题以及解决
我觉得代码上写的注释已经比较清除了,就是我做的时候有几个纠结的地方:
1.使用数据问题
最纠结的是在去重之和后面的数据应该用去重前的还是之后的。我思前想后,觉得还是用去重前的比较好,这是因为一个逻辑如果第三问用了第二问的结果,那第四问是不是要用第三问的结果?
但是这样下去就没法做了。于是还是统一用了去重前的数据。
2.别名是什么
第二个疑点是,不理解这个别名是啥意思,我知道,可能在dataframe里有一种别名的概念,比如系统自动命名的最小值结果是这样
(10) 查询年龄age 的最小值。
+--------+
|min(age)|
+--------+
| 28|
+--------+
这时候能给这列取一个“别名”。但是这不是这种情况。
我最后处理的时候把别名理解成重命名了。这时候又遇到一个坑,网上的很多资料都是pandas dataframe的,但是我们用的是spark dataframe,这两种同名的数据结构,含有的方法却完全不一样,而且spark dataframe还不支持类似del
这种操作,遇到了很多坑,最后找到了spark dataframe的列重命名函数withColumnRenamed
3.Spark UI端口占用问题
spark启动的时候,会强制启动一个能在浏览器里观察Spark RDD的UI工具Spark UI,这本来是个好事,但是他默认的端口是4040,如果被占用就尝试下一个。这也没什么问题,我调试的时候也没出现问题。
但是我重启电脑再用就出问题了,有些应用把4040附近的端口占用了,导致spark UI无法正常使用,Spark的逻辑是这个UI打不开,整个应用你也别想用喽。
屈从于Spark的淫威,我只能想到调参数:
conf.set("spark.port.maxRetries", "128")
conf.set("spark.ui.port", "12345")
然后就解决了!!在12345端口尝试128次...这段端口没人用。
命令行结果
命令行结果,在我看来还是比较正确的
D:\ProgramData\Anaconda3\envs\py36\python.exe D:\code\geoDataWork\day3_t1.py
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
(1) 查询所有数据
+----+---+-----+
| age| id| name|
+----+---+-----+
| 36| 1| Ella|
| 29| 2| Bob|
| 29| 3| Jack|
| 28| 4| Jim|
| 28| 4| Jim|
|null| 5|Damon|
|null| 5|Damon|
+----+---+-----+
(2) 查询所有数据,并去除重复的数据
+----+---+-----+
| age| id| name|
+----+---+-----+
|null| 5|Damon|
| 36| 1| Ella|
| 29| 3| Jack|
| 29| 2| Bob|
| 28| 4| Jim|
+----+---+-----+
(3) 查询所有数据,打印时去除id 字段
+----+-----+
| age| name|
+----+-----+
| 36| Ella|
| 29| Bob|
| 29| Jack|
| 28| Jim|
| 28| Jim|
|null|Damon|
|null|Damon|
+----+-----+
有一种更单纯的方法
+-----+----+
| name| age|
+-----+----+
| Ella| 36|
| Bob| 29|
| Jack| 29|
| Jim| 28|
| Jim| 28|
|Damon|null|
|Damon|null|
+-----+----+
(4) 筛选出age>30 的记录
+---+---+-----+
|age| id| name|
+---+---+-----+
| 36| 1| Ella|
+---+---+-----+
(5) 将数据按age 分组
+----+-----+
| age|count|
+----+-----+
| 29| 2|
|null| 2|
| 28| 2|
| 36| 1|
+----+-----+
(6) 将数据按name 升序排列
+----+---+-----+
| age| id| name|
+----+---+-----+
| 36| 1| Ella|
| 29| 2| Bob|
|null| 5|Damon|
|null| 5|Damon|
| 29| 3| Jack|
| 28| 4| Jim|
| 28| 4| Jim|
+----+---+-----+
(7) 取出前3行数据
Row(age=36, id=1, name=' Ella')
Row(age=29, id=2, name='Bob')
Row(age=29, id=3, name='Jack')
(8) 查询所有记录的name 列,并为其取别名为username
+--------+
|username|
+--------+
| Ella|
| Bob|
| Jack|
| Jim|
| Jim|
| Damon|
| Damon|
+--------+
(9) 查询年龄age 的平均值
+--------+
|avg(age)|
+--------+
| 30.0|
+--------+
(10) 查询年龄age 的最小值。
+--------+
|min(age)|
+--------+
| 28|
+--------+
进程已结束,退出代码为 0
运行截图
题目2:编程实现将RDD 转换为DataFrame
2. 编程实现将RDD 转换为DataFrame
源文件内容如下(包含id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
请将数据复制保存到Linux 系统中,命名为employee.txt,实现从RDD 转换得到
DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame 的所有数据。请写出程序代
码。
第一步,强调用linux,我感觉很奇怪,但是还是在linux里做了
这里使用wsl2,在linux里打开gedit编辑文件。
wsl2的新特性非常棒。wsl本来就可以直接在windows下native的运行linux,得益于wsl2使用的hyper v,wsl2不仅性能更强,而且能直接用windows的显卡驱动调度资源,给CUDA带来了巨大的方便。
有点扯远了,这里wsl2能直接像打开windows应用一样,把一个linux GUI应用装到窗口里,这是十分令人惊喜的。之前wsl如果想用GUI需要用远程桌面,很不方便,效率也很低。在最新的windows insider dev版本中,这一特性得到实装,非常棒,如果有相关需要可以体验一下。
又有点扯远了,总之就是在linux里编辑了文件并保存。
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row
conf = SparkConf().setMaster("local").setAppName("rdd2df")
conf.set("spark.port.maxRetries", "128")
conf.set("spark.ui.port", "12345")
sc = SparkContext(conf=conf) # 创建spark对象
line = sc.textFile("employee.txt").map(lambda x: x.split(",")).map(
lambda y: Row(id=int(y[0]), name=y[1], age=int(y[2]))) # 读入文件
# 创建一个sql上下文
sql_context = SQLContext(sc)
schema_employee = sql_context.createDataFrame(line)
# 创建一个临时的表
schema_employee.createOrReplaceTempView('employee')
df_employee = sql_context.sql('select * from employee')
# 先用默认方式显示一下,哦,完美
df_employee.show()
# 再按照要求的方式输出,其实collect很浪费性能,但是数据量很小,就原谅一下吧
df_employee_alter = df_employee.collect()
for people in df_employee_alter:
print("id:{0},name:{1},age:{2}".format(people.id, people.name, people.age))
这里关键就是,怎么让这些数据以指定的格式输出,就需要方法collect
,在代码中对应line23。这一步是把dataframe转成python的list数据结构,就可以用python的for直接遍历。
方便是方便,但是资料显示这个collect效率很低,如果数据比较多的话可能需要考虑使用其他方式输出。比如直接在Spark UI中查看(?)。
还有一个关键点,就是RDD转能使用的dataframe的时候,要先变成一个类似“中间件”的dataframe,然后再用这个df创建一个临时的数据表,再从数据表执行一个sql语句读出来能用的dataframe。
我觉得这种方式十分反直觉,而且很麻烦。
命令行结果
D:\ProgramData\Anaconda3\envs\py36\python.exe D:/code/geoDataWork/day3_t2.py
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+---+---+----+
|age| id|name|
+---+---+----+
| 36| 1|Ella|
| 29| 2| Bob|
| 29| 3|Jack|
+---+---+----+
id:1,name:Ella,age:36
id:2,name:Bob,age:29
id:3,name:Jack,age:29
进程已结束,退出代码为 0
运行截图
题目3:编程实现利用DataFrame 读写MySQL 的数据
3. 编程实现利用DataFrame 读写MySQL 的数据
(1)在MySQL 数据库中新建数据库testspark,再创建表employee,包含如下表所示的两
行数据。
表1 employee 表原有数据
id name gender Age
1 Alice F 22
2 John M 25
(2)配置Spark 通过JDBC 连接数据库MySQL,编程实现利用DataFrame 插入如下表所示
的两行数据到MySQL 中,最后打印出age 的最大值和age 的总和。
表2employee 表新增数据
id name gender age
3 Mary F 26
4 Tom M 23
第一步:先得建立一个数据库,建立一个表,导入原始数据,这里我也是python做的,上代码:
import pymysql
conn = pymysql.connect(host='localhost', port=3306, user="root", passwd="Test123456")
# 获取游标
cursor = conn.cursor()
# 创建testspark数据库,并使用
cursor.execute('CREATE DATABASE IF NOT EXISTS testspark;')
cursor.execute('USE testspark;')
sql = "CREATE TABLE IF NOT EXISTS employee (id int(3) NOT NULL AUTO_INCREMENT,name varchar(255) NOT NULL,gender char(1) NOT NULL,Age int(3) NOT NULL,PRIMARY KEY (id))"
cursor.execute(sql)
cursor.execute("INSERT INTO employee (name,gender,Age) VALUES ('Alice','F',22)")
cursor.execute("INSERT INTO employee (name,gender,Age) VALUES ('John','M',25)")
# commit更改
conn.commit()
cursor.close() # 先关闭游标
conn.close() # 再关闭数据库连接
这主要是sql操作,没啥可说的。就是我这里,对于employee的id字段,用了一个自动增加的AUTO_INCREMENT关键字。这使得我在之后的所有写入,都不需要输入id,id会随着一条一条记录自动增加,非常方便。
第二步:写入新增数据并计算总和
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row
# spark 初始化
conf = SparkConf().setMaster("local").setAppName("sparksql")
conf.set("spark.port.maxRetries", "128")
conf.set("spark.ui.port", "12345")
sc = SparkContext(conf=conf) # 创建spark对象
spark = SQLContext(sc)
# mysql 配置
prop = {'user': 'root',
'password': 'Test123456',
'driver': 'com.mysql.cj.jdbc.Driver'}
# database
url = 'jdbc:mysql://localhost:3306/testspark?serverTimezone=UTC'
# 读取表
employeeRDD = sc.parallelize(["Mary F 26", "Tom M 23"]).map(lambda x: x.split(" ")).map(
lambda p: Row(name=p[0].strip(), gender=p[1].strip(), Age=int(p[2].strip())))
schema_employee = spark.createDataFrame(employeeRDD)
# 创建一个临时的表
schema_employee.createOrReplaceTempView('employee')
employeeDF = spark.sql('select * from employee')
employeeDF.show()
employeeDF.write.jdbc(url=url, table='employee', mode='append',
properties=prop)
age_sum = spark.read.format("jdbc").options(
url='jdbc:mysql://localhost:3306/testspark?serverTimezone=UTC&user=root&password=Test123456',
dbtable="(SELECT sum(Age) FROM employee) tmp",
driver='com.mysql.cj.jdbc.Driver').load()
age_sum.show()
sc.stop()
这一步也没什么,就是综合应用之前的方法,运行后结果:
确实是没啥问题。
再看看数据库:
结果符合预期