Shortcuts

Introduction || 什么是DDP || 单节点多GPU训练 || 容错 || 多节点训练 || minGPT训练

使用DDP进行多GPU训练

创建于:2022年9月27日 | 最后更新:2024年11月3日 | 最后验证:未验证

作者: Suraj Subramanian

What you will learn
  • 如何通过DDP将单GPU训练脚本迁移到多GPU

  • 设置分布式进程组

  • 在分布式设置中保存和加载模型

查看本教程中使用的代码在 GitHub

Prerequisites
  • DDP 工作原理的高级概述

  • 一台配备多个GPU的机器(本教程使用AWS p3.8xlarge实例)

  • PyTorch installed 已安装 CUDA

跟随下面的视频或在youtube上观看。

之前的教程中,我们获得了DDP工作原理的高级概述;现在我们将看到如何在代码中使用DDP。 在本教程中,我们从单GPU训练脚本开始,并将其迁移到在单个节点的4个GPU上运行。 在此过程中,我们将在代码中实现这些重要概念的同时,讨论分布式训练中的重要概念。

注意

如果你的模型包含任何BatchNorm层,它需要被转换为SyncBatchNorm以同步BatchNorm层的运行统计信息。

使用辅助函数 torch.nn.SyncBatchNorm.convert_sync_batchnorm(model) 将模型中的所有 BatchNorm 层转换为 SyncBatchNorm

比较 single_gpu.pymultigpu.py 的差异

这些是您通常对单GPU训练脚本所做的更改,以启用DDP。

导入

  • torch.multiprocessing 是 PyTorch 对 Python 原生多进程的封装

  • 分布式进程组包含所有可以相互通信和同步的进程。

import torch
import torch.nn.functional as F
from utils import MyTrainDataset

import torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import os

构建进程组

  • 首先,在初始化组进程之前,调用set_device,它为每个进程设置默认的GPU。这对于防止在GPU:0上出现挂起或过度内存使用非常重要。

  • 进程组可以通过TCP(默认)或从共享文件系统初始化。了解更多关于进程组初始化

  • init_process_group 初始化分布式进程组。

  • 了解更多关于选择DDP后端

def ddp_setup(rank: int, world_size: int):
   """
   Args:
       rank: Unique identifier of each process
      world_size: Total number of processes
   """
   os.environ["MASTER_ADDR"] = "localhost"
   os.environ["MASTER_PORT"] = "12355"
   torch.cuda.set_device(rank)
   init_process_group(backend="nccl", rank=rank, world_size=world_size)

构建DDP模型

self.model = DDP(model, device_ids=[gpu_id])

分发输入数据

  • DistributedSampler 将输入数据在所有分布式进程中分块。

  • The DataLoader combines a dataset and a

    采样器,并提供对给定数据集的可迭代访问。

  • 每个进程将接收一个包含32个样本的输入批次;有效批次大小为32 * nprocs,当使用4个GPU时为128。

train_data = torch.utils.data.DataLoader(
    dataset=train_dataset,
    batch_size=32,
    shuffle=False,  # We don't shuffle
    sampler=DistributedSampler(train_dataset), # Use the Distributed Sampler here.
)
  • 在每个epoch开始时调用set_epoch()方法是必要的,以确保在多个epoch中正确地进行洗牌。否则,每个epoch将使用相同的顺序。

def _run_epoch(self, epoch):
    b_sz = len(next(iter(self.train_data))[0])
    self.train_data.sampler.set_epoch(epoch)   # call this additional line at every epoch
    for source, targets in self.train_data:
      ...
      self._run_batch(source, targets)

保存模型检查点

  • 我们只需要从一个进程中保存模型检查点。如果没有这个条件,每个进程都会保存其相同的模式副本。了解更多关于使用DDP保存和加载模型的信息这里

- ckp = self.model.state_dict()
+ ckp = self.model.module.state_dict()
...
...
- if epoch % self.save_every == 0:
+ if self.gpu_id == 0 and epoch % self.save_every == 0:
  self._save_checkpoint(epoch)

警告

Collective calls 是在所有分布式进程上运行的函数,用于将某些状态或值收集到特定进程。集体调用要求所有等级都运行集体代码。在这个例子中,_save_checkpoint 不应该有任何集体调用,因为它只在 rank:0 进程上运行。如果你需要进行任何集体调用,应该在 if self.gpu_id == 0 检查之前进行。

运行分布式训练任务

  • 包括新的参数 rank(替换 device)和 world_size

  • rank 在调用 mp.spawn 时由 DDP 自动分配。

  • world_size 是训练任务中的进程数量。对于GPU训练,这对应于使用的GPU数量,每个进程在一个专用的GPU上工作。

- def main(device, total_epochs, save_every):
+ def main(rank, world_size, total_epochs, save_every):
+  ddp_setup(rank, world_size)
   dataset, model, optimizer = load_train_objs()
   train_data = prepare_dataloader(dataset, batch_size=32)
-  trainer = Trainer(model, train_data, optimizer, device, save_every)
+  trainer = Trainer(model, train_data, optimizer, rank, save_every)
   trainer.train(total_epochs)
+  destroy_process_group()

if __name__ == "__main__":
   import sys
   total_epochs = int(sys.argv[1])
   save_every = int(sys.argv[2])
-  device = 0      # shorthand for cuda:0
-  main(device, total_epochs, save_every)
+  world_size = torch.cuda.device_count()
+  mp.spawn(main, args=(world_size, total_epochs, save_every,), nprocs=world_size)

以下是代码的样子:

进一步阅读

优云智算