On this page
协同程序和任务
本节概述了与协程和任务一起使用的高级异步 API。
Coroutines
使用 async/await 语法语句的Coroutines是编写 asyncio 应用程序的首选方法。例如,以下代码段(需要 Python 3.7)打印“ hello”,await1 秒钟,然后打印“ world”:
>>> import asyncio
>>> async def main():
... print('hello')
... await asyncio.sleep(1)
... print('world')
>>> asyncio.run(main())
hello
world
请注意,仅调用协程不会调度它的执行:
>>> main()
<coroutine object main at 0x1053bb7c8>
为了实际运行协程,asyncio 提供了三种主要机制:
asyncio.run()函数用于运行顶级入口点“ main()”函数(请参见上面的示例.)
await 协程。以下代码段将在 await1 秒钟后显示“ hello”,然后在 await另一 2 秒后显示“ world”:
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
Expected output:
started at 17:13:52
hello
world
finished at 17:13:55
- asyncio.create_task()函数可以作为异步Tasks同时运行协程。
让我们修改上面的示例,并同时运行两个say_after
协程:
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
请注意,现在的预期输出显示该代码段比以前快了 1 秒:
started at 17:14:32
hello
world
finished at 17:14:34
Awaitables
我们说如果可以在await表达式中使用对象,则该对象是“可 await”对象。许多异步 API 被设计为接受 await。
- awaitable 对象有三种主要类型:协程 ,任务 和期货*。
Coroutines
Python 协程是* waitables *,因此可以从其他协程中 await:
import asyncio
async def nested():
return 42
async def main():
# Nothing happens if we just call "nested()".
# A coroutine object is created but not awaited,
# so it *won't run at all*.
nested()
# Let's do it differently now and await it:
print(await nested()) # will print "42".
asyncio.run(main())
Important
在本文档中,术语“协程”可用于两个紧密相关的概念:
协程函数:async def函数;
协程对象:pass调用协程函数返回的对象。
asyncio 还支持旧版generator-based协程。
Tasks
任务用于同时安排协程。
当协程被包装成具有诸如_之类的Function的* Task *时,协程将自动计划很快运行:
import asyncio
async def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
asyncio.run(main())
Futures
Future是特殊的“低级别”await 对象,表示异步操作的“finally结果”。
当一个 Future 对象await*时,这意味着协程将 await,直到在其他地方解析该 Future。
需要在 asyncio 中使用将来的对象,以允许将基于回调的代码与 async/await 一起使用。
通常,“不需要”在应用程序级代码中创建 Future 对象。
将来的对象(有时由库和一些 asyncio API 公开)可以 await:
async def main():
await function_that_returns_a_future_object()
# this is also valid:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
一个返回 Future 对象的低级函数的一个好例子是loop.run_in_executor()。
运行异步程序
asyncio.
run
(* coro ,**,* debug = False *)- 执行coroutine * coro *并返回结果。
此函数运行传递的协程,从而 Managementasyncio 事件循环并“finally确定异步生成器”。
当另一个异步事件循环在同一线程中运行时,无法调用此函数。
如果* debug *为True
,则事件循环将在调试模式下运行。
此函数始终创建一个新的事件循环,并在最后将其关闭。它应该用作 asyncio 程序的主要入口,理想情况下应仅调用一次。
Example:
async def main():
await asyncio.sleep(1)
print('hello')
asyncio.run(main())
3.7 版中的新Function。
Note
asyncio.run()
的源代码可以在Lib/asyncio/runners.py中找到。
Creating Tasks
如果* name *不是None
,则使用Task.set_name()将其设置为任务的名称。
在get_running_loop()返回的循环中执行任务,如果当前线程中没有正在运行的循环,则引发RuntimeError。
Python 3.7 中已添加此Function 。在 Python 3.7 之前,可以使用低级asyncio.ensure_future()函数:
async def coro():
...
# In Python 3.7+
task = asyncio.create_task(coro())
...
# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())
...
3.7 版中的新Function。
在 3.8 版中进行了更改:添加了name
参数。
Sleeping
- 协程
asyncio.
sleep
(延迟,结果=无,***,循环=无)- 阻塞延迟秒。
如果提供了* result *,则协程完成后会将其返回给调用方。
sleep()
始终挂起当前任务,从而允许其他任务运行。
从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。
协程示例在 5 秒钟内每秒显示当前日期:
import asyncio
import datetime
async def display_date():
loop = asyncio.get_running_loop()
end_time = loop.time() + 5.0
while True:
print(datetime.datetime.now())
if (loop.time() + 1.0) >= end_time:
break
await asyncio.sleep(1)
asyncio.run(display_date())
同时运行任务
-
- awaitable *
asyncio.
gather
(** aws , loop = None , return_exceptions = False *)
- 同时以* aws *序列运行awaitable objects。
- awaitable *
如果在* aws *中 await 的是协程,它将自动安排为任务。
如果所有 await 项都成功完成,则结果将是返回值的汇总列表。结果值的 Sequences 对应于* aws *中的 awaitSequences。
如果* return_exceptions *为False
(默认值),则第一个引发的异常将立即传播到在gather()
上 await 的任务。 * aws *序列中的其他可 await 对象 不会被取消 ,并将 continue 运行。
如果* return_exceptions *为True
,则将异常与成功结果相同,并汇总到结果列表中。
如果gather()
被“取消”,则所有已提交的 await(尚未完成)也被“取消”。
如果* aws 序列中的任何 Task 或 Future 被 cancelled *取消,则将其视为引发CancelledError –在这种情况下**不会取消gather()
调用。这是为了防止取消一个已提交的任务/Future 导致其他任务/Future 被取消。
从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。
Example:
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
asyncio.run(main())
# Expected output:
#
# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task C: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task C: Compute factorial(3)...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4)...
# Task C: factorial(4) = 24
Note
如果* return_exceptions *为 False,则将其标记为已完成后取消 collect()不会取消任何已提交的 await。例如,可以在将异常传播给调用方之后将其标记为完成,因此,在从异常捕获到异常(由一个 await 对象引发)之后调用gather.cancel()
不会取消任何其他 await 对象。
在版本 3.7 中进行了更改:如果* gather 本身被取消,则取消传播将与 return_exceptions *无关。
抵制取消
-
- awaitable *
asyncio.
shield
(* aw ,**,* loop = None *)
- awaitable *
如果* aw *是协程,它将自动安排为任务。
The statement:
res = await shield(something())
等效于:
res = await something()
除非如果包含它的协程被取消,则在something()
中运行的任务不会被取消。从something()
的角度来看,没有发生取消。尽管其调用方仍被取消,所以“ await”表达式仍引发CancelledError。
如果something()
pass其他方式(即从自身内部)取消,那也将取消shield()
。
如果希望完全忽略取消操作(不建议这样做),则shield()
函数应与 try/except 子句结合使用,如下所示:
try:
res = await shield(something())
except CancelledError:
res = None
从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。
Timeouts
- 协程
asyncio.
wait_for
(* aw , timeout ,**,* loop = None *)- await* aw * awaitable完成超时。
如果* aw *是协程,它将自动安排为任务。
- timeout 可以是
None
,也可以是 float 或 intawait 的秒数。如果 timeout *是None
,则阻塞直到将来完成。
如果发生超时,它将取消任务并引发asyncio.TimeoutError。
为避免任务cancellation,请将其包装在shield()中。
该函数将 await 直到实际上取消了将来,因此总 await 时间可能超过* timeout *。
如果 await 被取消,则将来的* aw *也将被取消。
从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。
Example:
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')
async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
# Expected output:
#
# timeout!
在 3.7 版中进行了更改:当* aw 由于超时而被取消时,wait_for
await aw *被取消。以前,它立即引发asyncio.TimeoutError。
Waiting Primitives
- 协程
asyncio.
wait
(* aws ,**,* loop = None , timeout = None , return_when = ALL_COMPLETED *)- 同时在设置的* aws 中运行awaitable objects并阻塞直到 return_when *指定的条件。
返回两组任务/期货:(done, pending)
。
Usage:
done, pending = await asyncio.wait(aws)
如果指定,* timeout *(浮点数或整数)可用于控制返回之前 await 的最大秒数。
请注意,此函数不会引发asyncio.TimeoutError。发生超时时未完成的期货或任务仅在第二组中返回。
- return_when *指示该函数何时应返回。它必须是以下常量之一:
Constant | Description |
---|---|
FIRST_COMPLETED |
以后完成或取消操作时,该函数将返回。 |
FIRST_EXCEPTION |
当将来pass引发异常结束时,该函数将返回。如果没有将来引发异常,则它等效于ALL_COMPLETED 。 |
ALL_COMPLETED |
当所有期货结束或被取消时,该函数将返回。 |
与wait_for()不同,wait()
不会在发生超时时取消期货。
从 3.8 版开始不推荐使用:如果* aws *中的任何 await 程序都是协程,则它将自动安排为任务。不建议将协程对象直接传递到wait()
,因为它会导致confusing behavior。
从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。
Note
wait()
自动将协程安排为 Tasks,然后在(done, pending)
个集合中返回那些隐式创建的 Task 对象。因此,以下代码将无法正常工作:
async def foo():
return 42
coro = foo()
done, pending = await asyncio.wait({coro})
if coro in done:
# This branch will never be run!
修复以上代码段的方法如下:
async def foo():
return 42
task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})
if task in done:
# Everything will work as expected now.
从 3.8 版开始不推荐使用:不建议将协程对象直接传递给wait()
。
asyncio.
as_completed
(* aws ,**,* loop = None , timeout = None *)- 同时在“ aws”集中运行awaitable objects。返回协程的迭代器。可以 await 返回的每个协程,以从其余的 await 组中获得最早的下一个结果。
如果在所有期货完成之前发生超时,则引发asyncio.TimeoutError。
从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。
Example:
for coro in as_completed(aws):
earliest_result = await coro
# ...
从其他线程调度
asyncio.
run_coroutine_threadsafe
(* coro , loop *)- 将协程提交到给定的事件循环。线程安全的。
返回concurrent.futures.Future以 await 另一个 OS 线程的结果。
该函数应从不同于运行事件循环的 OS 线程中调用。例:
# Create a coroutine
coro = asyncio.sleep(1, result=3)
# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3
如果协程中出现异常,则将通知返回的 Future。它也可以用于取消事件循环中的任务:
try:
result = future.result(timeout)
except asyncio.TimeoutError:
print('The coroutine took too long, cancelling the task...')
future.cancel()
except Exception as exc:
print(f'The coroutine raised an exception: {exc!r}')
else:
print(f'The coroutine returned: {result!r}')
请参阅文档的并发和多线程部分。
与其他异步函数不同,此函数需要显式传递* loop *参数。
版本 3.5.1 中的新Function。
Introspection
asyncio.
current_task
(* loop = None *)- 返回当前正在运行的Task实例,如果没有任务正在运行,则返回
None
。
- 返回当前正在运行的Task实例,如果没有任务正在运行,则返回
如果* loop *为None
get_running_loop()用于获取当前循环。
3.7 版中的新Function。
asyncio.
all_tasks
(* loop = None *)- 返回由循环运行的一组尚未完成的Task对象。
如果* loop *为None
,则get_running_loop()用于获取当前循环。
3.7 版中的新Function。
Task Object
-
- class *
asyncio.
Task
(* coro ,**,* loop = None , name = None *)
- 运行 Python coroutine的Future-like对象。不是线程安全的。
- class *
任务用于在事件循环中运行协程。如果协程 awaitFuture,则 Task 会暂停协程的执行并 awaitFuture 的完成。当 Future 是* done *时,包装的协程将 continue 执行。
事件循环使用协作调度:事件循环一次运行一个 Task。当任务 awaitFuture 完成时,事件循环将运行其他任务,回调或执行 IO 操作。
使用高级asyncio.create_task()函数创建任务,或使用低级loop.create_task()或ensure_future()函数。不建议手动实例化 Task。
要取消正在运行的任务,请使用cancel()方法。调用它会导致 Task 向包装好的协程中抛出CancelledError异常。如果协程在取消过程中正在 awaitFuture 对象,则 Future 对象将被取消。
cancelled()可用于检查任务是否已取消。如果包装的协程没有抑制CancelledError异常并且实际上已取消,则该方法返回True
。
asyncio.Task从Future继承除Future.set_result()和Future.set_exception()之外的所有 API。
任务支持contextvars模块。创建 Task 时,它将复制当前上下文,然后在复制的上下文中运行其协程。
在 3.7 版中进行了更改:添加了对contextvars模块的支持。
在 3.8 版中进行了更改:添加了name
参数。
从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。
cancel
( )- 要求取消任务。
这样可以安排在事件循环的下一个周期将CancelledError异常抛出到包装的协程中。
然后,协程有机会pass使用try…_5…finally块抑制异常来清理甚至拒绝该请求。因此,与Future.cancel()不同,Task.cancel()不能保证任务将被取消,尽管完全抑制取消并不常见,并且积极劝阻。
以下示例说明了协程如何拦截取消请求:
async def cancel_me():
print('cancel_me(): before sleep')
try:
# Wait for 1 hour
await asyncio.sleep(3600)
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
# Create a "cancel_me" Task
task = asyncio.create_task(cancel_me())
# Wait for 1 second
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("main(): cancel_me is cancelled now")
asyncio.run(main())
# Expected output:
#
# cancel_me(): before sleep
# cancel_me(): cancel sleep
# cancel_me(): after sleep
# main(): cancel_me is cancelled now
cancelled
( )- 如果任务被“取消”,则返回
True
。
- 如果任务被“取消”,则返回
当使用cancel()请求取消并且封装的协程将CancelledError异常抛出时,“任务”被“取消”。
done
( )- 如果 Task 是* done *,则返回
True
。
- 如果 Task 是* done *,则返回
当包装好的协程返回值,引发异常或取消了任务时,“完成”任务。
result
( )- 返回任务的结果。
如果 Task 为* done *,则返回包装的协程的结果(或者如果协程引发异常,则重新引发该异常.)
如果 Task 已被* cancelled *取消,则此方法引发CancelledError异常。
如果“任务”的结果尚不可用,则此方法将引发InvalidStateError异常。
exception
( )- 返回任务的异常。
如果包装的协程引发了异常,则返回该异常。如果包装好的协程正常返回,则此方法返回None
。
如果 Task 已被* cancelled *取消,则此方法引发CancelledError异常。
如果尚未完成“任务”,则此方法将引发InvalidStateError异常。
add_done_callback
(* callback ,**,* context = None *)- 添加一个在 Task 为* done *时运行的回调。
此方法仅应在基于低级回调的代码中使用。
有关更多详细信息,请参见Future.add_done_callback()的文档。
remove_done_callback
(回叫)- 从回调列表中删除* callback *。
此方法仅应在基于低级回调的代码中使用。
有关更多详细信息,请参见Future.remove_done_callback()的文档。
get_stack
(**, limit = None *)- 返回此任务的堆栈帧列表。
如果包装好的协程未完成,则将堆栈返回挂起的位置。如果协程已成功完成或被取消,则将返回一个空列表。如果协程被异常终止,则将返回回溯帧列表。
框架始终按从旧到新的 Sequences 排列。
对于悬挂的协程,仅返回一个堆栈框架。
可选的* limit *参数设置要返回的最大帧数;默认情况下,返回所有可用的帧。返回列表的 Sequences 取决于是否返回堆栈或回溯:返回了堆栈的最新帧,但返回了回溯的最早帧。 (这与回溯模块的行为匹配.)
print_stack
(**, limit = None , file = None *)- 打印此任务的堆栈或回溯。
对于get_stack()检索的帧,这产生与回溯模块相似的输出。
limit *参数直接传递给get_stack()。
file *参数是将输出写入其中的 I/O 流。默认情况下,输出将写入sys.stderr。
get_coro
( )- 返回由Task包裹的协程对象。
3.8 版的新Function。
get_name
( )- 返回任务的名称。
如果未为任务明确分配名称,则默认 asyncio Task 实现在实例化期间会生成默认名称。
3.8 版的新Function。
set_name
(* value *)- 设置任务名称。
- value *参数可以是任何对象,然后将其转换为字符串。
在默认的 Task 实现中,该名称将在 Task 对象的repr()输出中可见。
3.8 版的新Function。
-
- classmethod *
all_tasks
(* loop = None *)- 返回事件循环的所有任务集。
- classmethod *
默认情况下,返回当前事件循环的所有任务。如果* loop *为None
,则使用get_event_loop()函数获取当前循环。
从 3.7 版开始不推荐使用,将在 3.9 版中删除:不要将此作为任务方法调用。请使用asyncio.all_tasks()函数。
-
- classmethod *
current_task
(* loop = None *)- 返回当前正在运行的任务或
None
。
- 返回当前正在运行的任务或
- classmethod *
如果* loop *为None
,则使用get_event_loop()函数获取当前循环。
从 3.7 版开始不推荐使用,将在 3.9 版中删除:不要将此作为任务方法调用。请使用asyncio.current_task()函数。
Generator-based Coroutines
Note
不支持基于生成器的协程,并且计划在 Python 3.10 中删除。
基于生成器的协程早于异步/await 语法。它们是使用yield from
表达式 awaitFuture 和其他协程的 Python 生成器。
基于生成器的协程应使用@asyncio.coroutine修饰,尽管这不是强制性的。
@
asyncio.
coroutine
- 装饰器标记基于生成器的协程。
此装饰器使基于遗留生成器的协程与异步/await 代码兼容:
@asyncio.coroutine
def old_style_coroutine():
yield from asyncio.sleep(1)
async def main():
await old_style_coroutine()
此装饰器不得用于async def个协程。
从 3.8 版开始不推荐使用,将在 3.10 版中删除:改用async def。
asyncio.
iscoroutine
(* obj *)- 如果* obj *是coroutine object,则返回
True
。
- 如果* obj *是coroutine object,则返回
此方法不同于inspect.iscoroutine(),因为它针对基于生成器的协程返回True
。
asyncio.
iscoroutinefunction
(* func *)- 如果* func *是coroutine function,则返回
True
。
- 如果* func *是coroutine function,则返回
此方法与inspect.iscoroutinefunction()有所不同,因为它针对以@coroutine装饰的基于生成器的协程函数返回True
。