python / 3.7.2rc1 / all / library-asyncio-eventloop.html

Event Loop

源代码: Lib/asyncio/events.pyLib/asyncio/base_events.py


Preface

事件循环是每个异步应用程序的核心。事件循环运行异步任务和回调,执行网络 IO 操作并运行子流程。

应用程序开发人员通常应该使用高级异步函数,例如asyncio.run(),并且几乎不需要引用循环对象或调用其方法。本部分主要面向需要更好地控制事件循环行为的较低级代码,库和框架的作者。

获取事件循环

以下低级函数可用于获取,设置或创建事件循环:

  • asyncio. get_running_loop ( )
    • 返回当前 OS 线程中的运行事件循环。

如果没有运行事件循环,则引发RuntimeError。只能从协程或回调中调用此函数。

3.7 版中的新Function。

  • asyncio. get_event_loop ( )
    • 获取当前事件循环。

如果在当前 OS 线程中没有设置当前事件循环,则 OS 线程为 main,并且尚未调用set_event_loop(),asyncio 将创建一个新的事件循环并将其设置为当前事件循环。

因为此函数具有相当复杂的行为(尤其是在使用自定义事件循环策略时),所以在协程和回调中,使用get_running_loop()函数优于get_event_loop()

还可以考虑使用asyncio.run()函数而不是使用较低级别的函数来手动创建和关闭事件循环。

  • asyncio. set_event_loop(* loop *)

    • 将* loop *设置为当前 OS 线程的当前事件循环。
  • asyncio. new_event_loop ( )

    • 创建一个新的事件循环对象。

请注意,设置自定义事件循环策略可以更改get_event_loop()set_event_loop()new_event_loop()函数的行为。

Contents

本文档页面包含以下部分:

事件循环方法

事件循环具有适用于以下情况的“低级” API:

运行和停止循环

  • loop. run_until_complete(* future *)
    • 运行直到* future *(Future的实例)完成。

如果参数是coroutine object,则将其隐式调度为asyncio.Task

返回 Future 的结果或引发异常。

  • loop. run_forever ( )
    • 运行事件循环,直到调用stop()

如果在调用stop()之前调用了stop(),则循环将以 0 的超时对 I/Oselectors 轮询一次,运行所有响应 I/O 事件(以及已调度的事件)而调度的回调,然后退出。

如果在run_forever()运行时调用stop(),则该循环将运行当前的一系列回调,然后退出。请注意,在这种情况下,由回调安排的新回调将不会运行。相反,它们将在下次调用run_forever()run_until_complete()时运行。

  • loop. stop ( )

    • 停止事件循环。
  • loop. is_running ( )

    • 如果事件循环当前正在运行,则返回True
  • loop. is_closed ( )

    • 如果事件循环已关闭,则返回True
  • loop. close ( )

    • 关闭事件循环。

调用此函数时,循环不得运行。任何未决的回调将被丢弃。

此方法清除所有队列并关闭执行程序,但不 await 执行程序完成。

该方法是幂等且不可逆的。关闭事件循环后,不应再调用其他方法。

  • 协程 loop. shutdown_asyncgens()
    • 安排所有当前打开的asynchronous generator对象passaclose()调用关闭。调用此方法后,如果迭代了新的异步生成器,则事件循环将发出警告。应该使用它来可靠地完成所有调度的异步生成器。

请注意,使用asyncio.run()时无需调用此函数。

Example:

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

3.6 版的新Function。

Scheduling callbacks

  • loop. call_soon(* callback * args context = None *)
    • 安排在事件循环的下一次迭代时使用* args 参数调用 callback * callback

回调按注册 Sequences 调用。每个回调将仅被调用一次。

可选的仅关键字* context 参数允许为运行的 callback 指定自定义contextvars.Context。当未提供 context *时,将使用当前上下文。

返回asyncio.Handle的实例,以后可用于取消回调。

此方法不是线程安全的。

  • loop. call_soon_threadsafe(* callback * args context = None *)
    • call_soon()的线程安全变体。必须用于调度来自另一个线程的回调。

请参阅文档的并发和多线程部分。

在 3.7 版中进行了更改:添加了* context *仅关键字参数。有关更多详细信息,请参见 PEP 567

Note

大多数asyncio调度Function都不允许传递关键字参数。为此,请使用functools.partial()

# will schedule "print("Hello", flush=True)"
loop.call_soon(
functools.partial(print, "Hello", flush=True))

通常,使用部分对象比使用 lambda 更方便,因为 asyncio 可以在调试和错误消息中更好地呈现部分对象。

安排延迟的回调

事件循环提供了安排计划在将来某个 Moment 调用的回调函数的机制。事件循环使用单调时钟来跟踪时间。

  • loop. call_later(* delay callback * args context = None *)
    • 将* callback 安排在给定的 delay *秒数之后调用(可以是 int 或 float)。

