• Docs >
  • Distributed Optimizers
Shortcuts

分布式优化器

警告

当前不支持在使用 CUDA 张量时使用分布式优化器

torch.distributed.optim 提供了 DistributedOptimizer,它接受一个远程参数列表(RRef)并在参数所在的本地工作节点上运行优化器。分布式优化器可以使用任何本地优化器的 基类 来在每个工作节点上应用梯度。

class torch.distributed.optim.DistributedOptimizer(optimizer_class, params_rref, *args, **kwargs)[源代码]

DistributedOptimizer 获取分散在各个工作节点上的参数的远程引用,并在本地为每个参数应用给定的优化器。

此类使用 get_gradients() 来检索特定参数的梯度。

step() 的并发调用,无论是来自相同的还是不同的客户端,都将在每个工作节点上序列化执行——因为每个工作节点的优化器一次只能处理一组梯度。然而,并不能保证完整的正向-反向-优化器序列会为单个客户端依次执行。这意味着正在应用的梯度可能并不对应于在给定工作节点上执行的最新前向传播。此外,跨工作节点之间没有保证的顺序。

DistributedOptimizer 默认情况下使用启用了 TorchScript 的本地优化器创建,以便在多线程训练(例如分布式模型并行)的情况下,优化器更新不会被 Python 全局解释器锁(GIL)阻塞。此功能目前适用于大多数优化器。您还可以按照 PyTorch 教程中的 配方 为您的自定义优化器启用 TorchScript 支持。

Parameters
  • optimizer_class (optim.Optimizer) – 在每个工作线程上实例化的优化器类。

  • params_rref (list[RRef]) – 要优化的本地或远程参数的RRef列表。

  • args – 传递给每个工作节点上优化器构造函数的参数。

  • kwargs – 传递给每个工作节点上优化器构造函数的参数。

Example::
>>> import torch.distributed.autograd as dist_autograd
>>> import torch.distributed.rpc as rpc
>>> from torch import optim
>>> from torch.distributed.optim import DistributedOptimizer
>>>
>>> with dist_autograd.context() as context_id:
>>>   # 前向传播。
>>>   rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>>   rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>>   loss = rref1.to_here() + rref2.to_here()
>>>
>>>   # 反向传播。
>>>   dist_autograd.backward(context_id, [loss.sum()])
>>>
>>>   # 优化器。
>>>   dist_optim = DistributedOptimizer(
>>>      optim.SGD,
>>>      [rref1, rref2],
>>>      lr=0.05,
>>>   )
>>>   dist_optim.step(context_id)
step(context_id)[源代码]

执行单个优化步骤。

这将在每个包含需要优化的参数的工作节点上调用 torch.optim.Optimizer.step(),并会阻塞直到所有工作节点返回。提供的 context_id 将用于检索对应的 context,其中包含应应用于参数的梯度。

Parameters

context_id – 我们应该运行优化器步骤的自动微分上下文ID。

class torch.distributed.optim.PostLocalSGDOptimizer(optim, averager)[源代码]

包装一个任意的 torch.optim.Optimizer 并运行 post-local SGD, 该优化器在每一步运行本地优化器。 在预热阶段之后,它在应用本地优化器之后定期平均参数。

Parameters
  • optim (优化器) – 本地优化器。

  • averager (ModelAverager) – 一个用于运行post-localSGD算法的模型平均实例。

示例:

