Streams

源代码: Lib/asyncio/streams.py


流是用于网络连接的高级异步/await 就绪 Primitives。流允许在不使用回调或底层协议和传输的情况下发送和接收数据。

这是使用异步流编写的 TCP 回显 Client 端的示例:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

另请参见下面的Examples部分。

Stream Functions

以下顶级 asyncio 函数可用于创建和使用流:

返回的* reader writer *对象是StreamReaderStreamWriter类的实例。

其余参数直接传递给loop.create_connection()

3.7 版中的新Function:* ssl_handshake_timeout *参数。

每当构建新的 Client 端连接时,都会调用* client_connected_cb *回调。它接收(reader, writer)对作为两个参数,即StreamReaderStreamWriter类的实例。

其余参数直接传递给loop.create_server()

3.7 版中的新Function:* ssl_handshake_timeout start_serving *参数。

Unix Sockets

open_connection()类似,但在 Unix 套接字上运行。

另请参见loop.create_unix_connection()的文档。

Availability: Unix.

3.7 版中的新Function:* ssl_handshake_timeout *参数。

在 3.7 版中进行了更改:* path *参数现在可以是path-like object

start_server()类似,但可用于 Unix 套接字。

另请参见loop.create_unix_server()的文档。

Availability: Unix.

3.7 版中的新Function:* ssl_handshake_timeout start_serving *参数。

在版本 3.7 中更改:* path *参数现在可以是path-like object

StreamReader

不建议直接实例化* StreamReader *对象。请改用open_connection()start_server()

如果收到 EOF 并且内部缓冲区为空,则返回一个空的bytes对象。

如果收到 EOF 且未找到\n,则该方法返回部分读取的数据。

如果收到 EOF 并且内部缓冲区为空,则返回一个空的bytes对象。

如果在读取* n *之前达到 EOF,请引发IncompleteReadError。使用IncompleteReadError.partial属性获取部分读取的数据。

成功后,数据和分隔符将从内部缓冲区(已消耗)中删除。返回的数据将在末尾包含分隔符。

如果读取的数据量超过配置的流限制,则会引发LimitOverrunError异常,并且数据将保留在内部缓冲区中并可以再次读取。

如果在找到完整的分隔符之前已到达 EOF,则会引发IncompleteReadError异常,并重置内部缓冲区。 IncompleteReadError.partial属性可能包含分隔符的一部分。

版本 3.5.2 中的新Function。

StreamWriter

不建议直接实例化* StreamWriter *对象。请改用open_connection()start_server()

该方法应与drain()方法一起使用:

stream.write(data)
await stream.drain()

该方法应与drain()方法一起使用:

stream.writelines(lines)
await stream.drain()

该方法应与wait_closed()方法一起使用:

stream.close()
await stream.wait_closed()
writer.write(data)
await writer.drain()

这是一种与基础 IO 写缓冲区交互的流控制方法。当缓冲区的大小达到高水位线时,* drain()*会阻塞,直到缓冲区的大小被排空到低水位线为止,然后才能恢复写入。当没有什么可 await 的时,drain()立即返回。

3.7 版中的新Function。

应该在close()之后调用,以 await 基础连接关闭。

3.7 版中的新Function。

Examples

TCP 回显 Client 端使用流

使用asyncio.open_connection()函数的 TCP 回显 Client 端:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()

asyncio.run(tcp_echo_client('Hello World!'))

See also

TCP 回显 Client 端协议示例使用低级loop.create_connection()方法。

TCP 回显服务器使用流

使用asyncio.start_server()Function的 TCP 回显服务器:

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

See also

TCP 回显服务器协议示例使用loop.create_server()方法。

获取 HTTPHeaders

查询命令行中传递的 URL 的 HTTPHeaders 的简单示例:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Usage:

python example.py http://example.com/path/page.html

或使用 HTTPS:

python example.py https://example.com/path/page.html

注册一个开放的套接字以使用流 await 数据

协程 await,直到套接字使用open_connection()函数接收数据:

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

See also

使用协议注册一个开放的套接字以 await 数据示例使用底层协议和loop.create_connection()方法。

观看文件 Descriptors 中的读取事件示例使用低级loop.add_reader()方法监视文件 Descriptors。

首页