• Docs >
  • Multiprocessing best practices
Shortcuts

多进程最佳实践

torch.multiprocessing 是 Python 的 multiprocessing 模块的直接替代品。它支持完全相同的操作,但对其进行了扩展,因此通过 multiprocessing.Queue 发送的所有张量,其数据将被移动到共享内存中,并且只会发送一个句柄到另一个进程。

注意

当一个Tensor被发送到另一个进程时,Tensor数据是共享的。如果torch.Tensor.grad不是None,它也会被共享。在一个没有torch.Tensor.grad字段的Tensor被发送到另一个进程后,它会创建一个标准的进程特定的.gradTensor,这个Tensor的数据不会像之前那样在所有进程中自动共享。

这使得可以实现各种训练方法,如Hogwild、A3C,或任何其他需要异步操作的方法。

多进程中的CUDA

CUDA 运行时不支持 fork 启动方法;在子进程中使用 CUDA 需要使用 spawnforkserver 启动方法。

注意

可以通过使用 multiprocessing.get_context(...) 创建上下文或直接使用 multiprocessing.set_start_method(...) 来设置 start 方法。

与CPU张量不同,发送进程需要保持原始张量,只要接收进程保留该张量的副本。这是在底层实现的,但要求用户遵循最佳实践,以确保程序正确运行。例如,只要消费者进程有对该张量的引用,发送进程就必须保持活动状态,并且如果消费者进程通过致命信号异常退出,引用计数也无法拯救你。请参阅本节

另请参阅: 使用 nn.parallel.DistributedDataParallel 代替 multiprocessing 或 nn.DataParallel

最佳实践和技巧

避免和解决死锁

当一个新的进程被创建时,可能会出现很多问题,其中最常见的原因是后台线程导致的死锁。如果有任何线程持有锁或导入了模块,并且调用了fork,那么子进程很可能会处于损坏状态,并可能发生死锁或以其他方式失败。请注意,即使你自己没有这样做,Python内置库也会这样做——无需进一步查看multiprocessingmultiprocessing.Queue实际上是一个非常复杂的类,它会生成多个用于序列化、发送和接收对象的线程,这些线程也可能导致上述问题。如果你发现自己处于这种情况,尝试使用一个SimpleQueue,它不会使用任何额外的线程。

我们正在尽最大努力使操作变得简单,并确保这些死锁不会发生,但有些事情是我们无法控制的。如果你遇到暂时无法解决的问题,可以尝试在论坛上寻求帮助,我们会看看是否可以解决这个问题。

重用通过队列传递的缓冲区

请记住,每次将 Tensor 放入 multiprocessing.Queue 时,都必须将其移动到共享内存中。 如果它已经共享,则不会产生任何操作,否则将产生额外的内存复制,这可能会减慢整个过程。即使你有一组进程向单个进程发送数据,也要让它发送缓冲区回来——这几乎是免费的,并且可以让你在发送下一批数据时避免复制。

异步多进程训练(例如 Hogwild)

使用 torch.multiprocessing,可以异步训练模型,参数可以始终共享,或者定期同步。在第一种情况下,我们建议发送整个模型对象,而在后者中,我们建议仅发送 state_dict()

我们建议使用 multiprocessing.Queue 在进程之间传递所有类型的 PyTorch 对象。在使用 fork 启动方法时,可以继承已经在共享内存中的张量和存储,但这非常容易出错,应谨慎使用,并且仅由高级用户使用。队列虽然有时不是最优雅的解决方案,但在所有情况下都能正常工作。

警告

你应该小心使用全局语句,这些语句没有用if __name__ == '__main__'保护。如果使用的是fork以外的启动方法,它们将在所有子进程中执行。

Hogwild

可以在示例仓库中找到具体的Hogwild实现, 但为了展示代码的整体结构,下面也有一个最小的示例:

import torch.multiprocessing as mp
from model import MyModel

def train(model):
    # 构建数据加载器、优化器等
    for data, labels in data_loader:
        optimizer.zero_grad()
        loss_fn(model(data), labels).backward()
        optimizer.step()  # 这将更新共享参数

if __name__ == '__main__':
    num_processes = 4
    model = MyModel()
    # 注意:这对于 ``fork`` 方法正常工作是必需的
    model.share_memory()
    processes = []
    for rank in range(num_processes):
        p = mp.Process(target=train, args=(model,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

多进程中的CPU

不适当的并行处理可能导致CPU过载,使得不同进程之间竞争CPU资源,从而导致效率低下。

本教程将解释什么是CPU超额订阅以及如何避免它。

CPU 超额订阅

CPU 超额订阅是一个技术术语,指的是系统中分配的 vCPU 总数超过硬件上可用的 vCPU 总数的情况。

这导致了对CPU资源的严重争用。在这种情况下,进程之间频繁切换,增加了进程切换开销,降低了整体系统效率。

请参阅在示例仓库中找到的Hogwild实现中的代码示例,了解CPU超额订阅的情况。

当使用以下命令在CPU上运行训练示例时,使用4个进程:

python main.py --num-processes 4

假设机器上有N个vCPU可用,执行上述命令将生成4个子进程。每个子进程将为自己分配N个vCPU,导致总共需要4*N个vCPU。然而,机器上只有N个vCPU可用。因此,不同的进程将竞争资源,导致频繁的进程切换。

以下观察结果表明存在CPU过度订阅:

  1. 高CPU利用率:通过使用htop命令,您可以观察到CPU利用率持续高企,通常达到或超过其最大容量。这表明对CPU资源的需求超过了可用的物理核心,导致进程之间对CPU时间的争夺和竞争。

  2. 频繁的上下文切换与低系统效率:在超额订阅的CPU场景下,进程之间竞争CPU时间,操作系统需要快速地在不同进程之间切换以公平分配资源。这种频繁的上下文切换增加了开销并降低了整体系统效率。

避免CPU过度分配

避免CPU过度订阅的一个好方法是进行适当的资源分配。 确保同时运行的进程或线程数量不超过可用的CPU资源。

在这种情况下,解决方案是在子进程中指定适当数量的线程。这可以通过在子进程中使用 torch.set_num_threads(int) 函数来设置每个进程的线程数来实现。

假设机器上有N个vCPU,并且将生成M个进程,每个进程使用的最大num_threads值将是floor(N/M)。为了避免在mnist_hogwild示例中出现CPU过度订阅,需要在示例仓库中的train.py文件中进行以下更改。

def train(rank, args, model, device, dataset, dataloader_kwargs):
    torch.manual_seed(args.seed + rank)

    #### 定义当前子进程中使用的线程数
    torch.set_num_threads(floor(N/M))

    train_loader = torch.utils.data.DataLoader(dataset, **dataloader_kwargs)

    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
    for epoch in range(1, args.epochs + 1):
        train_epoch(epoch, args, model, device, train_loader, optimizer)

为每个进程设置num_thread,使用 torch.set_num_threads(floor(N/M))。其中,N替换为可用vCPU的数量,M替换为选择的进程数量。 适当的num_thread值会根据具体任务的不同而变化。然而,作为一般指导原则, num_thread的最大值应为floor(N/M),以避免CPU过度订阅。 在mnist_hogwild训练示例中,避免CPU过度订阅后,您可以实现30倍的性能提升。