>>> import torch
>>> import torch.distributed as dist
>>> import torch.distributed.algorithms.model_averaging.averagers as averagers
>>> import torch.nn as nn
>>> from torch.distributed.optim import PostLocalSGDOptimizer
>>> from torch.distributed.algorithms.ddp_comm_hooks.post_localSGD_hook import (
>>>   PostLocalSGDState,
>>>   post_localSGD_hook,
>>> )
>>>
>>> model = nn.parallel.DistributedDataParallel(
>>>    module, device_ids=[rank], output_device=rank
>>> )
>>>
>>> # 注册一个post-localSGD通信钩子。
>>> state = PostLocalSGDState(process_group=None, subgroup=None, start_localSGD_iter=100)
>>> model.register_comm_hook(state, post_localSGD_hook)
>>>
>>> # 创建一个post-localSGD优化器,该优化器包装了一个本地优化器。
>>> # 注意,``PostLocalSGDOptimizer``中使用的``warmup_steps``必须与``PostLocalSGDState``中使用的``start_localSGD_iter``相同。
>>> local_optim = torch.optim.SGD(params=model.parameters(), lr=0.01)
>>> opt = PostLocalSGDOptimizer(
>>>     optim=local_optim,
>>>     averager=averagers.PeriodicModelAverager(period=4, warmup_steps=100)
>>> )
>>>
>>> # 在前100步中,DDP在每一步运行全局梯度平均。
>>> # 在100步之后,DDP在每个子组内(默认情况下是节点内)运行梯度平均,
>>> # 并且在应用本地优化器之后,post-localSGD优化器每4步运行一次全局模型平均。
>>> for step in range(0, 200):
>>>    opt.zero_grad()
>>>    loss = loss_fn(output, labels)
>>>    loss.backward()
>>>    opt.step()
load_state_dict(state_dict)[源代码]

这与 torch.optim.Optimizerload_state_dict() 相同, 但还会将模型平均器的步数值恢复为 保存在提供的 state_dict 中的值。

如果在state_dict中没有"step"条目, 它将发出警告并将模型平均器的步骤初始化为0。

state_dict()[源代码]

这与 torch.optim.Optimizer state_dict() 相同, 但会在检查点中添加一个额外的条目以记录模型平均器的步骤, 以确保重新加载不会导致不必要的再次预热。

step()[源代码]

执行单个优化步骤(参数更新)。

class torch.distributed.optim.ZeroRedundancyOptimizer(params, optimizer_class, process_group=None, parameters_as_bucket_view=False, overlap_with_ddp=False, **defaults)[源代码]

将任意的 optim.Optimizer 包装起来,并在组内的各个等级之间分片其状态。

共享方式如ZeRO所述。

每个rank中的本地优化器实例仅负责更新大约1 / world_size的参数,因此只需要保持1 / world_size的优化器状态。在本地更新参数后,每个rank将广播其参数给所有其他对等节点,以保持所有模型副本处于相同状态。ZeroRedundancyOptimizer可以与torch.nn.parallel.DistributedDataParallel结合使用,以减少每个rank的峰值内存消耗。

ZeroRedundancyOptimizer 使用一种排序贪婪算法在每个等级中打包多个参数。每个参数属于一个单一的等级,并且不会在等级之间分割。分区是任意的,可能与参数注册或使用顺序不匹配。

Parameters

参数 (Iterable) – 一个包含 torch.Tensordict 的可迭代对象,这些参数将在各个rank之间进行分片。

