16.6. multiprocessing-基于进程的“线程”界面

2.6 版的新Function。

16.6.1. Introduction

multiprocessing是使用类似于threading模块的 API 支持生成程序的软件包。 multiprocessing包同时提供本地和远程并发,pass使用子进程而不是线程来有效地避开全局翻译锁。因此,multiprocessing模块允许程序员充分利用给定机器上的多个处理器。它可以在 Unix 和 Windows 上运行。

multiprocessing模块还引入了threading模块中没有类似物的 API。最好的例子是Pool对象,该对象提供了一种便利的方法,可以跨多个 Importing 值并行执行函数,从而跨进程分配 Importing 数据(数据并行性)。下面的示例演示了在模块中定义此类Function的常规做法,以便子进程可以成功导入该模块。这个使用Pool的数据并行性的基本示例,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

将打印到标准输出

[1, 4, 9]

16.6.1.1. 流程类

multiprocessing中,pass创建Process对象然后调用其start()方法来生成进程。 Process遵循threading.Thread的 API。多进程程序的一个简单示例是

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

为了显示涉及的各个进程 ID,下面是一个扩展的示例:

from multiprocessing import Process
import os

def info(title):
    print title
    print 'module name:', __name__
    if hasattr(os, 'getppid'):  # only available on Unix
        print 'parent process:', os.getppid()
    print 'process id:', os.getpid()

def f(name):
    info('function f')
    print 'hello', name

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

有关为什么(在 Windows 上)需要if __name__ == '__main__'部分的说明,请参见Programming guidelines

16.6.1.2. 在流程之间交换对象

multiprocessing支持进程之间的两种通信通道:

Queues

Note

Queue类是Queue.Queue的近克隆。例如:

from multiprocessing import Process, Queue

def f(q):
q.put([42, None, 'hello'])

if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get()    # prints "[42, None, 'hello']"
p.join()

队列是线程和进程安全的。

Pipes

Note

Pipe()函数返回Pairpass管道连接的连接对象,管道默认情况下为双工(双向)。例如:

from multiprocessing import Process, Pipe

def f(conn):
conn.send([42, None, 'hello'])
conn.close()

if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv()   # prints "[42, None, 'hello']"
p.join()

Pipe()返回的两个连接对象代表管道的两端。每个连接对象都有send()recv()个方法(以及其他方法)。请注意,如果两个进程(或线程)try同时读取或写入管道的“相同”端,则管道中的数据可能会损坏。当然,不存在同时使用管道不同端的过程造成损坏的风险。

16.6.1.3. 流程之间的同步

multiprocessing包含threading中所有同步 Primitives 的等效项。例如,可以使用锁来确保一次仅将一个进程打印到标准输出:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print 'hello world', i
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

如果不使用来自不同进程的锁输出,则很容易混淆所有信息。

16.6.1.4. 进程之间共享状态

如上所述,在进行并行编程时,通常最好尽可能避免使用共享状态。使用多个进程时尤其如此。

但是,如果您确实确实需要使用一些共享数据,则multiprocessing提供了几种方法。

Shared memory

Note

可以使用ValueArray将数据存储在共享内存 Map 中。例如下面的代码

from multiprocessing import Process, Value, Array

def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]

if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))

p = Process(target=f, args=(num, arr))
p.start()
p.join()

print num.value
print arr[:]

will print

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

创建numarr时使用的'd''i'参数是array模块使用的类型的类型代码:'d'表示双精度浮点数,而'i'表示带符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用multiprocessing.sharedctypes模块,该模块支持创建从共享内存分配的任意 ctypes 对象。

Server process

Note

Manager()返回的 Management 器对象控制着一个服务器进程,该进程持有 Python 对象,并允许其他进程使用代理对其进行操作。

Manager()返回的 Manager 将支持listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventQueueValueArray类型。例如,

from multiprocessing import Process, Manager

def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()

if __name__ == '__main__':
manager = Manager()

d = manager.dict()
l = manager.list(range(10))

p = Process(target=f, args=(d, l))
p.start()
p.join()

print d
print l

will print

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程 Management 器比使用共享内存对象更灵活,因为可以使它们支持任意对象类型。同样,单个 Management 器可以由网络上不同计算机上的进程共享。但是,它们比使用共享内存慢。

16.6.1.5. 雇用 Worker

Pool类表示辅助进程池。它具有允许以几种不同方式将任务卸载到工作进程的方法。

For example:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print pool.map(f, range(10))

    # print same numbers in arbitrary order
    for i in pool.imap_unordered(f, range(10)):
        print i

    # evaluate "f(20)" asynchronously
    res = pool.apply_async(f, (20,))      # runs in *only* one process
    print res.get(timeout=1)              # prints "400"

    # evaluate "os.getpid()" asynchronously
    res = pool.apply_async(os.getpid, ()) # runs in *only* one process
    print res.get(timeout=1)              # prints the PID of that process

    # launching multiple evaluations asynchronously *may* use more processes
    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print [res.get(timeout=1) for res in multiple_results]

    # make a single worker sleep for 10 secs
    res = pool.apply_async(time.sleep, (10,))
    try:
        print res.get(timeout=1)
    except TimeoutError:
        print "We lacked patience and got a multiprocessing.TimeoutError"

请注意,池的方法仅应由创建它的进程使用。

Note

此软件包中的Function要求__main__模块可由子级导入。 Programming guidelines对此进行了介绍,但是这里值得指出。这意味着某些示例(例如Pool示例)在交互式解释器中将不起作用。例如:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(如果try此操作,则实际上将以半随机方式输出交错的三个完整回溯,然后您可能不得不以某种方式停止主进程.)

16.6.2. Reference

multiprocessing软件包主要复制threading模块的 API。

16.6.2.1. 流程和 exception

构造函数应始终使用关键字参数进行调用。 * group *应始终为None;它仅用于与threading.Thread兼容。 * target *是由run()方法调用的可调用对象。默认为None,表示什么都不会被调用。 * name 是进程名称。默认情况下,唯一名称以'Process-N1:N2:...:Nk'的形式构造,其中 N1,N2,...,Nk 是一个整数序列,其长度由进程的 generation *决定。 * args *是目标调用的参数 Tuples。 * kwargs 是用于目标调用的关键字参数字典。默认情况下,没有参数传递给 target *。

如果子类覆盖了构造函数,则必须确保在对进程进行其他任何操作之前,先调用 Base Class 构造函数(Process.__init__())。

您可以在子类中重写此方法。标准run()方法调用传递给对象构造函数的可调用对象作为目标参数(如果有),并分别从* args kwargs *参数中获取 Sequences 参数和关键字参数。

每个过程对象最多只能调用一次。它安排在单独的过程中调用对象的run()方法。

如果* timeout *为None,则没有超时。

一个过程可以多次加入。

进程无法加入自身,因为这将导致死锁。在启动进程之前try加入该进程是错误的。

名称是仅用于标识目的的字符串。它没有语义。多个进程可以使用相同的名称。初始名称由构造函数设置。

大致来说,从start()方法返回到子进程终止之间,进程对象一直处于活动状态。

初始值是从创建过程继承的。

进程退出时,它将try终止其所有守护程序子进程。

