ThreadPoolExecutor
是 Executor
的子类,它使用线程池来异步执行调用。
class ThreadPoolExecutor(_base.Executor):
# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().__next__
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
关于concurrent.futures模块下的ThreadPoolExecutor类
在使用submit的时候,如果参数传进去的是生成器对象,在某些情况下,生成器对象会被消耗掉一部分或者是全部的数据
具体如下demo展示:
#!/usr/bin/env Python
# -- coding: utf-8 --
"""
@version: v1.0
@author: huangyc
@file: test.py
@Description:
@time: 2021/6/3 11:19
"""
from concurrent import futures
from itertools import groupby, count, tee
from typing import Iterable
def iter_slice_tool(iterator: Iterable, batch_size: int = 5):
"""生成器 访问工具方法 支持每次取 n个元素"""
yield from groupby(iterator, key=lambda _, c=count(): next(c) // batch_size)
def gen_datas():
yield from range(30)
def single_task(samples):
print(list(samples))
if __name__ == '__main__':
num = 4
tasks = []
executor = futures.ThreadPoolExecutor(max_workers=num)
batch_size = 6
# 尝试一
"""
数据会无缘无故就少了,可以调数据量和batch_size 会看到不一样的缺失结果
[0, 1, 2, 3, 4, 5]
[6, 7, 8, 9, 10, 11]
[12, 13, 14, 15, 16, 17]
[18, 19, 20, 21, 22, 23]
[29]
"""
for key, sample in iter_slice_tool(gen_datas(), batch_size=batch_size):
# sample_copy, sample = tee(sample, 2) 这个加不加 效果一样
sample_copy, sample = tee(sample, 2)
task = executor.submit(single_task, samples=sample)
tasks.append(task)
[future.result() for future in futures.as_completed(tasks)]
print()
# 尝试二
"""
[0, 1, 2, 3, 4, 5]
[6, 7, 8, 9, 10, 11]
[12, 13, 14, 15, 16, 17]
[18, 19, 20, 21, 22, 23]
[24, 25, 26, 27, 28, 29]
"""
for key, sample in iter_slice_tool(gen_datas(), batch_size=batch_size):
sample = list(sample)
task = executor.submit(single_task, samples=sample)
tasks.append(task)
[future.result() for future in futures.as_completed(tasks)]
print()
# 尝试三
"""
[0, 1, 2, 3, 4, 5]
[6, 7, 8, 9, 10, 11]
[12, 13, 14, 15, 16, 17]
[18, 19, 20, 21, 22, 23]
[24, 25, 26, 27, 28, 29]
"""
for key, sample in iter_slice_tool(gen_datas(), batch_size=batch_size):
sample_copy, sample = tee(sample, 2)
list(sample_copy)
task = executor.submit(single_task, samples=sample)
tasks.append(task)
[future.result() for future in futures.as_completed(tasks)]
以上示例中,尝试二部分是正常且保证是没有问题
而尝试一则会在submit的时候被消耗掉一部分的数据
尝试三这里先利用tee,复制出两个副本,并且调用了其中一个转list,另一个丢给submit方法,这种情况下,数据不会产生丢失
sample_copy, sample = tee(sample, 2)
list(sample_copy)
两个问题
问题一:生成器对象为什么会在submit的时候,丢失了部分数据?
问题二:尝试三这里复制了副本,对其中一个转list,就不会丢失数据,不转list还是会丢失数据,又是什么原理?
不知道有没知情人士可以帮忙解答下,不胜感激、