Shortcuts

分布式数据并行

class torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=25, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False, static_graph=False, delay_all_reduce_named_params=None, param_to_hook_all_reduce=None, mixed_precision=None, device_mesh=None)[源代码]

在模块级别基于 torch.distributed 实现分布式数据并行。

此容器通过在每个模型副本之间同步梯度来提供数据并行性。要同步的设备由输入 process_group 指定,默认情况下是整个世界。请注意,DistributedDataParallel 不会对输入进行分块或分片到参与的 GPU 上;用户负责定义如何执行此操作,例如通过使用 DistributedSampler

参见: 基础使用 nn.parallel.DistributedDataParallel 代替 multiprocessing 或 nn.DataParallel. 与 torch.nn.DataParallel 中的输入约束相同。

创建此类需要先通过调用torch.distributed.init_process_group()来初始化torch.distributed

DistributedDataParallel 已被证明比 torch.nn.DataParallel 在单节点多GPU数据并行训练中显著更快。

要在拥有 N 个 GPU 的主机上使用 DistributedDataParallel,您应该启动 N 个进程,确保每个进程仅在单个 从 0 到 N-1 的 GPU 上工作。这可以通过为每个进程设置 CUDA_VISIBLE_DEVICES 或通过调用以下命令来完成:

>>> torch.cuda.set_device(i)

其中 i 从 0 到 N-1。在每个过程中,您应参考以下内容来构建此模块:

>>> torch.distributed.init_process_group(
>>>     backend='nccl', world_size=N, init_method='...'
>>> )
>>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)

为了在每个节点上启动多个进程,您可以使用torch.distributed.launchtorch.multiprocessing.spawn

注意

请参阅PyTorch分布式概述 以简要了解与分布式训练相关的所有功能。

注意

DistributedDataParallel 可以与 torch.distributed.optim.ZeroRedundancyOptimizer 结合使用,以减少每个等级的优化器状态的内存占用。请参阅 ZeroRedundancyOptimizer 配方 了解更多详情。

注意

nccl 后端是目前使用GPU时最快且强烈推荐的后端。这适用于单节点和多节点分布式训练。

注意

此模块还支持混合精度分布式训练。 这意味着您的模型可以具有不同类型的参数,例如混合类型的 fp16fp32,这些混合类型参数的梯度缩减将正常工作。

注意

如果你在一个进程中使用torch.save来保存模块的检查点,并在其他进程中使用torch.load来恢复它,请确保为每个进程正确配置map_location。如果没有map_locationtorch.load会将模块恢复到它保存时的设备上。

注意

当一个模型在M个节点上训练,并且batch=N时,与在单个节点上训练的相同模型相比,梯度将缩小M倍,如果损失是在批次中的实例上求和的(而不是像通常那样平均),因为不同节点之间的梯度是平均的。当你想要获得与本地训练过程在数学上等效的训练过程时,你应该考虑这一点。但在大多数情况下,你可以将一个被DistributedDataParallel包装的模型、一个被DataParallel包装的模型和一个在单个GPU上的普通模型视为相同的(例如,为等效的批次大小使用相同的学习率)。

注意

参数从不在进程之间广播。该模块对梯度执行全归约步骤,并假设它们将由优化器在所有进程中以相同的方式进行修改。缓冲区(例如,BatchNorm 统计数据)从排名为 0 的进程中的模块广播到系统中的所有其他副本,在每次迭代中进行广播。

注意

如果你在使用 DistributedDataParallel 与 分布式 RPC 框架,你应该始终使用 torch.distributed.autograd.backward() 来计算梯度,并使用 torch.distributed.optim.DistributedOptimizer 来优化参数。

示例:

