传输和协议

Preface

低级**事件循环 API(例如loop.create_connection())使用传输和协议。它们使用基于回调的编程风格,并实现网络或 IPC 协议(例如 HTTP)的高性能实现。

本质上,传输和协议仅应在库和框架中使用,而不应在高级异步应用程序中使用。

本文档页面涵盖TransportsProtocols

Introduction

在最高级别,传输与字节的传输方式有关,而协议则确定**字节*的传输(以及某种程度上的时间)。

说同一件事的另一种方式:从传输的角度来看,传输是套接字(或类似的 I/O 端点)的抽象,而协议是应用程序的抽象。

另一个观点是,传输接口和协议接口共同定义了用于使用网络 I/O 和进程间 I/O 的抽象接口。

传输对象和协议对象之间始终存在 1:1 的关系:协议调用传输方法来发送数据,而传输调用协议方法来传递已接收的数据。

大多数面向连接的事件循环方法(例如loop.create_connection())通常都接受一个* protocol_factory 参数,该参数用于为一个可接受的连接创建一个 Protocol 对象,该对象由一个 Transport *对象表示。此类方法通常返回(transport, protocol)的 Tuples。

Contents

本文档页面包含以下部分:

Transports

源代码: Lib/asyncio/transports.py


传输是asyncio提供的类,用于抽象各种通信 Channel。

传输对象始终由异步事件循环实例化。

asyncio 实现 TCP,UDP,SSL 和子进程管道的传输。运输工具可用的方法取决于运输工具的种类。

运输类别为不是线程安全的

Transports Hierarchy

  • 类别 asyncio. BaseTransport

    • 所有运输的 Base Class。包含所有异步传输共享的方法。
    • class * asyncio. WriteTransport(* BaseTransport *)
    • 只写连接的基本传输。
    • class * asyncio. ReadTransport(* BaseTransport *)
    • 只读连接的基本传输。
    • class * asyncio. Transport(* WriteTransport ReadTransport *)
    • 表示双向传输的接口,例如 TCP 连接。

用户不直接实例化传输。他们调用一个 Util 函数,将其传递给协议工厂以及创建传输和协议所需的其他信息。

    • class * asyncio. DatagramTransport(* BaseTransport *)
    • 数据报(UDP)连接的传输。
    • class * asyncio. SubprocessTransport(* BaseTransport *)
    • 表示父代与其子代 OS 进程之间的连接的抽象。

Base Transport

  • BaseTransport. close ( )
    • 关闭运输。

如果传输具有用于传出数据的缓冲区,则将异步刷新缓冲的数据。不会再收到任何数据。刷新所有缓冲的数据后,将以None作为其参数调用协议的protocol.connection_lost()方法。

  • BaseTransport. is_closing ( )

    • 如果运输正在关闭或已关闭,则返回True
  • BaseTransport. get_extra_info(* name default = None *)

    • 返回有关其使用的传输或基础资源的信息。
  • name *是一个字符串,代表要获取的特定于运输的信息。

  • default *是在信息不可用或传输不支持使用给定的第三方事件循环实现或在当前平台上查询时返回的值。

例如,以下代码try获取传输的基础套接字对象:

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

可以在某些传输中查询的信息类别:

仅在记录了两个协议都支持该交换的情况下,才应执行交换协议。

  • BaseTransport. get_protocol ( )
    • 返回当前协议。

Read-only Transports

  • ReadTransport. is_reading ( )
    • 如果传输正在接收新数据,则返回True

3.7 版中的新Function。

在版本 3.7 中更改:该方法是幂等的,即可以在传输已暂停或关闭时调用该方法。

  • ReadTransport. resume_reading ( )

在版本 3.7 中更改:该方法是幂等的,即可以在读取传输文件时调用它。

Write-only Transports

  • WriteTransport. abort ( )

    • 立即关闭传输,而无需 await 未完成的操作完成。缓冲的数据将丢失。不会再收到任何数据。协议的protocol.connection_lost()方法finally将以None作为其参数来调用。
  • WriteTransport. can_write_eof ( )

  • WriteTransport. get_write_buffer_size ( )

    • 返回传输使用的输出缓冲区的当前大小。
  • WriteTransport. get_write_buffer_limits ( )

    • 获取水印以进行写流控制。返回一个 Tuples(low, high),其中* low high *是正字节数。

