On this page
Queues
异步队列被设计为类似于queue模块的类。尽管异步队列不是线程安全的,但它们专门设计用于异步/await 代码。
请注意,异步队列的方法没有* timeout *参数。使用asyncio.wait_for()函数进行超时的队列操作。
另请参见下面的Examples部分。
Queue
-
- class *
asyncio.
Queue
(* maxsize = 0 ,**,* loop = None *)
- 先进先出(FIFO)队列。
- class *
如果* maxsize 小于或等于零,则队列大小为无限。如果它是大于0
的整数,则await put()
会在队列达到 maxsize *时阻塞,直到由get()删除某项为止。
与标准库线程queue不同,队列的大小始终是已知的,可以pass调用qsize()方法返回。
从 3.8 版开始不推荐使用,将在 3.10 版中删除:* loop *参数。
此类为不是线程安全的。
maxsize
- 队列中允许的项目数。
empty
( )- 如果队列为空,则返回
True
,否则返回False
。
- 如果队列为空,则返回
full
( )- 如果队列中有maxsize个项目,则返回
True
。
- 如果队列中有maxsize个项目,则返回
如果队列是使用maxsize=0
(默认值)初始化的,则full()从不返回True
。
协程
get
()- 从队列中删除并返回一个项目。如果队列为空,请 await 直到有一个项目可用。
get_nowait
( )- 如果有货马上return,否则returnQueueEmpty。
协程
join
()- 阻塞直到队列中的所有项目都已接收并处理。
每当将项目添加到队列时,未完成任务的数量就会增加。每当 Consumer 协程呼叫task_done()表示已检索到该物品并且该物品的所有工作已完成时,该计数就会减少。当未完成的任务计数降至零时,join()解除阻止。
协程
put
(项目)- 将项目放入队列。如果队列已满,请等到空闲插槽可用后再添加项目。
put_nowait
(项目)- 将项目放入队列而不会阻塞。
如果没有立即可用的空闲插槽,请升高QueueFull。
qsize
( )- 返回队列中的项目数。
task_done
( )- 表示先前排队的任务已完成。
由队列使用者使用。对于用于提取任务的每个get(),随后对task_done()的调用将告诉队列该任务的处理已完成。
如果join()当前正在阻止,它将在处理完所有项目后恢复(意味着收到put()进入队列的每个项目都收到task_done()调用)。
如果被调用的次数比队列中放置的项目多,则引发ValueError。
Priority Queue
- 类别
asyncio.
PriorityQueue
- Queue的变体;以优先级 Sequences 检索条目(从低到低)。
条目通常是(priority_number, data)
形式的 Tuples。
LIFO Queue
- 类别
asyncio.
LifoQueue
- Queue的变体,首先检索最新添加的条目(后进先出)。
Exceptions
exception
asyncio.
QueueEmpty
- 在空队列上调用get_nowait()方法时,会引发此异常。
exception
asyncio.
QueueFull
- 在已达到其* maxsize *的队列上调用put_nowait()方法时引发异常。
Examples
队列可用于在多个并发任务之间分配工作负载:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())