>>> import torch.distributed.autograd as dist_autograd
>>> from torch.nn.parallel import DistributedDataParallel as DDP
>>> import torch
>>> from torch import optim
>>> from torch.distributed.optim import DistributedOptimizer
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>>
>>> t1 = torch.rand((3, 3), requires_grad=True)
>>> t2 = torch.rand((3, 3), requires_grad=True)
>>> rref = rpc.remote("worker1", torch.add, args=(t1, t2))
>>> ddp_model = DDP(my_model)
>>>
>>> # 设置优化器
>>> optimizer_params = [rref]
>>> for param in ddp_model.parameters():
>>>     optimizer_params.append(RRef(param))
>>>
>>> dist_optim = DistributedOptimizer(
>>>     optim.SGD,
>>>     optimizer_params,
>>>     lr=0.05,
>>> )
>>>
>>> with dist_autograd.context() as context_id:
>>>     pred = ddp_model(rref.to_here())
>>>     loss = loss_func(pred, target)
>>>     dist_autograd.backward(context_id, [loss])
>>>     dist_optim.step(context_id)

注意

DistributedDataParallel 目前对梯度检查点功能的支持有限,该功能通过 torch.utils.checkpoint() 实现。如果使用 use_reentrant=False(推荐)进行检查点操作,DDP 将按预期工作,没有任何限制。然而,如果使用 use_reentrant=True(默认值)进行检查点操作,当模型中没有未使用的参数且每个层最多被检查点一次时(确保你没有传递 find_unused_parameters=True 给 DDP),DDP 将按预期工作。我们目前不支持一个层被多次检查点的情况,也不支持检查点模型中存在未使用参数的情况。

注意

要让非DDP模型加载DDP模型的状态字典, consume_prefix_in_state_dict_if_present() 需要在加载之前应用于去除DDP状态字典中的前缀“module.”。

警告

构造函数、前向方法以及输出的微分(或该模块输出的函数)是分布式同步点。如果不同的进程可能执行不同的代码,请考虑这一点。

警告

本模块假设所有参数在模型创建时都已注册。之后不应添加或删除任何参数。同样适用于缓冲区。

警告

本模块假设所有参数在每个分布式进程的模型中都按相同的顺序注册。模块本身将按照模型注册参数的相反顺序执行梯度allreduce。换句话说,确保每个分布式进程具有完全相同的模型,从而具有完全相同的参数注册顺序,是用户的责任。

警告

此模块允许具有非行优先连续步长的参数。 例如,您的模型可能包含一些参数,其 torch.memory_formattorch.contiguous_format 而其他参数的格式为 torch.channels_last。 然而, 不同进程中的相应参数必须具有相同的步长。

警告

此模块不与 torch.autograd.grad() 一起工作(即,它仅在梯度被累积到参数的 .grad 属性中时才有效)。

警告

如果你计划将此模块与使用nccl后端或使用Infiniband的gloo后端一起使用,并且与使用多个工作线程的DataLoader一起使用,请将多进程启动方法更改为forkserver(仅限Python 3)或spawn。不幸的是,使用Infiniband的Gloo和NCCL2不是fork安全的,如果你不更改此设置,可能会遇到死锁。

警告

在将模型封装为 DistributedDataParallel 后,您永远不应尝试更改模型的参数。因为,当使用 DistributedDataParallel 封装模型时,DistributedDataParallel 的构造函数会在构造时为模型本身的所有参数注册额外的梯度缩减函数。如果您随后更改模型的参数,梯度缩减函数将不再与正确的参数集匹配。

警告

使用 DistributedDataParallel分布式 RPC 框架 是实验性的,可能会发生变化。

