分批读取
import pandas as pd
import pymysql
from sqlalchemy import create_engine
pymysql.install_as_MySQLdb()
第一种读取数据的方式
#建立数据库连接
con=pymysql.connect(
host="10.***.***.***", #ip地址
database="db1", #需读取的数据表所在数据库的库名
user="user", #mysql用户名
password="password", #密码
port=3306, #端口号
charset='utf8'
)
cur = con.cursor() #创建游标
#数据读取函数
def read_table(cur, sql):
try:
cur.execute(sql)
d = cur.fetchall()
df = pd.DataFrame(list(d))
except Exception as e:
df = pd.DataFrame()
print('read data from mysql failed:',sqli)
print(e)
return df
#设置分批的节点
def batch_generate(n, b): #n 数据条数,b分批数目
batch = []
for i in range(0, n, int(n/b)):
batch.append(i)
batch.append(n)
return batch
batch_i = batch_generate(n=10050, b=10)
#分批读取数据,保存成dataframe。示例中mysql数据表中的数据可根据id分批
dat = pd.DataFrame()
for i in range(1,len(batch_i)):
# print(batch_i[i-1],batch_i[i])
sqli = 'select * from db1.test where id>%d and id<=%d' % (batch_i[i-1], batch_i[i]) #sql语句
d = read_table(cur=cur, sql=sqli) #读取数据
dat = pd.concat([dat,d], axis=0) #按行合并数据
cur.close() #关闭游标
con.close() #关闭连接
第二种读取数据的方式
con_engine = create_engine('mysql://user:password@10.***.***.***:3306/db1?charset=utf8')
sql = 'select * from db1.test'
d = pd.read_sql(sql=sql, con=con_engine, chunksize=10) #分10批读取数据,语句返回的是生成器
dat = pd.DataFrame()
for i in d:
dat = pd.concat([dat, i], axis = 0)
dat.index = range(len(dat)) #上述方法输出的index会有重复,更新index
分批写入mysql
第一种方式数据写入数据库
#建立连接
con=pymysql.connect(
host="10.***.***.***", #ip地址
database="db1", #需读取的数据表所在数据库的库名
user="user", #mysql用户名
password="password", #密码
port=3306, #端口号
charset='utf8'
)
cur = con.cursor()
#写入数据
batch_insert_i = batch_generate(n=1000,b=10)
for i in range(1,len(batch_insert_i)):
sqli = 'insert into db1.test_py(id, w_no, c_code, s_code, c_dt, l_dt) values(%s,%s,%s,%s,%s,%s)'
dati = (dat.iloc[batch_insert_i[i-1]:batch_insert_i[i], :]).values.tolist()
#print(batch_insert_i[i-1], batch_insert_i[i])
try:
# 执行sql语句
cur.executemany(sqli,dati)
con.commit() # 提交到数据库执行
except Exception as e:
# 如果发生错误则退出,也可以不退出,回滚con.rollback()
print(e)
break #con.rollback()
cur.close() # 关闭游标
con.close() # 关闭数据库连接
第二种方式写入数据库
con_engine = create_engine('mysql://user:password@10.***.***.***:3306/db1?charset=utf8')
dat.to_sql(con=con_engine, name='test_py', if_exists='append', index=False, chunksize=10)
#name数据表名; if_exists='append' 若不存在test_py表则新建,若存在则追加写入。
注:
第二种读取和写入方式直接调用pandas连接mysql,在读写大规模数据时效率更高。