返回asyncio.TimerHandle的实例,该实例可用于取消回调。

  • callback *将只被调用一次。如果两个回调完全安排在同一时间,则调用它们的 Sequences 是不确定的。

可选的位置* args *在调用时将传递给回调。如果要使用关键字参数调用回调,请使用functools.partial()

可选的仅关键字* context 参数允许为运行的 callback 指定自定义contextvars.Context。当未提供 context *时,将使用当前上下文。

在 3.7 版中进行了更改:添加了* context *仅关键字参数。有关更多详细信息,请参见 PEP 567

在 3.8 版中进行了更改:在具有默认事件循环实现的 Python 3.7 和更早版本中,* delay *不能超过一天。此问题已在 Python 3.8 中修复。

  • loop. call_at(* when callback * args context = None *)
    • 安排* callback 在给定的绝对时间戳 when *(一个 int 或 float)时使用,该时间与loop.time()相同。

此方法的行为与call_later()相同。

返回asyncio.TimerHandle的实例,该实例可用于取消回调。

在 3.7 版中进行了更改:添加了* context *仅关键字参数。有关更多详细信息,请参见 PEP 567

在 3.8 版中进行了更改:在具有默认事件循环实现的 Python 3.7 和更早版本中,* when *和当前时间之间的差不能超过一天。此问题已在 Python 3.8 中修复。

  • loop. time ( )
    • 根据事件循环的内部单调时钟返回当前时间,作为float值。

Note

在 3.8 版中进行了更改:在 Python 3.7 和更早版本中,超时(相对* delay 或绝对 when *)不应超过一天。此问题已在 Python 3.8 中修复。

See also

asyncio.sleep()Function。

创建期货和任务

  • loop. create_future ( )

这是在 asyncio 中创建 Future 的首选方式。这样,第三方事件循环就可以提供 Future 对象的替代实现(具有更好的性能或手段)。

版本 3.5.2 中的新Function。

  • loop. create_task(* coro **,* name = None *)

第三方事件循环可以使用自己的子类Task来实现互操作性。在这种情况下,结果类型是Task的子类。

如果提供了* name *参数而不是None,则使用Task.set_name()将其设置为任务的名称。

在 3.8 版中进行了更改:添加了name参数。

如果* factory None,则将设置默认任务工厂。否则, factory 必须是 callable 且签名匹配(loop, coro),其中 loop 是对活动事件循环的引用,而 coro *是协程对象。可调用对象必须返回asyncio.Future兼容的对象。

  • loop. get_task_factory ( )
    • 返回任务工厂;如果使用默认任务,则返回None

打开网络连接

  • 协程 loop. create_connection(* protocol_factory host = None port = None **,* ssl = None family = 0 proto = 0 flags = 0 *, * sock = None local_addr = None server_hostname = None ssl_handshake_timeout = None happy_eyeballs_delay = None interleave = None *)
    • 打开到* host port *指定的给定地址的流传输连接。

套接字系列可以是AF_INETAF_INET6,具体取决于* host (或 family *参数,如果提供)。

套接字类型将为SOCK_STREAM

此方法将try在后台构建连接。成功时,它将返回(transport, protocol)对。

基本操作的时间 Sequences 摘要如下:

  • 构建连接,并为此创建一个transport

    • protocol_factory *不带参数地被调用,并且期望返回一个protocol实例。
  • 协议实例pass调用其connection_made()方法与传输耦合。

  • 成功返回(transport, protocol)Tuples。

创建的传输是与实现有关的双向流。

Other arguments:

    • ssl :如果给定且不为 false,则将创建 SSL/TLS 传输(默认情况下将创建纯 TCP 传输)。如果 ssl ssl.SSLContext对象,则此上下文用于创建传输;如果 ssl *为True,则使用ssl.create_default_context()返回的默认上下文。
    • server_hostname 设置或覆盖将与目标服务器的证书匹配的主机名。仅当 ssl 不是None时才应pass。默认情况下,使用 host 参数的值。如果 host 为空,则没有默认值,您必须传递 server_hostname 的值。如果 server_hostname *为空字符串,则禁用主机名匹配(这是严重的安全风险,可能会导致中间人攻击)。
    • family proto flags 是可选的地址族,协议和标志,将传递给 getaddrinfo()以进行 host *解析。如果给定的话,这些都应该是来自相应的socket模块常量的整数。
    • happy_eyeballs_delay *(如果指定),将为此连接启用“欢乐眼球”。它应该是一个浮点数,表示在并行开始下一次try之前 await 连接try完成的时间(以秒为单位)。这是 RFC 8305中定义的“连接try延迟”。 RFC 建议的明智的默认值为0.25(250 毫秒)。
  • 当主机名解析为多个 IP 地址时,* interleave 控制地址的重新排序。如果0或未指定,则不进行任何重新排序,并且按照getaddrinfo()返回的 Sequences try地址。如果指定了正整数,则地址按地址族交织,并且给定的整数将解释为 RFC 8305中定义的“第一地址族计数”。如果未指定 happy_eyeballs_delay *,则默认值为0;如果未指定,则默认值为1

    • sock (如果提供)应该是传输要使用的现有的,已经连接的socket.socket对象。如果给定了 sock ,则不应指定 host port family proto flags happy_eyeballs_delay interleave local_addr *。
    • local_addr *(如果有的话)是一个(local_host, local_port)Tuples,用于将套接字绑定到本地。 * local_host local_port 使用getaddrinfo()查找,类似于 host port *。
    • ssl_handshake_timeout *(对于 TLS 连接)是在终止连接之前 awaitTLS 握手完成的时间(以秒为单位)。 60.0秒(如果是None)(默认)。

