协同程序和任务

本节概述了与协程和任务一起使用的高级异步 API。

Coroutines

使用 async/await 语法语句的Coroutines是编写 asyncio 应用程序的首选方法。例如,以下代码段(需要 Python 3.7)打印“ hello”,await1 秒钟,然后打印“ world”:

>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

请注意,仅调用协程不会调度它的执行:

>>> main()
<coroutine object main at 0x1053bb7c8>

为了实际运行协程,asyncio 提供了三种主要机制:

  • asyncio.run()函数用于运行顶级入口点“ main()”函数(请参见上面的示例.)

  • await 协程。以下代码段将在 await1 秒钟后显示“ hello”,然后在 await另一 2 秒后显示“ world”:

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

Expected output:

started at 17:13:52
hello
world
finished at 17:13:55

让我们修改上面的示例,并同时运行两个say_after协程:

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

请注意,现在的预期输出显示该代码段比以前快了 1 秒:

started at 17:14:32
hello
world
finished at 17:14:34

Awaitables

我们说如果可以在await表达式中使用对象,则该对象是“可 await”对象。许多异步 API 被设计为接受 await。

  • awaitable 对象有三种主要类型:协程 ,任务 和期货*。

Coroutines

Python 协程是* waitables *,因此可以从其他协程中 await:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

Important

在本文档中,术语“协程”可用于两个紧密相关的概念:

  • 协程函数async def函数;

  • 协程对象:pass调用协程函数返回的对象。

asyncio 还支持旧版generator-based协程。

Tasks

任务用于同时安排协程。

当协程被包装成具有诸如_之类的Function的* Task *时,协程将自动计划很快运行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Futures

Future是特殊的“低级别”await 对象,表示异步操作的“finally结果”。

一个 Future 对象await*时,这意味着协程将 await,直到在其他地方解析该 Future。

需要在 asyncio 中使用将来的对象,以允许将基于回调的代码与 async/await 一起使用。

通常,“不需要”在应用程序级代码中创建 Future 对象。

将来的对象(有时由库和一些 asyncio API 公开)可以 await:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

一个返回 Future 对象的低级函数的一个好例子是loop.run_in_executor()

运行异步程序

  • asyncio. run(* coro **,* debug = False *)

此函数运行传递的协程,从而 Managementasyncio 事件循环并“finally确定异步生成器”。

当另一个异步事件循环在同一线程中运行时,无法调用此函数。

如果* debug *为True,则事件循环将在调试模式下运行。

此函数始终创建一个新的事件循环,并在最后将其关闭。它应该用作 asyncio 程序的主要入口,理想情况下应仅调用一次。

Example:

async def main():
    await asyncio.sleep(1)
    print('hello')

asyncio.run(main())

3.7 版中的新Function。

Note

asyncio.run()的源代码可以在Lib/asyncio/runners.py中找到。

Creating Tasks

  • asyncio. create_task(* coro **,* name = None *)
    • 将* coro * coroutine包装为Task并安排其执行时间。返回任务对象。

如果* name *不是None,则使用Task.set_name()将其设置为任务的名称。

get_running_loop()返回的循环中执行任务,如果当前线程中没有正在运行的循环,则引发RuntimeError

Python 3.7 中已添加此Function 。在 Python 3.7 之前,可以使用低级asyncio.ensure_future()函数:

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...

3.7 版中的新Function。

在 3.8 版中进行了更改:添加了name参数。

Sleeping

  • 协程 asyncio. sleep(延迟结果=无,***,循环=无)
    • 阻塞延迟秒。

如果提供了* result *,则协程完成后会将其返回给调用方。

sleep()始终挂起当前任务,从而允许其他任务运行。

从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。

协程示例在 5 秒钟内每秒显示当前日期:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

同时运行任务

    • awaitable * asyncio. gather(** aws loop = None return_exceptions = False *)

如果在* aws *中 await 的是协程,它将自动安排为任务。

