current.futures —启动并行任务

3.2 版中的新Function。

源代码: Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures模块提供了用于异步执行可调用对象的高级接口。

异步执行可以使用ThreadPoolExecutor使用线程执行,也可以使用ProcessPoolExecutor使用单独的进程执行。两者都实现了相同的接口,该接口由抽象Executor类定义。

Executor Objects

  • 类别 concurrent.futures. Executor
    • 提供用于异步执行调用的方法的抽象类。它不应直接使用,而应pass其具体子类使用。

Note

  • submit(* fn *, *args * kwargs *)
  • 调度可调用对象* fn *以fn(*args **kwargs)的形式执行,并返回一个Future对象,该对象表示可调用对象的执行。
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
  • map(* func * iterables timeout = None chunksize = 1 *)

  • map(func, *iterables)类似,除了:

  • “可迭代项目”被立即收集而不是懒惰地收集;

    • func 是异步执行的,并且可以同时进行多个对 func *的调用。

如果调用了next(),则返回的迭代器将引发concurrent.futures.TimeoutError,并且从原始调用Executor.map()的* timeout *秒后,结果将不可用。 * timeout 可以是 int 或 float。如果未指定 timeout *或None,则 await 时间没有限制。

如果* func *调用引发异常,则从迭代器中检索其值时,将引发该异常。

使用ProcessPoolExecutor时,此方法将* iterables 切成许多块,作为单独的任务提交给池。可以pass将 chunksize 设置为正整数来指定这些块的(大约)大小。对于非常长的可迭代对象,与默认大小 1 相比,对 chunksize 使用较大的值可以显着提高性能。使用ThreadPoolExecutor时, chunksize *无效。

在版本 3.5 中更改:添加了* chunksize *参数。

  • shutdown(* wait = True *)
  • 通知执行者,当当前未决的期货执行完毕时,它应该释放其正在使用的任何资源。关机后对Executor.submit()Executor.map()的调用将引发RuntimeError

如果* wait True,则该方法将在所有未完成的期货执行完毕且与执行者相关的资源已被释放之前不返回。如果 wait False,则此方法将立即返回,并且当所有未决的期货执行完毕时,与执行者关联的资源将被释放。不管 wait *的值如何,直到所有未完成的期货执行完毕,整个 Python 程序才会退出。

如果使用with语句,则可以避免显式调用此方法,这将关闭Executor(await 时,就像在将* wait *设置为True的情况下调用Executor.shutdown()一样):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

ThreadPoolExecutor

ThreadPoolExecutorExecutor子类,它使用线程池异步执行调用。

当与Future关联的可调用对象 await 另一个Future的结果时,就会发生死锁。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6

executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

And:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
    • class * concurrent.futures. ThreadPoolExecutor(* max_workers = None thread_name_prefix ='' initializer = None initargs =()*)
    • 一个Executor子类,最多使用* max_workers *个线程池来异步执行调用。
  • initializer *是一个可选的可调用对象,它在每个工作线程的开始处被调用; * initargs 是传递给初始化程序的参数的 Tuples。如果 initializer *引发异常,则所有当前待处理的作业都会引发BrokenThreadPool,以及任何try向池中提交更多作业的try。

在版本 3.5 中进行了更改:如果* max_workers *为None或未提供None,它将默认为计算机上的处理器数量乘以5,假设ThreadPoolExecutor通常用于重叠 I/O 而不是 CPU 工作,并且 Worker 数应高于ProcessPoolExecutor的 Worker 数。

3.6 版中的新Function:添加了* thread_name_prefix *参数,以允许用户控制由池创建的工作线程的threading.Thread名称,以便于调试。

在 3.7 版中进行了更改:添加了* initializer initargs *参数。

在 3.8 版中更改:* max_workers *的默认值更改为min(32, os.cpu_count() + 4)。此默认值为 I/O 绑定任务保留至少 5 个工作线程。它最多使用 32 个 CPU 内核来执行释放 GIL 的 CPU 绑定任务。而且,它避免在多核计算机上隐式使用非常大的资源。