3.8 版的新Function:添加了* happy_eyeballs_delay interleave *参数。

快乐的眼球算法:双栈主机的成功。当服务器的 IPv4 路径和协议正常工作,但服务器的 IPv6 路径和协议无效时,与仅使用 IPv4 的 Client 端相比,双栈 Client 端应用程序会遇到明显的连接延迟。这是不希望的,因为它会使双堆栈 Client 端的用户体验变差。本文档规定了减少该用户可见延迟的算法要求,并提供了一种算法。

有关更多信息:https://tools.ietf.org/html/rfc6555

3.7 版中的新Function:* ssl_handshake_timeout *参数。

在版本 3.6 中更改:默认情况下,所有 TCP 连接都设置了套接字选项TCP_NODELAY

在版本 3.5 中进行了更改:在ProactorEventLoop中添加了对 SSL/TLS 的支持。

See also

open_connection()函数是高级替代 API。它返回Pair(StreamReaderStreamWriter),可以直接在异步/await 代码中使用。

协程 loop. create_datagram_endpoint((protocol_factory local_addr = None remote_addr = None **,* family = 0 proto = 0 flags = 0 reuse_address = None *, * reuse_port = None allow_broadcast = None sock = None *)

Note

不再支持参数* reuse_address *,因为使用SO_REUSEADDR会对 UDP 造成严重的安全问题。显式传递reuse_address=True将引发异常。

当具有不同 UID 的多个进程将套接字分配给具有SO_REUSEADDR的相同 UDP 套接字地址时,传入的数据包可能会随机分布在套接字之间。

对于受支持的平台,* reuse_port 可以替代类似Function。与 reuse_port *一起使用SO_REUSEPORT,它特别防止具有不同 UID 的进程将套接字分配给相同的套接字地址。

创建一个数据报连接。

套接字家族可以是AF_INETAF_INET6AF_UNIX,具体取决于* host (或 family *参数,如果提供)。

套接字类型将为SOCK_DGRAM

  • protocol_factory *必须是可调用的,返回protocol实现。

成功返回(transport, protocol)Tuples。

Other arguments:

    • local_addr (如果有的话)是一个(local_host, local_port)Tuples,用于将套接字绑定到本地。使用getaddrinfo()查找 local_host local_port *。
    • remote_addr (如果提供)是一个(remote_host, remote_port)Tuples,用于将套接字连接到远程地址。使用getaddrinfo()查找 remote_host remote_port *。
    • family proto flags 是可选的地址族,协议和标志,将传递给getaddrinfo()以实现 host *解析。如果给出的话,这些都应该是来自相应的socket模块常数的整数。
    • reuse_port *告诉内核允许将此端点绑定到与其他现有端点绑定到的端口相同的端口,只要它们在创建时都设置了此标志即可。 Windows 和某些 Unix 不支持此选项。如果未定义SO_REUSEPORT常量,则不支持此Function。
    • allow_broadcast *告诉内核允许该端点将消息发送到 Broadcast 地址。
  • 可以选择指定* sock ,以使用传输使用的预先存在的,已连接的socket.socket对象。如果指定,则应Ellipsis local_addr remote_addr *(必须为None)。

请参阅UDP 回显 Client 端协议UDP 回显服务器协议示例。

在版本 3.4.4 中更改:添加了* family proto flags reuse_address reuse_port,* allow_broadcast sock *参数。

在版本 3.8.1 中更改:由于安全方面的考虑,不再支持* reuse_address *参数。

在 3.8 版中进行了更改:添加了对 Windows 的支持。

  • 协程 loop. create_unix_connection(* protocol_factory path = None **,* ssl = None sock = None server_hostname = None ssl_handshake_timeout = None *)
    • 创建一个 Unix 连接。

套接字系列将为AF_UNIX;套接字类型将为SOCK_STREAM

成功返回(transport, protocol)Tuples。

  • path 是 Unix 域套接字的名称,并且是必需的,除非指定了 sock *参数。支持抽象 Unix 套接字strbytesPath路径。

有关此方法的参数的信息,请参见loop.create_connection()方法的文档。

Availability: Unix.

3.7 版中的新Function:* ssl_handshake_timeout *参数。

在版本 3.7 中更改:* path *参数现在可以是path-like object

创建 Web Service 器

  • 协程 loop. create_server(* protocol_factory host = None port = None **,* family = socket.AF_UNSPEC flags = socket.AI_PASSIVE sock = None backlog = 100 ssl = None reuse_address = None reuse_port = None ssl_handshake_timeout = None start_serving = True *)
    • 创建一个侦听* host 地址的 port *的 TCP 服务器(套接字类型SOCK_STREAM)。

返回一个Server对象。

Arguments:

    • protocol_factory *必须是可调用的,返回protocol实现。
  • 可以将* host *参数设置为几种类型,这些类型确定服务器将在哪里侦听:

  • 如果* host 是字符串,则将 TCP 服务器绑定到 host *指定的单个网络接口。

    • 如果* host *是字符串序列,则 TCP 服务器绑定到该序列指定的所有网络接口。

    • 如果* host *为空字符串或None,则假定所有接口都将返回多个套接字的列表(最有可能的一个用于 IPv4,另一个用于 IPv6)。

  • 可以将* family 设置为socket.AF_INETAF_INET6,以强制套接字使用 IPv4 或 IPv6.如果未设置,则 family *将根据主机名确定(默认为AF_UNSPEC)。

  • 可以选择指定* sock 以便使用预先存在的套接字对象。如果指定,则不得指定 host port *。

    • backlog *是传递到listen()的最大排队连接数(默认为 100)。
  • 可以将* ssl *设置为SSLContext实例,以在接受的连接上启用 TLS。

    • reuse_address *告诉内核重用处于TIME_WAIT状态的本地套接字,而不必 await 其自然超时到期。如果未指定,将在 Unix 上自动设置为True
    • reuse_port *告诉内核允许将此端点绑定到与其他现有端点绑定到的端口相同的端口,只要它们在创建时都设置了此标志即可。 Windows 不支持此选项。
    • ssl_handshake_timeout *是(对于 TLS 服务器)以秒为单位的时间(以秒为单位),以 awaitTLS 握手完成,然后中止连接。 60.0秒(如果是None)(默认)。

3.7 版中的新Function:添加了* ssl_handshake_timeout start_serving *参数。

在版本 3.6 中更改:默认情况下,所有 TCP 连接都设置了套接字选项TCP_NODELAY

在版本 3.5 中进行了更改:在ProactorEventLoop中添加了对 SSL/TLS 的支持。

在版本 3.5.1 中更改:* host *参数可以是字符串序列。

See also

start_server()函数是一个较高级别的替代 API,它返回PairStreamReaderStreamWriter,可以在异步/await 代码中使用它们。

  • 协程 loop. create_unix_server(* protocol_factory path = None **,* sock = None backlog = 100 ssl = None ssl_handshake_timeout = None start_serving = True *)
  • path 是 Unix 域套接字的名称,并且是必需的,除非提供了 sock *参数。支持抽象 Unix 套接字strbytesPath路径。

有关此方法的参数的信息,请参见loop.create_server()方法的文档。

Availability: Unix.

3.7 版中的新Function:* ssl_handshake_timeout start_serving *参数。

在版本 3.7 中更改:* path *参数现在可以是Path对象。

  • 协程 loop. connect_accepted_socket(协议协议袜子,** ssl =无*,* ssl_handshake_timeout =无*)
    • 将已经接受的连接包装为传输/协议对。

接受 asyncio 外部的连接但使用 asyncio 进行处理的服务器可以使用此方法。

Parameters:

    • protocol_factory *必须是可调用的,返回protocol实现。
    • sock *是从socket.accept返回的预先存在的套接字对象。
  • 可以将* ssl *设置为SSLContext,以在接受的连接上启用 SSL。

    • ssl_handshake_timeout *是(对于 SSL 连接)以秒为单位的时间(以秒为单位),以 awaitSSL 握手完成,然后中止连接。 60.0秒(如果是None)(默认)。

返回Pair(transport, protocol)

3.7 版中的新Function:* ssl_handshake_timeout *参数。

版本 3.5.3 中的新Function。

Transferring files

  • 协程 loop. sendfile(传输文件,*偏移量= 0 计数=无**,后备=真)
    • pass* transport 发送 file *。返回发送的字节总数。

该方法使用高性能os.sendfile()(如果有)。

  • file *必须是以二进制模式打开的常规文件对象。

  • offset 指示从哪里开始读取文件。如果指定, count *是要发送的字节总数,而不是在达到 EOF 之前发送文件。即使此方法出现错误,文件位置也会始终更新,并且file.tell()可用于获取发送的实际字节数。

  • fallback *设置为True可使 asyncio 在平台不支持 sendfile 系统调用(例如 Windows 或 Unix 上的 SSL 套接字)时手动读取和发送文件。

如果系统不支持* sendfile * syscall 和* fallback *为False,则提高SendfileNotAvailableError

3.7 版中的新Function。

TLS Upgrade

  • 协程 loop. start_tls(传输协议,* sslcontext **,* server_side = False server_hostname = None ssl_handshake_timeout = None *)
    • 将现有的基于传输的连接升级到 TLS。

返回一个新的传输实例,该协议必须在 await 之后立即开始使用。传递给* start_tls 方法的 transport *实例永远不要再使用。

Parameters:

  • 诸如create_server()create_connection()之类的方法返回的* transport protocol *实例。

  • 升级服务器端连接(例如create_server()创建的连接)时,* server_side *passTrue

    • server_hostname *:设置或覆盖将与目标服务器的证书匹配的主机名。
    • ssl_handshake_timeout *(对于 TLS 连接)是在终止连接之前 awaitTLS 握手完成的时间(以秒为单位)。 60.0秒(如果是None)(默认)。

3.7 版中的新Function。

观看文件 Descriptors

  • loop. add_reader(* fd callback * args *)

    • 开始监视* fd 文件 Descriptors 的读取可用性,并在 fd 可读取后使用指定的参数调用 callback *。
  • loop. remove_reader(* fd *)

    • 停止监视* fd *文件 Descriptors 的读取可用性。
  • loop. add_writer(* fd callback * args *)

    • 开始监视* fd 文件 Descriptors 的写可用性,并在 fd 可用于写操作时使用指定的参数调用 callback *。

使用functools.partial() 传递关键字参数进行回调

  • loop. remove_writer(* fd *)
    • 停止监视* fd *文件 Descriptors 的写可用性。

有关这些方法的一些限制,另请参见Platform Support部分。

直接使用套接字对象

通常,使用基于传输的 API(例如loop.create_connection()loop.create_server())的协议实现比直接使用套接字的实现更快。但是,在某些用例中,性能并不是很关键,直接使用socket对象更加方便。

  • 协程 loop. sock_recv(袜子,* nbytes *)

将接收到的数据作为字节对象返回。

  • sock *必须是非阻塞套接字。

在 3.7 版中进行了更改:尽管此方法始终被记录为协程方法,但在 Python 3.7 返回Future之前已发布。从 Python 3.7 开始,这是async def方法。

  • 协程 loop. sock_recv_into(袜子,* buf *)
    • 从* sock 接收数据到 buf *缓冲区。在阻塞socket.recv_into()方法之后建模。

返回写入缓冲区的字节数。

  • sock *必须是非阻塞套接字。

3.7 版中的新Function。

  • 协程 loop. sock_sendall(袜子数据)

此方法 continue 发送到套接字,直到* data *中的所有数据都已发送或发生错误为止。成功返回None。如果出错,则会引发异常。此外,无法确定连接的接收端已成功处理了多少数据(如果有)。

  • sock *必须是非阻塞套接字。

在 3.7 版中进行了更改:即使该方法始终被记录为协程方法,但在 Python 3.7 之前,它仍返回Future。从 Python 3.7 开始,这是async def方法。

  • 协程 loop. sock_connect(袜子地址)
    • 将* sock 连接到位于 address *的远程套接字。

socket.connect()的异步版本。

  • sock *必须是非阻塞套接字。

在版本 3.5.2 中更改:address不再需要解决。 sock_connect将trypass调用socket.inet_pton()来检查* address 是否已经解析。否则,loop.getaddrinfo()将用于解析 address *。

  • 协程 loop. sock_accept(袜子)

套接字必须绑定到一个地址并监听连接。返回值是Pair(conn, address),其中* conn 是可用于在连接上发送和接收数据的 new 套接字对象, address *是在连接另一端绑定到套接字的地址。

  • sock *必须是非阻塞套接字。

在 3.7 版中进行了更改:即使该方法始终被记录为协程方法,但在 Python 3.7 之前,它仍返回Future。从 Python 3.7 开始,这是async def方法。

  • 协程 loop. sock_sendfile(袜子文件,*偏移量= 0 计数=无**,后备=真)
    • 如果可能,使用高性能os.sendfile发送文件。返回发送的字节总数。

socket.sendfile()的异步版本。

  • sock *必须是非阻塞的socket.SOCK_STREAM socket

  • file *必须是以二进制模式打开的常规文件对象。

  • offset 指示从哪里开始读取文件。如果指定, count *是要发送的字节总数,而不是在达到 EOF 之前发送文件。即使此方法出现错误,文件位置也会始终更新,并且file.tell()可用于获取发送的实际字节数。

当设置为True时,* fallback *可以使 asyncio 在平台不支持 sendfile syscall(例如 Unix 上的 Windows 或 SSL 套接字)时手动读取并发送文件。

如果系统不支持* sendfile * syscall 和* fallback *为False,则提高SendfileNotAvailableError

  • sock *必须是非阻塞套接字。

3.7 版中的新Function。

DNS

  • 协程 loop. getaddrinfo(主机端口,***,*家族= 0 *,*类型= 0 *,*原型= 0 *,*标志= 0 *)

  • 协程 loop. getnameinfo(* sockaddr flags = 0 *)

在 3.7 版中进行了更改:始终记录* getaddrinfo getnameinfo *方法都返回协程,但是在 Python 3.7 之前,它们实际上都返回asyncio.Future对象。从 Python 3.7 开始,这两种方法都是协程。

使用管道

  • 协程 loop. connect_read_pipe(* protocol_factory pipe *)
    • 在事件循环中注册* pipe *的读取端。

返回对(transport, protocol),其中* transport 支持ReadTransport接口,而 protocol 是由 protocol_factory *实例化的对象。

passSelectorEventLoop事件循环,* pipe *设置为非阻塞模式。

  • 协程 loop. connect_write_pipe(* protocol_factory pipe *)
    • 在事件循环中注册* pipe *的写端。

管道file-like object

返回对(transport, protocol),其中* transport 支持WriteTransport接口,而 protocol 是由 protocol_factory *实例化的对象。

passSelectorEventLoop事件循环,* pipe *设置为非阻塞模式。

Note

SelectorEventLoop在 Windows 上不支持上述方法。对于 Windows,请改用ProactorEventLoop

Unix signals

  • loop. add_signal_handler(* signum callback * args *)
    • 将* callback 设置为 signum *signal 的处理程序。

该回调将由* loop *以及该事件循环的其他排队回调和可运行协程调用。与使用signal.signal()注册的 signal 处理程序不同,允许使用此函数注册的回调与事件循环进行交互。

如果 signal 编号无效或无法捕获,请提高ValueError。如果设置处理程序时遇到问题,请提出RuntimeError

使用functools.partial() 传递关键字参数进行回调

signal.signal()一样,必须在主线程中调用此函数。

  • loop. remove_signal_handler(* sig *)
    • 删除* sig *signal 的处理程序。

如果删除了 signal 处理程序,则返回True;如果没有为给定 signal 设置处理程序,则返回False

Availability: Unix.

See also

signal模块。

在线程或进程池中执行代码

    • awaitable * loop. run_in_executor(* executor func * args *)
    • 安排* func *在指定的执行程序中被调用。

Example:

import asyncio
import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open('/dev/urandom', 'rb') as f:
        return f.read(100)

def cpu_bound():
    # CPU-bound operations will block the event loop:
    # in general it is preferable to run them in a
    # process pool.
    return sum(i * i for i in range(10 ** 7))

async def main():
    loop = asyncio.get_running_loop()

    ## Options:

    # 1. Run in the default loop's executor:
    result = await loop.run_in_executor(
        None, blocking_io)
    print('default thread pool', result)

    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, blocking_io)
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            pool, cpu_bound)
        print('custom process pool', result)

