用异步开发

异步编程不同于经典的“Sequences”编程。

该页面列出了常见的错误和陷阱,并说明了如何避免它们。

Debug Mode

默认情况下,asyncio 在生产模式下运行。为了简化开发,asyncio 具有* debug 模式*。

有几种方法可以启用异步调试模式:

除了启用调试模式外,还请考虑:

  • asyncio logger的日志级别设置为logging.DEBUG,例如,可以在应用程序启动时运行以下代码段:
logging.basicConfig(level=logging.DEBUG)

启用调试模式后:

  • asyncio 检查尚未 await 的协程并记录它们;这减轻了“被遗忘的 await”陷阱。

  • 如果从错误的线程调用许多非线程安全的异步 API(例如loop.call_soon()loop.call_at()方法),则会引发异常。

  • 如果执行 I/O 操作花费的时间太长,则会记录 I/Oselectors 的执行时间。

  • 记录时间超过 100 毫秒的回调。 loop.slow_callback_duration属性可用于设置被视为“慢速”的最小执行持续时间(以秒为单位)。

并发和多线程

事件循环在线程(通常是主线程)中运行,并在其线程中执行所有回调和任务。当任务在事件循环中运行时,没有其他任务可以在同一线程中运行。当 Task 执行await表达式时,正在运行的 Task 将被挂起,事件循环将执行下一个 Task。

要调度另一个 os 线程中的callback,应使用loop.call_soon_threadsafe()方法。例:

loop.call_soon_threadsafe(callback, *args)

几乎所有异步对象都不是线程安全的,通常这不是问题,除非存在从任务或回调外部使用它们的代码。如果需要此类代码来调用低级异步 API,则应使用loop.call_soon_threadsafe()方法,例如:

loop.call_soon_threadsafe(fut.cancel)

要从其他 OS 线程安排协程对象,应使用run_coroutine_threadsafe()函数。它返回concurrent.futures.Future以访问结果:

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

为了处理 signal 并执行子流程,事件循环必须在主线程中运行。

loop.run_in_executor()方法可与concurrent.futures.ThreadPoolExecutor一起使用,以在不同的 OS 线程中执行阻止代码,而不会阻塞事件循环运行所在的 OS 线程。

运行阻止代码

阻塞(CPU 绑定)代码不应直接调用。例如,如果一个函数执行 CPU 密集型计算 1 秒钟,则所有并发的异步任务和 IO 操作将延迟 1 秒钟。

执行程序可用于在不同线程甚至不同进程中运行任务,以避免事件循环阻塞 OS 线程。有关更多详细信息,请参见loop.run_in_executor()方法。

Logging

asyncio 使用logging模块,所有记录都pass"asyncio"Logger 执行。

默认日志级别为logging.INFO,可以轻松调整:

logging.getLogger("asyncio").setLevel(logging.WARNING)

检测未曾 await 的协程

当协程函数被调用但未 await 时(例如coro()而不是await coro())或协程未使用asyncio.create_task()进行调度时,asyncio 将发出RuntimeWarning

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

Output:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

在调试模式下输出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常的解决方法是 await 协程或调用asyncio.create_task()函数:

async def main():
    await test()

检测从未检索到的异常

如果调用了Future.set_exception(),但从未 await 过 Future 对象,则永远不会将异常传播到用户代码。在这种情况下,当 Future 对象被垃圾回收时,asyncio 将发出一条日志消息。

未处理异常的示例:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

Output:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

启用调试模式获取创建任务的回溯:

asyncio.run(main(), debug=True)

在调试模式下输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed