• Docs >
  • Distributed Data Parallel
Shortcuts

分布式数据并行

警告

实现 torch.nn.parallel.DistributedDataParallel 随着时间的推移而发展。本设计说明是基于截至v1.4的状态编写的。

torch.nn.parallel.DistributedDataParallel(DDP)透明地执行分布式数据并行训练。本页描述了其工作原理并揭示了实现细节。

示例

让我们从一个简单的 torch.nn.parallel.DistributedDataParallel 示例开始。这个示例使用一个 torch.nn.Linear 作为本地模型,将其与DDP包装,然后运行一次前向传递、一次反向传递和一个优化器步骤在DDP模型上。之后,本地模型上的参数将被更新,并且不同进程上的所有模型应该完全相同。

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDP


def example(rank, world_size):
    # 创建默认进程组
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
    # 创建本地模型
    model = nn.Linear(10, 10).to(rank)
    # 构建DDP模型
    ddp_model = DDP(model, device_ids=[rank])
    # 定义损失函数和优化器
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # 前向传播
    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    # 反向传播
    loss_fn(outputs, labels).backward()
    # 更新参数
    optimizer.step()

def main():
    world_size = 2
    mp.spawn(example,
        args=(world_size,),
        nprocs=world_size,
        join=True)

if __name__=="__main__":
    # 使用c10d的默认“env”初始化模式时需要设置的环境变量
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    main()

DDP 与 TorchDynamo 一起工作。当与 TorchDynamo 一起使用时,在编译模型之前应用 DDP 模型包装器,以便 torchdynamo 可以根据 DDP 桶大小应用 DDPOptimizer(图中断优化)。(有关更多信息,请参阅 TorchDynamo DDPOptimizer。)

ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)

内部设计

本节揭示了通过深入每个步骤的细节,了解torch.nn.parallel.DistributedDataParallel在迭代中的工作原理。

  • 先决条件:DDP依赖于c10d ProcessGroup 进行通信。 因此,应用程序必须在构建DDP之前创建 ProcessGroup 实例。

  • 构造: DDP 构造函数接受一个本地模块的引用,并将排名为 0 的进程中的 state_dict() 广播给组中的所有其他进程,以确保所有模型副本从完全相同的状态开始。然后,每个 DDP 进程创建一个本地的 Reducer,它将在反向传播期间负责梯度同步。为了提高通信效率,Reducer 将参数梯度组织成桶,并一次减少一个桶。桶的大小可以通过在 DDP 构造函数中设置 bucket_cap_mb 参数来配置。参数梯度到桶的映射是在构造时确定的,基于桶大小限制和参数大小。模型参数以(大致)与给定模型的 Model.parameters() 相反的顺序分配到桶中。使用相反顺序的原因是 DDP 期望梯度在反向传播期间以大约该顺序变为就绪状态。下图显示了一个示例。请注意,grad0grad1bucket1 中,而其他两个梯度在 bucket0 中。当然,这种假设可能并不总是成立,当这种情况发生时,可能会影响 DDP 反向传播的速度,因为 Reducer 无法在最早可能的时间启动通信。除了分桶之外,Reducer 还在构造期间注册自动求导钩子,每个参数一个钩子。这些钩子将在梯度变为就绪状态时在反向传播期间触发。

  • 前向传播:DDP 接收输入并将其传递给本地模型,然后如果 find_unused_parameters 设置为 True,则分析本地模型的输出。此模式允许在模型的子图上进行反向传播,DDP 通过从模型输出遍历自动求导图来确定哪些参数参与了反向传播,并将所有未使用的参数标记为准备进行规约。在反向传播过程中,Reducer 只会等待未准备好的参数,但它仍会规约所有桶。将参数梯度标记为准备状态目前不会帮助 DDP 跳过桶,但它会防止 DDP 在反向传播过程中永远等待缺失的梯度。请注意,遍历自动求导图会引入额外的开销,因此应用程序应仅在必要时将 find_unused_parameters 设置为 True

  • 反向传播:在损失Tensor上直接调用backward()函数,这一过程超出了DDP的控制范围,DDP通过在构造时注册的autograd钩子来触发梯度同步。当一个梯度准备就绪时,其对应的梯度累加器上的DDP钩子将被触发,然后DDP将标记该参数梯度为准备进行缩减。当一个桶中的所有梯度都准备就绪时,Reducer将对该桶启动一个异步的allreduce操作,以计算所有进程中梯度的平均值。当所有桶都准备就绪时,Reducer将阻塞等待所有allreduce操作完成。完成后,平均梯度将被写入所有参数的param.grad字段。因此,在反向传播之后,不同DDP进程中相同参数的grad字段应该是相同的。

  • 优化器步骤:从优化器的角度来看,它正在优化一个局部模型。所有DDP进程上的模型副本可以保持同步,因为它们都从相同的状态开始,并且在每次迭代中都具有相同的平均梯度。

ddp_grad_sync.png

注意

DDP 要求所有进程上的 Reducer 实例以完全相同的顺序调用 allreduce, 这是通过始终按照桶索引顺序而不是实际的桶就绪顺序来运行 allreduce 来实现的。 进程间 allreduce 顺序不匹配可能导致错误结果或 DDP 反向传播挂起。

实现

以下是指向DDP实现组件的指针。堆叠图显示了代码的结构。

进程组

  • ProcessGroup.hpp: 包含所有进程组实现的抽象API。c10d 库提供了3种开箱即用的实现,即, ProcessGroupGloo, ProcessGroupNCCL, 和 ProcessGroupMPIDistributedDataParallel 使用 ProcessGroup::broadcast() 在初始化期间将模型状态从排名为0的进程发送到其他进程,并使用 ProcessGroup::allreduce() 来求和梯度。

  • Store.hpp: 协助进程组实例的会合服务以找到彼此。

分布式数据并行

  • distributed.py: 是DDP的Python入口点。它实现了初始化步骤和 forward函数,用于nn.parallel.DistributedDataParallel 模块,该模块调用C++库。其_sync_param函数执行 当一个DDP进程在多个设备上工作时的进程内参数同步,并且它还将模型缓冲区从排名为0的进程广播到 所有其他进程。进程间参数同步发生在 Reducer.cpp中。

  • comm.h: 实现了合并广播辅助函数,该函数在初始化期间被调用以广播模型状态,并在前向传递之前同步模型缓冲区。

  • reducer.h: 提供了反向传播中梯度同步的核心实现。它有三个入口点函数:

    • Reducer: 构造函数在 distributed.py 中被调用,该函数将 Reducer::autograd_hook() 注册到梯度累加器中。

    • autograd_hook() 函数将在梯度准备好时由自动求导引擎调用。

    • prepare_for_backward()distributed.py 中的 DDP 前向传播结束时被调用。当在 DDP 构造函数中将 find_unused_parameters 设置为 True 时,它会遍历自动求导图以查找未使用的参数。

ddp_code.png

TorchDynamo DDPOptimizer

DDP的性能优势来自于在反向传播过程中重叠allreduce集合操作与计算。 当与TorchDynamo一起使用AotAutograd进行编译整个前向和整个反向图时, 由于allreduce操作是在整个优化的反向传播计算完成后由autograd钩子启动的,因此这种重叠被阻止了。

TorchDynamo 的 DDPOptimizer 通过在反向传播过程中在 DDP 的 allreduce 桶的逻辑边界处中断前向图来提供帮助。注意:目标是中断反向传播过程中的图,最简单的实现是中断前向图,然后对每个部分调用 AotAutograd 和编译。这允许 DDP 的 allreduce 钩子在反向传播的各个部分之间触发,并安排通信与计算重叠。

有关更深入的解释和实验结果,请参阅这篇博客文章,或者阅读文档和代码位于 torch/_dynamo/optimizations/distributed.py

要调试 DDPOptimizer,请设置 TORCH_LOGS=’ddp_graphs’ 以获取完整的图形转储。对于没有图形的日志,请将 ‘dynamo’、‘distributed’ 或 ‘dist_ddp’ 添加到 TORCH_LOGS 中(以获取有关桶边界的基本信息)。要禁用 DDPOptimizer,请设置 torch._dynamo.config.optimize_ddp=False。 DDP 和 TorchDynamo 在没有 DDPOptimizer 的情况下仍应正常工作,但性能会有所下降。