请注意,不允许守护进程创建子进程。否则,如果守护进程在其父进程退出时被终止,它将使其子进程变成孤儿。另外,这些不是 Unix 守护程序或服务,它们是正常的进程,如果退出了非守护进程,则将终止(而不加入)。

除了threading.Thread API,Process对象还支持以下属性和方法:

初始化multiprocessing时,使用os.urandom()为主进程分配一个随机字符串。

创建Process对象时,它将继承其父进程的身份验证密钥,尽管可以pass将authkey设置为另一个字节字符串来更改此密钥。

See Authentication keys.

请注意,该进程的后代进程将不会终止–它们将变得孤立。

Warning

如果在关联的进程正在使用管道或队列时使用此方法,则该管道或队列可能会损坏,并可能无法被其他进程使用。同样,如果进程已获取锁定或 signal 量等,则终止进程可能导致其他进程死锁。

请注意,start()join()is_alive()terminate()exitcode方法应仅由创建流程对象的流程调用。

Process的某些方法的用法示例:

>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print p, p.is_alive()
<Process(Process-1, initial)> False
>>> p.start()
>>> print p, p.is_alive()
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print p, p.is_alive()
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True

如果eBufferTooShort的实例,则e.args[0]会将消息作为字节字符串给出。

16.6.2.2. 管道和队列

当使用多个进程时,通常使用消息传递进行进程之间的通信,并避免使用任何同步 Primitives(例如锁)。

为了传递消息,可以使用Pipe()(用于两个进程之间的连接)或队列(允许多个生产者和使用者)。

Queuemultiprocessing.queues.SimpleQueueJoinableQueue类型是根据标准库中Queue.Queue类建模的多生产者,多 ConsumerFIFO 队列。它们的区别在于Queue缺少 Python 2.5 的Queue.Queue类中引入的task_done()join()方法。

如果您使用JoinableQueue,那么您必须为从队列中删除的每个任务调用JoinableQueue.task_done(),否则用于计数未完成任务数量的 signal 量finally可能会溢出,从而引发异常。

请注意,也可以使用 Management 器对象来创建共享队列-参见Managers

Note

multiprocessing使用通常的Queue.EmptyQueue.Full异常来表示超时。它们在multiprocessing名称空间中不可用,因此您需要从Queue导入它们。

Note

将对象放入队列时,将对对象进行 Pickling,然后后台线程将 Pickling 的数据刷新到基础管道。这会带来一些令人惊讶的后果,但不会造成任何实际困难-如果它们确实困扰您,则可以改用manager创建的队列。

  • 将对象放在空队列上之后,在队列的empty()方法返回Falseget_nowait()可以返回而不提高Queue.Empty之前,可能会有无穷的延迟。

  • 如果多个进程正在排队对象,则可能在另一端无序接收对象。但是,pass相同过程排队的对象始终相对于彼此处于预期的 Sequences。

Warning

如果在try使用Queue时使用Process.terminate()os.kill()将进程杀死,则队列中的数据可能会损坏。这可能会导致其他进程稍后try使用队列时获得异常。

Warning

如上所述,如果子进程已将项目放入队列中(并且未使用JoinableQueue.cancel_join_thread),则该进程将不会终止,直到所有缓冲的项目都已刷新到管道中为止。

这意味着,如果您try加入该进程,则除非您确定已耗尽所有已放入队列的项目,否则可能会陷入僵局。同样,如果子进程是非守护进程,则父进程在try加入其所有非守护进程子进程时可能会在退出时挂起。

请注意,使用 Management 器创建的队列不存在此问题。参见Programming guidelines

有关使用队列进行进程间通信的示例,请参见Examples

如果* duplex True(默认值),则管道是双向的。如果 duplex *为False,则管道是单向的:conn1仅可用于接收消息,而conn2仅可用于发送消息。

标准库的Queue模块中常见的Queue.EmptyQueue.Full异常引发了超时。

Queue实现Queue.Queue的所有方法,但task_done()join()除外。

请注意,这可能会在未实现sem_getvalue()的 Unix 平台(如 Mac OS X)上引发NotImplementedError

Queue还有Queue.Queue找不到的一些其他方法。这些方法对于大多数代码通常是不必要的:

默认情况下,如果进程不是队列的创建者,则退出时它将try加入队列的后台线程。该过程可以调用cancel_join_thread()来使join_thread()什么也不做。

此方法的更好称呼可能是allow_exit_without_flush()。这很可能导致排队的数据丢失,并且您几乎可以肯定不需要使用它。仅当您需要当前进程立即退出而无需 await 将排队的数据刷新到基础管道并且您不关心丢失的数据时,它才 true 存在。

Note

此类的Function需要在主机 os 上正常运行的共享 signal 量实现。没有一个,该类的Function将被禁用,并且try实例化Queue将导致ImportError。有关其他信息,请参见bpo-3770。对于以下列出的任何专用队列类型,这同样适用。

如果join()当前正在阻止,它将在处理完所有项目后恢复(这意味着已收到put()的每个项目都收到了task_done()调用)。

如果被调用的次数比队列中放置的项目多,则引发ValueError

每当将项目添加到队列时,未完成任务的数量就会增加。每当使用者线程调用task_done()表示已检索到该项目并且该项目的所有工作已完成时,该计数就会减少。当未完成的任务计数降至零时,join()解除阻止。

16.6.2.3. Miscellaneous

调用它具有“加入”任何已经完成的进程的副作用。

threading.current_thread()的类似物。

需要在主模块的if __name__ == '__main__'行之后立即调用此函数。例如:

from multiprocessing import Process, freeze_support

def f():
    print 'hello world!'

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

如果Ellipsisfreeze_support()行,则try运行冻结的可执行文件将引发RuntimeError

在 Windows 以外的任何其他 os 上调用freeze_support()时都无效。此外,如果该模块正在 Windows 上的 Python 解释器正常运行(该程序尚未冻结),则freeze_support()无效。

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

在他们可以创建子进程之前。 (仅 Windows)

16.6.2.4. 连接对象

连接对象允许发送和接收可腌制对象或字符串。可以将它们视为面向消息的连接套接字。

通常使用Pipe创建连接对象-另请参见Listener 和 Client

该对象必须是可腌制的。较大的 pickle(大约 32 MB,尽管取决于 os)可能会引发ValueError异常。

连接被垃圾回收时,将自动调用此方法。

如果未指定* timeout ,它将立即返回。如果 timeout 是一个数字,则此参数指定要阻止的最长时间(以秒为单位)。如果 timeout *为None,则使用无限超时。

如果给出* offset ,则从 buffer 中的该位置读取数据。如果给出 size *,那么将从缓冲区读取许多字节。很大的缓冲区(大约 32 MB,虽然取决于 os)可能会引发ValueError异常

如果指定了* maxlength 并且消息比 maxlength *长,那么将引发IOError并且连接将不再可读。

如果缓冲区太短,则会引发BufferTooShort异常,并且完整的消息将以e.args[0]的形式提供,其中e是异常实例。

For example:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes('thank you')
>>> a.recv_bytes()
'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Warning

Connection.recv()方法会自动释放收到的数据,这可能会带来安全风险,除非您可以信任发送消息的过程。