asyncio.run(main())

此方法返回一个asyncio.Future对象。

使用functools.partial() 传递关键字参数来* func *。

在版本 3.5.3 中进行了更改:loop.run_in_executor()不再配置其创建的线程池执行程序的max_workers,而是将其留给线程池执行程序(ThreadPoolExecutor)来设置默认值。

自 3.8 版起不推荐使用:不建议使用不是ThreadPoolExecutor实例的执行程序,这将在 Python 3.9 中引发错误。

错误处理 API

允许自定义事件循环中异常的处理方式。

  • loop. set_exception_handler(* handler *)
    • 将* handler *设置为新的事件循环异常处理程序。

如果* handler None,则将设置默认的异常处理程序。否则, handler *必须是可调用的,签名匹配(loop, context),其中loop是对活动事件循环的引用,而context是包含异常详细信息的dict对象(有关上下文的详细信息,请参见call_exception_handler()文档)。

  • loop. get_exception_handler ( )
    • 返回当前的异常处理程序;如果未设置自定义异常处理程序,则返回None

版本 3.5.2 中的新Function。

  • loop. default_exception_handler(上下文)
    • 默认异常处理程序。

当发生异常且未设置任何异常处理程序时,将调用此方法。这可以由想要遵循默认处理程序行为的自定义异常处理程序调用。

  • loop. call_exception_handler(上下文)
    • 调用当前事件循环异常处理程序。
  • context *是包含以下键的dict对象(新的键可能在将来的 Python 版本中引入):
  • 'message':错误消息;

  • 'exception'(可选):异常对象;

  • 'future'(可选):asyncio.Future个实例;

  • 'handle'(可选):asyncio.Handle个实例;

  • 'protocol'(可选):Protocol个实例;

  • 'transport'(可选):Transport个实例;

  • 'socket'(可选):socket.socket个实例。

