queue —同步队列类

源代码: Lib/queue.py


queue模块实现多生产者,多 Consumer 队列。当必须在多个线程之间安全地交换信息时,它在线程编程中特别有用。此模块中的Queue类实现所有必需的锁定语义。

该模块实现了三种类型的队列,它们的区别仅在于检索条目的 Sequences 不同。在 FIFO 队列中,首先检索到添加的第一个任务。在 LIFO 队列中,最近添加的条目是第一个检索到的条目(操作类似于堆栈)。使用优先级队列,条目将保持排序(使用heapq模块),并首先检索值最低的条目。

在内部,这三种类型的队列使用锁来临时阻止竞争线程。但是,它们并非旨在处理线程内的重入。

另外,该模块实现了“简单” FIFO 队列类型SimpleQueue,其特定实现为交换较小的Function提供了额外的保证。

queue模块定义以下类和异常:

最低值的条目首先被检索(最低值的条目是sorted(list(entries))[0]返回的条目)。条目的典型模式是(priority_number, data)形式的 Tuples。

如果* data *元素不可比较,则可以将数据包装在忽略数据项并且仅比较优先级数字的类中:

from dataclasses import dataclass, field
from typing import Any

@dataclass(order=True)
class PrioritizedItem:
    priority: int
    item: Any=field(compare=False)

3.7 版中的新Function。

Queue Objects

队列对象(QueueLifoQueuePriorityQueue)提供下面描述的公共方法。

在 POSIX 系统上的 3.0 之前的版本以及 Windows 上的所有版本中,如果* block 为 true 且 timeout *为None,则此操作将在基础锁上进行不间断的 await。这意味着不会发生任何异常,尤其是 SIGINT 不会触发KeyboardInterrupt

提供两种方法来支持跟踪排队的任务是否已由守护程序使用者线程完全处理。

如果join()当前正在阻止,它将在处理完所有项目后恢复(意味着收到put()进入队列的每个项目都收到task_done()调用)。

如果被调用的次数比队列中放置的项目多,则引发ValueError

每当将项目添加到队列时,未完成任务的数量就会增加。每当使用者线程调用task_done()表示已检索到该项目并且该项目的所有工作已完成时,该计数就会减少。当未完成的任务计数降至零时,join()解除阻止。

如何 await 排队的任务完成的示例:

import threading, queue

q = queue.Queue()

def worker():
    while True:
        item = q.get()
        print(f'Working on {item}')
        print(f'Finished {item}')
        q.task_done()

# turn-on the worker thread
threading.Thread(target=worker, daemon=True).start()

# send thirty task requests to the worker
for item in range(30):
    q.put(item)
print('All task requests sent\n', end='')

# block until all tasks are done
q.join()
print('All work completed')

SimpleQueue Objects

SimpleQueue对象提供了下面描述的公共方法。

CPython 实现细节: 此方法具有可重入的 C 实现。也就是说,一个put()get()调用可以被同一线程中的另一个put()调用break,而不会死锁或破坏队列内部的内部状态。这使其适合在析构函数(例如__del__方法或weakref回调)中使用。

See also

collections.deque是具有快速原子append()popleft()操作的无界队列的替代实现,这些操作不需要锁定并且还支持索引。

首页