因此,除非使用Pipe()生成连接对象,否则在执行某种身份验证后,应仅使用recv()send()方法。参见Authentication keys

Warning

如果某个进程在try读取或写入管道时被杀死,则该管道中的数据很可能会损坏,因为无法确定消息边界在哪里。

16.6.2.5. 同步 Primitives

通常,在多进程程序中,同步 Primitives 不是在多线程程序中所必需的。请参阅threading模块的文档。

请注意,还可以使用 Management 器对象来创建同步 Primitives–参见Managers

与它的近似模拟存在一个单独的区别:其acquire方法的第一个参数名为* block ,并且它支持可选的第二个参数 timeout *,与Lock.acquire()一致。

Note

在 Mac OS X 上,这与Semaphore没有区别,因为sem_getvalue()尚未在该平台上实现。

如果指定了* lock *,则它应该是multiprocessing中的LockRLock对象。

在 2.7 版中进行了更改:以前,该方法始终返回None

请注意,Lock实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.Lock的实例。

Lock支持context manager协议,因此可以在with语句中使用。

在* block *参数设置为True(默认值)的情况下,方法调用将阻塞,直到锁定处于未锁定状态,然后将其设置为 locked 并返回True。请注意,第一个参数的名称与threading.Lock.acquire()的名称不同。

将* block *参数设置为False时,方法调用不会阻塞。如果锁当前处于锁定状态,则返回False;否则,将锁设置为锁定状态并返回True

当使用* timeout 的正浮点值调用时,只要不能获取锁,最多阻塞 timeout *指定的秒数。 * timeout 为负值的调用等效于 timeout *为零。 * timeout 值为None(默认值)的调用将超时期限设置为无限。如果 block 参数设置为False,则 timeout 参数没有实际意义。如果已获取锁,则返回True,如果超时时间已过,则返回False。请注意, timeout *参数在此方法的模拟threading.Lock.acquire()中不存在。

行为与threading.Lock.release()相同,除了在解锁的锁上调用时会引发ValueError

请注意,RLock实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.RLock的实例。

RLock支持context manager协议,因此可以在with语句中使用。

在将* block *参数设置为True的情况下调用时,除非锁已由当前进程或线程拥有,否则直到锁处于未锁定状态(不受任何进程或线程拥有)时,才进行阻塞。然后,当前进程或线程将获得该锁的所有权(如果尚未拥有该锁),并且该锁内的递归级别将递增 1,从而导致返回值True。请注意,第一个自变量的行为与threading.RLock.acquire()的实现相比有几个不同之处,从自变量本身的名称开始。

在将* block *参数设置为False的情况下调用时,请勿阻止。如果该锁已被另一个进程或线程获取(并因此而拥有),则当前进程或线程将不拥有所有权,并且锁内的递归级别也不会更改,从而导致返回值False。如果锁处于未锁定状态,则当前进程或线程将拥有所有权,并且递归级别将增加,从而导致返回值True

仅在调用进程或线程拥有锁时才调用此方法。如果此方法是由所有者以外的进程或线程调用的,或者锁处于未锁定(未拥有)状态,则引发AssertionError。请注意,在这种情况下引发的异常类型与threading.RLock.release()中实现的行为不同。

与它的近似模拟存在一个单独的区别:其acquire方法的第一个参数名为* block ,并且它支持可选的第二个参数 timeout *,与Lock.acquire()一致。

Note

BoundedSemaphoreLockRLockSemaphoreacquire()方法具有threading中的等效项不支持的超时参数。签名为acquire(block=True, timeout=None),可接受关键字参数。如果* block True,而 timeout 不是None,则它指定以秒为单位的超时。如果 block False,则 timeout *被忽略。

在 Mac OS X 上,不支持sem_timedwait,因此超时调用acquire()将使用睡眠循环来模拟该函数的行为。

Note

如果在调用BoundedSemaphore.acquire()Lock.acquire()RLock.acquire()Semaphore.acquire()Condition.acquire()Condition.wait()阻止主线程的同时到达 Ctrl-C 生成的 SIGINTsignal,则该调用将立即break,并引发KeyboardInterrupt

这与threading的行为不同,后者在进行等效的阻塞调用时将忽略 SIGINT。

Note

此软件包的某些Function需要在主机 os 上正常运行的共享 signal 量实现。如果没有,则multiprocessing.synchronize模块将被禁用,并且try导入它会导致ImportError。有关其他信息,请参见bpo-3770

16.6.2.6. 共享的 ctypes 对象

可以使用可由子进程继承的共享内存来创建共享对象。

如果* lock True(默认值),则将创建一个新的递归锁对象以同步对该值的访问。如果 lock LockRLock对象,则该对象将用于同步对该值的访问。如果 lock *为False,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。

+=之类的涉及读取和写入的操作不是原子的。因此,例如,如果您想原子地增加共享值,则仅做不到

counter.value += 1

假设关联的锁是递归的(默认情况下是递归的),您可以执行

with counter.get_lock():
    counter.value += 1

请注意,* lock *是仅关键字的参数。

如果* lock True(默认值),则将创建一个新的锁对象以同步对该值的访问。如果 lock LockRLock对象,则该对象将用于同步对该值的访问。如果 lock *为False,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。

请注意,* lock *是仅关键字参数。

请注意,ctypes.c_char数组具有* value raw *属性,这使它们可以使用它来存储和检索字符串。

16.6.2.6.1. multiprocessing.sharedctypes 模块

multiprocessing.sharedctypes模块提供了用于从共享内存中分配ctypes个对象的Function,这些对象可以由子进程继承。

Note

尽管可以将指针存储在共享内存中,但请记住,这将指向特定进程地址空间中的位置。但是,指针很可能在第二个进程的上下文中无效,并且try从第二个进程取消引用指针可能会导致崩溃。

请注意,设置和获取元素可能不是原子的–请使用Array()来确保使用锁自动同步访问。

请注意,设置和获取值可能不是原子性的,请改用Value()来确保使用锁自动同步访问。

请注意,ctypes.c_char数组具有valueraw属性,使它们可以使用它存储和检索字符串-参见ctypes的文档。

如果* lock True(默认值),则将创建一个新的锁对象以同步对该值的访问。如果 lock LockRLock对象,则该对象将用于同步对该值的访问。如果 lock *为False,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。

请注意,* lock *是仅关键字的参数。

如果* lock True(默认值),则将创建一个新的锁对象以同步对该值的访问。如果 lock LockRLock对象,则该对象将用于同步对该值的访问。如果 lock *为False,则对返回的对象的访问将不会受到锁的自动保护,因此它不一定是“过程安全的”。

请注意,* lock *是仅关键字的参数。

同步包装器除了要包装的对象之外,还有两种方法:get_obj()返回被包装的对象,而get_lock()返回用于同步的锁对象。

请注意,pass包装器访问 ctypes 对象可能比访问原始 ctypes 对象要慢得多。

下表将用于从共享内存创建共享 ctypes 对象的语法与普通 ctypes 语法进行了比较。 (在表MyStruct中是ctypes.Structure的某些子类。)