使用set_write_buffer_limits()设置限制。

版本 3.4.2 中的新Function。

  • WriteTransport. set_write_buffer_limits(* high = None low = None *)
    • 设置水印以进行写流控制。

这两个值(以字节数为单位)控制何时调用协议的protocol.pause_writing()protocol.resume_writing()方法。如果指定,则低水印必须小于或等于高水印。 都不能为负。

当缓冲区大小大于或等于* high 值时,将调用pause_writing()。如果写入已暂停,则当缓冲区大小小于或等于 low *值时,将调用resume_writing()

默认值是特定于实现的。如果仅给出高水印,则低水印默认为特定于实现的值,该值小于或等于高水印。将* high 设置为零也将 low 设置为零,并且只要缓冲区变为非空,就会调用pause_writing()。将 low *设置为零会使resume_writing()仅在缓冲区为空时被调用。对于任一限制使用零通常不是最佳选择,因为它减少了同时执行 I/O 和计算的机会。

使用get_write_buffer_limits()获取限制。

  • WriteTransport. write(* data *)
    • 向传输中写入一些* data *字节。

此方法不会阻止;它缓冲数据并安排将其异步发送出去。

  • WriteTransport. writelines(* list_of_data *)

    • 将数据字节列表(或任何可迭代的字节)写入传输。这在Function上等效于在由迭代器产生的每个元素上调用write(),但可以更有效地实现。
  • WriteTransport. write_eof ( )

    • 刷新所有缓冲的数据后,关闭传输的写端。数据仍可能被接收。

如果传输方式(例如 SSL)不支持半封闭连接,则此方法可以引发NotImplementedError

Datagram Transports

  • DatagramTransport. sendto(* data addr = None *)
    • 将* data 字节发送到 addr (与传输有关的目标地址)给定的远程对等方。如果 addr *为None,则数据将发送到创建传输时给定的目标地址。

此方法不会阻止;它缓冲数据并安排将其异步发送出去。

  • DatagramTransport. abort ( )
    • 立即关闭传输,而无需 await 未完成的操作完成。缓冲的数据将丢失。不会再收到任何数据。协议的protocol.connection_lost()方法finally将以None作为其参数来调用。

Subprocess Transports

  • SubprocessTransport. get_pid ( )

    • 以整数形式返回子流程的流程 ID。
  • SubprocessTransport. get_pipe_transport(* fd *)

    • 返回与整数文件 Descriptors* fd *对应的通信管道的传输:
  • 0:标准 Importing(* stdin *)的可读流传输,如果不是使用stdin=PIPE创建子进程,则返回None

  • 1:标准输出(* stdout *)的可写流传输,如果不是使用stdout=PIPE创建的子进程,则为None

  • 2:标准错误(* stderr *)的可写流传输,如果不是使用stderr=PIPE创建的子进程,则返回None

  • 其他* fd *:None

  • SubprocessTransport. get_returncode ( )

  • SubprocessTransport. kill ( )

    • 杀死子进程。

在 POSIX 系统上,该函数将 SIGKILL 发送到子进程。在 Windows 上,此方法是terminate()的别名。

另请参见subprocess.Popen.kill()

  • SubprocessTransport. send_signal(* signal *)

  • SubprocessTransport. terminate ( )

    • 停止子进程。

在 POSIX 系统上,此方法将 SIGTERM 发送到子进程。在 Windows 上,将调用 Windows API 函数 TerminateProcess()来停止子进程。

另请参见subprocess.Popen.terminate()

  • SubprocessTransport. close ( )
    • pass调用kill()方法终止子进程。

如果尚未返回子进程,则关闭* stdin stdout stderr *管道的传输。

Protocols

源代码: Lib/asyncio/protocols.py


asyncio 提供了一组抽象 Base Class,应该用于实现网络协议。这些类应与transports一起使用。

抽象基本协议类的子类可以实现某些或所有方法。所有这些方法都是回调:它们在某些事件(例如,接收到某些数据)时由传输调用。基本协议方法应由相应的传输程序调用。

