时空数据实习报告-day3-

《时空数据处理和组织课程实习》实验报告

题目: 实验3 Spark SQL

日期:6.11

实验环境:python3.6,windows,wsl2(ubuntu 20.04)

写在前面

完全完整的代码已经附在了文末,因此不将代码作为附件提供。
建议使用简书查看本文,排版更佳本文简书地址
本次涉及到的代码文件是day3_t1.pyday3_t2.pyday3_t3.pyday3_t3_part2.py,与之对应的readme是day3.md
实习涉及到的全部代码都已储存到了github仓库,建议在线查看我的代码
亦可以使用git clone https://github.com/uiharuayako/geoDataWork.git实时获取我的最新进展!
所有代码均为本人原创或者来自老师给的资料,多点学习和交流思路,少点复制粘贴,谢谢!

实验内容与完成情况:

程序编程实现了实验内容的所有项目,为了保持代码的规整,我将每个问题的实现写成了不同的文件

其实代码里注释写得很清楚,建议直接看代码,在这里我主要写一下我的思路,以及代码片段的分析

题目1:Spark SQL 基本操作

1. Spark SQL 基本操作
将下列JSON 格式数据复制到Linux 系统中,并保存命名为employee.json。

{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }

为employee.json 创建DataFrame,并写出Python 语句完成下列操作:
(1) 查询所有数据;
(2) 查询所有数据,并去除重复的数据;
(3) 查询所有数据,打印时去除id 字段;
(4) 筛选出age>30 的记录;
(5) 将数据按age 分组;
(6) 将数据按name 升序排列;
(7) 取出前3 行数据;
(8) 查询所有记录的name 列,并为其取别名为username;
(9) 查询年龄age 的平均值;
(10) 查询年龄age 的最小值。

先用linux保存json文件,这里怎么用的linux下文会讲

保存json.png

第一题总体来说还是考察了一个spark sql的基础语句

import findspark

findspark.init()
from pyspark import SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
# 创建一个dataframe对象
df_origin = spark.read.json("employee.json")
# (1) 查询所有数据;
print("(1) 查询所有数据")
# 其实本来不用select("*"),既然强调查询,就加上也行
df_origin.select("*").show()
# (2) 查询所有数据,并去除重复的数据;
print("(2) 查询所有数据,并去除重复的数据")
# 逻辑上,年龄,姓名有可能重复,id不能重复
df_unique = df_origin.drop_duplicates(subset=['id'])
df_unique.select("*").show()
# (3) 查询所有数据,打印时去除id 字段;
print("(3) 查询所有数据,打印时去除id 字段")
# 选取所有数据
df_1 = df_origin.select('*')
# 去除id字段
df_1.drop(df_1.id).show()
print("有一种更单纯的方法")
df_origin.select('name', 'age').show()
# (4) 筛选出age>30 的记录;
print("(4) 筛选出age>30 的记录")
df_origin.filter(df_origin['age'] > 30).show()
# (5) 将数据按age 分组;
print("(5) 将数据按age 分组")
df_origin.groupby('age').count().show()
# (6) 将数据按name 升序排列;
print("(6) 将数据按name 升序排列")
df_origin.sort(df_origin['name'].asc()).show()
# (7) 取出前3行数据;
print("(7) 取出前3行数据")
var = df_origin.head(3)
for i in var:
    print(i)
# (8) 查询所有记录的name 列,并为其取别名为username;
print("(8) 查询所有记录的name 列,并为其取别名为username")
name = df_origin.select('name')
# 把取别名理解成改名,在c或者java里面,“别名”可能会被理解为浅拷贝,或者说引用,在这里我不知道该怎么理解这个词,思前想后,我将其理解为重命名
name.withColumnRenamed('name', 'username').show()
# (9) 查询年龄age 的平均值;
print("(9) 查询年龄age 的平均值")
df_origin.agg({'age': 'mean'}).show()
# (10) 查询年龄age 的最小值。
print("(10) 查询年龄age 的最小值。")
df_origin.agg({'age': 'min'}).show()