ctypes 使用类型的 sharedctypes 使用 typecode 的 sharedctypes
c_double(2.4) RawValue(c_double, 2.4) RawValue('d', 2.4)
MyStruct(4, 6) RawValue(MyStruct,4,6)
(c_short * 7)() RawArray(c_short, 7) RawArray('h', 7)
(c_int * 3)(9,2,8) RawArray(c_int,(9,2,8)) RawArray('i',(9,2,8))

下面是一个由子进程修改许多 ctypes 对象的示例:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', 'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print n.value
    print x.value
    print s.value
    print [(a.x, a.y) for a in A]

打印的结果是

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

16.6.2.7. Managers

Management 器提供了一种创建可以在不同流程之间共享的数据的方法。Management 器对象控制 Management共享对象的服务器进程。其他进程可以使用代理访问共享对象。

一旦垃圾回收或退出父进程,Management 器进程将立即关闭。Management 器类在multiprocessing.managers模块中定义:

创建后,应调用start()get_server().serve_forever()以确保 manager 对象引用已启动的 manager 进程。

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey='abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server另外具有address属性。

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey='abc')
>>> m.connect()

这可以多次调用。

BaseManager个实例还具有一个只读属性:

它还支持创建共享列表和字典。

如果提供了* lock *,则它应该是threading.Lockthreading.RLock对象的代理。

Note

对 dict 和 list 代理中的可变值或可变项的修改不会pass Management 器传播,因为代理无法知道何时修改其值或可变项。要修改此类项目,可以将修改后的对象重新分配给容器代理:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# reassigning the dictionary, the proxy is notified of the change
lproxy[0] = d

命名空间对象没有公共方法,但是具有可写属性。其表示形式显示其属性的值。

但是,当对名称空间对象使用代理时,以'_'开头的属性将是代理的属性,而不是引用对象的属性:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print Global
Namespace(x=10, y='hello')

16.6.2.7.1. 定制 Manager

要创建自己的 Management 器,可以创建一个BaseManager的子类,并使用register()类方法向该 Management 器类注册新类型或可调用项。例如:

from multiprocessing.managers import BaseManager

class MathsClass(object):
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    manager = MyManager()
    manager.start()
    maths = manager.Maths()
    print maths.add(4, 3)         # prints 7
    print maths.mul(7, 8)         # prints 56

16.6.2.7.2. 使用远程 Management 器

可以在一台计算机上运行 Management 器服务器,并让 Client 端从其他计算机上使用它(假设所涉及的防火墙允许它)。

运行以下命令将为单个共享队列创建服务器,远程 Client 端可以访问该服务器:

>>> from multiprocessing.managers import BaseManager
>>> import Queue
>>> queue = Queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

一个 Client 端可以按以下方式访问服务器:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

另一个 Client 端也可以使用它:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey='abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

本地进程也可以使用 Client 端上的代码从远程访问该队列,以访问该队列:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey='abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

16.6.2.8. 代理对象

代理是“引用”共享对象的对象,该共享对象(大概)生活在不同的进程中。共享对象被称为代理的“参考对象”。多个代理对象可能具有相同的引用对象。

代理对象具有调用其引用对象的相应方法的方法(尽管并非引用对象的每个方法都必须可以pass代理使用)。通常,可以使用与代理对象相同的大多数方式使用代理:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print l
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print repr(l)
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

请注意,将str()应用于代理将返回所指对象的表示,而将repr()应用于将返回代理的表示。

代理对象的一个重要Function是可拾取的,因此可以在进程之间传递。但是请注意,如果将代理发送到相应 Management 者的进程,则取消选择代理将自己生成引用对象。例如,这意味着一个共享对象可以包含第二个:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print a, b
[[]] []
>>> b.append('hello')
>>> print a, b
[['hello']] ['hello']

Note

multiprocessing中的代理类型不支持按值进行比较。因此,例如,我们有:

>>> manager.list([1,2,3]) == [1,2,3]
False

进行比较时,应该只使用引用对象的副本。

如果proxy是其引用对象为obj的代理,则表达式

proxy._callmethod(methodname, args, kwds)

将评估表达式

getattr(obj, methodname)(*args, **kwds)

在 Manager 的过程中。

返回的值将是调用结果的副本或新共享对象的代理-请参阅BaseManager.register()的* method_to_typeid *参数的文档。

如果呼叫引发异常,则_callmethod()重新引发。如果在 Management 者的流程中引发了其他异常,则该异常将转换为RemoteError异常,并由_callmethod()引发。

特别要注意的是,如果* methodname 没有被 expose *,则会引发一个异常。

_callmethod()的用法示例:

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getslice__', (2, 7))   # equiv to `l[2:7]`
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))     # equiv to `l[20]`
Traceback (most recent call last):
...
IndexError: list index out of range

如果引用对象不可拾取,则将引发异常。

16.6.2.8.1. Cleanup

代理对象使用 weakref 回调,以便在收集垃圾时将其自身从拥有其引用对象的 Management 器中注销。

当不再有代理引用共享对象时,该共享对象将从 Management 器进程中删除。

16.6.2.9. Process池

可以创建一个进程池,该池将执行passPool类提交给它的任务。

请注意,池对象的方法仅应由创建池的进程调用。

2.7 版中的新Function:* maxtasksperchild 是工作进程可以退出并被新的工作进程替换之前可以完成的任务数,以释放未使用的资源。缺省 maxtasksperchild *为None,这意味着工作进程将与池一样长。

Note

Pool内的工作进程通常在 Pool 工作队列的整个持续时间内都处于活动状态。在其他系统(例如 Apache,mod_wsgi 等)中发现的释放 Worker 资源的常见模式是允许池中的 Worker 在退出,清理和产生新进程之前仅完成一定数量的工作。取代旧的。 Pool的* maxtasksperchild *参数将这种Function展示给finally用户。

如果指定了* callback ,那么它应该是可调用的,可以接受单个参数。结果准备就绪后,将对其应用回调*(除非调用失败)。 * callback *应该立即完成,因为否则处理结果的线程将被阻塞。

此方法将可迭代项分为多个块,将其作为单独的任务提交给流程池。可以pass将* chunksize *设置为正整数来指定这些块的(大约)大小。

如果指定了* callback ,那么它应该是可调用的,可以接受单个参数。结果准备就绪后,将对其应用回调*(除非调用失败)。 * callback *应该立即完成,因为否则处理结果的线程将被阻塞。

同样,如果* chunksize 1,则imap()方法返回的迭代器的next()方法具有可选的 timeout 参数:如果无法在 timeout *秒内返回结果,则next(timeout)将提高multiprocessing.TimeoutError

以下示例演示了池的用法:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    result = pool.apply_async(f, (10,))   # evaluate "f(10)" asynchronously in a single process
    print result.get(timeout=1)           # prints "100" unless your computer is *very* slow

    print pool.map(f, range(10))          # prints "[0, 1, 4,..., 81]"

    it = pool.imap(f, range(10))
    print it.next()                       # prints "0"
    print it.next()                       # prints "1"
    print it.next(timeout=1)              # prints "4" unless your computer is *very* slow

    result = pool.apply_async(time.sleep, (10,))
    print result.get(timeout=1)           # raises multiprocessing.TimeoutError

16.6.2.10. Listener 和 Client

通常,使用队列或使用Pipe()返回的Connection对象来完成进程之间的消息传递。