Parameters
  • 模块 (模块) – 要并行化的模块

  • device_ids (列表 of 整数 or torch.device) –

    CUDA 设备。 1) 对于单设备模块,device_ids 可以 包含恰好一个设备 ID,表示唯一 CUDA 设备,该设备是与此进程对应的输入模块所在的位置。 或者,device_ids 也可以是 None。 2) 对于多设备模块和 CPU 模块, device_ids 必须为 None

    device_ids 在两种情况下都为 None 时, 前向传递的输入数据和实际模块 必须放置在正确的设备上。 (默认值: None)

  • output_device (inttorch.device) – 单设备 CUDA 模块的输出设备位置。对于多设备模块和 CPU 模块,它必须为 None,并且模块本身决定输出位置。(默认值:device_ids[0] 对于单设备模块)

  • broadcast_buffers (bool) – 标志,用于在forward函数开始时启用模块缓冲区的同步(广播)。(默认值: True)

  • process_group – 用于分布式数据全归约的进程组。如果 None,将使用默认的进程组,该进程组由 torch.distributed.init_process_group() 创建。(默认值:None

  • bucket_cap_mbDistributedDataParallel 会将参数分组到多个桶中,以便每个桶的梯度减少可以潜在地与反向计算重叠。 bucket_cap_mb 控制桶的大小,单位为兆字节(MB)。(默认值:25)

  • find_unused_parameters (bool) – 从包装模块的 forward 函数返回值中包含的所有张量开始遍历 autograd 图。在此图中未接收到梯度的参数会被预先标记为准备减少。此外,可能在包装模块的 forward 函数中使用过但未参与损失计算且因此也不会接收到梯度的参数也会被预先标记为准备减少。(默认值: False)

  • check_reduction – 此参数已弃用。

  • gradient_as_bucket_view (bool) – 当设置为 True 时,梯度将是视图,指向 allreduce 通信桶的不同偏移量。这可以减少峰值内存使用,节省的内存大小将等于总梯度大小。此外,它避免了梯度与 allreduce 通信桶之间的复制开销。当梯度是视图时,不能对梯度调用 detach_()。如果遇到此类错误,请参考 zero_grad() 函数在 torch/optim/optimizer.py 中作为解决方案进行修复。请注意,梯度将在第一次迭代后成为视图,因此应在第一次迭代后检查峰值内存节省情况。

  • static_graph (bool) –

    当设置为True时,DDP知道训练的图是静态的。静态图意味着1) 在整个训练循环中,已使用和未使用的参数集不会改变;在这种情况下,用户是否设置find_unused_parameters = True并不重要。2) 在整个训练循环中,图的训练方式不会改变(意味着没有依赖于迭代的控制流)。 当static_graph设置为True时,DDP将支持过去无法支持的情况: 1) 可重入的反向传播。 2) 多次激活检查点。 3) 当模型有未使用的参数时进行激活检查点。 4) 模型参数位于前向函数之外。 5) 当存在未使用的参数时,可能会提高性能,因为当static_graph设置为True时,DDP不会在每次迭代中搜索图来检测未使用的参数。 要检查是否可以将static_graph设置为True,一种方法是检查前一次模型训练结束时的ddp日志数据,如果ddp_logging_data.get("can_set_static_graph") == True,那么你也可以将static_graph = True

    示例::
    >>> model_DDP = torch.nn.parallel.DistributedDataParallel(model)
    >>> # 训练循环
    >>> ...
    >>> ddp_logging_data = model_DDP._get_ddp_logging_data()
    >>> static_graph = ddp_logging_data.get("can_set_static_graph")
    

  • delay_all_reduce_named_params (列表元组字符串和torch.nn.Parameter) – 一个列表 包含的命名参数,当指定在param_to_hook_all_reduce中的参数的梯度准备好时,这些参数的全部减少操作将被延迟。DDP的其他参数不适用于在此参数中指定的命名参数,因为这些命名参数将被DDP reducer忽略。

  • param_to_hook_all_reduce (torch.nn.Parameter) – 一个用于挂钩延迟所有减少的参数,参数在 delay_all_reduce_named_params 中指定。

Variables

模块 (模块) – 要并行化的模块。

示例:

>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> net = torch.nn.parallel.DistributedDataParallel(model)
join(divide_by_initial_world_size=True, enable=True, throw_on_early_termination=False)[源代码]

用于在DDP中跨进程处理不均匀输入的训练上下文管理器。