Base Protocols

  • 类别 asyncio. BaseProtocol

    • 基本协议以及所有协议共享的方法。
    • class * asyncio. Protocol(* BaseProtocol *)
    • 实现流协议(TCP,Unix 套接字等)的 Base Class。
    • class * asyncio. BufferedProtocol(* BaseProtocol *)
    • Base Class,用于pass手动控制接收缓冲区来实现流协议。
    • class * asyncio. DatagramProtocol(* BaseProtocol *)
    • 实现数据报(UDP)协议的 Base Class。
    • class * asyncio. SubprocessProtocol(* BaseProtocol *)
    • 用于实现与子进程(单向管道)通信的协议的 Base Class。

Base Protocol

所有异步协议都可以实现基本协议回调。

Connection Callbacks

在所有协议上都将调用连接回调,每个成功连接仅调用一次。所有其他协议回调只能在这两种方法之间调用。

  • BaseProtocol. connection_made(* transport *)
    • 构建连接时调用。
  • transport *参数是表示连接的传输。该协议负责存储对其传输的引用。
  • BaseProtocol. connection_lost(* exc *)
    • 连接丢失或关闭时调用。

参数是异常对象或None。后者表示已收到常规 EOF,或者连接的这一侧已中止或关闭了连接。

流控制回调

传输可以调用流控制回调,以暂停或恢复协议执行的写入。

有关更多详细信息,请参见set_write_buffer_limits()方法的文档。

  • BaseProtocol. pause_writing ( )

    • 在传输缓冲区超过高水位线时调用。
  • BaseProtocol. resume_writing ( )

    • 当传输缓冲区的水位低于低水位线时调用。

如果缓冲区大小等于高水位线,则不会调用pause_writing():缓冲区大小必须严格超过。

相反,当缓冲区大小等于或小于低水位标记时,将调用resume_writing()。这些结束条件对于确保当任一标记为零时事情按预期进行很重要。

Streaming Protocols

诸如loop.create_server()loop.create_unix_server()loop.create_connection()loop.create_unix_connection()loop.connect_accepted_socket()loop.connect_read_pipe()loop.connect_write_pipe()之类的事件方法接受返回流协议的工厂。

  • Protocol. data_received(* data *)
    • 收到一些数据时调用。 * data *是一个包含 Importing 数据的非空字节对象。

数据是缓冲,分块还是重组取决于传输方式。通常,您不应该依赖于特定的语义,而应该使解析具有通用性和灵 Active。但是,始终以正确的 Sequences 接收数据。

连接打开时,可以多次调用该方法。

但是,protocol.eof_received()最多只能调用一次。一旦调用 eof_received(),就不再调用data_received()

  • Protocol. eof_received ( )
    • 在另一端发出 signal 将不再发送任何数据时调用(例如,如果另一端也使用 asyncio,则pass调用transport.write_eof())。

此方法可能返回错误的值(包括None),在这种情况下,传输将自行关闭。相反,如果此方法返回一个真值,则使用的协议将确定是否关闭传输。由于默认实现返回None,因此它隐式关闭了连接。

某些传输(包括 SSL)不支持半封闭连接,在这种情况下,从此方法返回 true 将导致连接被封闭。

State machine:

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

缓冲的流协议

3.7 版中的新Function: 重要: 已在 Python 3.7 中临时添加到 asyncio 中!这是一个实验性 API,可能会在 Python 3.8 中完全更改或删除。

缓冲协议可以与任何支持Streaming Protocols的事件循环方法一起使用。

BufferedProtocol实现允许显式手动分配和控制接收缓冲区。然后,事件循环可以使用协议提供的缓冲区来避免不必要的数据复制。对于接收大量数据的协议,这可以显着提高性能。复杂的协议实现可以显着减少缓冲区分配的数量。

BufferedProtocol个实例上调用以下回调:

  • BufferedProtocol. get_buffer(* sizehint *)
    • 调用以分配新的接收缓冲区。
  • sizehint 是建议的返回缓冲区的最小大小。返回小于或等于 sizehint *建议值的缓冲区是可以接受的。设置为-1 时,缓冲区大小可以是任意的。返回大小为零的缓冲区是错误的。