ThreadPoolExecutor 现在也可以在启动* max_workers *工作线程之前重用空闲的工作线程。

ThreadPoolExecutor Example

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

ProcessPoolExecutor类是Executor子类,它使用进程池来异步执行调用。 ProcessPoolExecutor使用multiprocessing模块,这使其可以避开全局翻译锁,但也意味着只能执行和返回可拾取对象。

__main__模块必须可由工作程序子进程导入。这意味着ProcessPoolExecutor在交互式解释器中将不起作用。

从已提交给ProcessPoolExecutor的可调用对象中调用ExecutorFuture方法将导致死锁。

    • class * concurrent.futures. ProcessPoolExecutor(* max_workers = None mp_context = None initializer = None initargs =()*)
    • Executor子类使用最多* max_workers 个进程的池异步执行调用。如果 max_workers None或未提供,则默认为机器上的处理器数量。如果 max_workers 小于或等于0,则将引发ValueError。在 Windows 上, max_workers 必须等于或小于61。如果不是,则将引发ValueError。如果 max_workers *为None,则即使有更 multiprocessing 器可用,默认选择最多为61。 * mp_context 可以是 multiprocessing 上下文,也可以是 None。它将用来发动 Worker。如果 mp_context *为None或未提供,则使用默认的 multiprocessing 上下文。
  • initializer *是一个可选的可调用对象,它在每个工作进程开始时被调用; * initargs 是传递给初始化程序的参数的 Tuples。如果 initializer *引发异常,则所有当前暂挂的作业都将引发BrokenProcessPool,以及任何try向池中提交更多作业的try。

在版本 3.3 中进行了更改:当一个工作进程突然终止时,现在引发BrokenProcessPool错误。以前,行为是不确定的,但是对 Actuator 或其期货的操作通常会冻结或死锁。

在版本 3.7 中进行了更改:添加了* mp_context *参数,以允许用户控制由池创建的工作进程的 start_method。

添加了* initializer initargs *参数。

ProcessPoolExecutor Example

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Future Objects

Future类封装了可调用对象的异步执行。 Future实例由Executor.submit()创建。

  • 类别 concurrent.futures. Future
    • 封装可调用对象的异步执行。 Future实例是由Executor.submit()创建的,除测试外,不应直接创建。

Note

  • cancel ( )

  • try取消呼叫。如果该调用当前正在执行或已结束运行,并且无法取消,则该方法将返回False,否则该调用将被取消并且该方法将返回True

  • cancelled ( )

  • 如果呼叫成功取消,则返回True

  • running ( )

  • 如果当前正在执行呼叫且无法取消,则返回True

  • done ( )

  • 如果呼叫已成功取消或结束运行,则返回True

  • result(* timeout = None *)

  • 返回调用返回的值。如果呼叫尚未完成,则此方法将 await超时秒。如果通话未在超时秒内完成,则将引发concurrent.futures.TimeoutError。 * timeout 可以是 int 或 float。如果未指定 timeout *或None,则 await 时间没有限制。

如果在完成之前取消了 Future,则将提高CancelledError

如果调用引发,则此方法将引发相同的异常。

  • exception(* timeout = None *)
  • 返回调用引发的异常。如果呼叫尚未完成,则此方法将 await超时秒。如果通话未在超时秒内完成,则将引发concurrent.futures.TimeoutError。 * timeout 可以是 int 或 float。如果未指定 timeout *或None,则 await 时间没有限制。

如果在完成之前取消了 Future,则将提高CancelledError

如果呼叫完成而没有加注,则返回None

  • add_done_callback(* fn *)
  • 将可调用的* fn 附加到将来。当取消或结束运行时,将调用 fn *,并将 future 作为其唯一参数。

添加的可调用对象按添加 Sequences 被调用,并且始终在属于添加它们的进程的线程中调用。如果可调用对象引发Exception子类,它将被记录并忽略。如果可调用对象引发BaseException子类,则该行为未定义。