如果所有 await 项都成功完成,则结果将是返回值的汇总列表。结果值的 Sequences 对应于* aws *中的 awaitSequences。

如果* return_exceptions *为False(默认值),则第一个引发的异常将立即传播到在gather()上 await 的任务。 * aws *序列中的其他可 await 对象 不会被取消 ,并将 continue 运行。

如果* return_exceptions *为True,则将异常与成功结果相同,并汇总到结果列表中。

如果gather()被“取消”,则所有已提交的 await(尚未完成)也被“取消”。

如果* aws 序列中的任何 Task 或 Future 被 cancelled *取消,则将其视为引发CancelledError –在这种情况下**不会取消gather()调用。这是为了防止取消一个已提交的任务/Future 导致其他任务/Future 被取消。

从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。

Example:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

Note

如果* return_exceptions *为 False,则将其标记为已完成后取消 collect()不会取消任何已提交的 await。例如,可以在将异常传播给调用方之后将其标记为完成,因此,在从异常捕获到异常(由一个 await 对象引发)之后调用gather.cancel()不会取消任何其他 await 对象。

在版本 3.7 中进行了更改:如果* gather 本身被取消,则取消传播将与 return_exceptions *无关。

抵制取消

如果* aw *是协程,它将自动安排为任务。

The statement:

res = await shield(something())

等效于:

res = await something()

除非如果包含它的协程被取消,则在something()中运行的任务不会被取消。从something()的角度来看,没有发生取消。尽管其调用方仍被取消,所以“ await”表达式仍引发CancelledError

如果something()pass其他方式(即从自身内部)取消,那也将取消shield()

如果希望完全忽略取消操作(不建议这样做),则shield()函数应与 try/except 子句结合使用,如下所示:

try:
    res = await shield(something())
except CancelledError:
    res = None

从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。

Timeouts

  • 协程 asyncio. wait_for(* aw timeout **,* loop = None *)

如果* aw *是协程,它将自动安排为任务。

  • timeout 可以是None,也可以是 float 或 intawait 的秒数。如果 timeout *是None,则阻塞直到将来完成。

如果发生超时,它将取消任务并引发asyncio.TimeoutError

为避免任务cancellation,请将其包装在shield()中。

该函数将 await 直到实际上取消了将来,因此总 await 时间可能超过* timeout *。

如果 await 被取消,则将来的* aw *也将被取消。

从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。

Example:

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

在 3.7 版中进行了更改:当* aw 由于超时而被取消时,wait_forawait aw *被取消。以前,它立即引发asyncio.TimeoutError

Waiting Primitives

  • 协程 asyncio. wait(* aws **,* loop = None timeout = None return_when = ALL_COMPLETED *)
    • 同时在设置的* aws 中运行awaitable objects并阻塞直到 return_when *指定的条件。

返回两组任务/期货:(done, pending)

Usage:

done, pending = await asyncio.wait(aws)

如果指定,* timeout *(浮点数或整数)可用于控制返回之前 await 的最大秒数。

请注意,此函数不会引发asyncio.TimeoutError。发生超时时未完成的期货或任务仅在第二组中返回。

  • return_when *指示该函数何时应返回。它必须是以下常量之一:
ConstantDescription
FIRST_COMPLETED以后完成或取消操作时,该函数将返回。
FIRST_EXCEPTION当将来pass引发异常结束时,该函数将返回。如果没有将来引发异常,则它等效于ALL_COMPLETED
ALL_COMPLETED当所有期货结束或被取消时,该函数将返回。

wait_for()不同,wait()不会在发生超时时取消期货。

从 3.8 版开始不推荐使用:如果* aws *中的任何 await 程序都是协程,则它将自动安排为任务。不建议将协程对象直接传递到wait(),因为它会导致confusing behavior

从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。

Note

wait()自动将协程安排为 Tasks,然后在(done, pending)个集合中返回那些隐式创建的 Task 对象。因此,以下代码将无法正常工作:

async def foo():
return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
# This branch will never be run!

修复以上代码段的方法如下:

