管理并发任务池 concurrent.futures

concurrent.futures — Manage Pools of Concurrent Tasks

concurrent.futures - 管理并发任务池

Purpose: Easily manage tasks running concurrently and in parallel.
目的:很容易并行管理并发运行的任务

The concurrent.futures modules provides interfaces for running tasks using pools of thread or process workers. The APIs are the same, so applications can switch between threads and processes with minimal changes.
concurrent.futures 模块提供了用于使用线程池或者进程池运行任务的接口。API是相同的,因此应用程序可以在线程和进程之间以最小的代价切换。

The module provides two types of classes for interacting with the pools. Executors are used for managing pools of workers, and futures are used for managing results computed by the workers. To use a pool of workers, an application creates an instance of the appropriate executor class and then submits tasks for it to run. When each task is started, a Future instance is returned. When the result of the task is needed, an application can use the Future to block until the result is available. Various APIs are provided to make it convenient to wait for tasks to complete, so that the Future objects do not need to be managed directly.
模块提供了两种类型的类来与池交互,Executors用于管理工人池,future用于管理由工人计算得到的结果。想要使用工人池,程序应用就必须创建一个合适的执行类的实例,然后向其提交任务来运行。当每一个任务开始(也许是想说结束)时,就返回一个Future实例。当需要任务当结果,应用程序可以使用Future来阻塞,直到得到结果。对于等待任务运行结束,提供了大量的API,因此Future对象并不需要直接管理。

Using map() with a Basic Thread Pool

使用map()操作一个基本的线程池

The ThreadPoolExecutor manages a set of worker threads, passing tasks to them as they become available for more work. This example uses map() to concurrently produce a set of results from an input iterable. The task uses time.sleep() to pause a different amount of time to demonstrate that, regardless of the order of execution of concurrent tasks, map() always returns the values in order based on the inputs.
ThreadPoolExecutor管理了一个工人线程集合,一旦该集合能够接受任务,就向其传递更多的任务。本示例使用map()通过迭代的输入来并发的生成结果的结合。任务使用time.sleep()来暂停不同数量的时间来展示,与并发任务执行的顺序相比,map()总是返回基于输入顺序的值。

futures_thread_pool_map.py

from concurrent import futures
import threading
import time


def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
results = ex.map(task, range(5, 0, -1))
print('main: unprocessed results {}'.format(results))
print('main: waiting for real results')
real_results = list(results)
print('main: results: {}'.format(real_results))

The return value from map() is actually a special type of iterator that knows to wait for each response as the main program iterates over it.
来自于map()的返回值实际上是一个特殊类型的迭代器,该迭代器知道为每次响应等待,正如主程序迭代一样。

$ python3 futures_thread_pool_map.py

main: starting
Thread-1: sleeping 5
Thread-2: sleeping 4
main: unprocessed results <generator object
Executor.map.<locals>.result_iterator at 0x1013c80a0>
main: waiting for real results
Thread-2: done with 4
Thread-2: sleeping 3
Thread-1: done with 5
Thread-1: sleeping 2
Thread-1: done with 2
Thread-1: sleeping 1
Thread-2: done with 3
Thread-1: done with 1
main: results: [0.5, 0.4, 0.3, 0.2, 0.1]
Scheduling Individual Tasks

In addition to using map(), it is possible to schedule an individual task with an executor using submit(), and use the Future instance returned to wait for that task’s results.
想要额外的使用map()函数,可以调度一个独立的任务,使用submit()执行,然后使用Future示例返回等待任务的结果。

futures_thread_pool_submit.py

from concurrent import futures
import threading
import time


def task(n):
    print('{}: sleeping {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: done with {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)
print('main: future: {}'.format(f))
print('main: waiting for results')
result = f.result()
print('main: result: {}'.format(result))
print('main: future after result: {}'.format(f))

The status of the future changes after the tasks is completed and the result is made available.
future的状态在任务完成之后将会改变,结果就变的可见。

$ python3 futures_thread_pool_submit.py

main: starting
Thread-1: sleeping 5
main: future: <Future at 0x1010e6080 state=running>
main: waiting for results
Thread-1: done with 5
main: result: 0.5
main: future after result: <Future at 0x1010e6080 state=finished
returned float>
Waiting for Tasks in Any Order

Invoking the result() method of a Future blocks until the task completes (either by returning a value or raising an exception), or is canceled. The results of multiple tasks can be accessed in the order the tasks were scheduled using map(). If it does not matter what order the results should be processed, use as_completed() to process them as each task finishes.
调用Future语句块的result()方法,直到任务完成(要么通过返回一个值,或者抛出异常),或者取消任务。多个任务的结果能够以使用map()函数调度的顺序访问。如果无所谓以什么样子的顺序处理结果,使用as_completed()函数处理它们,作为任务已经结束。

futures_as_completed.py

from concurrent import futures
import random
import time


def task(n):
    time.sleep(random.random())
    return (n, n / 10)


ex = futures.ThreadPoolExecutor(max_workers=5)
print('main: starting')

wait_for = [
    ex.submit(task, i)
    for i in range(5, 0, -1)
]

for f in futures.as_completed(wait_for):
    print('main: result: {}'.format(f.result()))

Because the pool has as many workers as tasks, all of the tasks can be started. They finish in a random order so the values generated by as_completed() are different each time the example runs.
由于pool中拥有和任务一样多的workers,所有的入围都能够开始。而且以随机的顺序结束,因此,在每次运行as_completed()生成的值都是不同的。

$ python3 futures_as_completed.py

main: starting
main: result: (3, 0.3)
main: result: (5, 0.5)
main: result: (4, 0.4)
main: result: (2, 0.2)
main: result: (1, 0.1)

Future Callbacks

Future回调

To take some action when a task completed, without explicitly waiting for the result, use add_done_callback() to specify a new function to call when the Future is done. The callback should be a callable taking a single argument, the Future instance.
当任务完成后,想要执行一些操作,而不必显式的等待结果,可以使用add_done_callback()在Future完成后,来指定调用一个新的函数。callback将是一个可调用的参数,Future的实例。

futures_future_callback.py

from concurrent import futures
import time


def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        error = fn.exception()
        if error:
            print('{}: error returned: {}'.format(
                fn.arg, error))
        else:
            result = fn.result()
            print('{}: value returned: {}'.format(
                fn.arg, result))


if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    f = ex.submit(task, 5)
    f.arg = 5
    f.add_done_callback(done)
    result = f.result()

The callback is invoked regardless of the reason the Future is considered “done,” so it is necessary to check the status of the object passed in to the callback before using it in any way.
无论Future以任何原因被认定为完成,回调函数都将会被调用。因此,在任何时候,使用之前都有必要检查对象是否传入回调函数。

$ python3 futures_future_callback.py

main: starting
5: sleeping
5: done
5: value returned: 0.5
Canceling Tasks

A Future can be canceled, if it has been submitted but not started, by calling its cancel() method.
Future可以取消,如果已经提交,但并未启动,就可以通过调用cancel()方法。

futures_future_callback_cancel.py

from concurrent import futures
import time


def task(n):
    print('{}: sleeping'.format(n))
    time.sleep(0.5)
    print('{}: done'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: canceled'.format(fn.arg))
    elif fn.done():
        print('{}: not canceled'.format(fn.arg))


if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('main: starting')
    tasks = []

    for i in range(10, 0, -1):
        print('main: submitting {}'.format(i))
        f = ex.submit(task, i)
        f.arg = i
        f.add_done_callback(done)
        tasks.append((i, f))

    for i, t in reversed(tasks):
        if not t.cancel():
            print('main: did not cancel {}'.format(i))

    ex.shutdown()

cancel() returns a Boolean indicating whether or not the task was able to be canceled.
cancel()返回一个布尔值,表明该任务是否能被取消。

$ python3 futures_future_callback_cancel.py

main: starting
main: submitting 10
10: sleeping
main: submitting 9
9: sleeping
main: submitting 8
main: submitting 7
main: submitting 6
main: submitting 5
main: submitting 4
main: submitting 3
main: submitting 2
main: submitting 1
1: canceled
2: canceled
3: canceled
4: canceled
5: canceled
6: canceled
7: canceled
8: canceled
main: did not cancel 9
main: did not cancel 10
10: done
10: not canceled
9: done
9: not canceled
Exceptions in Tasks

If a task raises an unhandled exception, it is saved to the Future for the task and made available through the result() or exception() methods.
如果任务抛出一个未处理的异常,对于当前task自函数,该异常就被存入Future,然后使其对于result()或者exception()方法均可见。

futures_future_exception.py

from concurrent import futures


def task(n):
    print('{}: starting'.format(n))
    raise ValueError('the value {} is no good'.format(n))


ex = futures.ThreadPoolExecutor(max_workers=2)
print('main: starting')
f = ex.submit(task, 5)

error = f.exception()
print('main: error: {}'.format(error))

try:
    result = f.result()
except ValueError as e:
    print('main: saw error "{}" when accessing result'.format(e))

If result() is called after an unhandled exception is raised within a task function, the same exception is re-raised in the current context.
在task函数中,如果一个未处理的异常被抛出,调用result()函数,相同的异常就会在当前上下文中重新抛出。

$ python3 futures_future_exception.py

main: starting
5: starting
main: error: the value 5 is no good
main: saw error "the value 5 is no good" when accessing result

Context Manager

上下文管理器

Executors work as context managers, running tasks concurrently and waiting for them all to complete. When the context manager exits, the shutdown() method of the executor is called.
Executors以上下文管理器的方式工作,同步运行任务,然后等待这些任务全部完成。当上下文管理器退出后,executor的shutdown()方法就被调用。

futures_context_manager.py

from concurrent import futures

def task(n):
    print(n)


with futures.ThreadPoolExecutor(max_workers=2) as ex:
    print('main: starting')
    ex.submit(task, 1)
    ex.submit(task, 2)
    ex.submit(task, 3)
    ex.submit(task, 4)

print('main: done')

This mode of using the executor is useful when the thread or process resources should be cleaned up when execution leaves the current scope.
当执行退出当前作用域时,所有的线程和进程资源都被清理,这种使用executor的模式就非常有用。

$ python3 futures_context_manager.py

main: starting
1
2
3
4
main: done

Process Pools

进程池

The ProcessPoolExecutor works in the same way as ThreadPoolExecutor, but uses processes instead of threads. This allows CPU-intensive operations to use a separate CPU and not be blocked by the CPython interpreter’s global interpreter lock.
ProcessPoolExecutor的工作方式与ThreadPoolExecutor一致,但是使用进程替代线程。这允许CPU敏感的操作使用不同的CPU,而不被CPython解释器的全局线程锁阻塞。

futures_process_pool_map.py

from concurrent import futures
import os


def task(n):
    return (n, os.getpid())


ex = futures.ProcessPoolExecutor(max_workers=2)
results = ex.map(task, range(5, 0, -1))
for n, pid in results:
    print('ran task {} in process {}'.format(n, pid))

As with the thread pool, individual worker processes are reused for multiple tasks.
和线程池一样,独立的工作进程被用于多重任务。

$ python3 futures_process_pool_map.py

ran task 5 in process 60245
ran task 4 in process 60246
ran task 3 in process 60245
ran task 2 in process 60245
ran task 1 in process 60245

If something happens to one of the worker processes to cause it to exit unexpectedly, the ProcessPoolExecutor is considered “broken” and will no longer schedule tasks.
如果某个工作进程发生异常,导致意外的退出,ProcessPoolExecutor就被认为“被破坏”,从而不再由新的任务调度产生。

futures_process_pool_broken.py

from concurrent import futures
import os
import signal


with futures.ProcessPoolExecutor(max_workers=2) as ex:
    print('getting the pid for one worker')
    f1 = ex.submit(os.getpid)
    pid1 = f1.result()

    print('killing process {}'.format(pid1))
    os.kill(pid1, signal.SIGHUP)

    print('submitting another task')
    f2 = ex.submit(os.getpid)
    try:
        pid2 = f2.result()
    except futures.process.BrokenProcessPool as e:
        print('could not start new tasks: {}'.format(e))

The BrokenProcessPool exception is actually thrown when the results are processed, rather than when the new task is submitted.
当结果处理完成后,BrokenProcessPool 异常被抛出,而不是等到新的任务被提交。

$ python3 futures_process_pool_broken.py

getting the pid for one worker
killing process 62059
submitting another task
could not start new tasks: A process in the process pool was
terminated abruptly while the future was running or pending.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容

  • **2014真题Directions:Read the following text. Choose the be...
    又是夜半惊坐起阅读 9,460评论 0 23
  • A医生,你无意杀我父亲,我父亲却因你而死. 在昨天下午之前,我一直都把你当做我的同事. 父亲第一次肠穿孔病危之时,...
    世涂花开阅读 326评论 2 1
  • 茶文化背景 中国是茶的故乡,也是茶文化的发源地。中国茶的发现和利用,在中国已有四五千年历史,且长盛不衰,传遍全球。...
    水泰水媒体阅读 936评论 0 0
  • 如云朵掠过天空整个世界装不满心灵投下一波秋色山在天际肃穆庄严 经过春秋编织的风景托钵而行一场跋涉如果卷帘就能看到谁...
    幽兰33阅读 206评论 -1 11
  • 最近工作上的一通乱忙,昨日终于告一段落,细细想起来好像不全是自己范围内的工作,别人放手不干的我都囫囵吞枣的拾掇...
    123生活阅读 157评论 0 0