python多线程编程之异步委托

  多线程编程一向是难点,也容易出问题。之前c#中异步委托用的很爽,python中如何实现类似效果呢?


异步流程图.png

  上面的流程图中,在接收数据之后,启动一个清洗数据的线程,然后不必等待清洗结果继续接收数据。同样,在清洗数据时,启动一个保存数据的线程,然后不必等待保存结果继续清洗数据。
  由于python中父线程结束时,子线程也会跟着结束。因此我这里把接收数据,清洗数据,保存数据分别放入三个不同的线程池中,这三个线程池都是主线程创建。这样可以避免接收数据的线程结束时,清洗数据线程和保存数据线程也跟着结束。
  示例代码如下

import threading 
lock = threading.Lock() 
from concurrent.futures import ThreadPoolExecutor 
import pymysql
conn = pymysql.connect(**sqlDict)
cursor = conn.cursor()

sock_max_workers=64 
sock_pool =ThreadPoolExecutor(max_workers=sock_max_workers) 
sock_future = []
chgdata_max_workers=64
chgdata_pool = ThreadPoolExecutor(max_workers=chgdata_max_workers) 
chgdata_future = [] 
savedata_max_workers=128
savedata_pool = ThreadPoolExecutor(max_workers=savedata_max_workers) 
savedata_future = [] 

def sock(参数): 
    接收数据的代码 
    chgdata_future.append(chgdata_pool.submit(chgdata,参数) )   ##异步委托chgdata 
    其它代码 

def chgdata(参数): 
    清洗数据的代码 
    savedata_future.append(savedata_pool.submit(savedata,参数)) ##异步委托savedata 
    其它代码 

def savedata(参数): 
    准备保存数据 
    lock.acquire()   #加个互斥锁 
    cursor.execute(command,data)  #保存到数据库 
    lock.release() #释放锁 
    其它代码 

if __name__ == '__main__': 
    准备工作
    sock_future = [sock_pool.submit(sock,p) for p in iplist] 
    for f in sock_future: 
        f.result() 
    for f in chgdata_future: 
        f.result() 
    for f in saveda'ta_future: 
        f.result() 
    conn.commit()   #视实际需求放在 savadata 中
    conn.close()

  这里解释一下。由于所有线程共用一个conn,而cursor.execute()是独占式的,因此需加锁互斥使用此连接,否则会出现错误提示:pymysql.err.InterfaceError: (0, '') ,关于这个错误提示,StackOverflow 的说法是因为conn已被关闭,这个说法不完整,准确的说法应该是:conn已被关闭或已被独占。
  多线程保存数据到数据库时要小心,以保存到mysql为例,一般有如下几个方法:
  1. 每个线程拥有独立的连接
  2. 所有线程共用一个连接,则需加锁互斥使用此连接 (上述代码使用这种方式)
  3. 所有线程共用一个连接池,需要考虑线程总数和连接池连接数上限的问题
  彩蛋,多线程网络请求时容易出现错误提示Max retries exceeded with url:

    requests.adapters.DEFAULT_RETRIES = 5 
    time.sleep(1)    #减低网络请求的频率。如果实在不愿意,请使用代理。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容