async def foo():
return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
# Everything will work as expected now.

从 3.8 版开始不推荐使用:不建议将协程对象直接传递给wait()

  • asyncio. as_completed(* aws **,* loop = None timeout = None *)
    • 同时在“ aws”集中运行awaitable objects。返回协程的迭代器。可以 await 返回的每个协程,以从其余的 await 组中获得最早的下一个结果。

如果在所有期货完成之前发生超时,则引发asyncio.TimeoutError

从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。

Example:

for coro in as_completed(aws):
    earliest_result = await coro
    # ...

从其他线程调度

  • asyncio. run_coroutine_threadsafe(* coro loop *)
    • 将协程提交到给定的事件循环。线程安全的。

返回concurrent.futures.Future以 await 另一个 OS 线程的结果。

该函数应从不同于运行事件循环的 OS 线程中调用。例:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果协程中出现异常,则将通知返回的 Future。它也可以用于取消事件循环中的任务:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

请参阅文档的并发和多线程部分。

与其他异步函数不同,此函数需要显式传递* loop *参数。

版本 3.5.1 中的新Function。

Introspection

  • asyncio. current_task(* loop = None *)
    • 返回当前正在运行的Task实例,如果没有任务正在运行,则返回None

如果* loop *为None get_running_loop()用于获取当前循环。

3.7 版中的新Function。

  • asyncio. all_tasks(* loop = None *)
    • 返回由循环运行的一组尚未完成的Task对象。

如果* loop *为None,则get_running_loop()用于获取当前循环。

3.7 版中的新Function。

Task Object

    • class * asyncio. Task(* coro **,* loop = None name = None *)

任务用于在事件循环中运行协程。如果协程 awaitFuture,则 Task 会暂停协程的执行并 awaitFuture 的完成。当 Future 是* done *时,包装的协程将 continue 执行。

事件循环使用协作调度:事件循环一次运行一个 Task。当任务 awaitFuture 完成时,事件循环将运行其他任务,回调或执行 IO 操作。

使用高级asyncio.create_task()函数创建任务,或使用低级loop.create_task()ensure_future()函数。不建议手动实例化 Task。

要取消正在运行的任务,请使用cancel()方法。调用它会导致 Task 向包装好的协程中抛出CancelledError异常。如果协程在取消过程中正在 awaitFuture 对象,则 Future 对象将被取消。

cancelled()可用于检查任务是否已取消。如果包装的协程没有抑制CancelledError异常并且实际上已取消,则该方法返回True

asyncio.TaskFuture继承除Future.set_result()Future.set_exception()之外的所有 API。

任务支持contextvars模块。创建 Task 时,它将复制当前上下文,然后在复制的上下文中运行其协程。

在 3.7 版中进行了更改:添加了对contextvars模块的支持。

在 3.8 版中进行了更改:添加了name参数。

从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。

  • cancel ( )
    • 要求取消任务。

这样可以安排在事件循环的下一个周期将CancelledError异常抛出到包装的协程中。

然后,协程有机会pass使用try…_5…finally块抑制异常来清理甚至拒绝该请求。因此,与Future.cancel()不同,Task.cancel()不能保证任务将被取消,尽管完全抑制取消并不常见,并且积极劝阻。

以下示例说明了协程如何拦截取消请求:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
  • cancelled ( )
    • 如果任务被“取消”,则返回True

当使用cancel()请求取消并且封装的协程将CancelledError异常抛出时,“任务”被“取消”。

  • done ( )
    • 如果 Task 是* done *,则返回True

当包装好的协程返回值,引发异常或取消了任务时,“完成”任务。

  • result ( )
    • 返回任务的结果。

如果 Task 为* done *,则返回包装的协程的结果(或者如果协程引发异常,则重新引发该异常.)

如果 Task 已被* cancelled *取消,则此方法引发CancelledError异常。

如果“任务”的结果尚不可用,则此方法将引发InvalidStateError异常。

  • exception ( )
    • 返回任务的异常。