get_buffer()必须返回实现buffer protocol的对象。

  • BufferedProtocol. buffer_updated(* nbytes *)
    • 用接收到的数据更新缓冲区时调用。
  • nbytes *是写入缓冲区的字节总数。

get_buffer()在连接期间可以被任意调用。但是,protocol.eof_received()最多被调用一次,如果被调用,get_buffer()buffer_updated()将不会在其后被调用。

State machine:

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

Datagram Protocols

数据报协议实例应由传递给loop.create_datagram_endpoint()方法的协议工厂构造。

  • DatagramProtocol. datagram_received(* data addr *)

    • 收到数据报时调用。 * data *是包含 Importing 数据的字节对象。 * addr *是发送数据的对等方的地址;确切的格式取决于运输方式。
  • DatagramProtocol. error_received(* exc *)

    • 在先前的发送或接收操作引发OSError时调用。 * exc *是OSError实例。

当传输(例如 UDP)检测到数据报无法传递给接收方时,在极少数情况下会调用此方法。但是,在许多情况下,无法交付的数据报将被静默丢弃。

Note

在 BSD 系统(macOS,FreeBSD 等)上,数据报协议不支持流控制,因为没有可靠的方法来检测由于写入过多数据包而导致的发送失败。

套接字始终显示为“就绪”,多余的数据包将被丢弃。 errno设置为errno.ENOBUFSOSError可能会引发,也可能不会引发;如果它提高了,它将被报告给DatagramProtocol.error_received(),否则将被忽略。

Subprocess Protocols

数据报协议实例应由传递给loop.subprocess_exec()loop.subprocess_shell()方法的协议工厂构造。

  • SubprocessProtocol. pipe_data_received(* fd data *)
    • 当子进程将数据写入其 stdout 或 stderr 管道时调用。
  • fd *是管道的整数文件 Descriptors。

  • data *是一个包含接收到的数据的非空字节对象。

  • SubprocessProtocol. pipe_connection_lost(* fd exc *)
    • 当与子进程进行通信的管道之一关闭时调用。
  • fd *是已关闭的整数文件 Descriptors。
  • SubprocessProtocol. process_exited ( )
    • 子进程退出时调用。

Examples

TCP 回显服务器

使用loop.create_server()方法创建 TCP 回显服务器,发回接收到的数据,并关闭连接:

import asyncio

class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()

asyncio.run(main())

See also

TCP EchoClient 端

使用loop.create_connection()方法的 TCP 回显 Client 端,发送数据,并 await 直到连接关闭:

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)

async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()

asyncio.run(main())

UDP 回显服务器

UDP 回显服务器使用loop.create_datagram_endpoint()方法发送回接收到的数据:

import asyncio

class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)

async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(),
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()

asyncio.run(main())

UDP EchoClient 端

UDP 回显 Client 端使用loop.create_datagram_endpoint()方法发送数据并在收到答案时关闭传输:

import asyncio

class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)

async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()

asyncio.run(main())

连接现有的套接字

await 套接字使用带有协议的loop.create_connection()方法接收数据:

import asyncio
import socket

class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport;
        # connection_lost() will be called automatically.
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed
        self.on_con_lost.set_result(True)

async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Create a pair of connected sockets
    rsock, wsock = socket.socketpair()

    # Register the socket to wait for data.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Simulate the reception of data from the network.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

See also

观看文件 Descriptors 中的读取事件示例使用低级loop.add_reader()方法注册 FD。

注册一个打开的套接字以使用流 await 数据示例在协程中使用open_connection()函数创建的高级流。

loop.subprocess_exec()和 SubprocessProtocol

子流程协议的示例,用于获取子流程的输出并 await 子流程退出。

子流程是passloop.subprocess_exec()方法创建的:

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exit_future.set_result(True)

async def get_date():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Create the subprocess controlled by DateProtocol;
    # redirect the standard output into a pipe.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Wait for the subprocess exit using the process_exited()
    # method of the protocol.
    await exit_future

    # Close the stdout pipe.
    transport.close()

    # Read the output which was collected by the
    # pipe_data_received() method of the protocol.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

另请参见使用高级 API 编写的same example