• Docs >
  • Multiprocessing package - torch.multiprocessing
Shortcuts

多进程包 - torch.multiprocessing

torch.multiprocessing 是围绕原生 multiprocessing 模块的封装。

它注册了自定义的reducers,这些reducers使用共享内存来在不同进程中提供对相同数据的共享视图。一旦张量/存储被移动到共享内存(参见share_memory_()),就可以将其发送到其他进程而无需进行任何复制。

该API与原始模块100%兼容 - 只需将import multiprocessing更改为import torch.multiprocessing,即可将通过队列或其他机制发送或共享的所有张量移动到共享内存中。

由于API的相似性,我们没有对本包的大部分内容进行文档化,我们建议参考原始模块的非常完善的文档。

警告

如果主进程突然退出(例如因为收到信号),Python的multiprocessing有时无法清理其子进程。这是一个已知的注意事项,所以如果你在中断解释器后看到任何资源泄漏,这可能意味着这种情况刚刚发生在你身上。

策略管理

torch.multiprocessing.get_all_sharing_strategies()[源代码]

返回当前系统上支持的一组共享策略。

torch.multiprocessing.get_sharing_strategy()[源代码]

返回当前共享CPU张量的策略。

torch.multiprocessing.set_sharing_strategy(new_strategy)[源代码]

设置共享CPU张量的策略。

Parameters

new_strategy (str) – 所选策略的名称。应该是由 get_all_sharing_strategies() 返回的值之一。

共享CUDA张量

在进程间共享CUDA张量仅在Python 3中支持,使用spawnforkserver启动方法。

与CPU张量不同,发送进程需要保留原始张量,只要接收进程保留该张量的副本。引用计数是在幕后实现的,但要求用户遵循以下最佳实践。

警告

如果消费者进程因致命信号异常终止,只要发送进程仍在运行,共享张量可能会永远保存在内存中。

  1. 在消费者中尽快释放内存。

## 好
x = queue.get()
# 对x进行一些操作
del x
## 不好
x = queue.get()
# 对x进行一些操作
# 做其他所有事情(生产者必须将x保存在内存中)

2. 保持生产者进程运行,直到所有消费者退出。这将防止生产者进程释放仍在被消费者使用的内存的情况。

## 生产者
# 发送张量,执行某些操作
event.wait()
## 消费者
# 接收张量并使用它们
event.set()
  1. 不要传递接收到的张量。

# 不会工作
x = queue.get()
queue_2.put(x)
# 你需要创建一个进程本地的副本
x = queue.get()
x_clone = x.clone()
queue_2.put(x_clone)
# 在同一进程中从同一队列进行放置和获取可能会导致段错误
queue.put(tensor)
x = queue.get()

共享策略

本节简要概述了不同共享策略的工作原理。请注意,这仅适用于CPU张量 - CUDA张量将始终使用CUDA API,因为这是它们唯一可以共享的方式。

文件描述符 - file_descriptor

注意

这是默认策略(macOS 和 OS X 除外,这些系统不支持此策略)。

此策略将使用文件描述符作为共享内存句柄。每当一个存储被移动到共享内存时,从shm_open获得的文件描述符会与对象一起缓存,当它要被发送到其他进程时,文件描述符将被传输(例如通过UNIX套接字)到该进程。接收方也会缓存文件描述符并mmap它,以获得对存储数据的共享视图。

请注意,如果有大量共享的张量,此策略将保持大量文件描述符长时间打开。如果您的系统对打开文件描述符的数量限制较低,并且无法提高这些限制,您应该使用file_system策略。

文件系统 - file_system

此策略将使用传递给 shm_open 的文件名来识别共享内存区域。这样做的好处是不需要实现缓存从它获得的文件描述符,但同时容易导致共享内存泄漏。文件在创建后不能立即删除,因为其他进程需要访问它以打开它们的视图。如果进程发生致命崩溃或被终止,并且没有调用存储析构函数,文件将保留在系统中。这是一个非常严重的问题,因为它们会一直占用内存,直到系统重新启动或手动释放它们。

为了解决共享内存文件泄漏的问题,torch.multiprocessing 将启动一个名为 torch_shm_manager 的守护进程,该守护进程将隔离自身 于当前进程组,并跟踪所有共享内存分配。 一旦所有连接到它的进程退出,它将等待片刻以确保 不会有新的连接,并将遍历该组分配的所有共享内存文件。 如果发现其中任何一个仍然存在,它们将被释放。我们已经测试了这种方法, 并证明它对各种故障具有鲁棒性。尽管如此,如果您的系统有足够高的限制, 并且 file_descriptor 是受支持的策略,我们不建议切换到此策略。

生成子进程

注意

适用于 Python >= 3.4。

这取决于Python的multiprocessing包中的spawn启动方法。

生成多个子进程来执行某些功能可以通过创建Process实例并调用join来等待它们完成。这种方法在处理单个子进程时效果良好,但在处理多个进程时可能会出现潜在问题。

即,顺序连接进程意味着它们将顺序终止。如果它们不终止,并且第一个进程不终止,进程终止将不会被注意到。此外,没有用于错误传播的原生设施。

下面的 spawn 函数解决了这些问题,并处理了错误传播、无序终止,并在检测到其中一个进程出错时主动终止进程。

torch.multiprocessing.spawn.spawn(fn, args=(), nprocs=1, join=True, daemon=False, start_method='spawn')[源代码]

生成 nprocs 个进程,这些进程运行 fn 并带有 args

如果其中一个进程以非零退出状态退出,剩余的进程将被终止,并抛出一个异常,说明终止的原因。如果子进程中捕获了异常,该异常将被转发,并且其回溯信息将包含在父进程抛出的异常中。

Parameters
  • fn (函数) –

    函数作为生成的进程的入口点被调用。此函数必须在模块的顶层定义,以便可以进行pickle处理并生成。这是由multiprocessing强加的要求。

    函数被调用为 fn(i, *args),其中 i 是进程索引,args 是通过的参数元组。

  • args (元组) – 传递给 fn 的参数。

  • nprocs (int) – 要生成的进程数。

  • join (bool) – 在所有进程上执行阻塞连接。

  • 守护进程 (布尔值) – 生成的进程的守护进程标志。如果设置为True,将创建守护进程。

  • start_method (str) – (已弃用) 此方法将始终使用 spawn 作为启动方法。要使用不同的启动方法,请使用 start_processes()

Returns

如果 joinTrue,则为 None, 如果 joinFalse,则为 ProcessContext

class torch.multiprocessing.SpawnContext[源代码]

spawn() 在调用时返回,参数为 join=False

join(timeout=None)

在spawn上下文中加入一个或多个进程。

尝试在此spawn上下文中加入一个或多个进程。 如果其中一个进程以非零退出状态退出,此函数 将杀死剩余的进程并引发一个异常,说明第一个进程退出的原因。

如果所有进程都已成功加入,则返回True, 如果有更多进程需要加入,则返回False

Parameters

超时 (浮点数) – 在放弃等待之前等待这么长时间。

优云智算