多进程最佳实践¶
torch.multiprocessing
是 Python 的 multiprocessing
模块的直接替代品。它支持完全相同的操作,但对其进行了扩展,因此通过 multiprocessing.Queue
发送的所有张量,其数据将被移动到共享内存中,并且只会发送一个句柄到另一个进程。
注意
当一个Tensor
被发送到另一个进程时,Tensor
数据是共享的。如果torch.Tensor.grad
不是None
,它也会被共享。在一个没有torch.Tensor.grad
字段的Tensor
被发送到另一个进程后,它会创建一个标准的进程特定的.grad
Tensor
,这个Tensor
的数据不会像之前那样在所有进程中自动共享。
这使得可以实现各种训练方法,如Hogwild、A3C,或任何其他需要异步操作的方法。
多进程中的CUDA¶
CUDA 运行时不支持 fork
启动方法;在子进程中使用 CUDA 需要使用 spawn
或 forkserver
启动方法。
注意
可以通过使用
multiprocessing.get_context(...)
创建上下文或直接使用
multiprocessing.set_start_method(...)
来设置 start 方法。
与CPU张量不同,发送进程需要保持原始张量,只要接收进程保留该张量的副本。这是在底层实现的,但要求用户遵循最佳实践,以确保程序正确运行。例如,只要消费者进程有对该张量的引用,发送进程就必须保持活动状态,并且如果消费者进程通过致命信号异常退出,引用计数也无法拯救你。请参阅本节。
另请参阅: 使用 nn.parallel.DistributedDataParallel 代替 multiprocessing 或 nn.DataParallel
最佳实践和技巧¶
避免和解决死锁¶
当一个新的进程被创建时,可能会出现很多问题,其中最常见的原因是后台线程导致的死锁。如果有任何线程持有锁或导入了模块,并且调用了fork
,那么子进程很可能会处于损坏状态,并可能发生死锁或以其他方式失败。请注意,即使你自己没有这样做,Python内置库也会这样做——无需进一步查看multiprocessing
。
multiprocessing.Queue
实际上是一个非常复杂的类,它会生成多个用于序列化、发送和接收对象的线程,这些线程也可能导致上述问题。如果你发现自己处于这种情况,尝试使用一个SimpleQueue
,它不会使用任何额外的线程。
我们正在尽最大努力使操作变得简单,并确保这些死锁不会发生,但有些事情是我们无法控制的。如果你遇到暂时无法解决的问题,可以尝试在论坛上寻求帮助,我们会看看是否可以解决这个问题。
重用通过队列传递的缓冲区¶
请记住,每次将 Tensor
放入
multiprocessing.Queue
时,都必须将其移动到共享内存中。
如果它已经共享,则不会产生任何操作,否则将产生额外的内存复制,这可能会减慢整个过程。即使你有一组进程向单个进程发送数据,也要让它发送缓冲区回来——这几乎是免费的,并且可以让你在发送下一批数据时避免复制。
异步多进程训练(例如 Hogwild)¶
使用 torch.multiprocessing
,可以异步训练模型,参数可以始终共享,或者定期同步。在第一种情况下,我们建议发送整个模型对象,而在后者中,我们建议仅发送 state_dict()
。
我们建议使用 multiprocessing.Queue
在进程之间传递所有类型的 PyTorch 对象。在使用 fork
启动方法时,可以继承已经在共享内存中的张量和存储,但这非常容易出错,应谨慎使用,并且仅由高级用户使用。队列虽然有时不是最优雅的解决方案,但在所有情况下都能正常工作。
警告
你应该小心使用全局语句,这些语句没有用if __name__ == '__main__'
保护。如果使用的是fork
以外的启动方法,它们将在所有子进程中执行。
Hogwild¶
可以在示例仓库中找到具体的Hogwild实现, 但为了展示代码的整体结构,下面也有一个最小的示例:
import torch.multiprocessing as mp
from model import MyModel
def train(model):
# 构建数据加载器、优化器等
for data, labels in data_loader:
optimizer.zero_grad()
loss_fn(model(data), labels).backward()
optimizer.step() # 这将更新共享参数
if __name__ == '__main__':
num_processes = 4
model = MyModel()
# 注意:这对于 ``fork`` 方法正常工作是必需的
model.share_memory()
processes = []
for rank in range(num_processes):
p = mp.Process(target=train, args=(model,))
p.start()
processes.append(p)
for p in processes:
p.join()
多进程中的CPU¶
不适当的并行处理可能导致CPU过载,使得不同进程之间竞争CPU资源,从而导致效率低下。
本教程将解释什么是CPU超额订阅以及如何避免它。
CPU 超额订阅¶
CPU 超额订阅是一个技术术语,指的是系统中分配的 vCPU 总数超过硬件上可用的 vCPU 总数的情况。
这导致了对CPU资源的严重争用。在这种情况下,进程之间频繁切换,增加了进程切换开销,降低了整体系统效率。
请参阅在示例仓库中找到的Hogwild实现中的代码示例,了解CPU超额订阅的情况。
当使用以下命令在CPU上运行训练示例时,使用4个进程:
python main.py --num-processes 4
假设机器上有N个vCPU可用,执行上述命令将生成4个子进程。每个子进程将为自己分配N个vCPU,导致总共需要4*N个vCPU。然而,机器上只有N个vCPU可用。因此,不同的进程将竞争资源,导致频繁的进程切换。
以下观察结果表明存在CPU过度订阅:
高CPU利用率:通过使用
htop
命令,您可以观察到CPU利用率持续高企,通常达到或超过其最大容量。这表明对CPU资源的需求超过了可用的物理核心,导致进程之间对CPU时间的争夺和竞争。频繁的上下文切换与低系统效率:在超额订阅的CPU场景下,进程之间竞争CPU时间,操作系统需要快速地在不同进程之间切换以公平分配资源。这种频繁的上下文切换增加了开销并降低了整体系统效率。
避免CPU过度分配¶
避免CPU过度订阅的一个好方法是进行适当的资源分配。 确保同时运行的进程或线程数量不超过可用的CPU资源。
在这种情况下,解决方案是在子进程中指定适当数量的线程。这可以通过在子进程中使用 torch.set_num_threads(int)
函数来设置每个进程的线程数来实现。
假设机器上有N个vCPU,并且将生成M个进程,每个进程使用的最大num_threads
值将是floor(N/M)
。为了避免在mnist_hogwild示例中出现CPU过度订阅,需要在示例仓库中的train.py
文件中进行以下更改。
def train(rank, args, model, device, dataset, dataloader_kwargs):
torch.manual_seed(args.seed + rank)
#### 定义当前子进程中使用的线程数
torch.set_num_threads(floor(N/M))
train_loader = torch.utils.data.DataLoader(dataset, **dataloader_kwargs)
optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
for epoch in range(1, args.epochs + 1):
train_epoch(epoch, args, model, device, train_loader, optimizer)
为每个进程设置num_thread
,使用
torch.set_num_threads(floor(N/M))
。其中,N替换为可用vCPU的数量,M替换为选择的进程数量。
适当的num_thread
值会根据具体任务的不同而变化。然而,作为一般指导原则,
num_thread
的最大值应为floor(N/M)
,以避免CPU过度订阅。
在mnist_hogwild训练示例中,避免CPU过度订阅后,您可以实现30倍的性能提升。