Queues

源代码: Lib/asyncio/queues.py


异步队列被设计为类似于queue模块的类。尽管异步队列不是线程安全的,但它们专门设计用于异步/await 代码。

请注意,异步队列的方法没有* timeout *参数。使用asyncio.wait_for()函数进行超时的队列操作。

另请参见下面的Examples部分。

Queue

    • class * asyncio. Queue(* maxsize = 0 **,* loop = None *)
    • 先进先出(FIFO)队列。

如果* maxsize 小于或等于零,则队列大小为无限。如果它是大于0的整数,则await put()会在队列达到 maxsize *时阻塞,直到由get()删除某项为止。

与标准库线程queue不同,队列的大小始终是已知的,可以pass调用qsize()方法返回。

从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。

此类为不是线程安全的

  • maxsize

    • 队列中允许的项目数。
  • empty ( )

    • 如果队列为空,则返回True,否则返回False
  • full ( )

    • 如果队列中有maxsize个项目,则返回True

如果队列是使用maxsize=0(默认值)初始化的,则full()从不返回True

  • 协程 get()

    • 从队列中删除并返回一个项目。如果队列为空,请 await 直到有一个项目可用。
  • get_nowait ( )

    • 如果有货马上return,否则returnQueueEmpty
  • 协程 join()

    • 阻塞直到队列中的所有项目都已接收并处理。

每当将项目添加到队列时,未完成任务的数量就会增加。每当 Consumer 协程呼叫task_done()表示已检索到该物品并且该物品的所有工作已完成时,该计数就会减少。当未完成的任务计数降至零时,join()解除阻止。

  • 协程 put(项目)

    • 将项目放入队列。如果队列已满,请等到空闲插槽可用后再添加项目。
  • put_nowait(项目)

    • 将项目放入队列而不会阻塞。

如果没有立即可用的空闲插槽,请升高QueueFull

  • qsize ( )

    • 返回队列中的项目数。
  • task_done ( )

    • 表示先前排队的任务已完成。

由队列使用者使用。对于用于提取任务的每个get(),随后对task_done()的调用将告诉队列该任务的处理已完成。

如果join()当前正在阻止,它将在处理完所有项目后恢复(意味着收到put()进入队列的每个项目都收到task_done()调用)。

如果被调用的次数比队列中放置的项目多,则引发ValueError

Priority Queue

  • 类别 asyncio. PriorityQueue
    • Queue的变体;以优先级 Sequences 检索条目(从低到低)。

条目通常是(priority_number, data)形式的 Tuples。

LIFO Queue

  • 类别 asyncio. LifoQueue
    • Queue的变体,首先检索最新添加的条目(后进先出)。

Exceptions

  • exception asyncio. QueueEmpty

    • 在空队列上调用get_nowait()方法时,会引发此异常。
  • exception asyncio. QueueFull

    • 在已达到其* maxsize *的队列上调用put_nowait()方法时引发异常。

Examples

队列可用于在多个并发任务之间分配工作负载:

import asyncio
import random
import time

async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')

async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')

asyncio.run(main())