python3 线程池和异常处理

  1. 引用
  2. 线程池的基本使用
    as_completed
    wait
    map
  3. 线程池的异常处理
  4. 进程池用法

引用

Python中已经有了threading模块,为什么还需要线程池呢,线程池又是什么东西呢?举个爬虫的例子 需要控制同时爬取的线程数,例子中创建了20个线程,而同时只允许3个线程在运行,但是20个线程都需要创建和销毁,线程的创建是需要消耗系统资源的,有没有更好的方案呢?其实只需要三个线程就行了,每个线程各分配一个任务,剩下的任务排队等待,当某个线程完成了任务的时候,排队任务就可以安排给这个线程继续执行。

这就是最简单的线程池思想,但是自己编写线程池很难写的比较完美,还需要考虑复杂情况下的线程同步,很容易发生死锁。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutorProcessPoolExecutor两个类,实现了对threadingmultiprocessing的进一步抽象(这里主要关注线程池),不仅可以帮我们自动调度线程,还可以做到:

  1. 主线程可以获取某一个线程(或者任务的)的状态,以及返回值。
  2. 当一个线程完成的时候,主线程能够立即知道。
  3. 让多线程和多进程的编码接口一致。

python 线程池的使用

在python3以上版本中, python线程|进程池的使用进行了改进, 其中封装度较高的方法就是concurrent.futures模块提供的接口, 下面主要使用的就是concurrent模块的使用.

简单使用
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor
import time

# 参数times用来模拟网络请求的时间
def time_block(times):
    '''执行阻塞'''
    print("time sleep {0} sec".format(times))
    time.sleep(times)
    return times

executor = ThreadPoolExecutor(max_workers=2)
# 通过submit函数提交执行的函数到线程池中,submit函数立即返回,不阻塞
task1 = executor.submit(time_block, (3))
task2 = executor.submit(time_block, (2))
# done方法用于判定某个任务是否完成
print(task1.done()) # True | False
# cancel方法用于取消某个任务,该任务没有放入线程池中才能取消成功
print(task2.cancel())
time.sleep(2)
print(task1.done())
# result方法可以获取task的执行结果
print(task1.result())

ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程池中最多能同时运行的线程数目。
使用submit函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图)submit()不是阻塞的,而是立即返回.
通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。上面的例子可以看出,由于任务有2s的延时,在task1提交后立刻判断,task1还未完成,而在延时4s之后判断,task1就完成了。
使用cancel()方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,所以取消失败。如果改变线程池的大小为1,那么先提交的是task1,task2还在排队等候,这是时候就可以成功取消。
使用result()方法可以获取任务的返回值。查看内部代码,发现这个方法是阻塞的。

线程池实现阻塞的三种方法 (join方法)

推荐使用 as_completed 方法, 可对返回状态结果灵活操作

as_completed

上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断啊。有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用as_completed方法一次取出所有任务的结果。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time

# 参数times用来模拟网络请求的时间
def time_block(times):
    '''执行阻塞'''
    print("time sleep {0} sec".format(times))
    time.sleep(times)
    return times

executor = ThreadPoolExecutor(max_workers=2)
task_list = [executor.submit(time_block, i) for i in range(5)]
for task in as_completed(task_list):
    data = task.result()
    print('thread data: {0}'.format(data))
print('任务完成')
wait

wait 方法可以让主线程阻塞,直到满足设定的要求。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time

# 参数times用来模拟网络请求的时间
def time_block(times):
    '''执行阻塞'''
    print("time sleep {0} sec".format(times))
    time.sleep(times)
    return times

executor = ThreadPoolExecutor(max_workers=2)
task_list = [executor.submit(time_block, i) for i in range(5)]
wait(task_list)
print('任务完成')
map

executor.map 与 内置函数 map 操作方法一致

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time

# 参数times用来模拟网络请求的时间
def time_block(times):
    '''执行阻塞'''
    print("time sleep {0} sec".format(times))
    time.sleep(times)
    return times

executor = ThreadPoolExecutor(max_workers=2)
time_list = [1,5,5,6,2,2]
for result in executor.map(time_block, time_list):
    print(result)
print('任务完成')

线程池的异常处理

在使用python3的线程池过程中, 发现抛出的异常并不能在主线程中捕获输出, 在发现bug之前一直以为线程模块正常
原因: 查看源码可以输出的异常在返回主线程之前已经被捕获处理(异常的子线程会被内部结束运行),并不会抛出(raise), 所以需要我们手动捕获抛出异常

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
import time
import traceback

# 参数times用来模拟网络请求的时间
def time_block(times):
   '''执行阻塞'''
   print("time sleep {0} sec".format(times))
   time.sleep(times)
   if times // 2:
       return times / 0
   return times

executor = ThreadPoolExecutor(max_workers=2)
time_list = [1,2,5,6,2,2,3]
task_list = [executor.submit(time_block, i) for i in time_list]
for task in as_completed(task_list):
   try:
       print(task.result())
   except Exception as e:
       print(traceback.print_exc(e))
       # print('捕获异常: {0}'.format(e))
print('任务完成')

进程池用法

ThreadPoolExecutor 进程池接口封装与线程池用法一致
把ThreadPoolExecutor换成ProcessPoolExecutor其余用法全部相同

相关引用

https://docs.python.org/3/library/concurrent.futures.html
https://blog.csdn.net/makinglj/article/details/98084973
https://www.jianshu.com/p/b9b3d66aa0be

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351