此上下文管理器将跟踪已加入的DDP进程,并通过插入集体通信操作来“影子”前向和后向传递,以匹配由未加入的DDP进程创建的操作。这将确保每个集体调用都有一个由已加入的DDP进程进行的相应调用,从而防止在使用不均匀输入进行训练时出现挂起或错误。或者,如果将标志throw_on_early_termination指定为True,则当某个进程耗尽输入时,所有训练器将抛出一个错误,允许根据应用程序逻辑捕获和处理这些错误。

一旦所有DDP进程都已加入,上下文管理器将把与最后加入的进程对应的模型广播到所有进程,以确保模型在所有进程中是相同的(这是由DDP保证的)。

要使用此功能在跨进程的不均匀输入上进行训练,只需将此上下文管理器包裹在您的训练循环周围。无需对模型或数据加载进行进一步修改。

警告

如果此上下文管理器所包裹的模型或训练循环中包含额外的分布式集体操作,例如模型前向传递中的SyncBatchNorm,则必须启用throw_on_early_termination标志。这是因为此上下文管理器无法感知非DDP集体通信。此标志将导致当任何一个进程耗尽输入时,所有进程都会抛出异常,从而允许在所有进程中捕获和恢复这些错误。

Parameters
  • divide_by_initial_world_size (bool) – 如果为True,将把梯度除以DDP训练启动时的初始world_size。如果为False,将计算有效的world size(尚未耗尽输入的rank数量)并在allreduce期间将梯度除以该值。设置divide_by_initial_world_size=True以确保每个输入样本(包括不均匀的输入)在全局梯度中的贡献权重相等。这是通过始终将梯度除以初始world_size来实现的,即使在遇到不均匀输入时也是如此。如果将此设置为False,我们将梯度除以剩余的节点数量。这确保了与在较小的world_size上训练的等价性,尽管这也意味着不均匀的输入将对全局梯度贡献更多。通常,您会希望在训练作业的最后几个输入不均匀的情况下将此设置为True。在极端情况下,输入数量差异较大时,将此设置为False可能会提供更好的结果。

  • 启用 (布尔值) – 是否启用不均匀输入检测。传入 enable=False 以在您知道输入在参与进程中均匀分布的情况下禁用此功能。默认值为 True

  • throw_on_early_termination (bool) – 当至少一个rank耗尽输入时,是否抛出错误或继续训练。如果True,将在第一个rank到达数据末尾时抛出错误。如果False,将继续训练,有效世界大小将变小,直到所有rank都加入。请注意,如果指定了此标志,则标志divide_by_initial_world_size将被忽略。默认值为False

示例:

>>> import torch
>>> import torch.distributed as dist
>>> import os
>>> import torch.multiprocessing as mp
>>> import torch.nn as nn
>>> # 在每个生成的worker上
>>> def worker(rank):
>>>     dist.init_process_group("nccl", rank=rank, world_size=2)
>>>     torch.cuda.set_device(rank)
>>>     model = nn.Linear(1, 1, bias=False).to(rank)
>>>     model = torch.nn.parallel.DistributedDataParallel(
>>>         model, device_ids=[rank], output_device=rank
>>>     )
>>>     # 等级1比等级0多一个输入。
>>>     inputs = [torch.tensor([1]).float() for _ in range(10 + rank)]
>>>     with model.join():
>>>         for _ in range(5):
>>>             for inp in inputs:
>>>                 loss = model(inp).sum()
>>>                 loss.backward()
>>>     # 如果没有join() API,下面的同步将会挂起
>>>     # 阻塞等待等级1的allreduce完成。
>>>     torch.cuda.synchronize(device=rank)
join_hook(**kwargs)[源代码]

DDP join hook 通过在前向和后向传播中镜像通信,使得在不均匀输入上进行训练成为可能。

Parameters

