多线程编程一向是难点,也容易出问题。之前c#中异步委托用的很爽,python中如何实现类似效果呢?
上面的流程图中,在接收数据之后,启动一个清洗数据的线程,然后不必等待清洗结果继续接收数据。同样,在清洗数据时,启动一个保存数据的线程,然后不必等待保存结果继续清洗数据。
由于python中父线程结束时,子线程也会跟着结束。因此我这里把接收数据,清洗数据,保存数据分别放入三个不同的线程池中,这三个线程池都是主线程创建。这样可以避免接收数据的线程结束时,清洗数据线程和保存数据线程也跟着结束。
示例代码如下
import threading
lock = threading.Lock()
from concurrent.futures import ThreadPoolExecutor
import pymysql
conn = pymysql.connect(**sqlDict)
cursor = conn.cursor()
sock_max_workers=64
sock_pool =ThreadPoolExecutor(max_workers=sock_max_workers)
sock_future = []
chgdata_max_workers=64
chgdata_pool = ThreadPoolExecutor(max_workers=chgdata_max_workers)
chgdata_future = []
savedata_max_workers=128
savedata_pool = ThreadPoolExecutor(max_workers=savedata_max_workers)
savedata_future = []
def sock(参数):
接收数据的代码
chgdata_future.append(chgdata_pool.submit(chgdata,参数) ) ##异步委托chgdata
其它代码
def chgdata(参数):
清洗数据的代码
savedata_future.append(savedata_pool.submit(savedata,参数)) ##异步委托savedata
其它代码
def savedata(参数):
准备保存数据
lock.acquire() #加个互斥锁
cursor.execute(command,data) #保存到数据库
lock.release() #释放锁
其它代码
if __name__ == '__main__':
准备工作
sock_future = [sock_pool.submit(sock,p) for p in iplist]
for f in sock_future:
f.result()
for f in chgdata_future:
f.result()
for f in saveda'ta_future:
f.result()
conn.commit() #视实际需求放在 savadata 中
conn.close()
这里解释一下。由于所有线程共用一个conn,而cursor.execute()是独占式的,因此需加锁互斥使用此连接,否则会出现错误提示:pymysql.err.InterfaceError: (0, '') ,关于这个错误提示,StackOverflow 的说法是因为conn已被关闭,这个说法不完整,准确的说法应该是:conn已被关闭或已被独占。
多线程保存数据到数据库时要小心,以保存到mysql为例,一般有如下几个方法:
1. 每个线程拥有独立的连接
2. 所有线程共用一个连接,则需加锁互斥使用此连接 (上述代码使用这种方式)
3. 所有线程共用一个连接池,需要考虑线程总数和连接池连接数上限的问题
彩蛋,多线程网络请求时容易出现错误提示Max retries exceeded with url:
requests.adapters.DEFAULT_RETRIES = 5
time.sleep(1) #减低网络请求的频率。如果实在不愿意,请使用代理。