multiprocessing.shared_memory —提供共享内存以便跨进程直接访问

源代码: Lib/multiprocessing/shared_memory.py

3.8 版的新Function。


此模块提供了一个类SharedMemory,用于分配和 Management 将由多核或对称 multiprocessing 器(SMP)计算机上的一个或多个进程访问的共享内存。为了帮助共享内存的生命周期 Management(尤其是跨不同进程),在multiprocessing.managers模块中还提供了BaseManager子类SharedMemoryManager

在此模块中,共享内存是指“ System V 样式”共享内存块(尽管不一定如此显式实现),并且不涉及“分布式共享内存”。这种共享存储器的样式允许不同的进程潜在地读写易失性存储器的公共(或共享)区域。常规上,进程被限制为只能访问其自己的进程存储空间,但是共享内存允许进程之间共享数据,从而避免了在包含该数据的进程之间发送消息的需求。与pass磁盘或插槽或其他需要序列化/反序列化和复制数据的通信共享数据相比,直接pass内存共享数据可以提供显着的性能优势。

    • class * multiprocessing.shared_memory. SharedMemory(* name = None create = False size = 0 *)
    • 创建一个新的共享内存块或附加到现有的共享内存块。每个共享内存块都分配有唯一的名称。这样,一个进程可以创建具有特定名称的共享存储块,而另一个进程可以使用相同的名称附加到该共享存储块。

作为跨进程共享数据的资源,共享内存块可能会超过创建它们的原始进程的寿命。当一个进程不再需要访问其他进程可能仍需要的共享内存块时,应调用close()方法。当任何进程不再需要共享内存块时,应调用unlink()方法以确保正确清理。

  • name *是请求的共享内存的唯一名称,指定为字符串。创建新的共享内存块时,如果名称提供None(默认值),则会生成一个新名称。

  • create *控制创建新的共享存储块(True)还是附加现有的共享存储块(False)。

  • size *指定创建新的共享内存块时请求的字节数。由于某些平台选择基于该平台的内存页面大小来分配内存块,因此共享内存块的确切大小可能大于或等于请求的大小。连接到现有共享内存块时,size参数将被忽略。

  • close ( )

    • 关闭从此实例对共享内存的访问。为了确保正确清理资源,一旦不再需要该实例,所有实例应调用close()。请注意,调用close()不会导致共享内存块本身被破坏。
  • buf

    • 共享内存块内容的 memoryview。
  • name

    • 以只读方式访问共享内存块的唯一名称。
  • size

    • 只读访问共享内存块的大小(以字节为单位)。

下面的示例演示了SharedMemory实例的低级用法:

>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

下面的示例演示了SharedMemory类和NumPy arrays的实际用法,可以从两个不同的 Python shell 访问相同的numpy.ndarray

>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'

>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])

>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])

>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()

>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
    • class * multiprocessing.managers. SharedMemoryManager([* address * [,* authkey *]])
    • BaseManager的子类,可用于跨进程 Management 共享内存块。

SharedMemoryManager实例上调用start()会导致启动新进程。这个新过程的唯一目的是 Management pass它创建的所有共享内存块的生命周期。要触发该进程 Management 的所有共享内存块的释放,请在实例上调用shutdown()。这将触发该进程 Management 的所有SharedMemory对象的SharedMemory.unlink()调用,然后停止该进程本身。passpassSharedMemoryManager创建SharedMemory实例,我们避免了手动跟踪和触发共享内存资源释放的需要。

此类提供了用于创建和返回SharedMemory实例以及创建由共享内存支持的类似列表的对象(ShareableList)的方法。

有关继承的* address authkey *可选 Importing 参数的描述,以及如何将它们用于从其他进程连接到现有的SharedMemoryManager服务,请参考multiprocessing.managers.BaseManager

  • SharedMemory(大小)

    • 创建并返回一个具有指定size字节的新SharedMemory对象。
  • ShareableList(序列)

    • 创建并返回一个新的ShareableList对象,该对象由 Importingsequence的值初始化。

以下示例演示了SharedMemoryManager的基本机制:

>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl

下面的示例描述了passwith语句使用SharedMemoryManager对象以确保不再需要所有共享内存块时将其释放的可能更方便的模式:

>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

with语句中使用SharedMemoryManager时,使用with语句的代码块完成执行后,将释放使用该 Management 器创建的共享内存块。

    • class * multiprocessing.shared_memory. ShareableList(* sequence = None **,* name = None *)
    • 提供一个可变的类似列表的对象,其中存储的所有值都存储在共享内存块中。这将可存储的值仅限制为intfloatboolstr(每个小于 10M 字节),bytes(每个小于 10M 字节)和None内置数据类型。它与内置list类型的显着不同还在于,这些列表不能更改其总长度(即,没有追加,插入等),并且不支持pass切片动态创建新的ShareableList实例。
  • sequence *用于填充新的ShareableList值。设置为None即可pass其唯一的共享内存名称附加到已存在的ShareableList

  • name *是请求的共享内存的唯一名称,如SharedMemory的定义中所述。附加到现有的ShareableList时,指定其共享内存块的唯一名称,同时将sequence设置为None

  • count(* value *)

    • 返回value的出现次数。
  • index(* value *)

    • 返回第一个索引位置value。如果不存在value,则提高ValueError
  • format

    • 只读属性,包含所有当前存储的值使用的struct包装格式。
  • shm

以下示例演示了ShareableList实例的基本用法:

>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported

以下示例描述了一个,两个或多个进程如何pass在其后面提供共享内存块的名称来访问相同的ShareableList

>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()