继上篇(二)利用json将excel内容生成json文件)后,我们已经实现了从excel中读取内容生成json文件,如果读出的data数组有中文,需要特别注意encoding的问题,并且通过json.dump()生成的json是标准的带双引号的json格式,这对后面解析json也提供了方便。在实际工作中,往往也会有很多场景是接收一些json再在数据库中进行持久化的,比如在编写写网络爬虫应用时,就会用到本章节所述的知识点。
sqlite是一个短小精干的功能强大的数据库,使用python操作sqlite的语法与操作oracle、mysql类似。为实现疫情数据的持久化,这里采用sqlite进行示意。
1,创建sqlite数据库文件
python中自带了sqlite,无需做pip install,可直接这样操作:
import sqlite3
conn = sqlite3.connect("system.db.sqlite")
会在当前py所在的文件夹下创建system.db.sqlite的文件,此文件既是sqlite3数据库的数据文件。
2,创建员工疫情数据库表
可以在sqlite admin工具创建,也可以用python创建:
# 创建一个游标 curson
cursor = conn.cursor()
# 执行一条语句,创建 day_temperature 表
sql = "create table day_temperature (id integer PRIMARY KEY autoincrement, input_date date," \
"dep varchar(60), name varchar(60), is_base varchar(60), city varchar(60), leave_date varchar(60), leave_date_v varchar(60)" \
", back_date varchar(60), address varchar(100), out varchar(60), isOK varchar(60), temperature float, flag1 varchar(60)" \
", flag2 varchar(60), flag3 varchar(60), flag4 varchar(60), flag4_v varchar(60)" \
", v_1 varchar(60), v_2 varchar(60), can_duty varchar(60), can_duty_v varchar(60)" \
", v_3 varchar(60))"
cursor.execute(sql)
3,创建完数据库表,然后通过read_json.py文件的data_dict()方法解析的json文件:
start = read_json.JSONReader('ALL2.19.json1')
data = start.data_dict()
得到data[]数据,遍历data[]形成insert的sql:
for user in data:
dep = user['所在科室']
name = user['姓 名']
is_base = user['是否在汉(填写是或否)']
city = user['目前所在城市(填写城市名)']
leave_date = user['离汉时间/同行人员(从1月22日0时后开始填写,如:1月23日/妻子、女儿)']
leave_date_v = user['离汉交通工具及目的地(填写类别/车次、航班号、私家车牌号)']
back_date = user['预计回汉时间/同行人员/交通工具(如:1月22日/妻子、女儿/飞机)']
address = user['当日所在具体地址(精确到小区/街道)']
out = user['当日是否外出/外出场所(填写是或否及详细地点)']
isOK = user['是否健康/当日体温(℃)(填写是或否及当日最高体温)']
temperature = str(re.findall(r"\d+\.?\d*", isOK)[0])
flag1 = user['有无接触确诊(疑似)病例史(填写有或无/接触人员情况/接触时间)']
flag2 = user['是否普通病例(填写是或否/症状)']
flag3 = user['是否疑似病例(填写是或否/症状)']
flag4 = user['是否确诊病例(填写是或否/症状)']
flag4_v = user['确诊时间/机构(填写确诊时间及诊断机构名称)']
v_1 = user['目前采取措施(详细填写采取措施、方法)']
v_2 = user['备 注(未尽事项,可备注进行说明)']
# can_duty = user['是否能正常到岗']
# can_duty_v = user['未能正常到岗原因']
# v_3 = user['到汉后是否需申请集中隔离']
print(temperature)
i = i+1
# 插入一条记录
# sql = "insert into day_temperature (input_date,dep, name, is_base,city,leave_date,leave_date_v,back_date," \
# "address,out,isOK,temperature,flag1,flag2,flag3,flag4,flag4_v,v_1,v_2," \
# "can_duty,can_duty_v,v_3) values (" + "\'"+now_date + "\',\'"+dep + "\',\'" + name + "\',\'" + is_base + "\',\'" + city + "\',\'" + leave_date + "\',\'" + leave_date_v + "\',\'" + back_date + "\',\'" + address + "\',\'" + out + "\',\'" + isOK + "\',\'" + temperature + "\',\'" + flag1 + "\',\'" + flag2 + "\',\'" + flag3 + "\',\'" + flag4 + "\',\'" + flag4_v + "\',\'" + v_1 + "\',\'" + v_2 + "\',\'" + can_duty + "\',\'" + can_duty_v + "\',\'" + v_3 + "\')"
# print(sql)
# cursor.execute(sql)
# 用参数也可以
line_paras = [now_date, dep, name, is_base, city, leave_date, leave_date_v, back_date, address, out, isOK,
temperature, flag1, flag2, flag3, flag4, flag4_v, v_1, v_2, 'can_duty', 'can_duty_v', 'v_3']
sql = "insert into day_temperature (input_date,dep, name, is_base,city,leave_date,leave_date_v,back_date,address,out,isOK,temperature,flag1,flag2,flag3,flag4,flag4_v,v_1,v_2,can_duty,can_duty_v,v_3) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
cursor.execute(sql,[now_date,dep, name, is_base,city,leave_date,leave_date_v,back_date,address,out,isOK,temperature,flag1,flag2,flag3,flag4,flag4_v,v_1,v_2,'can_duty','can_duty_v','v_3'])
此处,在形成sql时,可以用笨方法,一个参数一个参数的组装,也可以用cursor.execute方法中使用sql的参数数组:
sql_paras = []
在for循环中,形成参数数组:
# 用参数也可以
line_paras = [now_date, dep, name, is_base, city, leave_date, leave_date_v, back_date, address, out, isOK,
temperature, flag1, flag2, flag3, flag4, flag4_v, v_1, v_2, 'can_duty', 'can_duty_v', 'v_3']
sql = "insert into day_temperature (input_date,dep, name, is_base,city,leave_date,leave_date_v,back_date,address,out,isOK,temperature,flag1,flag2,flag3,flag4,flag4_v,v_1,v_2,can_duty,can_duty_v,v_3) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
cursor.execute(sql,[now_date,dep, name, is_base,city,leave_date,leave_date_v,back_date,address,out,isOK,temperature,flag1,flag2,flag3,flag4,flag4_v,v_1,v_2,'can_duty','can_duty_v','v_3'])
针对大量的insert操作,数据库会有吞吐和效率的问题,一般可以使用cursor.executemany(sql, sql_paras)方法——注意是在for循环外执行:
sql = "insert into day_temperature (input_date,dep, name, is_base,city,leave_date,leave_date_v,back_date,address,out,isOK,temperature,flag1,flag2,flag3,flag4,flag4_v,v_1,v_2,can_duty,can_duty_v,v_3) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
cursor.executemany(sql, sql_paras)
4,执行完建表和插数据后,记得关闭游标和数据库。
# 关闭游标:
cursor.close()
# 提交事物
conn.commit()
# 关闭连接
conn.close()
5,有关sqlite的基本操作介绍完毕,下面是sqlite文件夹下data_to_db.py文件的代码:
import re
import sqlite3
from datetime import datetime
import read_json
conn = sqlite3.connect("system.db.sqlite")
# 创建一个游标 curson
cursor = conn.cursor()
# 执行一条语句,创建 day_temperature 表
# sql = "create table day_temperature (id integer PRIMARY KEY autoincrement, input_date date," \
# "dep varchar(60), name varchar(60), is_base varchar(60), city varchar(60), leave_date varchar(60), leave_date_v varchar(60)" \
# ", back_date varchar(60), address varchar(100), out varchar(60), isOK varchar(60), temperature float, flag1 varchar(60)" \
# ", flag2 varchar(60), flag3 varchar(60), flag4 varchar(60), flag4_v varchar(60)" \
# ", v_1 varchar(60), v_2 varchar(60), can_duty varchar(60), can_duty_v varchar(60)" \
# ", v_3 varchar(60))"
# cursor.execute(sql)
now_date = datetime.now().strftime('%Y-%m-%d') # 格式为str
start = read_json.JSONReader('ALL2.19.json1')
data = start.data_dict()
sql_paras = []
i = 0
for user in data:
print(i)
dep = user['所在科室']
name = user['姓 名']
is_base = user['是否在汉(填写是或否)']
city = user['目前所在城市(填写城市名)']
leave_date = user['离汉时间/同行人员(从1月22日0时后开始填写,如:1月23日/妻子、女儿)']
leave_date_v = user['离汉交通工具及目的地(填写类别/车次、航班号、私家车牌号)']
back_date = user['预计回汉时间/同行人员/交通工具(如:1月22日/妻子、女儿/飞机)']
address = user['当日所在具体地址(精确到小区/街道)']
out = user['当日是否外出/外出场所(填写是或否及详细地点)']
isOK = user['是否健康/当日体温(℃)(填写是或否及当日最高体温)']
temperature = str(re.findall(r"\d+\.?\d*", isOK)[0])
flag1 = user['有无接触确诊(疑似)病例史(填写有或无/接触人员情况/接触时间)']
flag2 = user['是否普通病例(填写是或否/症状)']
flag3 = user['是否疑似病例(填写是或否/症状)']
flag4 = user['是否确诊病例(填写是或否/症状)']
flag4_v = user['确诊时间/机构(填写确诊时间及诊断机构名称)']
v_1 = user['目前采取措施(详细填写采取措施、方法)']
v_2 = user['备 注(未尽事项,可备注进行说明)']
# can_duty = user['是否能正常到岗']
# can_duty_v = user['未能正常到岗原因']
# v_3 = user['到汉后是否需申请集中隔离']
print(temperature)
i = i+1
# 插入一条记录
# sql = "insert into day_temperature (input_date,dep, name, is_base,city,leave_date,leave_date_v,back_date," \
# "address,out,isOK,temperature,flag1,flag2,flag3,flag4,flag4_v,v_1,v_2," \
# "can_duty,can_duty_v,v_3) values (" + "\'"+now_date + "\',\'"+dep + "\',\'" + name + "\',\'" + is_base + "\',\'" + city + "\',\'" + leave_date + "\',\'" + leave_date_v + "\',\'" + back_date + "\',\'" + address + "\',\'" + out + "\',\'" + isOK + "\',\'" + temperature + "\',\'" + flag1 + "\',\'" + flag2 + "\',\'" + flag3 + "\',\'" + flag4 + "\',\'" + flag4_v + "\',\'" + v_1 + "\',\'" + v_2 + "\',\'" + can_duty + "\',\'" + can_duty_v + "\',\'" + v_3 + "\')"
# print(sql)
# cursor.execute(sql)
# 用参数也可以
line_paras = [now_date, dep, name, is_base, city, leave_date, leave_date_v, back_date, address, out, isOK,
temperature, flag1, flag2, flag3, flag4, flag4_v, v_1, v_2, 'can_duty', 'can_duty_v', 'v_3']
# sql = "insert into day_temperature (input_date,dep, name, is_base,city,leave_date,leave_date_v,back_date,address,out,isOK,temperature,flag1,flag2,flag3,flag4,flag4_v,v_1,v_2,can_duty,can_duty_v,v_3) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
# cursor.execute(sql,[now_date,dep, name, is_base,city,leave_date,leave_date_v,back_date,address,out,isOK,temperature,flag1,flag2,flag3,flag4,flag4_v,v_1,v_2,can_duty,can_duty_v,v_3])
# 在该for循环外使用批量添加,但要先得到sql的参数值列表sql_paras
sql_paras.append(line_paras)
sql = "insert into day_temperature (input_date,dep, name, is_base,city,leave_date,leave_date_v,back_date,address,out,isOK,temperature,flag1,flag2,flag3,flag4,flag4_v,v_1,v_2,can_duty,can_duty_v,v_3) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
cursor.executemany(sql, sql_paras)
sql = 'select * from day_temperature'
cursor.execute(sql)
# 关闭游标:
cursor.close()
# 提交事物
conn.commit()
# 关闭连接
conn.close()
补充一下:为了获取“某个城市有多少员工”,单独写了一个测试文件,仅供参考(其实可以作为一个工具类的方法)
import sqlite3, json
conn = sqlite3.connect("system.db.sqlite")
def get_citys():
data = []
# 创建一个游标 curson
cursor = conn.cursor()
sql = 'select city,count(*),group_concat(name) from day_temperature group by city'
cityusers = cursor.execute(sql).fetchall()
citys = []
numbers = []
usernames = []
for cityuser in cityusers:
citys.append(cityuser[0])
numbers.append(cityuser[1])
usernames.append(cityuser[2])
data.append(citys)
data.append(numbers)
data.append(usernames)
print(data)
# 关闭游标:
cursor.close()
return data
# 提交事物
conn.commit()
get_citys()
# 关闭连接
conn.close()