16.6. multiprocessing-基于进程的“线程”界面
2.6 版的新Function。
16.6.1. Introduction
multiprocessing是使用类似于threading模块的 API 支持生成程序的软件包。 multiprocessing包同时提供本地和远程并发,pass使用子进程而不是线程来有效地避开全局翻译锁。因此,multiprocessing模块允许程序员充分利用给定机器上的多个处理器。它可以在 Unix 和 Windows 上运行。
multiprocessing模块还引入了threading模块中没有类似物的 API。最好的例子是Pool
对象,该对象提供了一种便利的方法,可以跨多个 Importing 值并行执行函数,从而跨进程分配 Importing 数据(数据并行性)。下面的示例演示了在模块中定义此类Function的常规做法,以便子进程可以成功导入该模块。这个使用Pool
的数据并行性的基本示例,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
p = Pool(5)
print(p.map(f, [1, 2, 3]))
将打印到标准输出
[1, 4, 9]
16.6.1.1. 流程类
在multiprocessing中,pass创建Process对象然后调用其start()方法来生成进程。 Process遵循threading.Thread的 API。多进程程序的一个简单示例是
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
为了显示涉及的各个进程 ID,下面是一个扩展的示例:
from multiprocessing import Process
import os
def info(title):
print title
print 'module name:', __name__
if hasattr(os, 'getppid'): # only available on Unix
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
def f(name):
info('function f')
print 'hello', name
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
有关为什么(在 Windows 上)需要if __name__ == '__main__'
部分的说明,请参见Programming guidelines。
16.6.1.2. 在流程之间交换对象
multiprocessing支持进程之间的两种通信通道:
Queues
Note
Queue类是Queue.Queue的近克隆。例如:
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
队列是线程和进程安全的。
Pipes
Note
Pipe()函数返回Pairpass管道连接的连接对象,管道默认情况下为双工(双向)。例如:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()
Pipe()返回的两个连接对象代表管道的两端。每个连接对象都有send()和recv()个方法(以及其他方法)。请注意,如果两个进程(或线程)try同时读取或写入管道的“相同”端,则管道中的数据可能会损坏。当然,不存在同时使用管道不同端的过程造成损坏的风险。
16.6.1.3. 流程之间的同步
multiprocessing包含threading中所有同步 Primitives 的等效项。例如,可以使用锁来确保一次仅将一个进程打印到标准输出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
print 'hello world', i
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
如果不使用来自不同进程的锁输出,则很容易混淆所有信息。
16.6.1.4. 进程之间共享状态
如上所述,在进行并行编程时,通常最好尽可能避免使用共享状态。使用多个进程时尤其如此。
但是,如果您确实确实需要使用一些共享数据,则multiprocessing提供了几种方法。
Shared memory
Note
可以使用Value或Array将数据存储在共享内存 Map 中。例如下面的代码
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print num.value
print arr[:]
will print
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建num
和arr
时使用的'd'
和'i'
参数是array模块使用的类型的类型代码:'d'
表示双精度浮点数,而'i'
表示带符号整数。这些共享对象将是进程和线程安全的。
为了更灵活地使用共享内存,可以使用multiprocessing.sharedctypes模块,该模块支持创建从共享内存分配的任意 ctypes 对象。
Server process
Note
Manager()
返回的 Management 器对象控制着一个服务器进程,该进程持有 Python 对象,并允许其他进程使用代理对其进行操作。
Manager()
返回的 Manager 将支持list
,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array类型。例如,
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print d
print l
will print
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
服务器进程 Management 器比使用共享内存对象更灵活,因为可以使它们支持任意对象类型。同样,单个 Management 器可以由网络上不同计算机上的进程共享。但是,它们比使用共享内存慢。
16.6.1.5. 雇用 Worker
Pool
类表示辅助进程池。它具有允许以几种不同方式将任务卸载到工作进程的方法。
For example:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
# print "[0, 1, 4,..., 81]"
print pool.map(f, range(10))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print i
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print res.get(timeout=1) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print res.get(timeout=1) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print [res.get(timeout=1) for res in multiple_results]
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print res.get(timeout=1)
except TimeoutError:
print "We lacked patience and got a multiprocessing.TimeoutError"
请注意,池的方法仅应由创建它的进程使用。
Note
此软件包中的Function要求__main__
模块可由子级导入。 Programming guidelines对此进行了介绍,但是这里值得指出。这意味着某些示例(例如Pool
示例)在交互式解释器中将不起作用。例如:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(如果try此操作,则实际上将以半随机方式输出交错的三个完整回溯,然后您可能不得不以某种方式停止主进程.)
16.6.2. Reference
multiprocessing软件包主要复制threading模块的 API。
16.6.2.1. 流程和 exception
-
- class *
multiprocessing.
Process
(* group = None , target = None , name = None , args =(), kwargs ={} *)
- 流程对象表示在单独的流程中运行的活动。 Process类具有threading.Thread的所有方法的等效项。
- class *
构造函数应始终使用关键字参数进行调用。 * group *应始终为None
;它仅用于与threading.Thread兼容。 * target *是由run()方法调用的可调用对象。默认为None
,表示什么都不会被调用。 * name 是进程名称。默认情况下,唯一名称以'Process-N1:N2:...:Nk'的形式构造,其中 N1,N2,...,Nk 是一个整数序列,其长度由进程的 generation *决定。 * args *是目标调用的参数 Tuples。 * kwargs 是用于目标调用的关键字参数字典。默认情况下,没有参数传递给 target *。
如果子类覆盖了构造函数,则必须确保在对进程进行其他任何操作之前,先调用 Base Class 构造函数(Process.__init__()
)。
run
( )- 表示流程活动的方法。
您可以在子类中重写此方法。标准run()方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),并分别从* args 和 kwargs *参数中获取 Sequences 参数和关键字参数。
start
( )- 开始流程的活动。
每个过程对象最多只能调用一次。它安排在单独的过程中调用对象的run()方法。
join
([超时])- 阻塞调用线程,直到被调用join()方法的进程终止或直到发生可选的超时为止。
如果* timeout *为None
,则没有超时。
一个过程可以多次加入。
进程无法加入自身,因为这将导致死锁。在启动进程之前try加入该进程是错误的。
name
- 进程的名称。
名称是仅用于标识目的的字符串。它没有语义。多个进程可以使用相同的名称。初始名称由构造函数设置。
is_alive
( )- 返回该过程是否仍然存在。
大致来说,从start()方法返回到子进程终止之间,进程对象一直处于活动状态。
daemon
- 进程的守护程序标志,一个布尔值。必须在调用start()之前进行设置。
初始值是从创建过程继承的。
进程退出时,它将try终止其所有守护程序子进程。
请注意,不允许守护进程创建子进程。否则,如果守护进程在其父进程退出时被终止,它将使其子进程变成孤儿。另外,这些不是 Unix 守护程序或服务,它们是正常的进程,如果退出了非守护进程,则将终止(而不加入)。
除了threading.Thread API,Process对象还支持以下属性和方法:
-
pid
- 返回进程 ID。在生成该进程之前,它是
None
。
- 返回进程 ID。在生成该进程之前,它是
-
exitcode
- 孩子的退出代码。如果进程尚未终止,则为
None
。负值* -N 表示该子对象已被 signal N *终止。
- 孩子的退出代码。如果进程尚未终止,则为
-
authkey
- 进程的身份验证密钥(字节字符串)。
初始化multiprocessing时,使用os.urandom()为主进程分配一个随机字符串。
创建Process对象时,它将继承其父进程的身份验证密钥,尽管可以pass将authkey设置为另一个字节字符串来更改此密钥。
See Authentication keys.
terminate
( )- 终止过程。在 Unix 上,这是使用
SIGTERM
signal 完成的;在 WindowsTerminateProcess()
上使用。请注意,将不执行 Export 处理程序和 finally 子句等。
- 终止过程。在 Unix 上,这是使用
请注意,该进程的后代进程将不会终止–它们将变得孤立。
Warning
如果在关联的进程正在使用管道或队列时使用此方法,则该管道或队列可能会损坏,并可能无法被其他进程使用。同样,如果进程已获取锁定或 signal 量等,则终止进程可能导致其他进程死锁。
请注意,start(),join(),is_alive(),terminate()和exitcode方法应仅由创建流程对象的流程调用。
Process的某些方法的用法示例:
>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print p, p.is_alive()
<Process(Process-1, initial)> False
>>> p.start()
>>> print p, p.is_alive()
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print p, p.is_alive()
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
- exception
multiprocessing.
BufferTooShort
- 当提供的缓冲区对象太小而无法读取消息时,Connection.recv_bytes_into()引发异常。
如果e
是BufferTooShort的实例,则e.args[0]
会将消息作为字节字符串给出。
16.6.2.2. 管道和队列
当使用多个进程时,通常使用消息传递进行进程之间的通信,并避免使用任何同步 Primitives(例如锁)。
为了传递消息,可以使用Pipe()(用于两个进程之间的连接)或队列(允许多个生产者和使用者)。
Queue,multiprocessing.queues.SimpleQueue和JoinableQueue类型是根据标准库中Queue.Queue类建模的多生产者,多 ConsumerFIFO 队列。它们的区别在于Queue缺少 Python 2.5 的Queue.Queue类中引入的task_done()和join()方法。
如果您使用JoinableQueue,那么您必须为从队列中删除的每个任务调用JoinableQueue.task_done(),否则用于计数未完成任务数量的 signal 量finally可能会溢出,从而引发异常。
请注意,也可以使用 Management 器对象来创建共享队列-参见Managers。
Note
multiprocessing使用通常的Queue.Empty和Queue.Full异常来表示超时。它们在multiprocessing名称空间中不可用,因此您需要从Queue导入它们。
Note
将对象放入队列时,将对对象进行 Pickling,然后后台线程将 Pickling 的数据刷新到基础管道。这会带来一些令人惊讶的后果,但不会造成任何实际困难-如果它们确实困扰您,则可以改用manager创建的队列。
-
将对象放在空队列上之后,在队列的empty()方法返回False和get_nowait()可以返回而不提高Queue.Empty之前,可能会有无穷的延迟。
-
如果多个进程正在排队对象,则可能在另一端无序接收对象。但是,pass相同过程排队的对象始终相对于彼此处于预期的 Sequences。
Warning
如果在try使用Queue时使用Process.terminate()或os.kill()将进程杀死,则队列中的数据可能会损坏。这可能会导致其他进程稍后try使用队列时获得异常。
Warning
如上所述,如果子进程已将项目放入队列中(并且未使用JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲的项目都已刷新到管道中为止。
这意味着,如果您try加入该进程,则除非您确定已耗尽所有已放入队列的项目,否则可能会陷入僵局。同样,如果子进程是非守护进程,则父进程在try加入其所有非守护进程子进程时可能会在退出时挂起。
请注意,使用 Management 器创建的队列不存在此问题。参见Programming guidelines。
有关使用队列进行进程间通信的示例,请参见Examples。
multiprocessing.
Pipe
([* duplex *])- 返回代表管道末端的
(conn1, conn2)
个对Connection个对象。
- 返回代表管道末端的
如果* duplex 为True
(默认值),则管道是双向的。如果 duplex *为False
,则管道是单向的:conn1
仅可用于接收消息,而conn2
仅可用于发送消息。
-
- class *
multiprocessing.
Queue
([* maxsize *])
- 返回使用管道和一些锁/signal 量实现的进程共享队列。当进程首先将项目放入队列时,将启动一个供料器线程,该线程将对象从缓冲区转移到管道中。
- class *
标准库的Queue模块中常见的Queue.Empty和Queue.Full异常引发了超时。
Queue实现Queue.Queue的所有方法,但task_done()和join()除外。
qsize
( )- 返回队列的大概大小。由于多线程/multiprocessing 语义,此数字不可靠。
请注意,这可能会在未实现sem_getvalue()
的 Unix 平台(如 Mac OS X)上引发NotImplementedError。
-
empty
( )- 如果队列为空,则返回
True
,否则返回False
。由于多线程/multiprocessing 语义,这是不可靠的。
- 如果队列为空,则返回
-
full
( )- 如果队列已满,则返回
True
,否则返回False
。由于多线程/multiprocessing 语义,这是不可靠的。
- 如果队列已满,则返回
-
put
(* obj * [,* block * [,* timeout *]])- 将 obj 放入队列。如果可选参数* block 是
True
(默认值),而 timeout 是None
(默认值),则如有必要,请阻塞直到有可用的插槽。如果 timeout 是一个正数,则它最多会阻塞 timeout 秒,并且如果在这段时间内没有可用的可用插槽,则会引发Queue.Full异常。否则( block 为False
),如果有可用的空闲插槽,则将一个项目放在队列中,否则引发Queue.Full异常(在这种情况下, timeout *被忽略)。
- 将 obj 放入队列。如果可选参数* block 是
-
put_nowait
(* obj *)- 等效于
put(obj, False)
。
- 等效于
-
get
([[* block * [,* timeout *]])- 从队列中删除并返回一个项目。如果可选参数 args * block 是
True
(默认值),而 timeout 是None
(默认值),则如有必要,请阻塞直到有可用项为止。如果 timeout 是一个正数,则它最多会阻塞 timeout 秒,如果在该时间内没有可用项,则会引发Queue.Empty异常。否则(块是False
),如果有一个立即可用,则返回一个项目,否则引发Queue.Empty异常(在这种情况下, timeout *被忽略)。
- 从队列中删除并返回一个项目。如果可选参数 args * block 是
-
get_nowait
( )- 等效于
get(False)
。
- 等效于
Queue还有Queue.Queue找不到的一些其他方法。这些方法对于大多数代码通常是不必要的:
-
close
( )- 指示当前进程不会再将更多数据放入此队列。后台线程将所有缓冲的数据刷新到管道后将退出。当队列被垃圾回收时,将自动调用此方法。
-
join_thread
( )- 加入后台线程。仅在调用close()后才能使用。它阻塞直到后台线程退出,以确保缓冲区中的所有数据都已刷新到管道中。
默认情况下,如果进程不是队列的创建者,则退出时它将try加入队列的后台线程。该过程可以调用cancel_join_thread()来使join_thread()什么也不做。
cancel_join_thread
( )- 防止join_thread()阻止。特别是,这可以防止后台线程在进程退出时自动加入–参见join_thread()。
此方法的更好称呼可能是allow_exit_without_flush()
。这很可能导致排队的数据丢失,并且您几乎可以肯定不需要使用它。仅当您需要当前进程立即退出而无需 await 将排队的数据刷新到基础管道并且您不关心丢失的数据时,它才 true 存在。
Note
此类的Function需要在主机 os 上正常运行的共享 signal 量实现。没有一个,该类的Function将被禁用,并且try实例化Queue将导致ImportError。有关其他信息,请参见bpo-3770。对于以下列出的任何专用队列类型,这同样适用。
-
类别
multiprocessing.queues.
SimpleQueue
-
empty
( )- 如果队列为空,则返回
True
,否则返回False
。
- 如果队列为空,则返回
-
get
( )- 从队列中删除并返回一个项目。
-
put
(项目)- 将* item *放入队列。
-
- class *
multiprocessing.
JoinableQueue
([* maxsize *])
- JoinableQueue是Queue的子类,它是一个队列,它另外具有task_done()和join()方法。
- class *
-
task_done
( )- 表示先前排队的任务已完成。由队列使用者线程使用。对于用于提取任务的每个get(),随后对task_done()的调用将告诉队列该任务的处理已完成。
如果join()当前正在阻止,它将在处理完所有项目后恢复(这意味着已收到put()的每个项目都收到了task_done()调用)。
如果被调用的次数比队列中放置的项目多,则引发ValueError。
join
( )- 阻塞直到队列中的所有项目都已获得并处理。
每当将项目添加到队列时,未完成任务的数量就会增加。每当使用者线程调用task_done()表示已检索到该项目并且该项目的所有工作已完成时,该计数就会减少。当未完成的任务计数降至零时,join()解除阻止。
16.6.2.3. Miscellaneous
multiprocessing.
active_children
( )- 返回当前进程的所有活动子级的列表。
调用它具有“加入”任何已经完成的进程的副作用。
-
multiprocessing.
cpu_count
( )- 返回系统中的 CPU 数量。可能提高NotImplementedError。
-
multiprocessing.
current_process
( )- 返回与当前进程对应的Process对象。
threading.current_thread()的类似物。
multiprocessing.
freeze_support
( )- 添加对冻结使用multiprocessing的程序以生成 Windows 可执行文件的支持。 (已使用 py2exe ,PyInstaller 和 cx_Freeze 进行了测试.)
需要在主模块的if __name__ == '__main__'
行之后立即调用此函数。例如:
from multiprocessing import Process, freeze_support
def f():
print 'hello world!'
if __name__ == '__main__':
freeze_support()
Process(target=f).start()
如果Ellipsisfreeze_support()
行,则try运行冻结的可执行文件将引发RuntimeError。
在 Windows 以外的任何其他 os 上调用freeze_support()
时都无效。此外,如果该模块正在 Windows 上的 Python 解释器正常运行(该程序尚未冻结),则freeze_support()
无效。
multiprocessing.
set_executable
( )- 设置启动子进程时要使用的 Python 解释器的路径。 (默认情况下使用sys.executable)。嵌入者可能需要做一些类似的事情
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
在他们可以创建子进程之前。 (仅 Windows)
Note
16.6.2.4. 连接对象
连接对象允许发送和接收可腌制对象或字符串。可以将它们视为面向消息的连接套接字。
通常使用Pipe创建连接对象-另请参见Listener 和 Client。
- 类
Connection
-
send
(* obj *)- 将对象发送到连接的另一端,应使用recv()进行读取。
-
该对象必须是可腌制的。较大的 pickle(大约 32 MB,尽管取决于 os)可能会引发ValueError异常。
-
recv
( ) -
fileno
( )- 返回连接使用的文件 Descriptors 或句柄。
-
close
( )- 关闭连接。
连接被垃圾回收时,将自动调用此方法。
poll
([超时])- 返回是否有任何可读取的数据。
如果未指定* timeout ,它将立即返回。如果 timeout 是一个数字,则此参数指定要阻止的最长时间(以秒为单位)。如果 timeout *为None
,则使用无限超时。
send_bytes
(* buffer * [,* offset * [,* size *]])- 从支持缓冲区接口的对象发送字节数据作为完整消息。
如果给出* offset ,则从 buffer 中的该位置读取数据。如果给出 size *,那么将从缓冲区读取许多字节。很大的缓冲区(大约 32 MB,虽然取决于 os)可能会引发ValueError异常
recv_bytes
([* maxlength *])- 以字符串形式返回从连接的另一端发送的字节数据的完整消息。阻塞直到有东西要接收为止。如果没有什么可接收的并且另一端已关闭,则引发EOFError。
如果指定了* maxlength 并且消息比 maxlength *长,那么将引发IOError并且连接将不再可读。
recv_bytes_into
(* buffer * [,* offset *])- 将连接另一端发送的字节数据的完整消息读入* buffer *,并返回消息中的字节数。阻塞直到有东西要接收为止。如果没有什么可接收的,并且另一端已关闭,则引发EOFError。
- buffer 必须是满足可写缓冲区接口的对象。如果给定 offset ,则消息将从该位置写入缓冲区。偏移量必须是小于 buffer *的长度(以字节为单位)的非负整数。
如果缓冲区太短,则会引发BufferTooShort
异常,并且完整的消息将以e.args[0]
的形式提供,其中e
是异常实例。
For example:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Warning
Connection.recv()方法会自动释放收到的数据,这可能会带来安全风险,除非您可以信任发送消息的过程。
因此,除非使用Pipe()
生成连接对象,否则在执行某种身份验证后,应仅使用recv()和send()方法。参见Authentication keys。
Warning
如果某个进程在try读取或写入管道时被杀死,则该管道中的数据很可能会损坏,因为无法确定消息边界在哪里。
16.6.2.5. 同步 Primitives
通常,在多进程程序中,同步 Primitives 不是在多线程程序中所必需的。请参阅threading模块的文档。
请注意,还可以使用 Management 器对象来创建同步 Primitives–参见Managers。
- 类别
multiprocessing.
BoundedSemaphore
([值])- 有界 signal 量对象:threading.BoundedSemaphore的近似模拟。
与它的近似模拟存在一个单独的区别:其acquire
方法的第一个参数名为* block ,并且它支持可选的第二个参数 timeout *,与Lock.acquire()一致。
Note
在 Mac OS X 上,这与Semaphore没有区别,因为sem_getvalue()
尚未在该平台上实现。
-
- class *
multiprocessing.
Condition
([* lock *])
- 条件变量:threading.Condition的克隆。
- class *
如果指定了* lock *,则它应该是multiprocessing中的Lock或RLock对象。
- 类别
multiprocessing.
Event
- threading.Event的副本。此方法在退出时返回内部 signal 量的状态,因此,除非给出超时且操作超时,否则它将始终返回
True
。
- threading.Event的副本。此方法在退出时返回内部 signal 量的状态,因此,除非给出超时且操作超时,否则它将始终返回
在 2.7 版中进行了更改:以前,该方法始终返回None
。
- 类别
multiprocessing.
Lock
- 非递归锁定对象:threading.Lock的近似模拟。一旦进程或线程获取了锁,则随后从任何进程或线程获取锁的try都将阻塞,直到释放为止;否则,该锁将被释放。任何进程或线程都可能释放它。除非另有说明,否则threading.Lock应用于线程的概念和行为将在multiprocessing.Lock中复制,因为它适用于进程或线程。
请注意,Lock实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.Lock
的实例。
Lock支持context manager协议,因此可以在with语句中使用。
acquire
(* block = True , timeout = None *)- 获取锁定,阻止或非阻止。
在* block *参数设置为True
(默认值)的情况下,方法调用将阻塞,直到锁定处于未锁定状态,然后将其设置为 locked 并返回True
。请注意,第一个参数的名称与threading.Lock.acquire()的名称不同。
将* block *参数设置为False
时,方法调用不会阻塞。如果锁当前处于锁定状态,则返回False
;否则,将锁设置为锁定状态并返回True
。
当使用* timeout 的正浮点值调用时,只要不能获取锁,最多阻塞 timeout *指定的秒数。 * timeout 为负值的调用等效于 timeout *为零。 * timeout 值为None
(默认值)的调用将超时期限设置为无限。如果 block 参数设置为False
,则 timeout 参数没有实际意义。如果已获取锁,则返回True
,如果超时时间已过,则返回False
。请注意, timeout *参数在此方法的模拟threading.Lock.acquire()中不存在。
release
( )- 释放锁。可以从任何进程或线程调用此函数,而不仅是最初获取锁的进程或线程。
行为与threading.Lock.release()相同,除了在解锁的锁上调用时会引发ValueError。
- 类别
multiprocessing.
RLock
- 递归锁定对象:threading.RLock的近似模拟。递归锁必须由获取它的进程或线程释放。一旦进程或线程获得了递归锁,相同的进程或线程就可以再次获得它而不会阻塞。该进程或线程必须在每次获取时释放一次。
请注意,RLock实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.RLock
的实例。
RLock支持context manager协议,因此可以在with语句中使用。
acquire
(* block = True , timeout = None *)- 获取锁定,阻止或非阻止。
在将* block *参数设置为True
的情况下调用时,除非锁已由当前进程或线程拥有,否则直到锁处于未锁定状态(不受任何进程或线程拥有)时,才进行阻塞。然后,当前进程或线程将获得该锁的所有权(如果尚未拥有该锁),并且该锁内的递归级别将递增 1,从而导致返回值True
。请注意,第一个自变量的行为与threading.RLock.acquire()的实现相比有几个不同之处,从自变量本身的名称开始。
在将* block *参数设置为False
的情况下调用时,请勿阻止。如果该锁已被另一个进程或线程获取(并因此而拥有),则当前进程或线程将不拥有所有权,并且锁内的递归级别也不会更改,从而导致返回值False
。如果锁处于未锁定状态,则当前进程或线程将拥有所有权,并且递归级别将增加,从而导致返回值True
。
- timeout 参数的用法和行为与Lock.acquire()中的相同。请注意, timeout *参数在此方法的类似物threading.RLock.acquire()中不存在。
release
( )- 释放一个锁,递归级别递减。如果递减级别递减后的级别为零,则将锁重置为未锁定状态(不属于任何进程或线程),并且如果其他任何进程或线程被阻塞,await 该锁被解锁,则允许其中一个 continue 进行。如果递减级别递减后仍不为零,则锁保持锁定状态,并由调用进程或线程拥有。
仅在调用进程或线程拥有锁时才调用此方法。如果此方法是由所有者以外的进程或线程调用的,或者锁处于未锁定(未拥有)状态,则引发AssertionError。请注意,在这种情况下引发的异常类型与threading.RLock.release()中实现的行为不同。
- 类别
multiprocessing.
Semaphore
([值])- signal 量对象:threading.Semaphore的近似模拟。
与它的近似模拟存在一个单独的区别:其acquire
方法的第一个参数名为* block ,并且它支持可选的第二个参数 timeout *,与Lock.acquire()一致。
Note
BoundedSemaphore,Lock,RLock和Semaphore的acquire()
方法具有threading中的等效项不支持的超时参数。签名为acquire(block=True, timeout=None)
,可接受关键字参数。如果* block 是True
,而 timeout 不是None
,则它指定以秒为单位的超时。如果 block 为False
,则 timeout *被忽略。
在 Mac OS X 上,不支持sem_timedwait
,因此超时调用acquire()
将使用睡眠循环来模拟该函数的行为。
Note
如果在调用BoundedSemaphore.acquire()
,Lock.acquire(),RLock.acquire(),Semaphore.acquire()
,Condition.acquire()
或Condition.wait()
阻止主线程的同时到达 Ctrl-C 生成的 SIGINTsignal,则该调用将立即break,并引发KeyboardInterrupt。
这与threading的行为不同,后者在进行等效的阻塞调用时将忽略 SIGINT。
Note
此软件包的某些Function需要在主机 os 上正常运行的共享 signal 量实现。如果没有,则multiprocessing.synchronize
模块将被禁用,并且try导入它会导致ImportError。有关其他信息,请参见bpo-3770。
16.6.2.6. 共享的 ctypes 对象
可以使用可由子进程继承的共享内存来创建共享对象。
multiprocessing.
Value
(* typecode_or_type ,* args * [,* lock *])- 返回从共享内存分配的ctypes对象。默认情况下,返回值实际上是对象的同步包装器。
- typecode_or_type *确定返回对象的类型:它是 ctypes 类型或array模块使用的一种字符类型代码。 args *传递给该类型的构造函数。
如果* lock 为True
(默认值),则将创建一个新的递归锁对象以同步对该值的访问。如果 lock 是Lock或RLock对象,则该对象将用于同步对该值的访问。如果 lock *为False
,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。
+=
之类的涉及读取和写入的操作不是原子的。因此,例如,如果您想原子地增加共享值,则仅做不到
counter.value += 1
假设关联的锁是递归的(默认情况下是递归的),您可以执行
with counter.get_lock():
counter.value += 1
请注意,* lock *是仅关键字的参数。
multiprocessing.
Array
(* typecode_or_type , size_or_initializer ,**,* lock = True *)- 返回从共享内存分配的 ctypes 数组。默认情况下,返回值实际上是该数组的同步包装器。
- typecode_or_type 确定返回数组的元素的类型:它是 ctypes 类型或array模块使用的单字符类型代码。如果 size_or_initializer 是整数,则它确定数组的长度,并且该数组最初将被清零。否则, size_or_initializer *是用于初始化数组的序列,其长度确定数组的长度。
如果* lock 为True
(默认值),则将创建一个新的锁对象以同步对该值的访问。如果 lock 是Lock或RLock对象,则该对象将用于同步对该值的访问。如果 lock *为False
,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。
请注意,* lock *是仅关键字参数。
16.6.2.6.1. multiprocessing.sharedctypes 模块
multiprocessing.sharedctypes模块提供了用于从共享内存中分配ctypes个对象的Function,这些对象可以由子进程继承。
Note
尽管可以将指针存储在共享内存中,但请记住,这将指向特定进程地址空间中的位置。但是,指针很可能在第二个进程的上下文中无效,并且try从第二个进程取消引用指针可能会导致崩溃。
multiprocessing.sharedctypes.
RawArray
(* typecode_or_type ,- 返回从共享内存分配的 ctypes 数组。
- typecode_or_type 确定返回数组的元素的类型:它是 ctypes 类型或array模块使用的单字符类型代码。如果 size_or_initializer 是整数,则它确定数组的长度,并且该数组最初将被清零。否则, size_or_initializer *是用于初始化数组的序列,其长度确定数组的长度。
请注意,设置和获取元素可能不是原子的–请使用Array()来确保使用锁自动同步访问。
multiprocessing.sharedctypes.
RawValue
(* typecode_or_type ,- 返回从共享内存分配的 ctypes 对象。
- typecode_or_type *确定返回对象的类型:它是 ctypes 类型或array模块使用的一种字符类型代码。 args *传递给该类型的构造函数。
请注意,设置和获取值可能不是原子性的,请改用Value()来确保使用锁自动同步访问。
请注意,ctypes.c_char数组具有value
和raw
属性,使它们可以使用它存储和检索字符串-参见ctypes的文档。
multiprocessing.sharedctypes.
Array
(* typecode_or_type , size_or_initializer ,* args * [,* lock *]- 与RawArray()相同,除了根据* lock *的值,可能会返回进程安全的同步包装程序,而不是原始的 ctypes 数组。
如果* lock 为True
(默认值),则将创建一个新的锁对象以同步对该值的访问。如果 lock 是Lock或RLock对象,则该对象将用于同步对该值的访问。如果 lock *为False
,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。
请注意,* lock *是仅关键字的参数。
multiprocessing.sharedctypes.
Value
(* typecode_or_type ,* args * [,* lock *]- 与RawValue()相同,除了根据* lock *的值,可以返回进程安全的同步包装而不是原始 ctypes 对象。
如果* lock 为True
(默认值),则将创建一个新的锁对象以同步对该值的访问。如果 lock 是Lock或RLock对象,则该对象将用于同步对该值的访问。如果 lock *为False
,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。
请注意,* lock *是仅关键字的参数。
-
multiprocessing.sharedctypes.
copy
- 返回从共享内存分配的 ctypes 对象,该对象是 ctypes 对象* obj *的副本。
-
multiprocessing.sharedctypes.
synchronized
(* obj * [,* lock *]- 返回使用* lock 同步访问的 ctypes 对象的进程安全包装对象。如果 lock *为
None
(默认值),则会自动创建multiprocessing.RLock对象。
- 返回使用* lock 同步访问的 ctypes 对象的进程安全包装对象。如果 lock *为
同步包装器除了要包装的对象之外,还有两种方法:get_obj()
返回被包装的对象,而get_lock()
返回用于同步的锁对象。
请注意,pass包装器访问 ctypes 对象可能比访问原始 ctypes 对象要慢得多。
下表将用于从共享内存创建共享 ctypes 对象的语法与普通 ctypes 语法进行了比较。 (在表MyStruct
中是ctypes.Structure的某些子类。)
ctypes | 使用类型的 sharedctypes | 使用 typecode 的 sharedctypes |
---|---|---|
c_double(2.4) | RawValue(c_double, 2.4) | RawValue('d', 2.4) |
MyStruct(4, 6) | RawValue(MyStruct,4,6) | |
(c_short * 7)() | RawArray(c_short, 7) | RawArray('h', 7) |
(c_int * 3)(9,2,8) | RawArray(c_int,(9,2,8)) | RawArray('i',(9,2,8)) |
下面是一个由子进程修改许多 ctypes 对象的示例:
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', 'hello world', lock=lock)
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print n.value
print x.value
print s.value
print [(a.x, a.y) for a in A]
打印的结果是
49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
16.6.2.7. Managers
Management 器提供了一种创建可以在不同流程之间共享的数据的方法。Management 器对象控制 Management共享对象的服务器进程。其他进程可以使用代理访问共享对象。
一旦垃圾回收或退出父进程,Management 器进程将立即关闭。Management 器类在multiprocessing.managers模块中定义:
-
- class *
multiprocessing.managers.
BaseManager
([* address * [,* authkey *]])
- 创建一个 BaseManager 对象。
- class *
创建后,应调用start()或get_server().serve_forever()
以确保 manager 对象引用已启动的 manager 进程。
-
address 是 Management 器进程在其上侦听新连接的地址。如果 address *为
None
,则选择任意一个。 -
authkey 是认证密钥,将用于检查到服务器进程的传入连接的有效性。如果 authkey 是
None
,则current_process().authkey
。否则,将使用 authkey *,并且它必须是字符串。
-
start
([[initializer * [,* initargs *]])- 启动一个子进程以启动 Management 器。如果* initializer *不是
None
,则子进程将在启动时调用initializer(*initargs)
。
- 启动一个子进程以启动 Management 器。如果* initializer *不是
-
get_server
( )- 返回一个
Server
对象,该对象代表 Management 器控制下的实际服务器。Server
对象支持serve_forever()
方法:
- 返回一个
>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey='abc')
>>> server = manager.get_server()
>>> server.serve_forever()
Server
另外具有address属性。
connect
( )- 将本地 Management 器对象连接到远程 Management 器进程:
>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey='abc')
>>> m.connect()
shutdown
( )- 停止 Management 器使用的过程。仅当使用start()启动服务器进程时,此选项才可用。
这可以多次调用。
register
(* typeid * [,* callable * [,* proxytype * [,暴露 [,* method_to_typeid * [,* create_method *]]]]]))- 可用于注册类型或可pass Management 器类调用的类方法。
-
typeid *是一个“类型标识符”,用于标识共享对象的特定类型。这必须是一个字符串。
-
callable 是用于为此类型标识符创建对象的可调用对象。如果将使用
from_address()
类方法创建 Management 器实例,或者如果 create_method *参数为False
,则可以将其保留为None
。 -
proxytype 是BaseProxy的子类,用于为具有此 typeid *的共享对象创建代理。如果
None
,则会自动创建一个代理类。 -
exposed 用于指定方法名称的序列,应允许使用BaseProxy._callmethod()访问此 typeid 的代理。 (如果 exposed *为
None
,则使用proxytype._exposed_
,如果存在的话)。如果未指定暴露列表,则共享对象的所有“公共方法”将可访问。 (此处的“公共方法”是指具有call()方法且名称不以'_'
开头的任何属性。) -
method_to_typeid 是用于指定应该返回代理的那些公开方法的返回类型的 Map。它将方法名称 Map 到 typeid 字符串。 (如果 method_to_typeid *为
None
,则使用proxytype._method_to_typeid_
(如果存在)。)如果方法的名称不是此 Map 的键,或者该 Map 为None
,则该方法返回的对象将按值复制。 -
create_method 确定是否应使用名称 typeid *创建方法,该方法可用于告诉服务器进程创建新的共享库并为其返回代理。默认情况下为
True
。
BaseManager个实例还具有一个只读属性:
-
address
- Manager 使用的地址。
-
类别
multiprocessing.managers.
SyncManager
- BaseManager的子类,可用于进程同步。
multiprocessing.Manager()
返回此类型的对象。
- BaseManager的子类,可用于进程同步。
它还支持创建共享列表和字典。
-
BoundedSemaphore
([值])- 创建一个共享的threading.BoundedSemaphore对象,并为其返回代理。
-
Condition
([* 锁])- 创建一个共享的threading.Condition对象,并为其返回代理。
如果提供了* lock *,则它应该是threading.Lock或threading.RLock对象的代理。
-
Event
( )- 创建一个共享的threading.Event对象,并为其返回代理。
-
Lock
( )- 创建一个共享的threading.Lock对象,并为其返回代理。
-
Namespace
( )- 创建一个共享的Namespace对象,并为其返回代理。
-
Queue
([[* maxsize *])- 创建一个共享的Queue.Queue对象,并为其返回代理。
-
RLock
( )- 创建一个共享的threading.RLock对象,并为其返回代理。
-
Semaphore
([值])- 创建一个共享的threading.Semaphore对象,并为其返回代理。
-
Array
(* typecode , sequence *)- 创建一个数组并为其返回代理。
-
Value
(* typecode , value *)- 创建一个具有可写
value
属性的对象,并为其返回代理。
- 创建一个具有可写
-
dict
( )-
dict
(Map) -
dict
(序列)- 创建一个共享的
dict
对象,并为其返回代理。
- 创建一个共享的
-
-
list
( )-
list
(序列)- 创建一个共享的
list
对象,并为其返回代理。
- 创建一个共享的
-
Note
对 dict 和 list 代理中的可变值或可变项的修改不会pass Management 器传播,因为代理无法知道何时修改其值或可变项。要修改此类项目,可以将修改后的对象重新分配给容器代理:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# reassigning the dictionary, the proxy is notified of the change
lproxy[0] = d
- 类别
multiprocessing.managers.
Namespace
- 可以向SyncManager注册的类型。
命名空间对象没有公共方法,但是具有可写属性。其表示形式显示其属性的值。
但是,当对名称空间对象使用代理时,以'_'
开头的属性将是代理的属性,而不是引用对象的属性:
>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3 # this is an attribute of the proxy
>>> print Global
Namespace(x=10, y='hello')
16.6.2.7.1. 定制 Manager
要创建自己的 Management 器,可以创建一个BaseManager的子类,并使用register()类方法向该 Management 器类注册新类型或可调用项。例如:
from multiprocessing.managers import BaseManager
class MathsClass(object):
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
manager = MyManager()
manager.start()
maths = manager.Maths()
print maths.add(4, 3) # prints 7
print maths.mul(7, 8) # prints 56
16.6.2.7.2. 使用远程 Management 器
可以在一台计算机上运行 Management 器服务器,并让 Client 端从其他计算机上使用它(假设所涉及的防火墙允许它)。
运行以下命令将为单个共享队列创建服务器,远程 Client 端可以访问该服务器:
>>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
一个 Client 端可以按以下方式访问服务器:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
另一个 Client 端也可以使用它:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地进程也可以使用 Client 端上的代码从远程访问该队列,以访问该队列:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
16.6.2.8. 代理对象
代理是“引用”共享对象的对象,该共享对象(大概)生活在不同的进程中。共享对象被称为代理的“参考对象”。多个代理对象可能具有相同的引用对象。
代理对象具有调用其引用对象的相应方法的方法(尽管并非引用对象的每个方法都必须可以pass代理使用)。通常,可以使用与代理对象相同的大多数方式使用代理:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print l
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print repr(l)
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
请注意,将str()应用于代理将返回所指对象的表示,而将repr()应用于将返回代理的表示。
代理对象的一个重要Function是可拾取的,因此可以在进程之间传递。但是请注意,如果将代理发送到相应 Management 者的进程,则取消选择代理将自己生成引用对象。例如,这意味着一个共享对象可以包含第二个:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print a, b
[[]] []
>>> b.append('hello')
>>> print a, b
[['hello']] ['hello']
Note
multiprocessing中的代理类型不支持按值进行比较。因此,例如,我们有:
>>> manager.list([1,2,3]) == [1,2,3]
False
进行比较时,应该只使用引用对象的副本。
-
类别
multiprocessing.managers.
BaseProxy
- 代理对象是BaseProxy的子类的实例。
-
_callmethod
(方法名 [,* args * [,* kwds *]])- 调用并返回代理引用对象的方法的结果。
如果proxy
是其引用对象为obj
的代理,则表达式
proxy._callmethod(methodname, args, kwds)
将评估表达式
getattr(obj, methodname)(*args, **kwds)
在 Manager 的过程中。
返回的值将是调用结果的副本或新共享对象的代理-请参阅BaseManager.register()的* method_to_typeid *参数的文档。
如果呼叫引发异常,则_callmethod()重新引发。如果在 Management 者的流程中引发了其他异常,则该异常将转换为RemoteError
异常,并由_callmethod()引发。
特别要注意的是,如果* methodname 没有被 expose *,则会引发一个异常。
_callmethod()的用法示例:
>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getslice__', (2, 7)) # equiv to `l[2:7]`
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,)) # equiv to `l[20]`
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue
( )- 返回引用对象的副本。
如果引用对象不可拾取,则将引发异常。
-
__repr__
( )- 返回代理对象的表示形式。
-
__str__
( )- 返回对象的表示形式。
16.6.2.8.1. Cleanup
代理对象使用 weakref 回调,以便在收集垃圾时将其自身从拥有其引用对象的 Management 器中注销。
当不再有代理引用共享对象时,该共享对象将从 Management 器进程中删除。
16.6.2.9. Process池
可以创建一个进程池,该池将执行passPool
类提交给它的任务。
-
- class *
multiprocessing.
Pool
([* processes * [,* initializer * [,* initargs * [,* maxtasksperchild *]]]])
- 进程池对象,用于控制可以向其提交作业的工作进程池。它支持带有超时和回调的异步结果,并具有并行 Map 实现。
- class *
- processes 是要使用的工作进程数。如果 processes 为
None
,则使用cpu_count()
返回的数字。如果 initializer *不是None
,则每个工作进程启动时都会调用initializer(*initargs)
。
请注意,池对象的方法仅应由创建池的进程调用。
2.7 版中的新Function:* maxtasksperchild 是工作进程可以退出并被新的工作进程替换之前可以完成的任务数,以释放未使用的资源。缺省 maxtasksperchild *为None
,这意味着工作进程将与池一样长。
Note
Pool
内的工作进程通常在 Pool 工作队列的整个持续时间内都处于活动状态。在其他系统(例如 Apache,mod_wsgi 等)中发现的释放 Worker 资源的常见模式是允许池中的 Worker 在退出,清理和产生新进程之前仅完成一定数量的工作。取代旧的。 Pool
的* maxtasksperchild *参数将这种Function展示给finally用户。
-
apply
(* func * [,* args * [,* kwds *]])- 等效于apply()内置函数。它会阻塞直到结果准备就绪,因此apply_async()更适合并行执行工作。此外,* func *仅在池的工作程序之一中执行。
-
apply_async
(* func * [,* args * [,* kwds * [,* callback *]]])- apply()方法的一种变体,它返回一个结果对象。
如果指定了* callback ,那么它应该是可调用的,可以接受单个参数。结果准备就绪后,将对其应用回调*(除非调用失败)。 * callback *应该立即完成,因为否则处理结果的线程将被阻塞。
map
(* func , iterable * [,* chunksize *])- 与map()内置函数的并行等效项(尽管它仅支持一个* iterable *参数)。它会阻塞直到结果准备就绪。
此方法将可迭代项分为多个块,将其作为单独的任务提交给流程池。可以pass将* chunksize *设置为正整数来指定这些块的(大约)大小。
map_async
(* func , iterable * [,* chunksize * [,* callback *]])- map()方法的一种变体,它返回一个结果对象。
如果指定了* callback ,那么它应该是可调用的,可以接受单个参数。结果准备就绪后,将对其应用回调*(除非调用失败)。 * callback *应该立即完成,因为否则处理结果的线程将被阻塞。
imap
(* func , iterable * [,* chunksize *])- 等效于itertools.imap()。
- chunksize 参数与map()方法使用的参数相同。与使用默认值
1
相比,使用 chunksize *较大的值进行迭代的时间很长,可以使工作完成 得多 。
同样,如果* chunksize 为1
,则imap()方法返回的迭代器的next()
方法具有可选的 timeout 参数:如果无法在 timeout *秒内返回结果,则next(timeout)
将提高multiprocessing.TimeoutError
。
-
imap_unordered
(* func , iterable * [,* chunksize *])- 与imap()相同,除了返回的迭代器的结果 Sequences 应视为任意。 (只有在只有一个工作进程时,才能保证订单是“正确的”.)
-
close
( )- 阻止将更多任务提交给池。一旦完成所有任务,工作进程将退出。
-
terminate
( )- 没有完成出色的工作,立即停止工作进程。当池对象被垃圾回收时,terminate()将立即被调用。
-
join
( )- await 工作进程退出。在使用join()之前,您必须先致电close()或terminate()。
-
类别
multiprocessing.pool.
AsyncResult
Pool.apply_async()
和Pool.map_async()
返回的结果的类别。
-
get
([超时])- 到达时返回结果。如果* timeout 不是
None
并且结果没有在 timeout *秒之内到达,则引发multiprocessing.TimeoutError
。如果远程呼叫引发了异常,则该异常将被get()引发。
- 到达时返回结果。如果* timeout 不是
-
wait
([超时])- await 结果可用或直到* timeout *秒过去。
-
ready
( )- 返回呼叫是否完成。
-
successful
( )- 返回调用是否完成而没有引发异常。如果结果尚未准备好,将加AssertionError。
以下示例演示了池的用法:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print it.next() # prints "0"
print it.next() # prints "1"
print it.next(timeout=1) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print result.get(timeout=1) # raises multiprocessing.TimeoutError
16.6.2.10. Listener 和 Client
通常,使用队列或使用Pipe()返回的Connection对象来完成进程之间的消息传递。
但是,multiprocessing.connection模块具有一些额外的灵 Active。它基本上提供了用于处理套接字或 Windows 命名管道的面向消息的高级 API,并且还支持使用hmac模块进行“摘要式身份验证”。
multiprocessing.connection.
deliver_challenge
(* connection , authkey *)- 将随机生成的消息发送到连接的另一端,然后 await 答复。
如果答复与使用* authkey *作为密钥的消息摘要匹配,则将欢迎消息发送到连接的另一端。否则引发AuthenticationError。
multiprocessing.connection.
answer_challenge
(* connection , authkey *)- 接收一条消息,使用* authkey *作为密钥计算该消息的摘要,然后将摘要发送回去。
如果未收到欢迎消息,则引发AuthenticationError。
multiprocessing.connection.
Client
(* address * [,* family * [,* authenticate * [,* authkey *]]])- try构建到使用地址* address *的监听器的连接,并返回Connection。
连接的类型由* family 参数确定,但通常可以Ellipsis,因为它通常可以从 address *的格式中推断出来。 (请参见Address Formats)
如果* authenticate 为True
或 authkey 为字符串,则使用摘要式身份验证。如果 authkey 为None
,则用于身份验证的密钥将为 authkey *或current_process().authkey)
。如果认证失败,则引发AuthenticationError。参见Authentication keys。
-
- class *
multiprocessing.connection.
Listener
([地址 [,家庭 [,待办事项 [,认证 [,认证密钥]]]]]]))
- 绑定套接字或 Windows 命名管道的包装器,正在“监听”连接。
- class *
- address *是侦听器对象的绑定套接字或命名管道要使用的地址。
Note
如果使用的地址为“ 0.0.0.0”,则该地址在 Windows 上将不是可连接的端点。如果需要可连接的端点,则应使用“ 127.0.0.1”。
- family 是要使用的套接字(或命名管道)的类型。这可以是字符串
'AF_INET'
(对于 TCP 套接字),'AF_UNIX'
(对于 Unix 域套接字)或'AF_PIPE'
(对于 Windows 命名管道)之一。其中只有第一个保证可用。如果 family 为None
,则从 address 的格式推断出该家族。如果 address 也是None
,则选择默认值。该默认值是假定为可用最快的系列。参见Address Formats。请注意,如果 family *是'AF_UNIX'
且地址是None
,则将在使用tempfile.mkstemp()创建的私有临时目录中创建套接字。
如果侦听器对象使用套接字,则绑定它后,会将* backlog *(默认为 1)传递给套接字的listen()方法。
如果* authenticate 是True
(默认为False
)或 authkey *不是None
,则使用摘要式身份验证。
如果* authkey *是字符串,则它将用作认证密钥;否则,它将用作认证密钥。否则必须为None
。
如果* authkey 为None
且 authenticate 为True
,则current_process().authkey
用作身份验证密钥。如果 authkey 为None
, authenticate *为False
,则不进行任何身份验证。如果认证失败,则引发AuthenticationError。参见Authentication keys。
-
accept
( )- 接受侦听器对象的绑定套接字或命名管道上的连接,并返回Connection对象。如果try进行身份验证并失败,则会引发
AuthenticationError
。
- 接受侦听器对象的绑定套接字或命名管道上的连接,并返回Connection对象。如果try进行身份验证并失败,则会引发
-
close
( )- 关闭绑定的套接字或侦听器对象的命名管道。当侦听器被垃圾回收时,将自动调用此方法。但是,建议显式调用它。
侦听器对象具有以下只读属性:
-
address
- 侦听器对象正在使用的地址。
-
last_accepted
- 最后接受的连接来自的地址。如果不可用,则为
None
。
- 最后接受的连接来自的地址。如果不可用,则为
该模块定义了以下异常:
-
exception
multiprocessing.connection.
ProcessError
- 所有multiprocessing个异常的 Base Class。
-
exception
multiprocessing.connection.
BufferTooShort
- 当提供的缓冲区对象太小而无法读取消息时,Connection.recv_bytes_into()引发异常。
-
exception
multiprocessing.connection.
AuthenticationError
- 出现身份验证错误时引发。
-
exception
multiprocessing.connection.
TimeoutError
- 超时到期时由带有超时的方法引发。
Examples
以下服务器代码创建一个使用'secret password'
作为身份验证密钥的侦听器。然后,它 await 连接并将一些数据发送到 Client 端:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
listener = Listener(address, authkey='secret password')
conn = listener.accept()
print 'connection accepted from', listener.last_accepted
conn.send([2.25, None, 'junk', float])
conn.send_bytes('hello')
conn.send_bytes(array('i', [42, 1729]))
conn.close()
listener.close()
以下代码连接到服务器并从服务器接收一些数据:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
conn = Client(address, authkey='secret password')
print conn.recv() # => [2.25, None, 'junk', float]
print conn.recv_bytes() # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print conn.recv_bytes_into(arr) # => 8
print arr # => array('i', [42, 1729, 0, 0, 0])
conn.close()
16.6.2.10.1. 地址格式
-
'AF_INET'
地址是(hostname, port)
形式的 Tuples,其中* hostname 是字符串,而 port *是整数。 -
'AF_UNIX'
地址是代表文件系统上文件名的字符串。 -
-
'AF_PIPE'
地址是以下形式的字符串: -
r'\.\pipe{PipeName}'
。要使用Client()连接到名为* ServerName *的远程计算机上的命名管道,应使用r'\ServerName\pipe{PipeName}'
形式的地址。
-
请注意,默认情况下,任何以两个反斜杠开头的字符串都假定为'AF_PIPE'
地址而不是'AF_UNIX'
地址。
16.6.2.11. 认证密钥
当人们使用Connection.recv()时,接收到的数据将自动取消处理。不幸的是,从不受信任的来源中抽取数据会带来安全风险。因此,Listener和Client()使用hmac模块提供摘要身份验证。
验证密钥是一个字符串,可以看作是密码:构建连接后,两端将要求证明对方知道该认证密钥。 (证明两端使用相同的密钥“不”涉及pass连接发送密钥.)
如果请求身份验证但未指定身份验证密钥,则使用current_process().authkey
的返回值(请参见Process)。当前进程创建的任何Process对象都会自动继承此值。这意味着(默认情况下)多进程程序的所有进程将共享一个身份验证密钥,该身份验证密钥可在它们之间构建连接时使用。
也可以使用os.urandom()生成合适的身份验证密钥。
16.6.2.12. Logging
提供了一些日志记录支持。但是请注意,logging包不使用进程共享锁,因此(取决于处理程序类型)可能会混淆来自不同进程的消息。
multiprocessing.
get_logger
( )- 返回multiprocessing使用的 Logger。如有必要,将创建一个新的。
首次创建时,Logger 的级别为logging.NOTSET
,并且没有默认处理程序。默认情况下,发送到该 Logger 的消息不会传播到根 Logger。
请注意,在 Windows 上,子进程将仅继承父进程的 Logger 级别,而不会继承 Logger 的任何其他自定义设置。
multiprocessing.
log_to_stderr
( )- 该函数执行对get_logger()的调用,但是除了返回由 get_logger 创建的 Logger 之外,它还添加了一个处理程序,该处理程序使用
'[%(levelname)s/%(processName)s] %(message)s'
格式将输出发送到sys.stderr。
- 该函数执行对get_logger()的调用,但是除了返回由 get_logger 创建的 Logger 之外,它还添加了一个处理程序,该处理程序使用
以下是启用了日志记录的示例会话:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
除了具有这两个日志记录Function外,multiprocessing 还公开了两个附加的日志记录级别属性。它们是SUBWARNING
和SUBDEBUG
。下表说明了这些内容在普通级别层次结构中的适合位置。
Level | Numeric value |
---|---|
SUBWARNING |
25 |
SUBDEBUG |
5 |
有关日志记录级别的完整表,请参见logging模块。
这些附加的日志记录级别主要用于 multiprocessing 模块中的某些调试消息。除了启用了SUBDEBUG
之外,以下示例与上述示例相同:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(multiprocessing.SUBDEBUG)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
>>> del m
[SUBDEBUG/MainProcess] finalizer calling ...
[INFO/MainProcess] sending shutdown message to manager
[DEBUG/SyncManager-...] manager received shutdown message
[SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
[SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
[SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
[SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
[INFO/SyncManager-...] manager exiting with exitcode 0
16.6.2.13. multiprocessing.dummy 模块
multiprocessing.dummy复制multiprocessing的 API,但只不过是threading模块周围的包装器。
16.6.3. 编程准则
使用multiprocessing时,应遵循某些准则和习惯用法。
16.6.3.1. 所有平台
避免共享状态
Note
应当尽可能避免在进程之间转移大量数据。
最好坚持使用队列或管道在进程之间进行通信,而不是使用threading模块中的较低级别的同步 Primitives。
Picklability
Note
确保代理方法的参数可挑剔。
代理的线程安全
Note
除非使用锁保护它,否则不要从多个线程中使用代理对象。
(使用* same *代理使用不同的进程永远不会有问题.)
加入僵尸进程
Note
在 Unix 上,当进程完成但尚未加入时,它就变成了僵尸。永远不要有太多,因为每次新进程启动(或调用active_children())时,所有尚未加入的已完成进程都将加入。同时调用已完成的过程Process.is_alive将加入该过程。即便如此,将您启动的所有过程明确地加入也可能是一个好习惯。
继承比腌制/腌制更好
Note
在 Windows 上,multiprocessing中的许多类型都需要可腌制,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他位置创建的共享资源的进程可以从祖先进程继承该程序。
避免终止流程
Note
使用Process.terminate方法停止进程可能导致该进程当前正在使用的任何共享资源(例如锁,signal 灯,管道和队列)损坏或对其他进程不可用。
因此,最好只考虑对从未使用任何共享资源的进程使用Process.terminate。
加入使用队列的进程
Note
请记住,将项目放入队列的进程将在终止之前 await,直到所有缓冲的项目由“ feeder”线程馈送到基础管道为止。 (子进程可以调用队列的cancel_join_thread()方法来避免这种行为。)
这意味着,无论何时使用队列,都需要确保在加入该流程之前,将finally删除队列中已放置的所有项目。否则,您无法确定将项目放入队列的进程将终止。还请记住,非守护进程将自动加入。
一个将导致死锁的示例如下:
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
此处的解决方法是交换最后两行(或仅删除p.join()
行)。
明确地将资源传递给子进程
Note
在 Unix 上,子进程可以利用在父进程中使用全局资源创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数。
除了使代码(可能)与 Windows 兼容之外,这还确保只要子进程仍然存在,就不会在父进程中垃圾收集对象。如果在父进程中垃圾回收对象时释放了一些资源,这可能很重要。
例如
from multiprocessing import Process, Lock
def f():
... do something using "lock" ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f).start()
应该 Rewrite 为
from multiprocessing import Process, Lock
def f(l):
... do something using "l" ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f, args=(lock,)).start()
当心用“类似于文件的文件”替换sys.stdin
Note
multiprocessing最初是无条件调用的:
os.close(sys.stdin.fileno())
multiprocessing.Process._bootstrap()
方法中-这导致了流程中的问题。它已更改为:
sys.stdin.close()
sys.stdin = open(os.devnull)
它解决了进程相互冲突导致文件 Descriptors 错误的根本问题,但是对使用输出缓冲将“ _替换为“类文件对象”的应用程序带来了潜在的危险。危险是,如果多个进程在该文件状对象上调用close(),则可能导致同一数据多次刷新到该对象,从而导致损坏。
如果编写类似文件的对象并实现自己的缓存,则可以pass在每次添加到缓存时存储 pid 并在 pid 更改时丢弃缓存来使其成为分叉安全的。例如:
@property
def cache(self):
pid = os.getpid()
if pid != self._pid:
self._pid = pid
self._cache = []
return self._cache
16.6.3.2. Windows
由于 Windows 缺少os.fork(),因此有一些额外的限制:
More picklability
Note
确保Process.__init__()
的所有参数都是可挑剔的。特别是,这意味着绑定或未绑定的方法不能直接用作 Windows 上的target
参数-只需定义一个函数并使用它即可。
另外,如果您继承Process的子类,请确保在调用Process.start方法时实例是可腌制的。
Global variables
Note
请记住,如果在子进程中运行的代码try访问全局变量,则它看到的值(如果有)可能与调用Process.start时父进程中的值不同。
但是,只是模块级常量的全局变量不会引起任何问题。
安全导入主模块
Note
确保可以由新的 Python 解释器安全地导入主模块,而不会引起意外的副作用(例如,启动新进程)。
例如,在 Windows 下,运行以下模块会失败,并显示RuntimeError:
from multiprocessing import Process
def foo():
print 'hello'
p = Process(target=foo)
p.start()
而是应该使用if __name__ == '__main__':
来保护程序的“入口点”,如下所示:
from multiprocessing import Process, freeze_support
def foo():
print 'hello'
if __name__ == '__main__':
freeze_support()
p = Process(target=foo)
p.start()
(如果程序将正常运行而不是冻结,则可以Ellipsisfreeze_support()
行.)
这使新产生的 Python 解释器可以安全地导入模块,然后运行模块的foo()
函数。
如果在主模块中创建了池或 Management 器,则适用类似的限制。
16.6.4. Examples
演示如何创建和使用定制的 Management 器和代理:
#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo(object):
def f(self):
print 'you called Foo.f()'
def g(self):
print 'you called Foo.g()'
def _h(self):
print 'you called Foo._h()'
# A simple generator function
def baz():
for i in xrange(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ('next', '__next__')
def __iter__(self):
return self
def next(self):
return self._callmethod('next')
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print '-' * 20
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print '-' * 20
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print '-' * 20
it = manager.baz()
for i in it:
print '<%d>' % i,
print
print '-' * 20
op = manager.operator()
print 'op.add(23, 45) =', op.add(23, 45)
print 'op.pow(2, 94) =', op.pow(2, 94)
print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
print 'op._exposed_ =', op._exposed_
##
if __name__ == '__main__':
freeze_support()
test()
使用Pool
:
#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def f(x):
return 1.0 / (x-5.0)
def pow3(x):
return x**3
def noop(x):
pass
#
# Test code
#
def test():
print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
#
# Create pool
#
PROCESSES = 4
print 'Creating pool with %d processes\n' % PROCESSES
pool = multiprocessing.Pool(PROCESSES)
print 'pool = %s' % pool
print
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print 'Ordered results using pool.apply_async():'
for r in results:
print '\t', r.get()
print
print 'Ordered results using pool.imap():'
for x in imap_it:
print '\t', x
print
print 'Unordered results using pool.imap_unordered():'
for x in imap_unordered_it:
print '\t', x
print
print 'Ordered results using pool.map() --- will block till complete:'
for x in pool.map(calculatestar, TASKS):
print '\t', x
print
#
# Simple benchmarks
#
N = 100000
print 'def pow3(x): return x**3'
t = time.time()
A = map(pow3, xrange(N))
print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
B = pool.map(pow3, xrange(N))
print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
(N, time.time() - t)
t = time.time()
C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
' seconds' % (N, N//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
L = [None] * 1000000
print 'def noop(x): pass'
print 'L = [None] * 1000000'
t = time.time()
A = map(noop, L)
print '\tmap(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
B = pool.map(noop, L)
print '\tpool.map(noop, L):\n\t\t%s seconds' % \
(time.time() - t)
t = time.time()
C = list(pool.imap(noop, L, chunksize=len(L)//8))
print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
(len(L)//8, time.time() - t)
assert A == B == C, (len(A), len(B), len(C))
print
del A, B, C, L
#
# Test error handling
#
print 'Testing error handling:'
try:
print pool.apply(f, (5,))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.apply()'
else:
raise AssertionError('expected ZeroDivisionError')
try:
print pool.map(f, range(10))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from pool.map()'
else:
raise AssertionError('expected ZeroDivisionError')
try:
print list(pool.imap(f, range(10)))
except ZeroDivisionError:
print '\tGot ZeroDivisionError as expected from list(pool.imap())'
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, range(10))
for i in range(10):
try:
x = it.next()
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
print
#
# Testing timeouts
#
print 'Testing ApplyResult.get() with timeout:',
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
print 'Testing IMapIterator.next() with timeout:',
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print
print
#
# Testing callback
#
print 'Testing callback:'
A = []
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
r = pool.apply_async(mul, (7, 8), callback=A.append)
r.wait()
r = pool.map_async(pow3, range(10), callback=A.extend)
r.wait()
if A == B:
print '\tcallbacks succeeded\n'
else:
print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
#
# Check there are no outstanding tasks
#
assert not pool._cache, 'cache = %r' % pool._cache
#
# Check close() methods
#
print 'Testing close():'
for worker in pool._pool:
assert worker.is_alive()
result = pool.apply_async(time.sleep, [0.5])
pool.close()
pool.join()
assert result.get() is None
for worker in pool._pool:
assert not worker.is_alive()
print '\tclose() succeeded\n'
#
# Check terminate() method
#
print 'Testing terminate():'
pool = multiprocessing.Pool(2)
DELTA = 0.1
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
pool.terminate()
pool.join()
for worker in pool._pool:
assert not worker.is_alive()
print '\tterminate() succeeded\n'
#
# Check garbage collection
#
print 'Testing garbage collection:'
pool = multiprocessing.Pool(2)
DELTA = 0.1
processes = pool._pool
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
results = pool = None
time.sleep(DELTA * 2)
for worker in processes:
assert not worker.is_alive()
print '\tgarbage collection succeeded\n'
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as multiprocessing
else:
print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
raise SystemExit(2)
test()
同步类型,如锁,条件和队列:
#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time, sys, random
from Queue import Empty
import multiprocessing # may get overwritten
#### TEST_VALUE
def value_func(running, mutex):
random.seed()
time.sleep(random.random()*4)
mutex.acquire()
print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
running.value -= 1
mutex.release()
def test_value():
TASKS = 10
running = multiprocessing.Value('i', TASKS)
mutex = multiprocessing.Lock()
for i in range(TASKS):
p = multiprocessing.Process(target=value_func, args=(running, mutex))
p.start()
while running.value > 0:
time.sleep(0.08)
mutex.acquire()
print running.value,
sys.stdout.flush()
mutex.release()
print
print 'No more running processes'
#### TEST_QUEUE
def queue_func(queue):
for i in range(30):
time.sleep(0.5 * random.random())
queue.put(i*i)
queue.put('STOP')
def test_queue():
q = multiprocessing.Queue()
p = multiprocessing.Process(target=queue_func, args=(q,))
p.start()
o = None
while o != 'STOP':
try:
o = q.get(timeout=0.3)
print o,
sys.stdout.flush()
except Empty:
print 'TIMEOUT'
print
#### TEST_CONDITION
def condition_func(cond):
cond.acquire()
print '\t' + str(cond)
time.sleep(2)
print '\tchild is notifying'
print '\t' + str(cond)
cond.notify()
cond.release()
def test_condition():
cond = multiprocessing.Condition()
p = multiprocessing.Process(target=condition_func, args=(cond,))
print cond
cond.acquire()
print cond
cond.acquire()
print cond
p.start()
print 'main is waiting'
cond.wait()
print 'main has woken up'
print cond
cond.release()
print cond
cond.release()
p.join()
print cond
#### TEST_SEMAPHORE
def semaphore_func(sema, mutex, running):
sema.acquire()
mutex.acquire()
running.value += 1
print running.value, 'tasks are running'
mutex.release()
random.seed()
time.sleep(random.random()*2)
mutex.acquire()
running.value -= 1
print '%s has finished' % multiprocessing.current_process()
mutex.release()
sema.release()
def test_semaphore():
sema = multiprocessing.Semaphore(3)
mutex = multiprocessing.RLock()
running = multiprocessing.Value('i', 0)
processes = [
multiprocessing.Process(target=semaphore_func,
args=(sema, mutex, running))
for i in range(10)
]
for p in processes:
p.start()
for p in processes:
p.join()
#### TEST_JOIN_TIMEOUT
def join_timeout_func():
print '\tchild sleeping'
time.sleep(5.5)
print '\n\tchild terminating'
def test_join_timeout():
p = multiprocessing.Process(target=join_timeout_func)
p.start()
print 'waiting for process to finish'
while 1:
p.join(timeout=1)
if not p.is_alive():
break
print '.',
sys.stdout.flush()
#### TEST_EVENT
def event_func(event):
print '\t%r is waiting' % multiprocessing.current_process()
event.wait()
print '\t%r has woken up' % multiprocessing.current_process()
def test_event():
event = multiprocessing.Event()
processes = [multiprocessing.Process(target=event_func, args=(event,))
for i in range(5)]
for p in processes:
p.start()
print 'main is sleeping'
time.sleep(2)
print 'main is setting event'
event.set()
for p in processes:
p.join()
#### TEST_SHAREDVALUES
def sharedvalues_func(values, arrays, shared_values, shared_arrays):
for i in range(len(values)):
v = values[i][1]
sv = shared_values[i].value
assert v == sv
for i in range(len(values)):
a = arrays[i][1]
sa = list(shared_arrays[i][:])
assert a == sa
print 'Tests passed'
def test_sharedvalues():
values = [
('i', 10),
('h', -2),
('d', 1.25)
]
arrays = [
('i', range(100)),
('d', [0.25 * i for i in range(100)]),
('H', range(1000))
]
shared_values = [multiprocessing.Value(id, v) for id, v in values]
shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
p = multiprocessing.Process(
target=sharedvalues_func,
args=(values, arrays, shared_values, shared_arrays)
)
p.start()
p.join()
assert p.exitcode == 0
####
def test(namespace=multiprocessing):
global multiprocessing
multiprocessing = namespace
for func in [ test_value, test_queue, test_condition,
test_semaphore, test_join_timeout, test_event,
test_sharedvalues ]:
print '\n\t######## %s\n' % func.__name__
func()
ignore = multiprocessing.active_children() # cleanup any old processes
if hasattr(multiprocessing, '_debug_info'):
info = multiprocessing._debug_info()
if info:
print info
raise ValueError('there should be no positive refcounts left')
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print ' Using processes '.center(79, '-')
namespace = multiprocessing
elif sys.argv[1] == 'manager':
print ' Using processes and a manager '.center(79, '-')
namespace = multiprocessing.Manager()
namespace.Process = multiprocessing.Process
namespace.current_process = multiprocessing.current_process
namespace.active_children = multiprocessing.active_children
elif sys.argv[1] == 'threads':
print ' Using threads '.center(79, '-')
import multiprocessing.dummy as namespace
else:
print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
raise SystemExit(2)
test(namespace)
一个示例,显示如何使用队列将任务提供给工作进程集合并收集结果:
#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue. If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print 'Unordered results:'
for i in range(len(TASKS1)):
print '\t', done_queue.get()
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print '\t', done_queue.get()
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()
一个工作进程池如何在共享一个侦听套接字的同时每个进程可以运行SimpleHTTPServer.HttpServer
实例的示例。
#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object. (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import os
import sys
from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
if sys.platform == 'win32':
import multiprocessing.reduction # make sockets pickable/inheritable
def note(format, *args):
sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))
class RequestHandler(SimpleHTTPRequestHandler):
# we override log_message() to show which process is handling the request
def log_message(self, format, *args):
note(format, *args)
def serve_forever(server):
note('starting server')
try:
server.serve_forever()
except KeyboardInterrupt:
pass
def runpool(address, number_of_processes):
# create a single server object -- children will each inherit a copy
server = HTTPServer(address, RequestHandler)
# create child processes to act as workers
for i in range(number_of_processes-1):
Process(target=serve_forever, args=(server,)).start()
# main process also acts as a worker
serve_forever(server)
def test():
DIR = os.path.join(os.path.dirname(__file__), '..')
ADDRESS = ('localhost', 8000)
NUMBER_OF_PROCESSES = 4
print 'Serving at http://%s:%d using %d worker processes' % \
(ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']
os.chdir(DIR)
runpool(ADDRESS, NUMBER_OF_PROCESSES)
if __name__ == '__main__':
freeze_support()
test()
一些简单的基准比较multiprocessing和threading:
#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import time, sys, multiprocessing, threading, Queue, gc
if sys.platform == 'win32':
_timer = time.clock
else:
_timer = time.time
delta = 1
#### TEST_QUEUESPEED
def queuespeed_func(q, c, iterations):
a = '0' * 256
c.acquire()
c.notify()
c.release()
for i in xrange(iterations):
q.put(a)
q.put('STOP')
def test_queuespeed(Process, q, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = Process(target=queuespeed_func, args=(q, c, iterations))
c.acquire()
p.start()
c.wait()
c.release()
result = None
t = _timer()
while result != 'STOP':
result = q.get()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through the queue in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_PIPESPEED
def pipe_func(c, cond, iterations):
a = '0' * 256
cond.acquire()
cond.notify()
cond.release()
for i in xrange(iterations):
c.send(a)
c.send('STOP')
def test_pipespeed():
c, d = multiprocessing.Pipe()
cond = multiprocessing.Condition()
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
p = multiprocessing.Process(target=pipe_func,
args=(d, cond, iterations))
cond.acquire()
p.start()
cond.wait()
cond.release()
result = None
t = _timer()
while result != 'STOP':
result = c.recv()
elapsed = _timer() - t
p.join()
print iterations, 'objects passed through connection in',elapsed,'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_SEQSPEED
def test_seqspeed(seq):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
a = seq[5]
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_LOCK
def test_lockspeed(l):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
t = _timer()
for i in xrange(iterations):
l.acquire()
l.release()
elapsed = _timer()-t
print iterations, 'iterations in', elapsed, 'seconds'
print 'average number/sec:', iterations/elapsed
#### TEST_CONDITION
def conditionspeed_func(c, N):
c.acquire()
c.notify()
for i in xrange(N):
c.wait()
c.notify()
c.release()
def test_conditionspeed(Process, c):
elapsed = 0
iterations = 1
while elapsed < delta:
iterations *= 2
c.acquire()
p = Process(target=conditionspeed_func, args=(c, iterations))
p.start()
c.wait()
t = _timer()
for i in xrange(iterations):
c.notify()
c.wait()
elapsed = _timer()-t
c.release()
p.join()
print iterations * 2, 'waits in', elapsed, 'seconds'
print 'average number/sec:', iterations * 2 / elapsed
####
def test():
manager = multiprocessing.Manager()
gc.disable()
print '\n\t######## testing Queue.Queue\n'
test_queuespeed(threading.Thread, Queue.Queue(),
threading.Condition())
print '\n\t######## testing multiprocessing.Queue\n'
test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
multiprocessing.Condition())
print '\n\t######## testing Queue managed by server process\n'
test_queuespeed(multiprocessing.Process, manager.Queue(),
manager.Condition())
print '\n\t######## testing multiprocessing.Pipe\n'
test_pipespeed()
print
print '\n\t######## testing list\n'
test_seqspeed(range(10))
print '\n\t######## testing list managed by server process\n'
test_seqspeed(manager.list(range(10)))
print '\n\t######## testing Array("i", ..., lock=False)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
print '\n\t######## testing Array("i", ..., lock=True)\n'
test_seqspeed(multiprocessing.Array('i', range(10), lock=True))
print
print '\n\t######## testing threading.Lock\n'
test_lockspeed(threading.Lock())
print '\n\t######## testing threading.RLock\n'
test_lockspeed(threading.RLock())
print '\n\t######## testing multiprocessing.Lock\n'
test_lockspeed(multiprocessing.Lock())
print '\n\t######## testing multiprocessing.RLock\n'
test_lockspeed(multiprocessing.RLock())
print '\n\t######## testing lock managed by server process\n'
test_lockspeed(manager.Lock())
print '\n\t######## testing rlock managed by server process\n'
test_lockspeed(manager.RLock())
print
print '\n\t######## testing threading.Condition\n'
test_conditionspeed(threading.Thread, threading.Condition())
print '\n\t######## testing multiprocessing.Condition\n'
test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
print '\n\t######## testing condition managed by a server process\n'
test_conditionspeed(multiprocessing.Process, manager.Condition())
gc.enable()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()