但是,multiprocessing.connection模块具有一些额外的灵 Active。它基本上提供了用于处理套接字或 Windows 命名管道的面向消息的高级 API,并且还支持使用hmac模块进行“摘要式身份验证”。

如果答复与使用* authkey *作为密钥的消息摘要匹配,则将欢迎消息发送到连接的另一端。否则引发AuthenticationError

如果未收到欢迎消息,则引发AuthenticationError

连接的类型由* family 参数确定,但通常可以Ellipsis,因为它通常可以从 address *的格式中推断出来。 (请参见Address Formats)

如果* authenticate True authkey 为字符串,则使用摘要式身份验证。如果 authkey None,则用于身份验证的密钥将为 authkey *或current_process().authkey)。如果认证失败,则引发AuthenticationError。参见Authentication keys

Note

如果使用的地址为“ 0.0.0.0”,则该地址在 Windows 上将不是可连接的端点。如果需要可连接的端点,则应使用“ 127.0.0.1”。

如果侦听器对象使用套接字,则绑定它后,会将* backlog *(默认为 1)传递给套接字的listen()方法。

如果* authenticate True(默认为False)或 authkey *不是None,则使用摘要式身份验证。

如果* authkey *是字符串,则它将用作认证密钥;否则,它将用作认证密钥。否则必须为None

如果* authkey None authenticate True,则current_process().authkey用作身份验证密钥。如果 authkey None authenticate *为False,则不进行任何身份验证。如果认证失败,则引发AuthenticationError。参见Authentication keys

侦听器对象具有以下只读属性:

该模块定义了以下异常:

Examples

以下服务器代码创建一个使用'secret password'作为身份验证密钥的侦听器。然后,它 await 连接并将一些数据发送到 Client 端:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'
listener = Listener(address, authkey='secret password')

conn = listener.accept()
print 'connection accepted from', listener.last_accepted

conn.send([2.25, None, 'junk', float])

conn.send_bytes('hello')

conn.send_bytes(array('i', [42, 1729]))

conn.close()
listener.close()

以下代码连接到服务器并从服务器接收一些数据:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)
conn = Client(address, authkey='secret password')

print conn.recv()                 # => [2.25, None, 'junk', float]

print conn.recv_bytes()            # => 'hello'

arr = array('i', [0, 0, 0, 0, 0])
print conn.recv_bytes_into(arr)     # => 8
print arr                         # => array('i', [42, 1729, 0, 0, 0])

conn.close()

16.6.2.10.1. 地址格式

请注意,默认情况下,任何以两个反斜杠开头的字符串都假定为'AF_PIPE'地址而不是'AF_UNIX'地址。

16.6.2.11. 认证密钥

当人们使用Connection.recv()时,接收到的数据将自动取消处理。不幸的是,从不受信任的来源中抽取数据会带来安全风险。因此,ListenerClient()使用hmac模块提供摘要身份验证。

验证密钥是一个字符串,可以看作是密码:构建连接后,两端将要求证明对方知道该认证密钥。 (证明两端使用相同的密钥“不”涉及pass连接发送密钥.)

如果请求身份验证但未指定身份验证密钥,则使用current_process().authkey的返回值(请参见Process)。当前进程创建的任何Process对象都会自动继承此值。这意味着(默认情况下)多进程程序的所有进程将共享一个身份验证密钥,该身份验证密钥可在它们之间构建连接时使用。

也可以使用os.urandom()生成合适的身份验证密钥。

16.6.2.12. Logging

提供了一些日志记录支持。但是请注意,logging包不使用进程共享锁,因此(取决于处理程序类型)可能会混淆来自不同进程的消息。

首次创建时,Logger 的级别为logging.NOTSET,并且没有默认处理程序。默认情况下,发送到该 Logger 的消息不会传播到根 Logger。

请注意,在 Windows 上,子进程将仅继承父进程的 Logger 级别,而不会继承 Logger 的任何其他自定义设置。

以下是启用了日志记录的示例会话:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

除了具有这两个日志记录Function外,multiprocessing 还公开了两个附加的日志记录级别属性。它们是SUBWARNINGSUBDEBUG。下表说明了这些内容在普通级别层次结构中的适合位置。

Level Numeric value
SUBWARNING 25
SUBDEBUG 5

有关日志记录级别的完整表,请参见logging模块。