kwargs (字典) – 一个包含任何关键字参数的字典,用于在运行时修改连接钩子的行为;所有共享相同连接上下文管理器的Joinable实例都会接收到相同的kwargs值。

The hook supports the following keyword arguments:
divide_by_initial_world_size (bool, optional):

如果 True,则梯度将除以DDP启动时的初始世界大小。 如果 False,则梯度将除以有效世界大小(即非加入进程的数量),这意味着不均匀的输入对全局梯度的贡献更大。 通常,如果不均匀程度较小,则应将其设置为 True,但在极端情况下可以设置为 False 以可能获得更好的结果。 默认值为 True

no_sync()[源代码]

上下文管理器,用于在DDP进程之间禁用梯度同步。

在此上下文中,梯度将累积在模块变量上,这些变量将在第一次前向-后向传递退出上下文时同步。

示例:

>>> ddp = torch.nn.parallel.DistributedDataParallel(model, pg)
>>> with ddp.no_sync():
>>>     for input in inputs:
>>>         ddp(input).backward()  # 不同步,累积梯度
>>> ddp(another_input).backward()  # 同步梯度

警告

前向传播应该包含在上下文管理器中,否则梯度仍然会被同步。

register_comm_hook(state, hook)[源代码]

为跨多个工作节点的用户定义DDP梯度聚合注册通信钩子。

这个钩子对研究人员尝试新想法非常有用。例如,这个钩子可以用于实现几种算法,如GossipGrad和梯度压缩,这些算法在运行分布式数据并行训练时涉及不同的参数同步通信策略。

Parameters
  • 状态 (对象) –

    传递给钩子以在训练过程中维护任何状态信息。 示例包括梯度压缩中的错误反馈, GossipGrad中下一个要通信的对等方等。

    它由每个工作节点本地存储, 并由工作节点上的所有梯度张量共享。

  • 钩子 (可调用对象) –

    具有以下签名的可调用对象: hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]:

    一旦桶准备就绪,就会调用此函数。钩子可以执行所需的任何处理,并返回一个表示异步工作(例如:allreduce)完成的Future。如果钩子不执行任何通信,它仍然必须返回一个已完成的Future。Future应包含梯度桶张量的新值。一旦桶准备就绪,c10d reducer将调用此钩子,并使用Future返回的张量将梯度复制到各个参数中。请注意,Future的返回类型必须是一个单一的张量。

    我们还提供了一个名为get_future的API,用于检索与c10d.ProcessGroup.Work完成相关联的Future。get_future目前支持NCCL,并且也支持GLOO和MPI上的大多数操作,除了点对点操作(send/recv)。

警告

梯度桶的张量将不会被world_size预先分割。用户需要在像allreduce这样的操作中负责除以world_size。

警告

DDP通信钩子只能注册一次,并且应该在调用反向传播之前注册。

警告

hook 返回的 Future 对象应包含一个与 grad bucket 内张量形状相同的张量。

警告

get_future API 支持 NCCL,并且部分支持 GLOO 和 MPI 后端(不支持点对点操作,如 send/recv),并将返回一个 torch.futures.Future

Example::

下面是一个返回相同张量的noop钩子的示例。

>>> def noop(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]:
>>>     fut = torch.futures.Future()
>>>     fut.set_result(bucket.buffer())
>>>     return fut
>>> ddp.register_comm_hook(state=None, hook=noop)
Example::

下面是一个并行SGD算法的示例,其中梯度在allreduce之前进行编码,然后在allreduce之后进行解码。

>>> def encode_and_decode(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]:
>>>     encoded_tensor = encode(bucket.buffer())  # 编码梯度
>>>     fut = torch.distributed.all_reduce(encoded_tensor).get_future()
>>>     # 定义解码的回调函数
>>>     def decode(fut):
>>>         decoded_tensor = decode(fut.value()[0])  # 解码梯度
>>>         return decoded_tensor
>>>     return fut.then(decode)
>>> ddp.register_comm_hook(state=None, hook=encode_and_decode)