Note

此方法不应在子类事件循环中重载。对于自定义异常处理,请使用set_exception_handler()方法。

启用调试模式

  • loop. get_debug ( )
    • 获取事件循环的调试模式(bool)。

如果环境变量 PYTHONASYNCIODEBUG设置为非空字符串,则默认值为True,否则为False

  • loop. set_debug(* enabled:bool *)
    • 设置事件循环的调试模式。

在版本 3.7 中进行了更改:新的-X dev命令行选项现在也可以用于启用调试模式。

See also

Running Subprocesses

本小节中描述的方法是低级方法。在常规的异步/await 代码中,请考虑使用高级asyncio.create_subprocess_shell()asyncio.create_subprocess_exec()便捷Function。

Note

Windows **上的默认 asyncio 事件循环不支持子进程。有关详情,请参见Windows 上的子流程支持

  • 协程 loop. subprocess_exec(* protocol_factory *, *args stdin = subprocess.PIPE stdout = subprocess.PIPE stderr = subprocess.PIPE * kwargs *)
    • 根据* args *指定的一个或多个字符串参数创建一个子进程。
  • args *必须是由以下各项代表的字符串列表:

第一个字符串指定程序可执行文件,其余字符串指定参数。字符串参数一起构成程序的argv

这类似于用shell=False调用的标准库subprocess.Popen类以及作为第一个参数传递的字符串列表。但是,在Popen接受单个参数(它是字符串列表)的情况下,* subprocess_exec *接受多个字符串参数。