这些附加的日志记录级别主要用于 multiprocessing 模块中的某些调试消息。除了启用了SUBDEBUG之外,以下示例与上述示例相同:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(multiprocessing.SUBDEBUG)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
>>> del m
[SUBDEBUG/MainProcess] finalizer calling ...
[INFO/MainProcess] sending shutdown message to manager
[DEBUG/SyncManager-...] manager received shutdown message
[SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
[SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
[SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
[SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
[INFO/SyncManager-...] manager exiting with exitcode 0

16.6.2.13. multiprocessing.dummy 模块

multiprocessing.dummy复制multiprocessing的 API,但只不过是threading模块周围的包装器。

16.6.3. 编程准则

使用multiprocessing时,应遵循某些准则和习惯用法。

16.6.3.1. 所有平台

避免共享状态

Note

应当尽可能避免在进程之间转移大量数据。

最好坚持使用队列或管道在进程之间进行通信,而不是使用threading模块中的较低级别的同步 Primitives。

Picklability

Note

确保代理方法的参数可挑剔。

代理的线程安全

Note

除非使用锁保护它,否则不要从多个线程中使用代理对象。

(使用* same *代理使用不同的进程永远不会有问题.)

加入僵尸进程

Note

在 Unix 上,当进程完成但尚未加入时,它就变成了僵尸。永远不要有太多,因为每次新进程启动(或调用active_children())时,所有尚未加入的已完成进程都将加入。同时调用已完成的过程Process.is_alive将加入该过程。即便如此,将您启动的所有过程明确地加入也可能是一个好习惯。

继承比腌制/腌制更好

Note

在 Windows 上,multiprocessing中的许多类型都需要可腌制,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他位置创建的共享资源的进程可以从祖先进程继承该程序。

避免终止流程

Note

使用Process.terminate方法停止进程可能导致该进程当前正在使用的任何共享资源(例如锁,signal 灯,管道和队列)损坏或对其他进程不可用。

因此,最好只考虑对从未使用任何共享资源的进程使用Process.terminate

加入使用队列的进程

Note

请记住,将项目放入队列的进程将在终止之前 await,直到所有缓冲的项目由“ feeder”线程馈送到基础管道为止。 (子进程可以调用队列的cancel_join_thread()方法来避免这种行为。)

这意味着,无论何时使用队列,都需要确保在加入该流程之前,将finally删除队列中已放置的所有项目。否则,您无法确定将项目放入队列的进程将终止。还请记住,非守护进程将自动加入。

一个将导致死锁的示例如下:

from multiprocessing import Process, Queue

def f(q):
q.put('X' * 1000000)

if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join()                    # this deadlocks
obj = queue.get()

此处的解决方法是交换最后两行(或仅删除p.join()行)。

明确地将资源传递给子进程

Note

在 Unix 上,子进程可以利用在父进程中使用全局资源创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数。

除了使代码(可能)与 Windows 兼容之外,这还确保只要子进程仍然存在,就不会在父进程中垃圾收集对象。如果在父进程中垃圾回收对象时释放了一些资源,这可能很重要。

例如

from multiprocessing import Process, Lock

def f():
... do something using "lock" ...

if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f).start()

应该 Rewrite 为

from multiprocessing import Process, Lock

def f(l):
... do something using "l" ...

if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f, args=(lock,)).start()

当心用“类似于文件的文件”替换sys.stdin

Note

multiprocessing最初是无条件调用的:

os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap()方法中-这导致了流程中的问题。它已更改为:

sys.stdin.close()
sys.stdin = open(os.devnull)

它解决了进程相互冲突导致文件 Descriptors 错误的根本问题,但是对使用输出缓冲将“ _替换为“类文件对象”的应用程序带来了潜在的危险。危险是,如果多个进程在该文件状对象上调用close(),则可能导致同一数据多次刷新到该对象,从而导致损坏。

如果编写类似文件的对象并实现自己的缓存,则可以pass在每次添加到缓存时存储 pid 并在 pid 更改时丢弃缓存来使其成为分叉安全的。例如:

@property
def cache(self):
pid = os.getpid()
if pid != self._pid:
self._pid = pid
self._cache = []
return self._cache

有关更多信息,请参见bpo-5155bpo-5313bpo-5331

16.6.3.2. Windows

由于 Windows 缺少os.fork(),因此有一些额外的限制:

More picklability

Note

确保Process.__init__()的所有参数都是可挑剔的。特别是,这意味着绑定或未绑定的方法不能直接用作 Windows 上的target参数-只需定义一个函数并使用它即可。

另外,如果您继承Process的子类,请确保在调用Process.start方法时实例是可腌制的。

Global variables

Note

请记住,如果在子进程中运行的代码try访问全局变量,则它看到的值(如果有)可能与调用Process.start时父进程中的值不同。

但是,只是模块级常量的全局变量不会引起任何问题。

安全导入主模块

Note

确保可以由新的 Python 解释器安全地导入主模块,而不会引起意外的副作用(例如,启动新进程)。

例如,在 Windows 下,运行以下模块会失败,并显示RuntimeError

from multiprocessing import Process

def foo():
print 'hello'

p = Process(target=foo)
p.start()

而是应该使用if __name__ == '__main__':来保护程序的“入口点”,如下所示:

from multiprocessing import Process, freeze_support

def foo():
print 'hello'

if __name__ == '__main__':
freeze_support()
p = Process(target=foo)
p.start()

(如果程序将正常运行而不是冻结,则可以Ellipsisfreeze_support()行.)

这使新产生的 Python 解释器可以安全地导入模块,然后运行模块的foo()函数。

如果在主模块中创建了池或 Management 器,则适用类似的限制。

16.6.4. Examples

演示如何创建和使用定制的 Management 器和代理:

#
# This module shows how to use arbitrary callables with a subclass of
# `BaseManager`.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo(object):
    def f(self):
        print 'you called Foo.f()'
    def g(self):
        print 'you called Foo.g()'
    def _h(self):
        print 'you called Foo._h()'

# A simple generator function
def baz():
    for i in xrange(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ('next', '__next__')
    def __iter__(self):
        return self
    def next(self):
        return self._callmethod('next')
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print '-' * 20

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print '-' * 20

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print '-' * 20

    it = manager.baz()
    for i in it:
        print '<%d>' % i,
    print

    print '-' * 20

    op = manager.operator()
    print 'op.add(23, 45) =', op.add(23, 45)
    print 'op.pow(2, 94) =', op.pow(2, 94)
    print 'op.getslice(range(10), 2, 6) =', op.getslice(range(10), 2, 6)
    print 'op.repeat(range(5), 3) =', op.repeat(range(5), 3)
    print 'op._exposed_ =', op._exposed_

##

if __name__ == '__main__':
    freeze_support()
    test()

使用Pool

#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def f(x):
    return 1.0 / (x-5.0)

def pow3(x):
    return x**3

def noop(x):
    pass

#
# Test code
#

def test():
    print 'cpu_count() = %d\n' % multiprocessing.cpu_count()

    #
    # Create pool
    #

    PROCESSES = 4
    print 'Creating pool with %d processes\n' % PROCESSES
    pool = multiprocessing.Pool(PROCESSES)
    print 'pool = %s' % pool
    print

    #
    # Tests
    #

    TASKS = [(mul, (i, 7)) for i in range(10)] + \
            [(plus, (i, 8)) for i in range(10)]

    results = [pool.apply_async(calculate, t) for t in TASKS]
    imap_it = pool.imap(calculatestar, TASKS)
    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

    print 'Ordered results using pool.apply_async():'
    for r in results:
        print '\t', r.get()
    print

    print 'Ordered results using pool.imap():'
    for x in imap_it:
        print '\t', x
    print

    print 'Unordered results using pool.imap_unordered():'
    for x in imap_unordered_it:
        print '\t', x
    print

    print 'Ordered results using pool.map() --- will block till complete:'
    for x in pool.map(calculatestar, TASKS):
        print '\t', x
    print

    #
    # Simple benchmarks
    #

    N = 100000
    print 'def pow3(x): return x**3'

    t = time.time()
    A = map(pow3, xrange(N))
    print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
          (N, time.time() - t)

    t = time.time()
    B = pool.map(pow3, xrange(N))
    print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
          (N, time.time() - t)

    t = time.time()
    C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
    print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
          ' seconds' % (N, N//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    L = [None] * 1000000
    print 'def noop(x): pass'
    print 'L = [None] * 1000000'

    t = time.time()
    A = map(noop, L)
    print '\tmap(noop, L):\n\t\t%s seconds' % \
          (time.time() - t)

    t = time.time()
    B = pool.map(noop, L)
    print '\tpool.map(noop, L):\n\t\t%s seconds' % \
          (time.time() - t)

    t = time.time()
    C = list(pool.imap(noop, L, chunksize=len(L)//8))
    print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
          (len(L)//8, time.time() - t)

    assert A == B == C, (len(A), len(B), len(C))
    print

    del A, B, C, L

    #
    # Test error handling
    #

    print 'Testing error handling:'

    try:
        print pool.apply(f, (5,))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from pool.apply()'
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print pool.map(f, range(10))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from pool.map()'
    else:
        raise AssertionError('expected ZeroDivisionError')

    try:
        print list(pool.imap(f, range(10)))
    except ZeroDivisionError:
        print '\tGot ZeroDivisionError as expected from list(pool.imap())'
    else:
        raise AssertionError('expected ZeroDivisionError')

    it = pool.imap(f, range(10))
    for i in range(10):
        try:
            x = it.next()
        except ZeroDivisionError:
            if i == 5:
                pass
        except StopIteration:
            break
        else:
            if i == 5:
                raise AssertionError('expected ZeroDivisionError')

    assert i == 9
    print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
    print

    #
    # Testing timeouts
    #

    print 'Testing ApplyResult.get() with timeout:',
    res = pool.apply_async(calculate, TASKS[0])
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % res.get(0.02))
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print
    print

    print 'Testing IMapIterator.next() with timeout:',
    it = pool.imap(calculatestar, TASKS)
    while 1:
        sys.stdout.flush()
        try:
            sys.stdout.write('\n\t%s' % it.next(0.02))
        except StopIteration:
            break
        except multiprocessing.TimeoutError:
            sys.stdout.write('.')
    print
    print

    #
    # Testing callback
    #

    print 'Testing callback:'

    A = []
    B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]

    r = pool.apply_async(mul, (7, 8), callback=A.append)
    r.wait()

    r = pool.map_async(pow3, range(10), callback=A.extend)
    r.wait()

    if A == B:
        print '\tcallbacks succeeded\n'
    else:
        print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)

    #
    # Check there are no outstanding tasks
    #

    assert not pool._cache, 'cache = %r' % pool._cache

    #
    # Check close() methods
    #

    print 'Testing close():'

    for worker in pool._pool:
        assert worker.is_alive()

    result = pool.apply_async(time.sleep, [0.5])
    pool.close()
    pool.join()

    assert result.get() is None

    for worker in pool._pool:
        assert not worker.is_alive()

    print '\tclose() succeeded\n'

    #
    # Check terminate() method
    #

    print 'Testing terminate():'

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
    pool.terminate()
    pool.join()

    for worker in pool._pool:
        assert not worker.is_alive()

    print '\tterminate() succeeded\n'

    #
    # Check garbage collection
    #

    print 'Testing garbage collection:'

    pool = multiprocessing.Pool(2)
    DELTA = 0.1
    processes = pool._pool
    ignore = pool.apply(pow3, [2])
    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]

    results = pool = None

    time.sleep(DELTA * 2)

    for worker in processes:
        assert not worker.is_alive()

    print '\tgarbage collection succeeded\n'

if __name__ == '__main__':
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
        print ' Using processes '.center(79, '-')
    elif sys.argv[1] == 'threads':
        print ' Using threads '.center(79, '-')
        import multiprocessing.dummy as multiprocessing
    else:
        print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
        raise SystemExit(2)

    test()

同步类型,如锁,条件和队列:

#
# A test file for the `multiprocessing` package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, random
from Queue import Empty

import multiprocessing               # may get overwritten

#### TEST_VALUE

def value_func(running, mutex):
    random.seed()
    time.sleep(random.random()*4)

    mutex.acquire()
    print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
    running.value -= 1
    mutex.release()

def test_value():
    TASKS = 10
    running = multiprocessing.Value('i', TASKS)
    mutex = multiprocessing.Lock()

    for i in range(TASKS):
        p = multiprocessing.Process(target=value_func, args=(running, mutex))
        p.start()

    while running.value > 0:
        time.sleep(0.08)
        mutex.acquire()
        print running.value,
        sys.stdout.flush()
        mutex.release()

    print
    print 'No more running processes'

#### TEST_QUEUE

def queue_func(queue):
    for i in range(30):
        time.sleep(0.5 * random.random())
        queue.put(i*i)
    queue.put('STOP')

def test_queue():
    q = multiprocessing.Queue()

    p = multiprocessing.Process(target=queue_func, args=(q,))
    p.start()

    o = None
    while o != 'STOP':
        try:
            o = q.get(timeout=0.3)
            print o,
            sys.stdout.flush()
        except Empty:
            print 'TIMEOUT'

    print

#### TEST_CONDITION

def condition_func(cond):
    cond.acquire()
    print '\t' + str(cond)
    time.sleep(2)
    print '\tchild is notifying'
    print '\t' + str(cond)
    cond.notify()
    cond.release()

def test_condition():
    cond = multiprocessing.Condition()

    p = multiprocessing.Process(target=condition_func, args=(cond,))
    print cond

    cond.acquire()
    print cond
    cond.acquire()
    print cond

    p.start()

    print 'main is waiting'
    cond.wait()
    print 'main has woken up'

    print cond
    cond.release()
    print cond
    cond.release()

    p.join()
    print cond

#### TEST_SEMAPHORE

def semaphore_func(sema, mutex, running):
    sema.acquire()

    mutex.acquire()
    running.value += 1
    print running.value, 'tasks are running'
    mutex.release()

    random.seed()
    time.sleep(random.random()*2)

    mutex.acquire()
    running.value -= 1
    print '%s has finished' % multiprocessing.current_process()
    mutex.release()

    sema.release()

def test_semaphore():
    sema = multiprocessing.Semaphore(3)
    mutex = multiprocessing.RLock()
    running = multiprocessing.Value('i', 0)

    processes = [
        multiprocessing.Process(target=semaphore_func,
                                args=(sema, mutex, running))
        for i in range(10)
        ]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

#### TEST_JOIN_TIMEOUT

def join_timeout_func():
    print '\tchild sleeping'
    time.sleep(5.5)
    print '\n\tchild terminating'

def test_join_timeout():
    p = multiprocessing.Process(target=join_timeout_func)
    p.start()

    print 'waiting for process to finish'

    while 1:
        p.join(timeout=1)
        if not p.is_alive():
            break
        print '.',
        sys.stdout.flush()

#### TEST_EVENT

def event_func(event):
    print '\t%r is waiting' % multiprocessing.current_process()
    event.wait()
    print '\t%r has woken up' % multiprocessing.current_process()

def test_event():
    event = multiprocessing.Event()

    processes = [multiprocessing.Process(target=event_func, args=(event,))
                 for i in range(5)]

    for p in processes:
        p.start()

    print 'main is sleeping'
    time.sleep(2)

    print 'main is setting event'
    event.set()

    for p in processes:
        p.join()

#### TEST_SHAREDVALUES

def sharedvalues_func(values, arrays, shared_values, shared_arrays):
    for i in range(len(values)):
        v = values[i][1]
        sv = shared_values[i].value
        assert v == sv

    for i in range(len(values)):
        a = arrays[i][1]
        sa = list(shared_arrays[i][:])
        assert a == sa

    print 'Tests passed'

def test_sharedvalues():
    values = [
        ('i', 10),
        ('h', -2),
        ('d', 1.25)
        ]
    arrays = [
        ('i', range(100)),
        ('d', [0.25 * i for i in range(100)]),
        ('H', range(1000))
        ]

    shared_values = [multiprocessing.Value(id, v) for id, v in values]
    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]

    p = multiprocessing.Process(
        target=sharedvalues_func,
        args=(values, arrays, shared_values, shared_arrays)
        )
    p.start()
    p.join()

    assert p.exitcode == 0

####

def test(namespace=multiprocessing):
    global multiprocessing

    multiprocessing = namespace

    for func in [ test_value, test_queue, test_condition,
                  test_semaphore, test_join_timeout, test_event,
                  test_sharedvalues ]:

        print '\n\t######## %s\n' % func.__name__
        func()

    ignore = multiprocessing.active_children()      # cleanup any old processes
    if hasattr(multiprocessing, '_debug_info'):
        info = multiprocessing._debug_info()
        if info:
            print info
            raise ValueError('there should be no positive refcounts left')

if __name__ == '__main__':
    multiprocessing.freeze_support()

    assert len(sys.argv) in (1, 2)

    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
        print ' Using processes '.center(79, '-')
        namespace = multiprocessing
    elif sys.argv[1] == 'manager':
        print ' Using processes and a manager '.center(79, '-')
        namespace = multiprocessing.Manager()
        namespace.Process = multiprocessing.Process
        namespace.current_process = multiprocessing.current_process
        namespace.active_children = multiprocessing.active_children
    elif sys.argv[1] == 'threads':
        print ' Using threads '.center(79, '-')
        import multiprocessing.dummy as namespace
    else:
        print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
        raise SystemExit(2)

    test(namespace)

一个示例,显示如何使用队列将任务提供给工作进程集合并收集结果:

#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue.  If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print 'Unordered results:'
    for i in range(len(TASKS1)):
        print '\t', done_queue.get()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print '\t', done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')

if __name__ == '__main__':
    freeze_support()
    test()

一个工作进程池如何在共享一个侦听套接字的同时每个进程可以运行SimpleHTTPServer.HttpServer实例的示例。

#
# Example where a pool of http servers share a single listening socket
#
# On Windows this module depends on the ability to pickle a socket
# object so that the worker processes can inherit a copy of the server
# object.  (We import `multiprocessing.reduction` to enable this pickling.)
#
# Not sure if we should synchronize access to `socket.accept()` method by
# using a process-shared lock -- does not seem to be necessary.
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import os
import sys

from multiprocessing import Process, current_process, freeze_support
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler

if sys.platform == 'win32':
    import multiprocessing.reduction    # make sockets pickable/inheritable

def note(format, *args):
    sys.stderr.write('[%s]\t%s\n' % (current_process().name, format%args))

class RequestHandler(SimpleHTTPRequestHandler):
    # we override log_message() to show which process is handling the request
    def log_message(self, format, *args):
        note(format, *args)

def serve_forever(server):
    note('starting server')
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        pass

def runpool(address, number_of_processes):
    # create a single server object -- children will each inherit a copy
    server = HTTPServer(address, RequestHandler)

    # create child processes to act as workers
    for i in range(number_of_processes-1):
        Process(target=serve_forever, args=(server,)).start()

    # main process also acts as a worker
    serve_forever(server)

def test():
    DIR = os.path.join(os.path.dirname(__file__), '..')
    ADDRESS = ('localhost', 8000)
    NUMBER_OF_PROCESSES = 4

    print 'Serving at http://%s:%d using %d worker processes' % \
          (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES)
    print 'To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32']

    os.chdir(DIR)
    runpool(ADDRESS, NUMBER_OF_PROCESSES)

if __name__ == '__main__':
    freeze_support()
    test()

一些简单的基准比较multiprocessingthreading

#
# Simple benchmarks for the multiprocessing package
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time, sys, multiprocessing, threading, Queue, gc

if sys.platform == 'win32':
    _timer = time.clock
else:
    _timer = time.time

delta = 1

#### TEST_QUEUESPEED

def queuespeed_func(q, c, iterations):
    a = '0' * 256
    c.acquire()
    c.notify()
    c.release()

    for i in xrange(iterations):
        q.put(a)

    q.put('STOP')

def test_queuespeed(Process, q, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = Process(target=queuespeed_func, args=(q, c, iterations))
        c.acquire()
        p.start()
        c.wait()
        c.release()

        result = None
        t = _timer()

        while result != 'STOP':
            result = q.get()

        elapsed = _timer() - t

        p.join()

    print iterations, 'objects passed through the queue in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed

#### TEST_PIPESPEED

def pipe_func(c, cond, iterations):
    a = '0' * 256
    cond.acquire()
    cond.notify()
    cond.release()

    for i in xrange(iterations):
        c.send(a)

    c.send('STOP')

def test_pipespeed():
    c, d = multiprocessing.Pipe()
    cond = multiprocessing.Condition()
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        p = multiprocessing.Process(target=pipe_func,
                                    args=(d, cond, iterations))
        cond.acquire()
        p.start()
        cond.wait()
        cond.release()

        result = None
        t = _timer()

        while result != 'STOP':
            result = c.recv()

        elapsed = _timer() - t
        p.join()

    print iterations, 'objects passed through connection in',elapsed,'seconds'
    print 'average number/sec:', iterations/elapsed

#### TEST_SEQSPEED

def test_seqspeed(seq):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            a = seq[5]

        elapsed = _timer()-t

    print iterations, 'iterations in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed

#### TEST_LOCK

def test_lockspeed(l):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        t = _timer()

        for i in xrange(iterations):
            l.acquire()
            l.release()

        elapsed = _timer()-t

    print iterations, 'iterations in', elapsed, 'seconds'
    print 'average number/sec:', iterations/elapsed

#### TEST_CONDITION

def conditionspeed_func(c, N):
    c.acquire()
    c.notify()

    for i in xrange(N):
        c.wait()
        c.notify()

    c.release()

def test_conditionspeed(Process, c):
    elapsed = 0
    iterations = 1

    while elapsed < delta:
        iterations *= 2

        c.acquire()
        p = Process(target=conditionspeed_func, args=(c, iterations))
        p.start()

        c.wait()

        t = _timer()

        for i in xrange(iterations):
            c.notify()
            c.wait()

        elapsed = _timer()-t

        c.release()
        p.join()

    print iterations * 2, 'waits in', elapsed, 'seconds'
    print 'average number/sec:', iterations * 2 / elapsed

####

def test():
    manager = multiprocessing.Manager()

    gc.disable()

    print '\n\t######## testing Queue.Queue\n'
    test_queuespeed(threading.Thread, Queue.Queue(),
                    threading.Condition())
    print '\n\t######## testing multiprocessing.Queue\n'
    test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
                    multiprocessing.Condition())
    print '\n\t######## testing Queue managed by server process\n'
    test_queuespeed(multiprocessing.Process, manager.Queue(),
                    manager.Condition())
    print '\n\t######## testing multiprocessing.Pipe\n'
    test_pipespeed()

    print

    print '\n\t######## testing list\n'
    test_seqspeed(range(10))
    print '\n\t######## testing list managed by server process\n'
    test_seqspeed(manager.list(range(10)))
    print '\n\t######## testing Array("i", ..., lock=False)\n'
    test_seqspeed(multiprocessing.Array('i', range(10), lock=False))
    print '\n\t######## testing Array("i", ..., lock=True)\n'
    test_seqspeed(multiprocessing.Array('i', range(10), lock=True))

    print

    print '\n\t######## testing threading.Lock\n'
    test_lockspeed(threading.Lock())
    print '\n\t######## testing threading.RLock\n'
    test_lockspeed(threading.RLock())
    print '\n\t######## testing multiprocessing.Lock\n'
    test_lockspeed(multiprocessing.Lock())
    print '\n\t######## testing multiprocessing.RLock\n'
    test_lockspeed(multiprocessing.RLock())
    print '\n\t######## testing lock managed by server process\n'
    test_lockspeed(manager.Lock())
    print '\n\t######## testing rlock managed by server process\n'
    test_lockspeed(manager.RLock())

    print

    print '\n\t######## testing threading.Condition\n'
    test_conditionspeed(threading.Thread, threading.Condition())
    print '\n\t######## testing multiprocessing.Condition\n'
    test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
    print '\n\t######## testing condition managed by a server process\n'
    test_conditionspeed(multiprocessing.Process, manager.Condition())

    gc.enable()

if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()
首页