这个spark sql给我的感觉就是...和一般的sql差不多呀。语法差不多,就是在python里调用的时候不能直接写SQL语句。毕竟这个dataframe也没存进硬盘,只是在内存里。

我觉得如果建一个临时表格或许能解决这个问题。

问题以及解决

我觉得代码上写的注释已经比较清除了,就是我做的时候有几个纠结的地方:

1.使用数据问题

最纠结的是在去重之和后面的数据应该用去重前的还是之后的。我思前想后,觉得还是用去重前的比较好,这是因为一个逻辑如果第三问用了第二问的结果,那第四问是不是要用第三问的结果?但是这样下去就没法做了。于是还是统一用了去重前的数据。

2.别名是什么

第二个疑点是,不理解这个别名是啥意思,我知道,可能在dataframe里有一种别名的概念,比如系统自动命名的最小值结果是这样

(10) 查询年龄age 的最小值。
+--------+
|min(age)|
+--------+
|      28|
+--------+

这时候能给这列取一个“别名”。但是这不是这种情况。
我最后处理的时候把别名理解成重命名了。这时候又遇到一个坑,网上的很多资料都是pandas dataframe的,但是我们用的是spark dataframe,这两种同名的数据结构,含有的方法却完全不一样,而且spark dataframe还不支持类似del这种操作,遇到了很多坑,最后找到了spark dataframe的列重命名函数withColumnRenamed

3.Spark UI端口占用问题

spark启动的时候,会强制启动一个能在浏览器里观察Spark RDD的UI工具Spark UI,这本来是个好事,但是他默认的端口是4040,如果被占用就尝试下一个。这也没什么问题,我调试的时候也没出现问题。
但是我重启电脑再用就出问题了,有些应用把4040附近的端口占用了,导致spark UI无法正常使用,Spark的逻辑是这个UI打不开,整个应用你也别想用喽。
屈从于Spark的淫威,我只能想到调参数:

conf.set("spark.port.maxRetries", "128")
conf.set("spark.ui.port", "12345")

然后就解决了!!在12345端口尝试128次...这段端口没人用。

命令行结果

命令行结果,在我看来还是比较正确的

D:\ProgramData\Anaconda3\envs\py36\python.exe D:\code\geoDataWork\day3_t1.py
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
(1) 查询所有数据
+----+---+-----+
| age| id| name|
+----+---+-----+
|  36|  1| Ella|
|  29|  2|  Bob|
|  29|  3| Jack|
|  28|  4|  Jim|
|  28|  4|  Jim|
|null|  5|Damon|
|null|  5|Damon|
+----+---+-----+

(2) 查询所有数据,并去除重复的数据
+----+---+-----+
| age| id| name|
+----+---+-----+
|null|  5|Damon|
|  36|  1| Ella|
|  29|  3| Jack|
|  29|  2|  Bob|
|  28|  4|  Jim|
+----+---+-----+

(3) 查询所有数据,打印时去除id 字段
+----+-----+
| age| name|
+----+-----+
|  36| Ella|
|  29|  Bob|
|  29| Jack|
|  28|  Jim|
|  28|  Jim|
|null|Damon|
|null|Damon|
+----+-----+

有一种更单纯的方法
+-----+----+
| name| age|
+-----+----+
| Ella|  36|
|  Bob|  29|
| Jack|  29|
|  Jim|  28|
|  Jim|  28|
|Damon|null|
|Damon|null|
+-----+----+

(4) 筛选出age>30 的记录
+---+---+-----+
|age| id| name|
+---+---+-----+
| 36|  1| Ella|
+---+---+-----+

(5) 将数据按age 分组
+----+-----+
| age|count|
+----+-----+
|  29|    2|
|null|    2|
|  28|    2|
|  36|    1|
+----+-----+

(6) 将数据按name 升序排列
+----+---+-----+
| age| id| name|
+----+---+-----+
|  36|  1| Ella|
|  29|  2|  Bob|
|null|  5|Damon|
|null|  5|Damon|
|  29|  3| Jack|
|  28|  4|  Jim|
|  28|  4|  Jim|
+----+---+-----+

(7) 取出前3行数据
Row(age=36, id=1, name=' Ella')
Row(age=29, id=2, name='Bob')
Row(age=29, id=3, name='Jack')
(8) 查询所有记录的name 列,并为其取别名为username
+--------+
|username|
+--------+
|    Ella|
|     Bob|
|    Jack|
|     Jim|
|     Jim|
|   Damon|
|   Damon|
+--------+

(9) 查询年龄age 的平均值
+--------+
|avg(age)|
+--------+
|    30.0|
+--------+

(10) 查询年龄age 的最小值。
+--------+
|min(age)|
+--------+
|      28|
+--------+


进程已结束,退出代码为 0

运行截图

day3t1.png

题目2:编程实现将RDD 转换为DataFrame

2. 编程实现将RDD 转换为DataFrame
源文件内容如下(包含id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
请将数据复制保存到Linux 系统中,命名为employee.txt,实现从RDD 转换得到
DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame 的所有数据。请写出程序代
码。

第一步,强调用linux,我感觉很奇怪,但是还是在linux里做了


保存txt.png

这里使用wsl2,在linux里打开gedit编辑文件。

wsl2的新特性非常棒。wsl本来就可以直接在windows下native的运行linux,得益于wsl2使用的hyper v,wsl2不仅性能更强,而且能直接用windows的显卡驱动调度资源,给CUDA带来了巨大的方便。

有点扯远了,这里wsl2能直接像打开windows应用一样,把一个linux GUI应用装到窗口里,这是十分令人惊喜的。之前wsl如果想用GUI需要用远程桌面,很不方便,效率也很低。在最新的windows insider dev版本中,这一特性得到实装,非常棒,如果有相关需要可以体验一下。

又有点扯远了,总之就是在linux里编辑了文件并保存。

import findspark

findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row

conf = SparkConf().setMaster("local").setAppName("rdd2df")
conf.set("spark.port.maxRetries", "128")
conf.set("spark.ui.port", "12345")
sc = SparkContext(conf=conf)  # 创建spark对象
line = sc.textFile("employee.txt").map(lambda x: x.split(",")).map(
    lambda y: Row(id=int(y[0]), name=y[1], age=int(y[2])))  # 读入文件
# 创建一个sql上下文
sql_context = SQLContext(sc)
schema_employee = sql_context.createDataFrame(line)
# 创建一个临时的表
schema_employee.createOrReplaceTempView('employee')
df_employee = sql_context.sql('select * from employee')
# 先用默认方式显示一下,哦,完美
df_employee.show()
# 再按照要求的方式输出,其实collect很浪费性能,但是数据量很小,就原谅一下吧
df_employee_alter = df_employee.collect()
for people in df_employee_alter:
    print("id:{0},name:{1},age:{2}".format(people.id, people.name, people.age))

这里关键就是,怎么让这些数据以指定的格式输出,就需要方法collect,在代码中对应line23。这一步是把dataframe转成python的list数据结构,就可以用python的for直接遍历。

方便是方便,但是资料显示这个collect效率很低,如果数据比较多的话可能需要考虑使用其他方式输出。比如直接在Spark UI中查看(?)。

还有一个关键点,就是RDD转能使用的dataframe的时候,要先变成一个类似“中间件”的dataframe,然后再用这个df创建一个临时的数据表,再从数据表执行一个sql语句读出来能用的dataframe。

我觉得这种方式十分反直觉,而且很麻烦。

命令行结果

D:\ProgramData\Anaconda3\envs\py36\python.exe D:/code/geoDataWork/day3_t2.py
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+---+---+----+
|age| id|name|
+---+---+----+
| 36|  1|Ella|
| 29|  2| Bob|
| 29|  3|Jack|
+---+---+----+

id:1,name:Ella,age:36
id:2,name:Bob,age:29
id:3,name:Jack,age:29

进程已结束,退出代码为 0

运行截图

d3t2.png

题目3:编程实现利用DataFrame 读写MySQL 的数据

3. 编程实现利用DataFrame 读写MySQL 的数据
(1)在MySQL 数据库中新建数据库testspark,再创建表employee,包含如下表所示的两
行数据。
表1 employee 表原有数据
id name gender Age
1 Alice F 22
2 John M 25
(2)配置Spark 通过JDBC 连接数据库MySQL,编程实现利用DataFrame 插入如下表所示
的两行数据到MySQL 中,最后打印出age 的最大值和age 的总和。
表2employee 表新增数据
id name gender age
3 Mary F 26
4 Tom M 23

第一步:先得建立一个数据库,建立一个表,导入原始数据,这里我也是python做的,上代码:

import pymysql

conn = pymysql.connect(host='localhost', port=3306, user="root", passwd="Test123456")
# 获取游标
cursor = conn.cursor()
# 创建testspark数据库,并使用
cursor.execute('CREATE DATABASE IF NOT EXISTS testspark;')
cursor.execute('USE testspark;')
sql = "CREATE TABLE IF NOT EXISTS employee (id int(3) NOT NULL AUTO_INCREMENT,name varchar(255) NOT NULL,gender char(1) NOT NULL,Age int(3) NOT NULL,PRIMARY KEY (id))"
cursor.execute(sql)
cursor.execute("INSERT INTO employee (name,gender,Age) VALUES ('Alice','F',22)")
cursor.execute("INSERT INTO employee (name,gender,Age) VALUES ('John','M',25)")
# commit更改
conn.commit()
cursor.close()  # 先关闭游标
conn.close()  # 再关闭数据库连接

这主要是sql操作,没啥可说的。就是我这里,对于employee的id字段,用了一个自动增加的AUTO_INCREMENT关键字。这使得我在之后的所有写入,都不需要输入id,id会随着一条一条记录自动增加,非常方便。

第二步:写入新增数据并计算总和

import findspark

findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, Row

# spark 初始化
conf = SparkConf().setMaster("local").setAppName("sparksql")
conf.set("spark.port.maxRetries", "128")
conf.set("spark.ui.port", "12345")
sc = SparkContext(conf=conf)  # 创建spark对象
spark = SQLContext(sc)
# mysql 配置
prop = {'user': 'root',
        'password': 'Test123456',
        'driver': 'com.mysql.cj.jdbc.Driver'}
# database
url = 'jdbc:mysql://localhost:3306/testspark?serverTimezone=UTC'

# 读取表
employeeRDD = sc.parallelize(["Mary F 26", "Tom M 23"]).map(lambda x: x.split(" ")).map(
    lambda p: Row(name=p[0].strip(), gender=p[1].strip(), Age=int(p[2].strip())))
schema_employee = spark.createDataFrame(employeeRDD)
# 创建一个临时的表
schema_employee.createOrReplaceTempView('employee')
employeeDF = spark.sql('select * from employee')
employeeDF.show()
employeeDF.write.jdbc(url=url, table='employee', mode='append',
                      properties=prop)

age_sum = spark.read.format("jdbc").options(
    url='jdbc:mysql://localhost:3306/testspark?serverTimezone=UTC&user=root&password=Test123456',
    dbtable="(SELECT sum(Age) FROM employee) tmp",
    driver='com.mysql.cj.jdbc.Driver').load()
age_sum.show()
sc.stop()

这一步也没什么,就是综合应用之前的方法,运行后结果:


d3t3.png

确实是没啥问题。

再看看数据库:


d3t3db.png

结果符合预期

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,717评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,501评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,311评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,417评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,500评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,538评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,557评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,310评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,759评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,065评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,233评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,909评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,548评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,172评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,420评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,103评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,098评论 2 352