如果 Future 已经完成或被取消,* fn *将立即被调用。

以下Future方法适用于单元测试和Executor实现。

Note

  • set_running_or_notify_cancel ( )
  • 仅在执行与Future关联的工作之前,由Executor实现调用此方法,并由单元测试调用。

如果该方法返回False,则Future被取消,即Future.cancel()被调用并返回 True。awaitFuture完成(即passas_completed()wait())的所有线程都将被唤醒。

如果该方法返回True,则Future未被取消并已处于运行状态,即对Future.running()的调用将返回 True。

该方法只能被调用一次,并且不能在调用Future.set_result()Future.set_exception()之后调用。

  • set_result(结果)
  • 将与Future关联的工作结果设置为* result *。

此方法仅应由Executor实现和单元测试使用。

在版本 3.8 中更改:如果Future已经完成,则此方法引发concurrent.futures.InvalidStateError

  • set_exception(* exception *)
  • 将与Future关联的工作结果设置为Exception * exception *。

此方法仅应由Executor实现和单元测试使用。

在版本 3.8 中更改:如果Future已经完成,则此方法引发concurrent.futures.InvalidStateError

Module Functions

  • concurrent.futures. wait(* fs timeout = None return_when = ALL_COMPLETED *)
    • await* fs *给出的Future实例(可能是由不同的Executor实例创建)完成。返回一组命名的 2Tuples。第一组名为done,包含在 await 完成之前完成的期货(完成或取消的期货)。第二组名为not_done,包含未完成的期货(待定或正在运行的期货)。
  • timeout *可用于控制返回之前 await 的最大秒数。 * timeout 可以是 int 或 float。如果未指定 timeout *或None,则 await 时间没有限制。

  • return_when *指示该函数何时应返回。它必须是以下常量之一:

ConstantDescription
FIRST_COMPLETED以后完成或取消操作时,该函数将返回。
FIRST_EXCEPTION当将来pass引发异常结束时,该函数将返回。如果没有将来引发异常,则它等效于ALL_COMPLETED
ALL_COMPLETED当所有期货结束或被取消时,该函数将返回。
  • concurrent.futures. as_completed(* fs timeout = None *)
    • 返回由* fs *给出的Future实例(可能由不同的Executor实例创建)的迭代器,该迭代器在完成时生成期货(完成或取消的期货)。 * fs 给定的任何重复的期货将被退回一次。在as_completed()之前完成的所有期货都将首先产生。如果调用了next(),则返回的迭代器将引发concurrent.futures.TimeoutError,并且从原始调用as_completed() timeout *秒后,结果将不可用。 * timeout 可以是 int 或 float。如果未指定 timeout *或None,则 await 时间没有限制。

See also

  • PEP 3148 –期货-异步执行计算

  • 将该特性描述为包含在 Python 标准库中的提案。

Exception classes

  • exception concurrent.futures. CancelledError

    • 在取消 Future 时引发。
  • exception concurrent.futures. TimeoutError

    • 当将来的操作超过给定的超时时引发。
  • exception concurrent.futures. BrokenExecutor

    • RuntimeError派生,当执行程序由于某种原因而break时,将引发此异常类,并且该异常类不能用于提交或执行新任务。

3.7 版中的新Function。

  • exception concurrent.futures. InvalidStateError
    • 在当前状态下不允许在将来执行某项操作时引发。

3.8 版的新Function。

  • exception concurrent.futures.thread. BrokenThreadPool
    • BrokenExecutor派生,当ThreadPoolExecutor的工作程序之一初始化失败时,引发此异常类。

3.7 版中的新Function。

  • exception concurrent.futures.process. BrokenProcessPool
    • BrokenExecutor(以前是RuntimeError)派生而来,当ProcessPoolExecutor的一个工作线程以非干净的方式终止时(例如,如果它是从外部杀死的),则引发此异常类。

版本 3.3 中的新Function。