Streams

源代码: Lib/asyncio/streams.py


流是用于网络连接的高级异步/await 就绪 Primitives。流允许在不使用回调或底层协议和传输的情况下发送和接收数据。

这是使用异步流编写的 TCP 回显 Client 端的示例:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

另请参见下面的Examples部分。

Stream Functions

以下顶级 asyncio 函数可用于创建和使用流:

  • 协程 asyncio. open_connection(* host = None port = None **,* loop = None limit = None ssl = None family = 0 proto = 0 flags = 0 sock = None local_addr = None server_hostname = None ssl_handshake_timeout = None *)
    • 构建网络连接并返回Pair(reader, writer)对象。

返回的* reader writer *对象是StreamReaderStreamWriter类的实例。

  • loop *参数是可选的,并且在协程中 await 该函数时始终可以自动确定。

  • limit 确定返回的StreamReader实例使用的缓冲区大小限制。默认情况下, limit *设置为 64 KiB。

其余参数直接传递给loop.create_connection()

3.7 版中的新Function:* ssl_handshake_timeout *参数。

  • 协程 asyncio. start_server((client_connected_cb host = None port = None **,* loop = None limit = None family = socket.AF_UNSPEC flags = socket .AI_PASSIVE sock = None backlog = 100 ssl = None reuse_address = None reuse_port = None ssl_handshake_timeout = None start_serving = True *)
    • 启动套接字服务器。

每当构建新的 Client 端连接时,都会调用* client_connected_cb *回调。它接收(reader, writer)对作为两个参数,即StreamReaderStreamWriter类的实例。

  • client_connected_cb *可以是普通可调用项,也可以是coroutine function;如果它是协程函数,它将被自动调度为Task

  • loop *参数是可选的,并且在从协程 await 该方法时始终可以自动确定。

  • limit 确定返回的StreamReader实例使用的缓冲区大小限制。默认情况下, limit *设置为 64 KiB。

其余参数直接传递给loop.create_server()

3.7 版中的新Function:* ssl_handshake_timeout start_serving *参数。

Unix Sockets

  • 协程 asyncio. open_unix_connection(* path = None **,* loop = None limit = None ssl = None sock = None server_hostname = None ssl_handshake_timeout = None *)
    • 构建 Unix 套接字连接并返回Pair(reader, writer)

open_connection()类似,但在 Unix 套接字上运行。

另请参见loop.create_unix_connection()的文档。

Availability: Unix.

3.7 版中的新Function:* ssl_handshake_timeout *参数。

在 3.7 版中进行了更改:* path *参数现在可以是path-like object

  • 协程 asyncio. start_unix_server((client_connected_cb path = None **,* loop = None limit = None sock = None backlog = 100 ssl = None *, * ssl_handshake_timeout = None start_serving = True *)
    • 启动 Unix 套接字服务器。

start_server()类似,但可用于 Unix 套接字。

另请参见loop.create_unix_server()的文档。

Availability: Unix.

3.7 版中的新Function:* ssl_handshake_timeout start_serving *参数。

在版本 3.7 中更改:* path *参数现在可以是path-like object

StreamReader

  • 类别 asyncio. StreamReader
    • 表示一个提供 API 以便从 IO 流读取数据的 Reader 对象。

不建议直接实例化* StreamReader *对象。请改用open_connection()start_server()

  • 协程 read(* n = -1 *)
    • 最多读取* n 个字节。如果未提供 n *或设置为-1,请读取直到 EOF 并返回所有读取的字节。

如果收到 EOF 并且内部缓冲区为空,则返回一个空的bytes对象。

  • 协程 readline()
    • 读一行,其中“行”是以\n结尾的字节序列。

如果收到 EOF 且未找到\n,则该方法返回部分读取的数据。

如果收到 EOF 并且内部缓冲区为空,则返回一个空的bytes对象。

  • 协程 readexactly(* n *)
    • 精确读取* n *个字节。

如果在读取* n *之前达到 EOF,请引发IncompleteReadError。使用IncompleteReadError.partial属性获取部分读取的数据。

  • 协程 readuntil(分隔符= b' n')
    • 从流中读取数据,直到找到* separator *。

成功后,数据和分隔符将从内部缓冲区(已消耗)中删除。返回的数据将在末尾包含分隔符。

如果读取的数据量超过配置的流限制,则会引发LimitOverrunError异常,并且数据将保留在内部缓冲区中并可以再次读取。

如果在找到完整的分隔符之前已到达 EOF,则会引发IncompleteReadError异常,并重置内部缓冲区。 IncompleteReadError.partial属性可能包含分隔符的一部分。

版本 3.5.2 中的新Function。

  • at_eof ( )
    • 如果缓冲区为空并且调用了feed_eof(),则返回True

StreamWriter

  • 类别 asyncio. StreamWriter
    • 表示 writer 对象,该对象提供用于将数据写入 IO 流的 API。

不建议直接实例化* StreamWriter *对象。请改用open_connection()start_server()

  • write(* data *)
    • 该方法try立即将* data *写入基础套接字。如果失败,数据将在内部写缓冲区中排队,直到可以发送为止。

该方法应与drain()方法一起使用:

stream.write(data)
await stream.drain()
  • writelines(* data *)
    • 该方法立即将字节列表(或任何可迭代的字节)写入基础套接字。如果失败,数据将在内部写缓冲区中排队,直到可以发送为止。

该方法应与drain()方法一起使用:

stream.writelines(lines)
await stream.drain()
  • close ( )
    • 该方法关闭流和基础套接字。

该方法应与wait_closed()方法一起使用:

stream.close()
await stream.wait_closed()
  • can_write_eof ( )

    • 如果基础传输支持write_eof()方法,则返回True,否则返回False
  • write_eof ( )

    • 在刷新缓冲的写数据之后,关闭流的写端。
  • transport

    • 返回基础异步传输。
  • get_extra_info(* name default = None *)

  • 协程 drain()

    • await,直到适合 continue 写入流为止。例:
writer.write(data)
await writer.drain()

这是一种与基础 IO 写缓冲区交互的流控制方法。当缓冲区的大小达到高水位线时,* drain()*会阻塞,直到缓冲区的大小被排空到低水位线为止,然后才能恢复写入。当没有什么可 await 的时,drain()立即返回。

  • is_closing ( )
    • 如果流已关闭或正在关闭,则返回True

3.7 版中的新Function。

  • 协程 wait_closed()
    • await 流关闭。

应该在close()之后调用,以 await 基础连接关闭。

3.7 版中的新Function。

Examples

TCP 回显 Client 端使用流

使用asyncio.open_connection()函数的 TCP 回显 Client 端:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()

asyncio.run(tcp_echo_client('Hello World!'))

See also

TCP 回显服务器使用流

使用asyncio.start_server()Function的 TCP 回显服务器:

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

See also

获取 HTTPHeaders

查询命令行中传递的 URL 的 HTTPHeaders 的简单示例:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Usage:

python example.py http://example.com/path/page.html

或使用 HTTPS:

python example.py https://example.com/path/page.html

注册一个开放的套接字以使用流 await 数据

协程 await,直到套接字使用open_connection()函数接收数据:

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

See also