- 实现连接池的好处
1.减少连接等待时间
- 实现连接池的思路
- 批量把实现的连接实例放在到有序队列
2.池子里的实例维护,如有效期,实例失效,新的实例加入,池子空,缓存等业务考虑。
mysql连接池的简易版
import MySQLdb
from queue import Queue
class ConnPool:
def __init__(self, max_conn_num, dbtype):
self._pool = Queue()
self.dbtype = dbtype
self.max_conn_num = max_conn_num
try:
for i in range(max_conn_num):
self.fill_conn(self.create_conn())
except Exception as e:
raise e
def fill_conn(self, conn):
try:
self._pool.put(conn)
except Exception as e:
raise "fillConnection error: {0}".format(str(e))
def create_conn(self):
if self.dbtype == 'xxx':
pass
elif self.dbtype == 'mysql':
try:
conndb = MySQLdb.connect(user="root", passwd="hugo9091", host="127.0.0.1",
port=3306, db="test");
conndb.clientinfo = 'datasync connection pool from datasync.py';
conndb.ping();
return conndb
except Exception as e:
raise 'conn targetdb datasource error: {0}'.format(str(e))
def return_conn(self, conn):
try:
self._pool.put(conn)
except Exception as e:
raise "returnConnection error: {0}".format(str(e))
def get_conn(self):
try:
return self._pool.get()
except Exception as e:
raise Exception("get_conn error: {0}".format(str(e)))
def close_conn(self, conn):
try:
self._pool.get().close()
self.fill_conn(self.create_conn(self.conn_name, self.dbtype))
except Exception as e:
raise "close_conn error: {0}".format(str(e))
mysql_pool = ConnPool(10, "mysql");
r = mysql_pool.get_conn()
print(r)
mysql连接池
class Pool(object):
def __init__(self, factory, options={}, initial_connections=0,
max_connections=200, reap_expired_connections=True,
reap_interval=180):
self._factory = factory
self._options = options
self._max_connections = max_connections
self._pool = collections.deque()
self._using = collections.deque()
assert initial_connections <= max_connections, "initial_connections must be less than max_connections"
for i in range(initial_connections):
self._pool.append(self._create_connection())
if reap_expired_connections:
self._reaper = ConnectionReaper(self, reap_interval)
self._reaper.start()
def __del__(self):
for conn in self._pool:
conn.close()
self._pool = None
for conn in self._using:
conn.close()
self._using = None
@contextlib.contextmanager
def connection(self):
conn = self.acquire()
try:
yield conn
finally:
self.release(conn)
@property
def size(self):
"""Returns the pool size."""
return len(self._pool) + len(self._using)
def acquire(self, retry=10, retried=0):
if len(self._pool):
conn = self._pool.popleft()
self._using.append(conn)
return conn
else:
if len(self._pool) + len(self._using) < self._max_connections:
conn = self._create_connection()
self._using.append(conn)
return conn
else:
if retried >= retry:
raise PoolExhaustedError()
retried += 1
gevent.sleep(0.1)
return self.acquire(retry=retry, retried=retried)
def release(self, conn):
if conn in self._using:
self._using.remove(conn)
self._pool.append(conn)
else:
raise ConnectionNotFoundError()
def drop(self, conn):
if conn in self._pool:
self._pool.remove(conn)
if conn.is_connected():
conn.close()
else:
raise ConnectionNotFoundError()
def drop_expired(self):
expired_conns = [conn for conn in self._pool if conn.is_expired()]
for conn in expired_conns:
self.drop(conn)
def _create_connection(self):
conn = self._factory(**self._options)
conn.open()
return conn
redis连接池
class ConnectionPool(object):
"Generic connection pool"
def __init__(self, connection_class=Connection, max_connections=None,
**connection_kwargs):
max_connections = max_connections or 2 ** 31
if not isinstance(max_connections, (int, long)) or max_connections < 0:
raise ValueError('"max_connections" must be a positive integer')
self.connection_class = connection_class
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections
self.reset()
def __repr__(self):
return "%s<%s>" % (
type(self).__name__,
self.connection_class.description_format % self.connection_kwargs,
)
def reset(self):
self.pid = os.getpid()
self._created_connections = 0
self._available_connections = []
self._in_use_connections = set()
self._check_lock = threading.Lock()
def _checkpid(self):
if self.pid != os.getpid():
with self._check_lock:
if self.pid == os.getpid():
# another thread already did the work while we waited
# on the lock.
return
self.disconnect()
self.reset()
def get_connection(self, command_name, *keys, **options):
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop()
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection)
return connection
def make_connection(self):
"Create a new connection"
if self._created_connections >= self.max_connections:
raise ConnectionError("Too many connections")
self._created_connections += 1
return self.connection_class(**self.connection_kwargs)
def release(self, connection):
"Releases the connection back to the pool"
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection)
self._available_connections.append(connection)
def disconnect(self):
"Disconnects all connections in the pool"
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
connection.disconnect()