On this page
multiprocessing—基于进程的并行性
源代码: Lib/multiprocessing/
Introduction
multiprocessing是使用类似于threading模块的 API 支持生成程序的软件包。 multiprocessing包同时提供本地和远程并发,pass使用子进程而不是线程来有效地避开全局翻译锁。因此,multiprocessing模块允许程序员充分利用给定机器上的多个处理器。它可以在 Unix 和 Windows 上运行。
multiprocessing模块还引入了threading模块中没有类似物的 API。最好的例子是Pool对象,该对象提供了一种便利的方法,可以跨多个 Importing 值并行执行函数,从而跨进程分配 Importing 数据(数据并行性)。下面的示例演示了在模块中定义此类Function的常规做法,以便子进程可以成功导入该模块。这个使用Pool的数据并行性的基本示例,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
将打印到标准输出
[1, 4, 9]
Process 类
在multiprocessing中,pass创建Process对象然后调用其start()方法来生成进程。 Process遵循threading.Thread的 API。多进程程序的一个简单示例是
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
为了显示涉及的各个进程 ID,下面是一个扩展的示例:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
有关为何需要if __name__ == '__main__'
部分的说明,请参见Programming guidelines。
上下文和启动方法
取决于平台,multiprocessing支持三种启动过程的方式。这些启动方法是
Note
spawn
父进程开始一个全新的 python 解释器进程。子进程将仅继承运行进程对象run()方法所需的资源。特别是,父进程中不必要的文件 Descriptors 和句柄将不会被继承。与使用* fork 或 forkserver *相比,使用此方法启动进程的速度相当慢。
在 Unix 和 Windows 上可用。 Windows 和 macOS 上的默认设置。
fork
父进程使用os.fork()来派生 Python 解释器。子进程开始时实际上与父进程相同。父进程的所有资源均由子进程继承。请注意,安全地分叉多线程进程是有问题的。
仅在 Unix 上可用。 Unix 上的默认值。
forkserver
当程序启动并选择* forkserver *启动方法时,服务器进程将启动。从那时起,无论何时需要新进程,父进程都将连接到服务器并请求它派生一个新进程。 fork 服务器进程是单线程的,因此使用os.fork()是安全的。没有不必要的资源被继承。
在支持pass Unix 管道传递文件 Descriptors 的 Unix 平台上可用。
在版本 3.8 中更改:在 macOS 上,* spawn *启动方法现在是默认设置。 * fork *启动方法应该被认为是不安全的,因为它可能导致子进程崩溃。参见bpo-33725。
在版本 3.4 中进行了更改:在所有 unix 平台上添加了* spawn ,并在某些 unix 平台上添加了 forkserver *。子进程不再继承 Windows 上所有父级可继承句柄。
在 Unix 上,使用* spawn 或 forkserver 的启动方法还将启动一个 resource tracker *进程,该进程跟踪由程序进程创建的未链接的命名系统资源(例如,命名 signal 量或SharedMemory对象)。当所有进程都退出后,资源跟踪器将取消链接任何剩余的跟踪对象。通常不应该有任何资源,但是如果某个进程被 signal 杀死,则可能会有一些“泄漏”资源。 (泄漏的 signal 量和共享内存段都不会自动取消链接,直到下次重新启动为止.这对于两个对象都是有问题的,因为系统仅允许有限数量的命名 signal 量,并且共享内存段占据了主内存中的某些空间.)
要选择启动方法,请在主模块的if __name__ == '__main__'
子句中使用set_start_method()。例如:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()不应在程序中多次使用。
或者,您可以使用get_context()获取上下文对象。上下文对象与 multiprocessing 模块具有相同的 API,并允许一个对象在同一程序中使用多个启动方法。
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
注意,与一个上下文有关的对象可能与另一上下文的过程不兼容。特别是,使用* fork 上下文创建的锁不能传递给使用 spawn 或 forkserver *启动方法启动的进程。
想要使用特定启动方法的库可能应该使用get_context()以避免干扰库用户的选择。
Warning
Unix 上的'spawn'
和'forkserver'
启动方法当前无法与“冻结”的可执行文件(即由 PyInstaller 和 cx_Freeze 之类的程序包生成的二进制文件)一起使用。 'fork'
start 方法起作用。
在流程之间交换对象
multiprocessing支持进程之间的两种通信通道:
Queues
Note
Queue类是queue.Queue的近克隆。例如:
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
队列是线程和进程安全的。
Pipes
Note
Pipe()函数返回Pairpass管道连接的连接对象,管道默认情况下为双工(双向)。例如:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
Pipe()返回的两个连接对象代表管道的两端。每个连接对象都有send()
和recv()
个方法(以及其他方法)。请注意,如果两个进程(或线程)try同时读取或写入管道的“相同”端,则管道中的数据可能会损坏。当然,不存在同时使用管道不同端的过程造成损坏的风险。
进程之间的同步
multiprocessing包含threading中所有同步 Primitives 的等效项。例如,可以使用锁来确保一次仅将一个进程打印到标准输出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
如果不使用来自不同进程的锁输出,则很容易混淆所有信息。
进程之间的共享状态
如上所述,在进行并行编程时,通常最好尽可能避免使用共享状态。使用多个进程时尤其如此。
但是,如果您确实确实需要使用一些共享数据,则multiprocessing提供了几种方法。
Shared memory
Note
可以使用Value或Array将数据存储在共享内存 Map 中。例如下面的代码
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
will print
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
创建num
和arr
时使用的'd'
和'i'
参数是array模块使用的类型的类型代码:'d'
表示双精度浮点数,而'i'
表示带符号整数。这些共享对象将是进程和线程安全的。
为了更灵活地使用共享内存,可以使用multiprocessing.sharedctypes模块,该模块支持创建从共享内存分配的任意 ctypes 对象。
Server process
使用 Worker 池
Pool类表示辅助进程池。它具有允许以几种不同方式将任务卸载到工作进程的方法。
For example:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
请注意,池的方法仅应由创建它的进程使用。
Note
此软件包中的Function要求__main__
模块可由子级导入。 Programming guidelines对此进行了介绍,但是这里值得指出。这意味着某些示例(例如multiprocessing.pool.Pool示例)在交互式解释器中将不起作用。例如:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(如果try此操作,则实际上将以半随机方式输出交错的三个完整回溯,然后您可能不得不以某种方式停止父进程.)
Reference
multiprocessing软件包主要复制threading模块的 API。
流程和 exception
-
- class *
multiprocessing.
Process
(* group = None , target = None , name = None , args =(), kwargs ={} ,**,* daemon = None *)
- 流程对象表示在单独的流程中运行的活动。 Process类具有threading.Thread的所有方法的等效项。
- class *
构造函数应始终使用关键字参数进行调用。 * group *应始终为None
;它仅用于与threading.Thread兼容。 * target *是由run()方法调用的可调用对象。默认为None
,表示什么都不会被调用。 * name *是进程名称(有关更多详细信息,请参见name)。 * args *是目标调用的参数 Tuples。 * kwargs 是用于目标调用的关键字参数字典。如果提供的话,仅关键字 daemon *参数将进程daemon标志设置为True
或False
。如果为None
(默认值),则此标志将从创建过程中继承。
默认情况下,没有参数传递给* target *。
如果子类覆盖了构造函数,则必须确保在对进程进行其他任何操作之前,先调用 Base Class 构造函数(Process.__init__()
)。
在版本 3.3 中更改:添加了* daemon *参数。
run
( )- 表示流程活动的方法。
您可以在子类中重写此方法。标准run()方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),并分别从* args 和 kwargs *参数中获取 Sequences 参数和关键字参数。
start
( )- 开始流程的活动。
每个过程对象最多只能调用一次。它安排在单独的过程中调用对象的run()方法。
join
([超时])
一个过程可以多次加入。
进程无法加入自身,因为这将导致死锁。在启动进程之前try加入该进程是错误的。
name
- 进程的名称。名称是仅用于标识目的的字符串。它没有语义。多个进程可以使用相同的名称。
初始名称由构造函数设置。如果没有为构造函数提供显式名称,则构造格式为“ Process-N1:N2:…:Nk”,其中每个 Nk 是其父级的第 N 个子级。
is_alive
( )- 返回该过程是否仍然存在。
大致来说,从start()方法返回到子进程终止之间,进程对象一直处于活动状态。
daemon
- 进程的守护程序标志,一个布尔值。必须在调用start()之前进行设置。
初始值是从创建过程继承的。
进程退出时,它将try终止其所有守护程序子进程。
请注意,不允许守护进程创建子进程。否则,如果守护进程在其父进程退出时被终止,它将使其子进程变成孤儿。另外,这些不是 Unix 守护程序或服务,它们是正常的进程,如果退出了非守护进程,则将终止(而不加入)。
除了threading.Thread API,Process对象还支持以下属性和方法:
pid
- 返回进程 ID。在生成该进程之前,它是
None
。
- 返回进程 ID。在生成该进程之前,它是
exitcode
- 孩子的退出代码。如果进程尚未终止,则为
None
。负值* -N 表示该子对象已被 signal N *终止。
- 孩子的退出代码。如果进程尚未终止,则为
authkey
- 进程的身份验证密钥(字节字符串)。
初始化multiprocessing时,使用os.urandom()为主进程分配一个随机字符串。
创建Process对象时,它将继承其父进程的身份验证密钥,尽管可以pass将authkey设置为另一个字节字符串来更改此密钥。
See Authentication keys.
sentinel
- 系统对象的数字句柄,该进程结束时将变为“就绪”状态。
如果要使用multiprocessing.connection.wait()一次 await 多个事件,则可以使用此值。否则,调用join()更简单。
在 Windows 上,这是可用于WaitForSingleObject
和WaitForMultipleObjects
系列 API 调用的 os 句柄。在 Unix 上,这是一个文件 Descriptors,可与select模块中的 Primitives 一起使用。
版本 3.3 中的新Function。
terminate
( )- 终止过程。在 Unix 上,这是使用
SIGTERM
signal 完成的;在 WindowsTerminateProcess()
上使用。请注意,将不执行 Export 处理程序和 finally 子句等。
- 终止过程。在 Unix 上,这是使用
请注意,该进程的后代进程将不会终止–它们将变得孤立。
Warning
如果在关联的进程正在使用管道或队列时使用此方法,则该管道或队列可能会损坏,并可能无法被其他进程使用。同样,如果进程已获取锁定或 signal 量等,则终止进程可能导致其他进程死锁。
kill
( )- 与terminate()相同,但在 Unix 上使用
SIGKILL
signal。
- 与terminate()相同,但在 Unix 上使用
3.7 版中的新Function。
close
( )- 关闭Process对象,释放与其关联的所有资源。如果基础进程仍在运行,则引发ValueError。一旦close()成功返回,Process对象的大多数其他方法和属性将提高ValueError。
3.7 版中的新Function。
请注意,start(),join(),is_alive(),terminate()和exitcode方法应仅由创建流程对象的流程调用。
Process的某些方法的用法示例:
>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process ... initial> False
>>> p.start()
>>> print(p, p.is_alive())
<Process ... started> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process ... stopped exitcode=-SIGTERM> False
>>> p.exitcode == -signal.SIGTERM
True
exception
multiprocessing.
ProcessError
- 所有multiprocessing个异常的 Base Class。
exception
multiprocessing.
BufferTooShort
- 当提供的缓冲区对象太小而无法读取消息时,
Connection.recv_bytes_into()
引发异常。
- 当提供的缓冲区对象太小而无法读取消息时,
如果e
是BufferTooShort的实例,则e.args[0]
会将消息作为字节字符串给出。
exception
multiprocessing.
AuthenticationError
- 出现身份验证错误时引发。
exception
multiprocessing.
TimeoutError
- 超时到期时由带有超时的方法引发。
管道和队列
当使用多个进程时,通常使用消息传递进行进程之间的通信,并避免使用任何同步 Primitives(例如锁)。
为了传递消息,可以使用Pipe()(用于两个进程之间的连接)或队列(允许多个生产者和使用者)。
Queue,SimpleQueue和JoinableQueue类型是根据标准库中queue.Queue类建模的多生产者,多 ConsumerFIFO 队列。它们的区别在于Queue缺少 Python 2.5 的queue.Queue类中引入的task_done()和join()方法。
如果您使用JoinableQueue,那么您必须为从队列中删除的每个任务调用JoinableQueue.task_done(),否则用于计数未完成任务数量的 signal 量finally可能会溢出,从而引发异常。
请注意,也可以使用 Management 器对象来创建共享队列-参见Managers。
Note
multiprocessing使用通常的queue.Empty和queue.Full异常来表示超时。它们在multiprocessing名称空间中不可用,因此您需要从queue导入它们。
Note
将对象放入队列时,将对对象进行 Pickling,然后后台线程将 Pickling 的数据刷新到基础管道。这会带来一些令人惊讶的后果,但不会造成任何实际困难-如果它们确实困扰您,则可以改用manager创建的队列。
将对象放在空队列上之后,在队列的empty()方法返回False和get_nowait()可以返回而不提高queue.Empty之前,可能会有无穷的延迟。
如果多个进程正在排队对象,则可能在另一端无序接收对象。但是,pass相同过程排队的对象始终相对于彼此处于预期的 Sequences。
Warning
如果在try使用Queue时使用Process.terminate()或os.kill()将进程杀死,则队列中的数据可能会损坏。这可能会导致其他进程稍后try使用队列时获得异常。
Warning
如上所述,如果子进程已将项目放入队列中(并且未使用JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲的项目都已刷新到管道中为止。
这意味着,如果您try加入该进程,则除非您确定已耗尽所有已放入队列的项目,否则可能会陷入僵局。同样,如果子进程是非守护进程,则父进程在try加入其所有非守护进程子进程时可能会在退出时挂起。
请注意,使用 Management 器创建的队列不存在此问题。参见Programming guidelines。
有关使用队列进行进程间通信的示例,请参见Examples。
multiprocessing.
Pipe
([* duplex *])- 返回代表管道末端的
(conn1, conn2)
个对Connection个对象。
- 返回代表管道末端的
如果* duplex 为True
(默认值),则管道是双向的。如果 duplex *为False
,则管道是单向的:conn1
仅可用于接收消息,而conn2
仅可用于发送消息。
-
- class *
multiprocessing.
Queue
([* maxsize *])
- 返回使用管道和一些锁/signal 量实现的进程共享队列。当进程首先将项目放入队列时,将启动一个供料器线程,该线程将对象从缓冲区转移到管道中。
- class *
标准库的queue模块中常见的queue.Empty和queue.Full异常引发了超时。
Queue实现queue.Queue的所有方法,但task_done()和join()除外。
qsize
( )- 返回队列的大概大小。由于多线程/multiprocessing 语义,此数字不可靠。
请注意,这可能会在未实现sem_getvalue()
的 Unix 平台(如 Mac OS X)上引发NotImplementedError。
empty
( )- 如果队列为空,则返回
True
,否则返回False
。由于多线程/multiprocessing 语义,这是不可靠的。
- 如果队列为空,则返回
full
( )- 如果队列已满,则返回
True
,否则返回False
。由于多线程/multiprocessing 语义,这是不可靠的。
- 如果队列已满,则返回
put
(* obj * [,* block * [,* timeout *]])- 将 obj 放入队列。如果可选参数* block 是
True
(默认值),而 timeout 是None
(默认值),则如有必要,请阻塞直到有可用的插槽。如果 timeout 是一个正数,则它最多会阻塞 timeout 秒,并且如果在这段时间内没有可用的可用插槽,则会引发queue.Full异常。否则( block 为False
),如果有可用的空闲插槽,则将一个项目放在队列中,否则引发queue.Full异常(在这种情况下, timeout *被忽略)。
- 将 obj 放入队列。如果可选参数* block 是
在版本 3.8 中更改:如果队列关闭,则引发ValueError而不是AssertionError。
put_nowait
(* obj *)- 等效于
put(obj, False)
。
- 等效于
get
([[* block * [,* timeout *]])- 从队列中删除并返回一个项目。如果可选参数 args * block 是
True
(默认值),而 timeout 是None
(默认值),则如有必要,请阻塞直到有可用项为止。如果 timeout 是一个正数,则它最多会阻塞 timeout 秒,如果在该时间内没有可用项,则会引发queue.Empty异常。否则(块是False
),如果有一个立即可用,则返回一个项目,否则引发queue.Empty异常(在这种情况下, timeout *被忽略)。
- 从队列中删除并返回一个项目。如果可选参数 args * block 是
在版本 3.8 中更改:如果队列关闭,则引发ValueError而不是OSError。
get_nowait
( )- 等效于
get(False)
。
- 等效于
multiprocessing.Queue还有queue.Queue找不到的一些其他方法。这些方法对于大多数代码通常是不必要的:
close
( )- 指示当前进程不会再将更多数据放入此队列。后台线程将所有缓冲的数据刷新到管道后将退出。当队列被垃圾回收时,将自动调用此方法。
join_thread
( )- 加入后台线程。仅在调用close()后才能使用。它阻塞直到后台线程退出,以确保缓冲区中的所有数据都已刷新到管道中。
默认情况下,如果进程不是队列的创建者,则退出时它将try加入队列的后台线程。该过程可以调用cancel_join_thread()来使join_thread()什么也不做。
cancel_join_thread
( )- 防止join_thread()阻止。特别是,这可以防止后台线程在进程退出时自动加入–参见join_thread()。
此方法的更好称呼可能是allow_exit_without_flush()
。这很可能导致排队的数据丢失,并且您几乎可以肯定不需要使用它。仅当您需要当前进程立即退出而无需 await 将排队的数据刷新到基础管道并且您不关心丢失的数据时,它才 true 存在。
Note
此类的Function需要在主机 os 上正常运行的共享 signal 量实现。没有一个,该类的Function将被禁用,并且try实例化Queue将导致ImportError。有关其他信息,请参见bpo-3770。对于以下列出的任何专用队列类型,这同样适用。
类别
multiprocessing.
SimpleQueue
empty
( )- 如果队列为空,则返回
True
,否则返回False
。
- 如果队列为空,则返回
get
( )- 从队列中删除并返回一个项目。
put
(项目)- 将* item *放入队列。
-
- class *
multiprocessing.
JoinableQueue
([* maxsize *])
- JoinableQueue是Queue的子类,它是一个队列,它另外具有task_done()和join()方法。
- class *
task_done
( )- 表示先前排队的任务已完成。由队列使用者使用。对于用于提取任务的每个get(),随后对task_done()的调用将告诉队列该任务的处理已完成。
如果join()当前正在阻止,它将在处理完所有项目后恢复(这意味着已收到put()的每个项目都收到了task_done()调用)。
如果被调用的次数比队列中放置的项目多,则引发ValueError。
join
( )- 阻塞直到队列中的所有项目都已获得并处理。
每当将项目添加到队列时,未完成任务的数量就会增加。每当 Consumer 致电task_done()表示已检索到该物品并且该物品的所有工作已完成时,该计数就会减少。当未完成的任务计数降至零时,join()解除阻止。
Miscellaneous
multiprocessing.
active_children
( )- 返回当前进程的所有活动子级的列表。
调用它具有“加入”任何已经完成的进程的副作用。
multiprocessing.
cpu_count
( )- 返回系统中的 CPU 数量。
该数目不等于当前进程可以使用的 CPU 数目。可用的 CPU 数量可以passlen(os.sched_getaffinity(0))
获得
可能提高NotImplementedError。
See also
multiprocessing.
current_process
( )- 返回与当前进程对应的Process对象。
threading.current_thread()的类似物。
multiprocessing.
parent_process
( )- 返回与current_process()的父进程相对应的Process对象。对于主进程,
parent_process
将是None
。
- 返回与current_process()的父进程相对应的Process对象。对于主进程,
3.8 版的新Function。
multiprocessing.
freeze_support
( )- 添加对冻结使用multiprocessing的程序以生成 Windows 可执行文件的支持。 (已使用 py2exe ,PyInstaller 和 cx_Freeze 进行了测试.)
需要在主模块的if __name__ == '__main__'
行之后立即调用此函数。例如:
from multiprocessing import Process, freeze_support
def f():
print('hello world!')
if __name__ == '__main__':
freeze_support()
Process(target=f).start()
如果Ellipsisfreeze_support()
行,则try运行冻结的可执行文件将引发RuntimeError。
在 Windows 以外的任何其他 os 上调用freeze_support()
时都无效。此外,如果该模块正在 Windows 上的 Python 解释器正常运行(该程序尚未冻结),则freeze_support()
无效。
multiprocessing.
get_all_start_methods
( )- 返回受支持的启动方法的列表,第一个为默认方法。可能的启动方法是
'fork'
,'spawn'
和'forkserver'
。在 Windows 上,仅'spawn'
可用。在 Unix 上,始终支持'fork'
和'spawn'
,默认值为'fork'
。
- 返回受支持的启动方法的列表,第一个为默认方法。可能的启动方法是
3.4 版的新Function。
multiprocessing.
get_context
(* method = None *)- 返回一个上下文对象,该对象具有与multiprocessing模块相同的属性。
如果* method 是None
,则返回默认上下文。否则方法*应该是'fork'
,'spawn'
,'forkserver'
。如果指定的启动方法不可用,则引发ValueError。
3.4 版的新Function。
multiprocessing.
get_start_method
(* allow_none = False *)- 返回用于启动进程的启动方法的名称。
如果尚未固定启动方法,并且* allow_none 为 false,则将启动方法固定为默认值,并返回名称。如果启动方法尚未修复且 allow_none *为 true,则返回None
。
返回值可以是'fork'
,'spawn'
,'forkserver'
或None
。 'fork'
是 Unix 上的默认值,而'spawn'
是 Windows 上的默认值。
3.4 版的新Function。
multiprocessing.
set_executable
( )- 设置启动子进程时要使用的 Python 解释器的路径。 (默认情况下使用sys.executable)。嵌入者可能需要做一些类似的事情
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
在他们可以创建子进程之前。
在版本 3.4 中更改:现在,当使用'spawn'
start 方法时,在 Unix 上受支持。
multiprocessing.
set_start_method
(方法)- 设置用于启动子进程的方法。 方法可以是
'fork'
,'spawn'
或'forkserver'
。
- 设置用于启动子进程的方法。 方法可以是
请注意,这最多应调用一次,并且应在主模块的if __name__ == '__main__'
子句中对其进行保护。
3.4 版的新Function。
Note
Connection Objects
连接对象允许发送和接收可腌制对象或字符串。可以将它们视为面向消息的连接套接字。
通常使用Pipe创建连接对象-另请参见Listener 和 Client。
- 类别
multiprocessing.connection.
Connection
-
send
(* obj *)- 将对象发送到连接的另一端,应使用recv()进行读取。
-
该对象必须是可腌制的。很大的 pickle(大约 32 MiB,尽管取决于 os)可能会引发ValueError异常。
recv
( )fileno
( )- 返回连接使用的文件 Descriptors 或句柄。
close
( )- 关闭连接。
连接被垃圾回收时,将自动调用此方法。
poll
([超时])- 返回是否有任何可读取的数据。
如果未指定* timeout ,它将立即返回。如果 timeout 是一个数字,则此参数指定要阻止的最长时间(以秒为单位)。如果 timeout *为None
,则使用无限超时。
请注意,可以使用multiprocessing.connection.wait()一次轮询多个连接对象。
send_bytes
(* buffer * [,* offset * [,* size *]])- 从bytes-like object发送字节数据作为完整消息。
如果给出* offset ,则从 buffer 中的该位置读取数据。如果给出 size *,那么将从缓冲区读取许多字节。很大的缓冲区(大约 32 MiB,虽然取决于 os)可能会引发ValueError异常
recv_bytes
([* maxlength *])- 以字符串形式返回从连接的另一端发送的字节数据的完整消息。阻塞直到有东西要接收为止。如果没有什么可接收的并且另一端已关闭,则引发EOFError。
如果指定了* maxlength 并且消息比 maxlength *长,那么将引发OSError并且连接将不再可读。
在版本 3.3 中更改:此函数曾经引发IOError,现在是OSError的别名。
recv_bytes_into
(* buffer * [,* offset *])- 将连接另一端发送的字节数据的完整消息读入* buffer *,并返回消息中的字节数。阻塞直到有东西要接收为止。如果没有什么可接收的,并且另一端已关闭,则引发EOFError。
- buffer 必须是可写的bytes-like object。如果给定 offset ,则消息将从该位置写入缓冲区。偏移量必须是小于 buffer *的长度(以字节为单位)的非负整数。
如果缓冲区太短,则会引发BufferTooShort
异常,并且完整的消息将以e.args[0]
的形式提供,其中e
是异常实例。
在版本 3.3 中进行了更改:现在可以使用Connection.send()和Connection.recv()在进程之间传输连接对象本身。
版本 3.3 中的新增Function:连接对象现在支持上下文 Management 协议–参见上下文 Management 器类型。 enter()返回连接对象,exit()调用close()。
For example:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Warning
Connection.recv()方法会自动释放收到的数据,这可能会带来安全风险,除非您可以信任发送消息的过程。
因此,除非使用Pipe()
生成连接对象,否则在执行某种身份验证后,应仅使用recv()和send()方法。参见Authentication keys。
Warning
如果某个进程在try读取或写入管道时被杀死,则该管道中的数据很可能会损坏,因为无法确定消息边界在哪里。
Synchronization primitives
通常,在多进程程序中,同步 Primitives 不是在多线程程序中所必需的。请参阅threading模块的文档。
请注意,还可以使用 Management 器对象来创建同步 Primitives–参见Managers。
-
- class *
multiprocessing.
Barrier
(* parties * [,* action * [,* timeout *]])
- 障碍物:threading.Barrier的副本。
- class *
版本 3.3 中的新Function。
- 类别
multiprocessing.
BoundedSemaphore
([值])- 有界 signal 量对象:threading.BoundedSemaphore的近似模拟。
与它的近似模拟存在一个单独的区别:其acquire
方法的第一个参数名为* block *,与Lock.acquire()一致。
Note
在 Mac OS X 上,这与Semaphore没有区别,因为sem_getvalue()
尚未在该平台上实现。
-
- class *
multiprocessing.
Condition
([* lock *])
- 条件变量:threading.Condition的别名。
- class *
如果指定了* lock *,则它应该是multiprocessing中的Lock或RLock对象。
在版本 3.3 中进行了更改:添加了wait_for()方法。
类别
multiprocessing.
Event
- threading.Event的副本。
类别
multiprocessing.
Lock
- 非递归锁定对象:threading.Lock的近似模拟。一旦进程或线程获取了锁,则随后从任何进程或线程获取锁的try都将阻塞,直到释放为止;否则,该锁将被释放。任何进程或线程都可能释放它。除非另有说明,否则threading.Lock应用于线程的概念和行为将在multiprocessing.Lock中复制,因为它适用于进程或线程。
请注意,Lock实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.Lock
的实例。
Lock支持context manager协议,因此可以在with语句中使用。
acquire
(* block = True , timeout = None *)- 获取锁定,阻止或非阻止。
在* block *参数设置为True
(默认值)的情况下,方法调用将阻塞,直到锁定处于未锁定状态,然后将其设置为 locked 并返回True
。请注意,第一个参数的名称与threading.Lock.acquire()的名称不同。
将* block *参数设置为False
时,方法调用不会阻塞。如果锁当前处于锁定状态,则返回False
;否则,将锁设置为锁定状态并返回True
。
当使用* timeout 的正浮点值调用时,只要不能获取锁,最多阻塞 timeout *指定的秒数。 * timeout 为负值的调用等效于 timeout *为零。 * timeout 值为None
(默认值)的调用将超时期限设置为无限。请注意,对 timeout 的负值或None
值的处理与threading.Lock.acquire()中实现的行为不同。如果 block 参数设置为False
,则 timeout *参数没有实际意义。如果已获取锁,则返回True
;如果超时时间已过,则返回False
。
release
( )- 释放锁。可以从任何进程或线程调用此函数,而不仅是最初获取锁的进程或线程。
行为与threading.Lock.release()相同,除了在解锁的锁上调用时会引发ValueError。
- 类别
multiprocessing.
RLock
- 递归锁定对象:threading.RLock的近似模拟。递归锁必须由获取它的进程或线程释放。一旦进程或线程获得了递归锁,相同的进程或线程就可以再次获得它而不会阻塞。该进程或线程必须在每次获取时释放一次。
请注意,RLock实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.RLock
的实例。
RLock支持context manager协议,因此可以在with语句中使用。
acquire
(* block = True , timeout = None *)- 获取锁定,阻止或非阻止。
在将* block *参数设置为True
的情况下调用时,除非锁已由当前进程或线程拥有,否则直到锁处于未锁定状态(不受任何进程或线程拥有)时,才进行阻塞。然后,当前进程或线程将获得该锁的所有权(如果尚未拥有该锁),并且该锁内的递归级别将递增 1,从而导致返回值True
。请注意,第一个自变量的行为与threading.RLock.acquire()的实现相比有几个不同之处,从自变量本身的名称开始。
在将* block *参数设置为False
的情况下调用时,请勿阻止。如果该锁已被另一个进程或线程获取(并因此而拥有),则当前进程或线程将不拥有所有权,并且锁内的递归级别也不会更改,从而导致返回值False
。如果锁处于未锁定状态,则当前进程或线程将拥有所有权,并且递归级别将增加,从而导致返回值True
。
- timeout 参数的用法和行为与Lock.acquire()中的相同。请注意, timeout *的某些行为与threading.RLock.acquire()中实现的行为不同。
release
( )- 释放一个锁,递归级别递减。如果递减级别递减后的级别为零,则将锁重置为未锁定状态(不属于任何进程或线程),并且如果其他任何进程或线程被阻塞,await 该锁被解锁,则允许其中一个 continue 进行。如果递减级别递减后仍不为零,则锁保持锁定状态,并由调用进程或线程拥有。
仅在调用进程或线程拥有锁时才调用此方法。如果此方法是由所有者以外的进程或线程调用的,或者锁处于未锁定(未拥有)状态,则引发AssertionError。请注意,在这种情况下引发的异常类型与threading.RLock.release()中实现的行为不同。
- 类别
multiprocessing.
Semaphore
([值])- signal 量对象:threading.Semaphore的近似模拟。
与它的近似模拟存在一个单独的区别:其acquire
方法的第一个参数名为* block *,与Lock.acquire()一致。
Note
在 Mac OS X 上,不支持sem_timedwait
,因此超时调用acquire()
将使用睡眠循环来模拟该函数的行为。
Note
如果在调用BoundedSemaphore.acquire()
,Lock.acquire(),RLock.acquire(),Semaphore.acquire()
,Condition.acquire()
或Condition.wait()
阻止主线程的同时到达 Ctrl-C 生成的 SIGINTsignal,则该调用将立即break,并引发KeyboardInterrupt。
这与threading的行为不同,后者在进行等效的阻塞调用时将忽略 SIGINT。
Note
此软件包的某些Function需要在主机 os 上正常运行的共享 signal 量实现。如果没有,则multiprocessing.synchronize
模块将被禁用,并且try导入它会导致ImportError。有关其他信息,请参见bpo-3770。
共享的 ctypes 对象
可以使用可由子进程继承的共享内存来创建共享对象。
multiprocessing.
Value
(* typecode_or_type ,* args , lock = True *)
- typecode_or_type *确定返回对象的类型:它是 ctypes 类型或array模块使用的一种字符类型代码。 args *传递给该类型的构造函数。
如果* lock 为True
(默认值),则将创建一个新的递归锁对象以同步对该值的访问。如果 lock 是Lock或RLock对象,则该对象将用于同步对该值的访问。如果 lock *为False
,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。
+=
之类的涉及读取和写入的操作不是原子的。因此,例如,如果您想原子地增加共享值,则仅做不到
counter.value += 1
假设关联的锁是递归的(默认情况下是递归的),您可以执行
with counter.get_lock():
counter.value += 1
请注意,* lock *是仅关键字的参数。
multiprocessing.
Array
(* typecode_or_type , size_or_initializer ,**,* lock = True *)- 返回从共享内存分配的 ctypes 数组。默认情况下,返回值实际上是该数组的同步包装器。
- typecode_or_type 确定返回数组的元素的类型:它是 ctypes 类型或array模块使用的单字符类型代码。如果 size_or_initializer 是整数,则它确定数组的长度,并且该数组最初将被清零。否则, size_or_initializer *是用于初始化数组的序列,其长度确定数组的长度。
如果* lock 为True
(默认值),则将创建一个新的锁对象以同步对该值的访问。如果 lock 是Lock或RLock对象,则该对象将用于同步对该值的访问。如果 lock *为False
,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。
请注意,* lock *是仅关键字参数。
multiprocessing.sharedctypes 模块
multiprocessing.sharedctypes模块提供了用于从共享内存中分配ctypes个对象的Function,这些对象可以由子进程继承。
Note
尽管可以将指针存储在共享内存中,但请记住,这将指向特定进程地址空间中的位置。但是,指针很可能在第二个进程的上下文中无效,并且try从第二个进程取消引用指针可能会导致崩溃。
multiprocessing.sharedctypes.
RawArray
(* typecode_or_type ,- 返回从共享内存分配的 ctypes 数组。
- typecode_or_type 确定返回数组的元素的类型:它是 ctypes 类型或array模块使用的单字符类型代码。如果 size_or_initializer 是整数,则它确定数组的长度,并且该数组最初将被清零。否则, size_or_initializer *是用于初始化数组的序列,其长度确定数组的长度。
请注意,设置和获取元素可能不是原子的–请使用Array()来确保使用锁自动同步访问。
multiprocessing.sharedctypes.
RawValue
(* typecode_or_type ,- 返回从共享内存分配的 ctypes 对象。
- typecode_or_type *确定返回对象的类型:它是 ctypes 类型或array模块使用的一种字符类型代码。 args *传递给该类型的构造函数。
请注意,设置和获取值可能不是原子性的,请改用Value()来确保使用锁自动同步访问。
请注意,ctypes.c_char数组具有value
和raw
属性,使它们可以使用它存储和检索字符串-参见ctypes的文档。
multiprocessing.sharedctypes.
Array
(* typecode_or_type , size_or_initializer ,- 与RawArray()相同,除了根据* lock *的值,可能会返回进程安全的同步包装程序,而不是原始的 ctypes 数组。
如果* lock 为True
(默认值),则将创建一个新的锁对象以同步对该值的访问。如果 lock 是Lock或RLock对象,则该对象将用于同步对该值的访问。如果 lock *为False
,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。
请注意,* lock *是仅关键字的参数。
multiprocessing.sharedctypes.
Value
(* typecode_or_type ,* args ,- 与RawValue()相同,除了根据* lock *的值,可以返回进程安全的同步包装而不是原始 ctypes 对象。
如果* lock 为True
(默认值),则将创建一个新的锁对象以同步对该值的访问。如果 lock 是Lock或RLock对象,则该对象将用于同步对该值的访问。如果 lock *为False
,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。
请注意,* lock *是仅关键字的参数。
multiprocessing.sharedctypes.
copy
- 返回从共享内存分配的 ctypes 对象,该对象是 ctypes 对象* obj *的副本。
multiprocessing.sharedctypes.
synchronized
(* obj * [,* lock *]- 返回使用* lock 同步访问的 ctypes 对象的进程安全包装对象。如果 lock *为
None
(默认值),则会自动创建multiprocessing.RLock对象。
- 返回使用* lock 同步访问的 ctypes 对象的进程安全包装对象。如果 lock *为
同步包装器除了要包装的对象之外,还有两种方法:get_obj()
返回被包装的对象,而get_lock()
返回用于同步的锁对象。
请注意,pass包装器访问 ctypes 对象可能比访问原始 ctypes 对象要慢得多。
在版本 3.5 中更改:同步对象支持context manager协议。
下表将用于从共享内存创建共享 ctypes 对象的语法与普通 ctypes 语法进行了比较。 (在表MyStruct
中是ctypes.Structure的某些子类。)
ctypes | 使用类型的 sharedctypes | 使用 typecode 的 sharedctypes |
---|---|---|
c_double(2.4) | RawValue(c_double, 2.4) | RawValue('d', 2.4) |
MyStruct(4, 6) | RawValue(MyStruct,4,6) | |
(c_short * 7)() | RawArray(c_short, 7) | RawArray('h', 7) |
(c_int * 3)(9,2,8) | RawArray(c_int,(9,2,8)) | RawArray('i',(9,2,8)) |
下面是一个由子进程修改许多 ctypes 对象的示例:
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', b'hello world', lock=lock)
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print(n.value)
print(x.value)
print(s.value)
print([(a.x, a.y) for a in A])
打印的结果是
49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
Managers
Management 器提供了一种创建可以在不同进程之间共享的数据的方法,包括pass网络在不同机器上运行的进程之间共享。Management 器对象控制 Management共享对象的服务器进程。其他进程可以使用代理访问共享对象。
一旦垃圾回收或退出父进程,Management 器进程将立即关闭。Management 器类在multiprocessing.managers模块中定义:
-
- class *
multiprocessing.managers.
BaseManager
([* address * [,* authkey *]])
- 创建一个 BaseManager 对象。
- class *
创建后,应调用start()或get_server().serve_forever()
以确保 manager 对象引用已启动的 manager 进程。
address 是 Management 器进程在其上侦听新连接的地址。如果 address *为
None
,则选择任意一个。authkey 是认证密钥,将用于检查到服务器进程的传入连接的有效性。如果 authkey 为
None
,则使用current_process().authkey
。否则,将使用 authkey *,并且它必须是字节字符串。
start
([[initializer * [,* initargs *]])- 启动一个子进程以启动 Management 器。如果* initializer *不是
None
,则子进程将在启动时调用initializer(*initargs)
。
- 启动一个子进程以启动 Management 器。如果* initializer *不是
get_server
( )- 返回一个
Server
对象,该对象代表 Management 器控制下的实际服务器。Server
对象支持serve_forever()
方法:
- 返回一个
>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()
Server
另外具有address属性。
connect
( )- 将本地 Management 器对象连接到远程 Management 器进程:
>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown
( )- 停止 Management 器使用的过程。仅当使用start()启动服务器进程时,此选项才可用。
这可以多次调用。
register
(* typeid * [,* callable * [,* proxytype * [,暴露 [,* method_to_typeid * [,* create_method *]]]]]))- 可用于注册类型或可pass Management 器类调用的类方法。
typeid *是一个“类型标识符”,用于标识共享对象的特定类型。这必须是一个字符串。
callable 是用于为此类型标识符创建对象的可调用对象。如果将使用connect()方法将 Management 器实例连接到服务器,或者如果 create_method *参数为
False
,则可以将其保留为None
。proxytype 是BaseProxy的子类,用于为具有此 typeid *的共享对象创建代理。如果
None
,则会自动创建一个代理类。exposed 用于指定方法名称的序列,应允许使用BaseProxy._callmethod()访问此 typeid 的代理。 (如果 exposed *为
None
,则使用proxytype._exposed_
,如果存在的话)。如果未指定暴露列表,则共享对象的所有“公共方法”将可访问。 (此处的“公共方法”是指具有call()方法且名称不以'_'
开头的任何属性。)method_to_typeid 是用于指定应该返回代理的那些公开方法的返回类型的 Map。它将方法名称 Map 到 typeid 字符串。 (如果 method_to_typeid *为
None
,则使用proxytype._method_to_typeid_
(如果存在)。)如果方法的名称不是此 Map 的键,或者该 Map 为None
,则该方法返回的对象将按值复制。create_method 确定是否应使用名称 typeid *创建方法,该方法可用于告诉服务器进程创建新的共享库并为其返回代理。默认情况下为
True
。
BaseManager个实例还具有一个只读属性:
address
- Manager 使用的地址。
在版本 3.3 中进行了更改:Manager 对象支持上下文 Management 协议–参见上下文 Management 器类型。 enter()启动服务器进程(如果尚未启动),然后返回 Management 器对象。 exit()调用shutdown()。
在以前的版本中,enter()如果尚未启动 Management 器的服务器进程,则不会启动它。
- 类别
multiprocessing.managers.
SyncManager
- BaseManager的子类,可用于进程同步。
multiprocessing.Manager()
返回此类型的对象。
- BaseManager的子类,可用于进程同步。
它的方法创建并返回Proxy Objects,以便在进程之间同步许多常用的数据类型。特别是,这包括共享列表和字典。
Barrier
(* parties * [,* action * [,* timeout *]])- 创建一个共享的threading.Barrier对象,并为其返回代理。
版本 3.3 中的新Function。
BoundedSemaphore
([值])- 创建一个共享的threading.BoundedSemaphore对象,并为其返回代理。
Condition
([* 锁])- 创建一个共享的threading.Condition对象,并为其返回代理。
如果提供了* lock *,则它应该是threading.Lock或threading.RLock对象的代理。
在版本 3.3 中进行了更改:添加了wait_for()方法。
Event
( )- 创建一个共享的threading.Event对象,并为其返回代理。
Lock
( )- 创建一个共享的threading.Lock对象,并为其返回代理。
Namespace
( )- 创建一个共享的Namespace对象,并为其返回代理。
Queue
([[* maxsize *])- 创建一个共享的queue.Queue对象,并为其返回代理。
RLock
( )- 创建一个共享的threading.RLock对象,并为其返回代理。
Semaphore
([值])- 创建一个共享的threading.Semaphore对象,并为其返回代理。
Array
(* typecode , sequence *)- 创建一个数组并为其返回代理。
Value
(* typecode , value *)- 创建一个具有可写
value
属性的对象,并为其返回代理。
- 创建一个具有可写
dict
( )dict
(Map)dict
(序列)- 创建一个共享的dict对象,并为其返回代理。
list
( )list
(序列)- 创建一个共享的list对象,并为其返回代理。
在版本 3.6 中更改:可以嵌套共享对象。例如,诸如共享列表之类的共享容器对象可以包含其他所有共享对象,这些共享对象都将由SyncManager进行 Management 和同步。
- 类别
multiprocessing.managers.
Namespace
- 可以向SyncManager注册的类型。
命名空间对象没有公共方法,但是具有可写属性。其表示形式显示其属性的值。
但是,当对名称空间对象使用代理时,以'_'
开头的属性将是代理的属性,而不是引用对象的属性:
>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3 # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')
Customized managers
要创建自己的 Management 器,可以创建一个BaseManager的子类,并使用register()类方法向该 Management 器类注册新类型或可调用项。例如:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
使用远程 Management 器
可以在一台计算机上运行 Management 器服务器,并让 Client 端从其他计算机上使用它(假设所涉及的防火墙允许它)。
运行以下命令将为单个共享队列创建服务器,远程 Client 端可以访问该服务器:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
一个 Client 端可以按以下方式访问服务器:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
另一个 Client 端也可以使用它:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地进程也可以使用 Client 端上的代码从远程访问该队列,以访问该队列:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Proxy Objects
代理是“引用”共享对象的对象,该共享对象(大概)生活在不同的进程中。共享对象被称为代理的“参考对象”。多个代理对象可能具有相同的引用对象。
代理对象具有调用其引用对象的相应方法的方法(尽管并非引用对象的每个方法都必须可以pass代理使用)。这样,可以像其引用对象一样使用代理:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
请注意,将str()应用于代理将返回所指对象的表示,而将repr()应用于将返回代理的表示。
代理对象的一个重要Function是可拾取的,因此可以在进程之间传递。这样,一个参照对象可以包含Proxy Objects。这允许嵌套这些托管列表,字典和其他Proxy Objects:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
同样,字典和列表代理可以相互嵌套:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
如果引用中包含标准(非代理)的list或dict对象,则不会pass Management 器传播对这些可变值的修改,因为代理无法知道何时修改了其中包含的值。但是,将值存储在容器代理中(会触发代理对象上的__setitem__
)会pass Management 器传播,因此为了有效地修改此类项目,可以将修改后的值重新分配给容器代理:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
对于大多数用例而言,此方法可能不如使用嵌套的Proxy Objects方便,但它还演示了对同步的控制级别。
Note
multiprocessing中的代理类型不支持按值进行比较。因此,例如,我们有:
>>> manager.list([1,2,3]) == [1,2,3]
False
进行比较时,应该只使用引用对象的副本。
类别
multiprocessing.managers.
BaseProxy
- 代理对象是BaseProxy的子类的实例。
_callmethod
(方法名 [,* args * [,* kwds *]])- 调用并返回代理引用对象的方法的结果。
如果proxy
是其引用对象为obj
的代理,则表达式
proxy._callmethod(methodname, args, kwds)
将评估表达式
getattr(obj, methodname)(*args, **kwds)
在 Manager 的过程中。
返回的值将是调用结果的副本或新共享对象的代理-请参阅BaseManager.register()的* method_to_typeid *参数的文档。
如果呼叫引发异常,则_callmethod()重新引发。如果在 Management 者的流程中引发了其他异常,则该异常将转换为RemoteError
异常,并由_callmethod()引发。
特别要注意的是,如果* methodname 没有被 expose *,则会引发一个异常。
_callmethod()的用法示例:
>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,)) # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue
( )- 返回引用对象的副本。
如果引用对象不可拾取,则将引发异常。
__repr__
( )- 返回代理对象的表示形式。
__str__
( )- 返回对象的表示形式。
Cleanup
代理对象使用 weakref 回调,以便在收集垃圾时将其自身从拥有其引用对象的 Management 器中注销。
当不再有代理引用共享对象时,该共享对象将从 Management 器进程中删除。
Process Pools
可以创建一个进程池,该池将执行passPool类提交给它的任务。
-
- class *
multiprocessing.pool.
Pool
([* processes * [,* initializer * [,* initargs * [,* maxtasksperchild * [,* context *]]]]])
- 进程池对象,用于控制可以向其提交作业的工作进程池。它支持带有超时和回调的异步结果,并具有并行 Map 实现。
- class *
- processes 是要使用的工作进程数。如果 processes *为
None
,则使用os.cpu_count()返回的数字。
如果* initializer *不是None
,则每个工作进程启动时都会调用initializer(*initargs)
。
maxtasksperchild 是工作进程在退出并被新的工作进程替换之前可以完成的任务数,以释放未使用的资源。缺省 maxtasksperchild *为
None
,这意味着工作进程将与池一样长。context 可用于指定用于启动工作进程的上下文。通常,使用上下文对象的函数
multiprocessing.Pool()
或Pool()方法创建池。在两种情况下, context *均已适当设置。
请注意,池对象的方法仅应由创建池的进程调用。
Warning
multiprocessing.pool对象具有内部资源(与任何其他资源一样),需要pass使用池作为上下文 Management 器或手动调用close()和terminate()来对其进行适当 Management。如果不这样做,可能会导致该过程无法完成。
请注意,依靠垃圾回收器破坏池是“不正确的”,因为 CPython 不能保证将调用池的终结器(有关更多信息,请参见object.del())。
版本 3.2 中的新Function:* maxtasksperchild *
3.4 版的新Function:上下文
Note
apply
(* func * [,* args * [,* kwds *]])- 使用参数* args 和关键字参数 kwds 调用 func 。它会阻塞直到结果准备就绪。在这种情况下,apply_async()更适合并行执行工作。此外, func *仅在池的工作程序之一中执行。
apply_async
(* func * [,* args * [,* kwds * [,* callback * [,* error_callback *]]]])- apply()方法的变体,返回AsyncResult对象。
如果指定了* callback ,那么它应该是可调用的,可以接受单个参数。当结果准备就绪时,将应用 callback ,除非调用失败,否则将应用 error_callback *。
如果指定了* error_callback ,那么它应该是可调用的,接受单个参数。如果目标函数失败,则使用异常实例调用 error_callback *。
回调应该立即完成,因为否则处理结果的线程将被阻塞。
map
(* func , iterable * [,* chunksize *])
此方法将可迭代项分为多个块,将其作为单独的任务提交给流程池。可以pass将* chunksize *设置为正整数来指定这些块的(大约)大小。
请注意,这可能会导致很长的迭代次数占用大量内存。考虑将imap()或imap_unordered()与显式的* chunksize *选项一起使用,以提高效率。
map_async
(* func , iterable * [,* chunksize * [,* callback * [,* error_callback *]]])- map()方法的变体,返回AsyncResult对象。
如果指定了* callback ,那么它应该是可调用的,可以接受单个参数。当结果准备就绪时,将应用 callback ,除非调用失败,否则将应用 error_callback *。
如果指定了* error_callback ,那么它应该是可调用的,接受单个参数。如果目标函数失败,则使用异常实例调用 error_callback *。
回调应该立即完成,因为否则处理结果的线程将被阻塞。
imap
(* func , iterable * [,* chunksize *])- map()的默认版本。
- chunksize 参数与map()方法使用的参数相同。与使用默认值
1
相比,使用 chunksize *较大的值进行迭代的时间很长,可以使工作完成 得多 。
同样,如果* chunksize 为1
,则imap()方法返回的迭代器的next()
方法具有可选的 timeout 参数:如果无法在 timeout *秒内返回结果,则next(timeout)
将引发multiprocessing.TimeoutError。
imap_unordered
(* func , iterable * [,* chunksize *])- 与imap()相同,除了返回的迭代器的结果 Sequences 应视为任意。 (只有在只有一个工作进程时,才能保证订单是“正确的”.)
starmap
(* func , iterable * [,* chunksize *])- 与map()相似,不同之处在于* iterable *的元素应为可分解为参数的可迭代对象。
因此,[(1,2), (3, 4)]
的可迭代导致[func(1,2), func(3,4)]
。
版本 3.3 中的新Function。
starmap_async
(* func , iterable * [,* chunksize * [,* callback * [,* error_callback *]]])- starmap()和map_async()的组合,用于对* iterable 的可迭代对象进行迭代,并在未包装可迭代对象的情况下调用 func *。返回结果对象。
版本 3.3 中的新Function。
close
( )- 阻止将更多任务提交给池。一旦完成所有任务,工作进程将退出。
terminate
( )- 没有完成出色的工作,立即停止工作进程。当池对象被垃圾回收时,terminate()将立即被调用。
join
( )- await 工作进程退出。在使用join()之前,您必须先致电close()或terminate()。
版本 3.3 中的新增Function:池对象现在支持上下文 Management 协议–参见上下文 Management 器类型。 enter()返回池对象,exit()调用terminate()。
类别
multiprocessing.pool.
AsyncResult
- Pool.apply_async()和Pool.map_async()返回的结果的类别。
get
([超时])- 到达时返回结果。如果* timeout 不是
None
并且结果没有在 timeout *秒内到达,则引发multiprocessing.TimeoutError。如果远程呼叫引发了异常,则该异常将被get()引发。
- 到达时返回结果。如果* timeout 不是
wait
([超时])- await 结果可用或直到* timeout *秒过去。
ready
( )- 返回呼叫是否完成。
successful
( )- 返回调用是否完成而没有引发异常。如果结果尚未准备好,将加ValueError。
在版本 3.7 中更改:如果结果尚未准备好,则引发ValueError而不是AssertionError。
以下示例演示了池的用法:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Listener 和 Client
通常,使用队列或使用Pipe()返回的Connection对象来完成进程之间的消息传递。
但是,multiprocessing.connection模块具有一些额外的灵 Active。它基本上提供了用于处理套接字或 Windows 命名管道的面向消息的高级 API。它还支持使用hmac模块进行摘要身份验证,并同时轮询多个连接。
multiprocessing.connection.
deliver_challenge
(* connection , authkey *)- 将随机生成的消息发送到连接的另一端,然后 await 答复。
如果答复与使用* authkey *作为密钥的消息摘要匹配,则将欢迎消息发送到连接的另一端。否则引发AuthenticationError。
multiprocessing.connection.
answer_challenge
(* connection , authkey *)- 接收一条消息,使用* authkey *作为密钥计算该消息的摘要,然后将摘要发送回去。
如果未收到欢迎消息,则引发AuthenticationError。
multiprocessing.connection.
Client
(* address * [,* family * [,* authkey *]])- try构建到使用地址* address *的监听器的连接,并返回Connection。
连接的类型由* family 参数确定,但通常可以Ellipsis,因为它通常可以从 address *的格式中推断出来。 (请参见Address Formats)
如果给出* authkey 而不是 None,则它应该是一个字节字符串,并将用作基于 HMAC 的身份验证质询的 Secret 密钥。如果 authkey *为 None,则不进行身份验证。如果身份验证失败,则引发AuthenticationError。参见Authentication keys。
-
- class *
multiprocessing.connection.
Listener
([地址 [,家庭 [,* backlog * [,* authkey *]]]])
- 绑定套接字或 Windows 命名管道的包装器,正在“监听”连接。
- class *
- address *是侦听器对象的绑定套接字或命名管道要使用的地址。
Note
如果使用的地址为“ 0.0.0.0”,则该地址在 Windows 上将不是可连接的端点。如果需要可连接的端点,则应使用“ 127.0.0.1”。
- family 是要使用的套接字(或命名管道)的类型。这可以是字符串
'AF_INET'
(对于 TCP 套接字),'AF_UNIX'
(对于 Unix 域套接字)或'AF_PIPE'
(对于 Windows 命名管道)之一。其中只有第一个保证可用。如果 family 为None
,则从 address 的格式推断出该家族。如果 address 也是None
,则选择默认值。该默认值是假定为可用最快的系列。参见Address Formats。请注意,如果 family *是'AF_UNIX'
且地址是None
,则将在使用tempfile.mkstemp()创建的私有临时目录中创建套接字。
如果侦听器对象使用套接字,则绑定它后,会将* backlog *(默认为 1)传递给套接字的listen()方法。
如果给出* authkey 而不是 None,则它应该是一个字节字符串,并将用作基于 HMAC 的身份验证质询的 Secret 密钥。如果 authkey *为 None,则不进行身份验证。如果身份验证失败,则引发AuthenticationError。参见Authentication keys。
accept
( )- 接受侦听器对象的绑定套接字或命名管道上的连接,并返回Connection对象。如果try进行身份验证并失败,则会引发AuthenticationError。
close
( )- 关闭绑定的套接字或侦听器对象的命名管道。当侦听器被垃圾回收时,将自动调用此方法。但是,建议显式调用它。
侦听器对象具有以下只读属性:
address
- 侦听器对象正在使用的地址。
last_accepted
- 最后接受的连接来自的地址。如果不可用,则为
None
。
- 最后接受的连接来自的地址。如果不可用,则为
版本 3.3 中的新增Function:侦听器对象现在支持上下文 Management 协议–参见上下文 Management 器类型。 enter()返回侦听器对象,exit()调用close()。
multiprocessing.connection.
wait
(* object_list , timeout = None *)- 等到* object_list 中的对象准备就绪。返回 object_list 中已准备好的那些对象的列表。如果 timeout 是 float,则调用最多会阻塞这么多秒。如果 timeout *为
None
,它将无限期阻塞。负超时等于零超时。
- 等到* object_list 中的对象准备就绪。返回 object_list 中已准备好的那些对象的列表。如果 timeout 是 float,则调用最多会阻塞这么多秒。如果 timeout *为
对于 Unix 和 Windows,如果一个对象可以出现在* object_list *中
可读的Connection对象;
连接且可读的socket.socket对象;要么
当可以从中读取连接数据或另一端已关闭时,连接或套接字对象已准备就绪。
Unix :wait(object_list, timeout)
几乎等于select.select(object_list, [], [], timeout)
。区别在于,如果select.select()被 signal break,则它可以引发OSError且错误号为EINTR
,而wait()不会。
Windows :* object_list *中的项必须是可 await 的整数句柄(根据 Win32 函数WaitForMultipleObjects()
的文档所使用的定义),也可以是带有fileno()
方法的对象,该方法返回一个套接字手柄或管道手柄。 (请注意,管道句柄和套接字句柄不是“可 await 的”句柄.)
版本 3.3 中的新Function。
Examples
以下服务器代码创建一个使用'secret password'
作为身份验证密钥的侦听器。然后,它 await 连接并将一些数据发送到 Client 端:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
以下代码连接到服务器并从服务器接收一些数据:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
以下代码使用wait()一次 await 来自多个进程的消息:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
Address Formats
'AF_INET'
地址是(hostname, port)
形式的 Tuples,其中* hostname 是字符串,而 port *是整数。'AF_UNIX'
地址是代表文件系统上文件名的字符串。-
'AF_PIPE'
地址是以下形式的字符串:r'\.\pipe{PipeName}'
。要使用Client()连接到名为* ServerName *的远程计算机上的命名管道,应使用r'\ServerName\pipe{PipeName}'
形式的地址。
请注意,默认情况下,任何以两个反斜杠开头的字符串都假定为'AF_PIPE'
地址而不是'AF_UNIX'
地址。
Authentication keys
当人们使用Connection.recv时,接收到的数据将自动取消处理。不幸的是,从不受信任的来源中抽取数据会带来安全风险。因此,Listener和Client()使用hmac模块提供摘要身份验证。
身份验证密钥是一个字节字符串,可以将其视为密码:构建连接后,两端将要求证明对方知道身份验证密钥。 (证明两端使用相同的密钥“不”涉及pass连接发送密钥.)
如果请求身份验证但未指定身份验证密钥,则使用current_process().authkey
的返回值(请参见Process)。当前进程创建的任何Process对象都会自动继承此值。这意味着(默认情况下)多进程程序的所有进程将共享一个身份验证密钥,该身份验证密钥可在它们之间构建连接时使用。
也可以使用os.urandom()生成合适的身份验证密钥。
Logging
提供了一些日志记录支持。但是请注意,logging包不使用进程共享锁,因此(取决于处理程序类型)可能会混淆来自不同进程的消息。
multiprocessing.
get_logger
( )- 返回multiprocessing使用的 Logger。如有必要,将创建一个新的。
首次创建时,Logger 的级别为logging.NOTSET
,并且没有默认处理程序。默认情况下,发送到该 Logger 的消息不会传播到根 Logger。
请注意,在 Windows 上,子进程将仅继承父进程的 Logger 级别,而不会继承 Logger 的任何其他自定义设置。
multiprocessing.
log_to_stderr
( )- 该函数执行对get_logger()的调用,但是除了返回由 get_logger 创建的 Logger 之外,它还添加了一个处理程序,该处理程序使用
'[%(levelname)s/%(processName)s] %(message)s'
格式将输出发送到sys.stderr。
- 该函数执行对get_logger()的调用,但是除了返回由 get_logger 创建的 Logger 之外,它还添加了一个处理程序,该处理程序使用
以下是启用了日志记录的示例会话:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
有关日志记录级别的完整表,请参见logging模块。
multiprocessing.dummy 模块
multiprocessing.dummy复制multiprocessing的 API,但只不过是threading模块周围的包装器。
Programming guidelines
使用multiprocessing时,应遵循某些准则和习惯用法。
所有启动方法
以下内容适用于所有启动方法。
避免共享状态
Note
应当尽可能避免在进程之间转移大量数据。
最好坚持使用队列或管道在进程之间进行通信,而不是使用较低级别的同步 Primitives。
Picklability
Note
确保代理方法的参数可挑剔。
代理的线程安全
Note
除非使用锁保护它,否则不要从多个线程中使用代理对象。
(使用* same *代理使用不同的进程永远不会有问题.)
加入僵尸进程
Note
在 Unix 上,当进程完成但尚未加入时,它就变成了僵尸。永远不要有太多,因为每次新进程启动(或调用active_children())时,所有尚未加入的已完成进程都将加入。同时调用已完成的过程Process.is_alive将加入该过程。即便如此,将您启动的所有过程明确地加入也可能是一个好习惯。
继承比腌制/腌制更好
Note
当使用* spawn 或 forkserver *启动方法时,multiprocessing中的许多类型都需要可腌制,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他位置创建的共享资源的进程可以从祖先进程继承该程序。
避免终止流程
Note
使用Process.terminate方法停止进程可能导致该进程当前正在使用的任何共享资源(例如锁,signal 灯,管道和队列)损坏或对其他进程不可用。
因此,最好只考虑对从未使用任何共享资源的进程使用Process.terminate。
加入使用队列的进程
Note
请记住,将项目放入队列的进程将在终止之前 await,直到所有缓冲的项目由“ feeder”线程馈送到基础管道为止。 (子进程可以调用队列的Queue.cancel_join_thread方法来避免这种行为。)
这意味着,无论何时使用队列,都需要确保在加入该流程之前,将finally删除队列中已放置的所有项目。否则,您无法确定将项目放入队列的进程将终止。还请记住,非守护进程将自动加入。
一个将导致死锁的示例如下:
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
此处的解决方法是交换最后两行(或仅删除p.join()
行)。
明确地将资源传递给子进程
Note
在使用* fork * start 方法的 Unix 上,子进程可以利用在父进程中使用全局资源创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数。
除了使代码(可能)与 Windows 和其他启动方法兼容之外,这还确保只要子进程仍然存在,就不会在父进程中垃圾收集对象。如果在父进程中垃圾回收对象时释放了一些资源,这可能很重要。
例如
from multiprocessing import Process, Lock
def f():
... do something using "lock" ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f).start()
应该 Rewrite 为
from multiprocessing import Process, Lock
def f(l):
... do something using "l" ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f, args=(lock,)).start()
当心用“类似于文件的文件”替换sys.stdin
Note
multiprocessing最初是无条件调用的:
os.close(sys.stdin.fileno())
multiprocessing.Process._bootstrap()
方法中-这导致了流程中的问题。它已更改为:
sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)
它解决了进程相互冲突导致文件 Descriptors 错误的根本问题,但是对使用输出缓冲将“ _替换为“类文件对象”的应用程序带来了潜在的危险。危险是,如果多个进程在该文件状对象上调用close(),则可能导致同一数据多次刷新到该对象,从而导致损坏。
如果编写类似文件的对象并实现自己的缓存,则可以pass在每次添加到缓存时存储 pid 并在 pid 更改时丢弃缓存来使其成为分叉安全的。例如:
@property
def cache(self):
pid = os.getpid()
if pid != self._pid:
self._pid = pid
self._cache = []
return self._cache
spawn 和 forkserver 启动方法
有一些额外的限制不适用于* fork *启动方法。
More picklability
Note
确保Process.__init__()
的所有参数都是可拾取的。另外,如果您继承Process的子类,请确保在调用Process.start方法时实例是可腌制的。
Global variables
Note
请记住,如果在子进程中运行的代码try访问全局变量,则它看到的值(如果有)可能与调用Process.start时父进程中的值不同。
但是,只是模块级常量的全局变量不会引起任何问题。
安全导入主模块
Note
确保可以由新的 Python 解释器安全地导入主模块,而不会引起意外的副作用(例如,启动新进程)。
例如,使用* spawn 或 forkserver * start 方法运行以下模块将失败,并显示RuntimeError:
from multiprocessing import Process
def foo():
print('hello')
p = Process(target=foo)
p.start()
而是应该使用if __name__ == '__main__':
来保护程序的“入口点”,如下所示:
from multiprocessing import Process, freeze_support, set_start_method
def foo():
print('hello')
if __name__ == '__main__':
freeze_support()
set_start_method('spawn')
p = Process(target=foo)
p.start()
(如果程序将正常运行而不是冻结,则可以Ellipsisfreeze_support()
行.)
这使新产生的 Python 解释器可以安全地导入模块,然后运行模块的foo()
函数。
如果在主模块中创建了池或 Management 器,则适用类似的限制。
Examples
演示如何创建和使用定制的 Management 器和代理:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Using Pool:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
一个示例,显示如何使用队列将任务提供给工作进程集合并收集结果:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()