Keyword Arguments
  • optimizer_class (torch.nn.Optimizer) – 本地优化器的类。

  • process_group (ProcessGroup, 可选) – torch.distributed ProcessGroup (默认值: dist.group.WORLDtorch.distributed.init_process_group() 初始化).

  • parameters_as_bucket_view (bool, 可选) – 如果True,参数被打包到桶中以加快通信速度,并且param.data字段指向不同偏移量的桶视图;如果False,每个单独的参数分别进行通信,并且每个params.data保持完整(默认值:False)。

  • overlap_with_ddp (布尔值, 可选) – 如果Truestep()DistributedDataParallel的梯度同步重叠;这需要(1)optimizer_class参数的功能优化器或具有功能等效的优化器,以及(2)从ddp_zero_hook.py中的函数之一构造的DDP通信钩子的注册;参数被打包到与DistributedDataParallel中的桶匹配的桶中,这意味着parameters_as_bucket_view参数被忽略。如果Falsestep()在反向传播后独立运行(按常规)。 (默认值:False

  • **defaults – 任何尾随参数,这些参数会被转发到本地优化器。

示例:

>>> import torch.nn as nn
>>> from torch.distributed.optim import ZeroRedundancyOptimizer
>>> from torch.nn.parallel import DistributedDataParallel as DDP
>>> model = nn.Sequential(*[nn.Linear(2000, 2000).to(rank) for _ in range(20)])
>>> ddp = DDP(model, device_ids=[rank])
>>> opt = ZeroRedundancyOptimizer(
>>>     ddp.parameters(),
>>>     optimizer_class=torch.optim.Adam,
>>>     lr=0.01
>>> )
>>> ddp(inputs).sum().backward()
>>> opt.step()

警告

目前,ZeroRedundancyOptimizer 要求所有传入的参数都是相同的密集类型。

警告

如果你传递 overlap_with_ddp=True,请注意以下事项:鉴于当前 DistributedDataParallelZeroRedundancyOptimizer 重叠的实现方式,在前两到三次训练迭代中,优化器步骤不会执行参数更新,具体取决于 static_graph=False 还是 static_graph=True。这是因为需要关于 DistributedDataParallel 使用的梯度分桶策略的信息,该信息在 static_graph=False 时在第二次前向传播时才最终确定,而在 static_graph=True 时在第三次前向传播时才最终确定。为了调整这一点,一个选项是预先添加虚拟输入。

警告

ZeroRedundancyOptimizer 是实验性的,可能会发生变化。

add_param_group(param_group)[源代码]

将参数组添加到 Optimizerparam_groups

这在微调预训练网络时非常有用,因为冻结的层可以变为可训练的,并随着训练的进行添加到优化器中。

Parameters

param_group (字典) – 指定要优化的参数和组特定的优化选项。

警告

此方法处理更新所有分区上的分片,但需要在所有rank上调用。如果在部分rank上调用此方法,将导致训练挂起,因为通信原语是根据托管参数调用的,并期望所有rank在同一组参数上参与。

consolidate_state_dict(to=0)[源代码]

在目标rank上合并一个包含state_dict的列表(每个rank一个)。

Parameters

(int) – 接收优化器状态的等级(默认值:0)。

Raises

RuntimeError – 如果 overlap_with_ddp=True 并且在此方法被调用之前,此 ZeroRedundancyOptimizer 实例尚未完全初始化,这种情况发生在 DistributedDataParallel 梯度桶重建之后。

警告

这需要在所有rank上调用。

property join_device: 设备

返回默认设备。

join_hook(**kwargs)[源代码]

返回 ZeRO 连接钩子。

它通过在优化器步骤中隐藏集体通信来实现对不均匀输入的训练。

梯度必须在此钩子被调用之前正确设置。

Parameters

kwargs (字典) – 一个包含任何关键字参数的字典,用于在运行时修改连接钩子的行为;所有共享相同连接上下文管理器的Joinable实例都会接收到相同的kwargs值。

此钩子不支持任何关键字参数;即 kwargs 未使用。

property join_process_group: Any

返回进程组。

load_state_dict(state_dict)[源代码]

从输入的 state_dict 中加载与给定等级相关的状态,并根据需要更新本地优化器。

Parameters

state_dict (字典) – 优化器状态;应为调用 state_dict() 返回的对象。

Raises

RuntimeError – 如果 overlap_with_ddp=True 并且在此方法被调用之前,此 ZeroRedundancyOptimizer 实例尚未完全初始化,这种情况发生在 DistributedDataParallel 梯度桶重建之后。

state_dict()[源代码]

返回此rank已知的最后一个全局优化器状态。

Raises

RuntimeError – 如果 overlap_with_ddp=True 并且在此方法被调用之前,此 ZeroRedundancyOptimizer 实例尚未完全初始化,这种情况发生在 DistributedDataParallel 梯度桶重建之后;或者如果在此方法被调用之前没有调用 consolidate_state_dict()

Return type

字典[字符串, 任意]

step(closure=None, **kwargs)[源代码]

执行一次优化器步骤,并在所有等级之间同步参数。

Parameters

闭包 (可调用对象) – 一个重新评估模型并返回损失的闭包;大多数优化器可选。

Returns

可选的损失函数,取决于底层局部优化器。

Return type

可选[浮点数]