Other parameters:

    • stdin *可以是以下任意一种:
  • 表示要使用connect_write_pipe()连接到子流程的标准 Importing 流的管道的文件状对象

    • stdout *可以是以下任意一种:
  • 一个文件状对象,表示要使用connect_write_pipe()连接到子流程的标准输出流的管道

    • stderr *可以是以下任意一种:
  • 表示要使用connect_write_pipe()连接到子流程的标准错误流的管道的文件状对象

  • 除* bufsize universal_newlines shell text encoding errors *之外,所有其他关键字参数均不带解释地传递给subprocess.Popen,这些根本不应指定。

asyncio子流程 API 不支持将流解码为文本。 bytes.decode()可用于将流返回的字节转换为文本。

有关其他参数的文档,请参见subprocess.Popen类的构造函数。

返回Pair(transport, protocol),其中* transport 符合asyncio.SubprocessTransportBase Class,而 protocol 是由 protocol_factory *实例化的对象。

  • 协程 loop. subprocess_shell(* protocol_factory cmd *, * stdin = subprocess.PIPE stdout = subprocess.PIPE stderr = subprocess.PIPE * kwargs *)
    • 使用平台的“ shell”语法从* cmd *创建一个子进程,该子进程可以是编码为filesystem encodingstrbytes字符串。

