#!/usr/bin/env python
#coding=utf-8
from DBUtils.PooledDB import PooledDB
import MySQLdb as mysql
from utils import util
import traceback
import config
import sys
class DB():
def __init__(self):
self.host = config.db_host
self.name = config.db_name
self.user = config.db_user
self.passwd = config.db_passwd
self.pool = PooledDB(mysql, mincached=4, maxcached=10, host=self.host,db=self.name,user=self.user,passwd=self.passwd,setsession=['SET AUTOCOMMIT = 1'])
#连接数据库
def connect_db(self):
self.db = self.pool.connection()
self.cur = self.db.cursor()
#关闭连接
def close_db(self):
self.cur.close()
self.db.close()
#执行sql
def execute(self, sql):
self.connect_db()
return self.cur.execute(sql)
#获取所有数据列表
def get_list(self,table,fields):
sql = "select %s from %s"% (",".join(fields),table)
try:
self.execute(sql)
result = self.cur.fetchall()
if result:
result = [dict((k,row[i]) for i, k in enumerate(fields)) for row in result]
else:
result = {}
return result;
except:
util.WriteLog('db').info("Execute '%s' error: %s" % (sql, traceback.format_exc()))
finally:
self.close_db()
#获取某一条数据,返回字典
def get_one(self,table,fields,where):
if isinstance(where, dict) and where:
conditions = []
for k,v in where.items():
conditions.append("%s='%s'" % (k, v))
sql = "select %s from %s where %s" % (",".join(fields),table,' AND '.join(conditions))
try:
self.execute(sql)
result = self.cur.fetchone()
if result:
result = dict((k, result[i])for i, k in enumerate(fields))
else:
result = {}
return result
except:
util.WriteLog('db').info("Execute '%s' error: %s" % (sql, traceback.format_exc()))
finally:
self.close_db()
#更新数据
def update(self,table,fields):
data = ",".join(["%s='%s'"%(k,v) for k,v in fields.items()])
sql = "update %s set %s where id=%s " % (table,data,fields["id"])
try:
return self.execute(sql)
except:
util.WriteLog('db').info("Execute '%s' error: %s" % (sql, traceback.format_exc()))
finally:
self.close_db()
#添加数据
def create(self,table,data):
fields,values = [],[]
for k, v in data.items():
fields.append(k)
values.append("'%s'" % v)
sql = "insert into %s (%s) values (%s)" % (table,",".join(fields),",".join(values))
try:
return self.execute(sql)
except:
util.WriteLog('db').info("Execute '%s' error: %s" % (sql, traceback.format_exc()))
finally:
self.close_db()
#删除数据
def delete(self,table,where):
if isinstance(where, dict) and where:
conditions = []
for k,v in where.items():
conditions.append("%s='%s'" % (k, v))
sql = "delete from %s where %s" % (table,' AND '.join(conditions))
try:
return self.execute(sql)
except:
util.WriteLog('db').info("Execute '%s' error: %s" % (sql, traceback.format_exc()))
finally:
self.close_db()
python mysql连接池 类
最后编辑于 :
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- Python实现Mysql数据库连接池 python连接Mysql数据库: python编程中可以使用MySQLd...
- 1.准备工作 要想实现python与MySQL的连接,首先需要引入pymysql,就这样来实现 2.开始啦 1.创...