#config.py
rds_v1 = {
"host": "127.0.0.1",
"user": "root",
"password": "12345678",
"database": "users",
"port": 3306
}
#database.py
import pymysql
import pandas as pd
from config import *
class DB:
def __init__(self, engine):
self._engine = engine
self._conn = pymysql.connect(host=self._engine['host'],
user=self._engine['user'],
password=self._engine['password'],
port=self._engine['port'],
database=self._engine['database'] ,
charset='utf8')
def insert(self, dataframe, table):
"""插入dataframe类型数据到指定表"""
with self._conn.cursor() as cursor:
col = ','.join(dataframe.columns)
par = ','.join(['%s'] * dataframe.shape[1])
keys = ','.join([f'{i} = values({i})' for i in dataframe.columns])
sql = f'insert into {table} ({col}) values({par}) on duplicate key update {keys};'
cursor.executemany(sql, [tuple(i) for i in df.values])
try:
self._conn.commit()
msg = f"""数据写入成功:共{dataframe.shape[0]}条数据写入{table}表!!!"""
except Exception as err:
self._conn.rollback()
msg = f"数据写入失败!!!"
raise Exception(msg)
return msg
def select(self, sql, kind=True):
"""查询数据:kind为True表示需要输出数据"""
with self._conn.cursor() as cursor:
try:
cursor.execute(sql)
if kind:
result = cursor.fetchall()
cols = cursor.description
return pd.DataFrame(result, columns=[i[0] for i in cols])
else:
return self._conn.commit()
except Exception as err:
self._conn.rollback()
msg = f"ERROR - {self._engine['host']} session init failed: {err}" + "\n"
raise Exception(msg)
def tables(self):
"""查看表"""
sql = "show tables;"
return self.select(sql)
def drop(self, table):
"""删除表"""
sql = f"drop table {table};"
return self.select(sql, kind=False)
def delete(self, table, tag='1=1'):
"""删除数据,可以按条件删除"""
sql = f"delete from {table} where {tag};"
return self.select(sql, kind=False)
def desc(self, table):
"""查看表结构"""
sql = f"desc {table};"
return self.select(sql)
def process(self):
"""查看数据库进程"""
sql = r"show processlist;"
return self.select(sql)
def kill(self, process_id):
"""关闭数据库进程"""
sql = f"kill {process_id};"
return self.select(sql, kind=False)
def use(self, database):
"""切换数据库"""
sql = f"use {database};"
return self.select(sql, kind=False)
def close(self):
self._conn.close()
def create_sql(self, table):
sql = f"""show create table {table}"""
dataframe = self.select(sql)
return print(dataframe['Create Table'][0])
if __name__ == '__main__':
db = DB(rds_v1)
df = db.tables()