这类似于用shell=True调用的标准库subprocess.Popen类。

有关其余参数的更多详细信息,请参见subprocess_exec()

返回Pair(transport, protocol),其中* transport 符合SubprocessTransportBase Class,而 protocol 是由 protocol_factory *实例化的对象。

Note

应用程序有责任确保所有空格和特殊字符都正确引用以避免shell injection漏洞。 shlex.quote()函数可用于适当地转义空格和将用于构造 Shell 命令的字符串中的特殊字符。

Callback Handles

  • 类别 asyncio. Handle

  • cancel ( )

    • 取消回调。如果回调已被取消或执行,则此方法无效。
  • cancelled ( )

    • 如果回调已取消,则返回True

3.7 版中的新Function。

此类是Handle的子类。

  • when ( )
    • 返回计划的回调时间为float秒。

时间是绝对时间戳,使用与loop.time()相同的时间参考。

3.7 版中的新Function。

Server Objects

服务器对象是passloop.create_server()loop.create_unix_server()start_server()start_unix_server()函数创建的。

不要直接实例化该类。

  • 类别 asyncio. Server
      • Server *对象是异步上下文 Management 器。在async with语句中使用时,可以确保async with语句完成后 Server 对象已关闭并且不接受新连接:
srv = await loop.create_server(...)

async with srv:
    # some code

