脚本开发时常常会用到数据库访问的情况,目前业内也已经有了大量成熟的访问方案,现将平时工作中用到大经典案例总结如下。
- 本文应用自以下连接:特此声明
Python 中创建 PostgreSQL 数据库连接池 | 隔叶黄莺 Yanbin Blog - 软件编程实践
Python使用Mysql连接池_ButFlyzzZ的博客-CSDN博客_python连接池mysql
postgreSQL连接池
from psycopg2 import pool
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager
import atexit
class DBHelper:
def __init__(self):
self._connection_pool = None
def initialize_connection_pool(self):
db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5'
self._connection_pool = pool.ThreadedConnectionPool(1, 3,db_dsn)
@contextmanager
def get_resource(self, autocommit=True) -> Union[RealDictCursor, tuple[RealDictCursor, connection]]:
if self._connection_pool is None:
self.initialize_connection_pool()
conn = self._connection_pool.getconn()
conn.autocommit = autocommit
cursor = conn.cursor(cursor_factory=RealDictCursor)
try:
if autocommit:
yield cursor
else:
yield cursor, conn
finally:
cursor.close()
self._connection_pool.putconn(conn)
def shutdown_connection_pool(self):
if self._connection_pool is not None:
self._connection_pool.closeall()
db_helper = DBHelper()
@atexit.register
def shutdown_connection_pool():
db_helper.shutdown_connection_pool()
# 一般用法
from db_helper import db_helper
with db_helper.get_resource() as cursor:
cursor.execute('select * from users')
for record in cursor.fetchall():
... process record, record['name'] ...
# 事务用法
with db_helper.get_resource(autocommit=False) as (cursor, connection):
try:
cursor.execute('update users set name = %s where id = %s', ('new_name', 1))
cursor.execute('delete from orders where user_id = %s', (1,))
conn.commit()
except:
conn.rollback()
mysql连接池:
import pymysql
from g_conf.config import config_template
from DBUtils.PooledDB import PooledDB
class MysqlPool:
config = {
'creator': pymysql,
'host': config_template['MYSQL']['HOST'],
'port': config_template['MYSQL']['PORT'],
'user': config_template['MYSQL']['USER'],
'password': config_template['MYSQL']['PASSWD'],
'db': config_template['MYSQL']['DB'],
'charset': config_template['MYSQL']['CHARSET'],
'maxconnections': 70, # 连接池最大连接数量
'cursorclass': pymysql.cursors.DictCursor
}
pool = PooledDB(**config)
def __enter__(self):
self.conn = MysqlPool.pool.connection()
self.cursor = self.conn.cursor()
return self
def __exit__(self, type, value, trace):
self.cursor.close()
self.conn.close()
# 一般用法
def func(tar_id):
with MysqlPool() as db:
db.cursor.execute('YOUR_SQL')
db.conn.commit()
# 装饰器用法
def db_conn(func):
def wrapper(*args, **kw):
with MysqlPool() as db:
result = func(db, *args, **kw)
return result
return wrapper
@db_conn
def update_info(db, *args, **kw):
try:
db.cursor.execute("YOUR_SQL")
db.conn.commit()
return 0
except Exception as e:
db.conn.rollback()
return 1