访问数据库的线程安全等级在下表中定义:
Integer constant stating the level of thread safety the interface supports. Possible values are:
threadsafety | Meaning |
---|---|
0 | Threads may not share the module. |
1 | Threads may share the module, but not connections. |
2 | Threads may share the module and connections. |
3 | Threads may share the module, connections and cursors. |
Sharing in the above context means that two threads may use a resource without wrapping it using a mutex semaphore to implement resource locking. Note that you cannot always make external resources thread safe by managing access using a mutex: the resource may rely on global variables or other external sources that are beyond your control.
pymysql的线程安全等级为1:
from ._compat import PY2
from .constants import FIELD_TYPE
from .converters import escape_dict, escape_sequence, escape_string
from .err import (
Warning, Error, InterfaceError, DataError,
DatabaseError, OperationalError, IntegrityError, InternalError,
NotSupportedError, ProgrammingError, MySQLError)
from .times import (
Date, Time, Timestamp,
DateFromTicks, TimeFromTicks, TimestampFromTicks)
VERSION = (0, 7, 11, None)
threadsafety = 1
apilevel = "2.0"
paramstyle = "pyformat"
在实际的项目中,一种经典的错误的使用方式就是全局使用一个conntction,这样如果有多个线程,同时使用一个connection来查询或操作数据库,数据库就会出现如下问题:
mysql error sql: Packet sequence number wrong - got 1 expected 2
for this sql query:
一种解决方法是使用线程池,每个线程采用独立的connection,但是这种方式对于并发量特别大的情况下,会造成利用效率比较低的问题。
另外的一种方法是,使用多线程同步方法,加入锁控制信号量。
具体来讲,一个简单的demo,但是要留心死锁问题:
#set up a mutex
mutex = 0
connection = pymysql.connect(host='xxx.xxx.xxx',
user='xxx',
password='xxxx',
db='xxx',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
# query for a fetchall
def qurey_all_sql(sql):
while mutex == 1:
time.sleep(500)
mutex = 1
cur = connection.cursor()
cur.execute(sql)
result = cur.fetchall()
connection.commit()
cur.close()
mutex = 0
return result
# query for a result
def query_one_sql(sql):
while mutex == 1:
time.sleep(500)
mutex = 1
cur = connection.cursor()
cur.execute(sql)
result = cur.fetchone()
connection.commit()
cur.close()
mutex = 0
return result
原作者在github上给出的解决方案如下:
I use SQLAlchemy's pool.
Another way is threadpool. Each worker thread can have own connection.
更高级的方法有《基于gevent和pymysql实现mysql读写的异步非堵塞方案》,具体见链接。