# At this point, srv is closed and no longer accepts new connections.

在版本 3.7 中更改:服务器对象是自 Python 3.7 起的异步上下文 Management 器。

  • close ( )
    • 停止提供服务:关闭监听套接字,并将sockets属性设置为None

代表现有传入 Client 端连接的套接字保持打开状态。

服务器异步关闭,请使用wait_closed()协程 await 直到服务器关闭。

  • get_loop ( )
    • 返回与服务器对象关联的事件循环。

3.7 版中的新Function。

  • 协程 start_serving()
    • 开始接受连接。

此方法是幂等的,因此可以在服务器已经在服务时调用它。

loop.create_server()asyncio.start_server()的仅* start_serving *关键字参数允许创建最初不接受连接的 Server 对象。在这种情况下,Server.start_serving()Server.serve_forever()可用于使服务器开始接受连接。

3.7 版中的新Function。

  • 协程 serve_forever()
    • 开始接受连接,直到协程被取消。取消serve_forever任务将导致服务器关闭。

如果服务器已经接受连接,则可以调用此方法。每个* Server *对象只能存在一个serve_forever任务。

Example:

async def client_connected(reader, writer):
    # Communicate with the client with
    # reader/writer streams.  For example:
    await reader.readline()

async def main(host, port):
    srv = await asyncio.start_server(
        client_connected, host, port)
    await srv.serve_forever()

asyncio.run(main('127.0.0.1', 0))

3.7 版中的新Function。

  • is_serving ( )
    • 如果服务器接受新连接,则返回True

3.7 版中的新Function。

  • 协程 wait_closed()

  • sockets

在 3.7 版中进行了更改:在 Python 3.7 之前,Server.sockets用于直接返回服务器套接字的内部列表。在 3.7 中,将返回该列表的副本。

事件循环的实现

asyncio 附带两种不同的事件循环实现:SelectorEventLoopProactorEventLoop

默认情况下,asyncio 配置为在 Unix 上使用SelectorEventLoop,在 Windows 上使用ProactorEventLoop

  • 类别 asyncio. SelectorEventLoop

使用给定平台上最有效的selectors。也可以手动配置要使用的确切 selectors 实现:

import asyncio
import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

Availability:Unix,Windows。

  • 类别 asyncio. ProactorEventLoop
    • Windows 的事件循环,使用“ I/O 完成端口”(IOCP)。

Availability: Windows.

  • 类别 asyncio. AbstractEventLoop
    • 异步兼容事件循环的抽象 Base Class。

事件循环方法部分列出了AbstractEventLoop的替代实现应定义的所有方法。

Examples

请注意,本节中的所有示例(有目的地)展示了如何使用低级事件循环 API,例如loop.run_forever()loop.call_soon()。现代异步应用程序很少需要以这种方式编写;考虑使用asyncio.run()之类的高级Function。

Hello 世界 with call_soon()

使用loop.call_soon()方法安排回调的示例。回调显示"Hello World",然后停止事件循环:

import asyncio

def hello_world(loop):
    """A callback to print 'Hello World' and stop the event loop"""
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

See also

使用协程和run()函数创建的类似Hello World示例。

使用 call_later()显示当前日期

每秒显示当前日期的回调示例。回调使用loop.call_later()方法在 5 秒后重新安排自身的时间,然后停止事件循环:

import asyncio
import datetime

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()

loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
try:
    loop.run_forever()
finally:
    loop.close()

See also

使用协程和run()函数创建的类似current date示例。

观看文件 Descriptors 以了解读取事件

await 文件 Descriptors 使用loop.add_reader()方法接收到一些数据,然后关闭事件循环:

import asyncio
from socket import socketpair

# Create a pair of connected file descriptors
rsock, wsock = socketpair()

loop = asyncio.get_event_loop()

def reader():
    data = rsock.recv(100)
    print("Received:", data.decode())

    # We are done: unregister the file descriptor
    loop.remove_reader(rsock)

    # Stop the event loop
    loop.stop()

# Register the file descriptor for read event
loop.add_reader(rsock, reader)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

try:
    # Run the event loop
    loop.run_forever()
finally:
    # We are done. Close sockets and the event loop.
    rsock.close()
    wsock.close()
    loop.close()

See also

为 SIGINT 和 SIGTERM 设置 signal 处理程序

(此signals示例仅适用于 Unix.)

使用loop.add_signal_handler()方法注册 signalSIGINTSIGTERM的处理程序:

import asyncio
import functools
import os
import signal

def ask_exit(signame, loop):
    print("got signal %s: exit" % signame)
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    for signame in {'SIGINT', 'SIGTERM'}:
        loop.add_signal_handler(
            getattr(signal, signame),
            functools.partial(ask_exit, signame, loop))

    await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")

asyncio.run(main())