如果包装的协程引发了异常,则返回该异常。如果包装好的协程正常返回,则此方法返回None

如果 Task 已被* cancelled *取消,则此方法引发CancelledError异常。

如果尚未完成“任务”,则此方法将引发InvalidStateError异常。

  • add_done_callback(* callback **,* context = None *)
    • 添加一个在 Task 为* done *时运行的回调。

此方法仅应在基于低级回调的代码中使用。

有关更多详细信息,请参见Future.add_done_callback()的文档。

  • remove_done_callback(回叫)
    • 从回调列表中删除* callback *。

此方法仅应在基于低级回调的代码中使用。

有关更多详细信息,请参见Future.remove_done_callback()的文档。

  • get_stack(** limit = None *)
    • 返回此任务的堆栈帧列表。

如果包装好的协程未完成,则将堆栈返回挂起的位置。如果协程已成功完成或被取消,则将返回一个空列表。如果协程被异常终止,则将返回回溯帧列表。

框架始终按从旧到新的 Sequences 排列。

对于悬挂的协程,仅返回一个堆栈框架。

可选的* limit *参数设置要返回的最大帧数;默认情况下,返回所有可用的帧。返回列表的 Sequences 取决于是否返回堆栈或回溯:返回了堆栈的最新帧,但返回了回溯的最早帧。 (这与回溯模块的行为匹配.)

  • print_stack(** limit = None file = None *)
    • 打印此任务的堆栈或回溯。

对于get_stack()检索的帧,这产生与回溯模块相似的输出。

  • limit *参数直接传递给get_stack()

  • file *参数是将输出写入其中的 I/O 流。默认情况下,输出将写入sys.stderr

  • get_coro ( )
    • 返回由Task包裹的协程对象。

3.8 版的新Function。

  • get_name ( )
    • 返回任务的名称。

如果未为任务明确分配名称,则默认 asyncio Task 实现在实例化期间会生成默认名称。

3.8 版的新Function。

  • set_name(* value *)
    • 设置任务名称。
  • value *参数可以是任何对象,然后将其转换为字符串。

在默认的 Task 实现中,该名称将在 Task 对象的repr()输出中可见。

3.8 版的新Function。

    • classmethod * all_tasks(* loop = None *)
      • 返回事件循环的所有任务集。

默认情况下,返回当前事件循环的所有任务。如果* loop *为None,则使用get_event_loop()函数获取当前循环。

从 3.7 版开始不推荐使用,将在 3.9 版中删除:不要将此作为任务方法调用。请使用asyncio.all_tasks()函数。

    • classmethod * current_task(* loop = None *)
      • 返回当前正在运行的任务或None

如果* loop *为None,则使用get_event_loop()函数获取当前循环。

从 3.7 版开始不推荐使用,将在 3.9 版中删除:不要将此作为任务方法调用。请使用asyncio.current_task()函数。

Generator-based Coroutines

Note

不支持基于生成器的协程,并且计划在 Python 3.10 中删除。

基于生成器的协程早于异步/await 语法。它们是使用yield from表达式 awaitFuture 和其他协程的 Python 生成器。

基于生成器的协程应使用@asyncio.coroutine修饰,尽管这不是强制性的。

  • @ asyncio. coroutine
    • 装饰器标记基于生成器的协程。

此装饰器使基于遗留生成器的协程与异步/await 代码兼容:

@asyncio.coroutine
def old_style_coroutine():
    yield from asyncio.sleep(1)

async def main():
    await old_style_coroutine()

此装饰器不得用于async def个协程。

从 3.8 版开始不推荐使用,将在 3.10 版中删除:改用async def

  • asyncio. iscoroutine(* obj *)

此方法不同于inspect.iscoroutine(),因为它针对基于生成器的协程返回True

  • asyncio. iscoroutinefunction(* func *)

此方法与inspect.iscoroutinefunction()有所不同,因为它针对以@coroutine装饰的基于生成器的协程函数返回True