• Docs >
  • DDP Communication Hooks
Shortcuts

DDP 通信钩子

DDP通信钩子是一个通用接口,用于通过覆盖DistributedDataParallel中的vanilla allreduce来控制如何在工作者之间传递梯度。提供了一些内置的通信钩子,用户可以轻松应用这些钩子来优化通信。此外,钩子接口还可以支持用户定义的通信策略,以满足更高级的使用场景。

如何使用通信钩子?

要使用通信钩子,用户只需在训练循环之前让DDP模型注册钩子,如下所示。

torch.nn.parallel.DistributedDataParallel.register_comm_hook()

通信钩子操作在什么上?

通信钩子提供了一种灵活的方式来进行梯度allreduce操作。因此,它主要在每个副本上的梯度进行allreduce之前操作,这些梯度被分桶以增加通信和计算之间的重叠。特别是,torch.distributed.GradBucket表示一个待进行allreduce的梯度张量桶。

class torch.distributed.GradBucket

此类主要传递一个扁平化的梯度张量 (由 buffer() 返回) 给 DDP 通信钩子。 该张量可以进一步分解为该桶中的每个参数张量列表 (由 get_per_parameter_tensors() 返回) 以应用逐层操作。

torch.distributed.GradBucket.index(self: torch._C._distributed_c10d.GradBucket) int

警告

由于在第一次迭代后会重建桶,因此在训练开始时不应依赖索引。

Returns

存储几个连续层梯度的桶的索引。 所有梯度都被分桶。

torch.distributed.GradBucket.buffer(self: torch._C._distributed_c10d.GradBucket) torch.Tensor
Returns

一个展平的1D torch.Tensor 缓冲区, 可以进一步分解为该桶中每个参数的张量列表。

torch.distributed.GradBucket.gradients(self: torch._C._distributed_c10d.GradBucket) List[torch.Tensor]
Returns

一个包含 torch.Tensor 的列表。列表中的每个张量对应一个梯度。

torch.distributed.GradBucket.is_last(self: torch._C._distributed_c10d.GradBucket) bool
Returns

这个桶是否是迭代中最后一个进行allreduce的桶。这也意味着这个桶对应于前向传播中的前几层。

torch.distributed.GradBucket.set_buffer(self: torch._C._distributed_c10d.GradBucket, buffer: torch.Tensor) None

将桶中的张量替换为输入的张量缓冲区。

torch.distributed.GradBucket.parameters(self: torch._C._distributed_c10d.GradBucket) List[torch.Tensor]
Returns

一个包含 torch.Tensor 的列表。列表中的每个张量对应于一个模型参数。

默认通信钩子

默认的通信钩子是简单的无状态钩子,因此在register_comm_hook中的输入状态要么是一个进程组,要么是None。输入的bucket是一个torch.distributed.GradBucket对象。

torch.distributed.algorithms.ddp_comm_hooks.default_hooks.allreduce_hook(process_group, bucket)[源代码]

使用 GradBucket 张量调用 allreduce

一旦梯度张量在所有工作节点上聚合,其 then 回调函数会取平均值并返回结果。

如果用户注册了这个DDP通信钩子, DDP结果预期与未注册钩子的情况相同。 因此,这不会改变DDP的行为,用户可以将其作为参考 或修改此钩子以记录有用信息或用于其他目的,同时不影响DDP行为。

Example::
>>> ddp_model.register_comm_hook(process_group, allreduce_hook)
Return type

未来[张量]

torch.distributed.algorithms.ddp_comm_hooks.default_hooks.fp16_compress_hook(process_group, bucket)[源代码]

通过将 GradBucket 转换为 torch.float16 并除以进程组大小来进行压缩。

此DDP通信钩子实现了一种简单的梯度压缩方法,将GradBucket张量转换为半精度浮点格式(torch.float16),然后将其除以进程组大小。 它对这些float16梯度张量进行allreduce操作。一旦压缩的梯度张量完成allreduce,链式回调decompress将其转换回输入数据类型(如float32)。

Example::
>>> ddp_model.register_comm_hook(process_group, fp16_compress_hook)
Return type

未来[张量]

torch.distributed.algorithms.ddp_comm_hooks.default_hooks.bf16_compress_hook(process_group, bucket)[源代码]

警告:此API是实验性的,并且需要NCCL版本高于2.9.6。

此DDP通信钩子实现了一种简单的梯度压缩方法,将GradBucket张量转换为半精度 脑浮点格式torch.bfloat16) 然后将其除以进程组大小。 它对这些bfloat16梯度张量进行allreduce操作。一旦压缩的梯度张量完成allreduce操作,链式回调decompress将其转换回输入数据类型(例如float32)。

Example::
>>> ddp_model.register_comm_hook(process_group, bf16_compress_hook)
Return type

未来[张量]

此外,提供了一个通信钩子包装器,以支持 fp16_compress_hook()bf16_compress_hook() 作为包装器, 可以与其他通信钩子结合使用。

torch.distributed.algorithms.ddp_comm_hooks.default_hooks.fp16_compress_wrapper(hook)[源代码]

将输入张量转换为 torch.float16,将钩子的结果转换回输入的数据类型。

这个包装器将给定DDP通信钩子的输入梯度张量转换为半精度浮点格式(torch.float16),并将给定钩子的结果张量转换回输入数据类型,例如float32。 因此,fp16_compress_hook等同于fp16_compress_wrapper(allreduce_hook)

Example::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10)
>>> ddp_model.register_comm_hook(state, fp16_compress_wrapper(powerSGD_hook))
Return type

可调用[[任意, 梯度桶], 未来[张量]]

torch.distributed.algorithms.ddp_comm_hooks.default_hooks.bf16_compress_wrapper(hook)[源代码]

警告:此API是实验性的,并且需要NCCL版本高于2.9.6。

这个包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度 Brain 浮点格式 `_ (``torch.bfloat16`), 并将给定钩子的结果张量转换回输入数据类型,例如 float32

因此,bf16_compress_hook 等同于 bf16_compress_wrapper(allreduce_hook)

Example::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10)
>>> ddp_model.register_comm_hook(state, bf16_compress_wrapper(powerSGD_hook))
Return type

可调用[[任意, 梯度桶], 未来[张量]]

PowerSGD 通信钩子

PowerSGD (Vogels 等人, NeurIPS 2019) 是一种梯度压缩算法,可以提供非常高的压缩率并加速带宽受限的分布式训练。 该算法需要维护一些超参数和内部状态。因此,PowerSGD 通信钩子是一个有状态的钩子, 用户需要提供一个如下定义的状态对象。

PowerSGD 状态

class torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.PowerSGDState(process_group, matrix_approximation_rank=1, start_powerSGD_iter=1000, min_compression_rate=2, use_error_feedback=True, warm_start=True, orthogonalization_epsilon=0, random_seed=0, compression_stats_logging_frequency=10000, batch_tensors_with_same_shape=False)[源代码]

在训练期间存储算法的超参数和所有梯度的内部状态。

特别是,matrix_approximation_rankstart_powerSGD_iter 是用户应调整的主要超参数。 为了性能,我们建议保持二进制超参数 use_error_feedbackwarm_start 开启。

  1. matrix_approximation_rank 控制压缩低秩张量的大小,这决定了压缩率。秩越低,压缩越强。

    1.1. 如果 matrix_approximation_rank 太低,完整模型的质量将需要更多的训练步骤才能达到,或者永远无法达到,从而导致准确性损失。

    1.2. 增加matrix_approximation_rank可能会显著增加压缩的计算成本,并且在超过某个matrix_approximation_rank阈值后,精度可能不会再进一步提高。

要调整matrix_approximation_rank,我们建议从1开始,并以2的倍数增加(类似于指数网格搜索,1, 2, 4, …),直到达到满意的精度。通常只使用较小的值1-4。对于某些NLP任务(如原始论文附录D所示),此值已增加到32。

  1. start_powerSGD_iter 推迟 PowerSGD 压缩直到步骤 start_powerSGD_iter,并且在步骤 start_powerSGD_iter 之前运行普通的 allreduce。这种 vanilla allreduce + PowerSGD 的混合方案可以有效提高准确性,即使使用相对较小的 matrix_approximation_rank。这是因为训练阶段的开始通常对不准确的梯度非常敏感,过早压缩梯度可能会使训练迅速采取次优轨迹,这可能会对准确性造成不可恢复的影响。

要调整 start_powerSGD_iter,我们建议从总训练步骤的10%开始,并逐渐增加,直到达到满意的准确度。如果在训练中有预热阶段,start_powerSGD_iter 通常不应少于预热步骤的数量。

  1. min_compression_rate 是压缩层时所需的最小压缩率。由于压缩带来的计算开销,只有当带宽能够有足够的节省时,张量才值得压缩,其中 (num_rows + num_cols) * matrix_approximation_rank * min_compression_rate < num_rows * num_cols。如果指定的压缩率阈值无法满足,张量将直接进行allreduce而不进行压缩。

压缩统计信息在PowerSGD压缩开始后,每compression_stats_logging_frequency次迭代记录一次。

  1. orthogonalization_epsilon 可以是一个非常小的值(例如,1e-8),在正交化步骤中添加到每个归一化矩阵列中,以防止在任何列全为0时出现除以零的错误。如果这已经可以通过(例如,批量归一化)来防止,则建议使用0的epsilon以提高准确性。

  2. batch_tensors_with_same_shape 控制是否在批处理操作中压缩和解压缩具有相同形状的张量以实现更高的并行性。请注意,您还应增加桶大小(即 DDP 构造函数中的 bucket_cap_mb 参数),以使更多相同形状的张量出现在同一个桶中,但这可能会减少计算和通信之间的重叠,并由于堆叠相同形状的张量而增加内存占用。如果压缩/解压缩计算成为瓶颈,请设置为 True

警告

如果启用了错误反馈或预热,DDP中允许的start_powerSGD_iter的最小值为2。 这是因为DDP中存在另一个内部优化,在迭代1时重建桶, 这可能会与重建过程之前记忆的任何张量发生冲突。

PowerSGD 钩子

警告

PowerSGD 通常需要额外的内存,其大小与模型的梯度相同,以启用误差反馈,这可以补偿有偏差的压缩通信并提高准确性。

警告

PowerSGD 钩子可能与 Apex 自动混合精度包 冲突。 请改用 PyTorch 原生自动混合精度包

torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.powerSGD_hook(state, bucket)[源代码]

实现 PowerSGD 算法。

此DDP通信钩子实现了PowerSGD梯度压缩算法,该算法在论文中进行了描述。一旦梯度张量在所有工作节点上聚合,此钩子将按如下方式应用压缩:

  1. 将输入的展平1D梯度张量视为一组按参数划分的张量,并将所有张量分为两组:

    1.1 在所有归约之前应压缩的张量,因为压缩可以在带宽上提供足够的节省。

    1.2 其余的张量将直接进行allreduce操作而不进行压缩,包括所有的向量张量(用于偏置)。

  2. 处理未压缩的张量:

    2.1. 为这些未压缩的张量分配连续内存,并将所有未压缩的张量作为一个批次进行allreduce操作,不进行压缩;

    2.2. 将连续内存中的各个未压缩张量复制回输入张量。

  3. 处理应由PowerSGD压缩的张量:

    3.1. 对于每个张量 M,创建两个低秩张量 P 和 Q 用于分解 M,使得 M = PQ^T,其中 Q 从标准正态分布初始化并进行正交化;

    3.2. 计算每个 P 在 Ps 中,其等于 MQ;

    3.3. 将所有减少操作作为一批处理;

    3.4. 对Ps中的每个P进行正交化;

    3.5. 计算每个 Q 在 Qs 中,其近似等于 M^TP;

    3.6. 将Allreduces Qs作为一个批次处理;

    3.7. 计算所有压缩张量中的每个M,其近似等于PQ^T。

请注意,此通信钩子在前state.start_powerSGD_iter次迭代中强制执行vanilla allreduce。 这不仅让用户能够更好地控制加速与精度之间的权衡, 还有助于为未来的通信钩子开发者抽象掉DDP内部优化的一些复杂性。

Parameters
  • state (PowerSGDState) – 用于配置压缩率和支持误差反馈、热启动等的状态信息。 要调整压缩配置,主要需要调整 matrix_approximation_rankstart_powerSGD_itermin_compression_rate

  • bucket (dist.GradBucket) – 存储一维扁平化梯度张量的桶,该张量批量存储多个每个变量的张量。 请注意,由于DDP通信钩子仅支持单进程单设备模式, 此桶中仅存储一个张量。

Returns

通信的未来处理程序,用于就地更新梯度。

Return type

未来[张量]

Example::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1,
                          start_powerSGD_iter=10, min_compression_rate=0.5)
>>> ddp_model.register_comm_hook(state, powerSGD_hook)
torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.batched_powerSGD_hook(state, bucket)[源代码]

实现简化的PowerSGD算法。

这个DDP通信钩子实现了简化版的PowerSGD梯度压缩算法,该算法在论文中有所描述。 这个变体不是逐层压缩梯度,而是压缩批量所有梯度的展平输入张量。 因此,它比powerSGD_hook()更快, 但通常会导致精度大幅降低,除非matrix_approximation_rank为1。

警告

增加 matrix_approximation_rank 在这里可能不一定能提高准确性, 因为对每个参数张量进行批处理而不进行列/行对齐可能会破坏低秩结构。 因此,用户应始终首先考虑 powerSGD_hook(), 并且只有在 matrix_approximation_rank 为 1 时能够达到满意的准确性时才考虑此变体。

一旦梯度张量在所有工作节点上聚合,此钩子将按如下方式应用压缩:

  1. 将输入的展平1D梯度张量视为带有0填充的方形张量M;

  2. 创建两个低秩张量 P 和 Q 用于分解 M,使得 M = PQ^T,其中 Q 从标准正态分布初始化并进行正交化;

  3. 计算P,其等于MQ;

  4. Allreduces P;

  5. 正交化 P;

  6. 计算Q,其近似等于M^TP;

  7. Allreduces Q;

  8. 计算M,其近似等于PQ^T。

  9. 将输入张量截断为原始长度。

请注意,此通信钩子在前state.start_powerSGD_iter次迭代中强制执行vanilla allreduce。 这不仅让用户能够更好地控制加速与精度之间的权衡, 还有助于为未来的通信钩子开发者抽象掉DDP内部优化的一些复杂性。

Parameters
  • state (PowerSGDState) – 用于配置压缩率和支持误差反馈、热启动等的状态信息。 要调整压缩配置,主要需要调整 matrix_approximation_rankstart_powerSGD_iter

  • bucket (dist.GradBucket) – 存储一维扁平化梯度张量的桶,该张量批量存储多个每个变量的张量。 请注意,由于DDP通信钩子仅支持单进程单设备模式, 此桶中仅存储一个张量。

Returns

通信的未来处理程序,用于就地更新梯度。

Return type

未来[张量]

Example::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1)
>>> ddp_model.register_comm_hook(state, batched_powerSGD_hook)

调试通信钩子

顾名思义,调试通信钩子用于调试和性能优化目的。

警告

调试通信钩子不一定输出正确的结果。

torch.distributed.algorithms.ddp_comm_hooks.debugging_hooks.noop_hook(_, bucket)[源代码]

返回一个包装输入的未来,因此它是一个不产生任何通信开销的空操作。

此钩子应用于所有reduce优化的头部空间分析,而不是正常的梯度同步。 例如,如果在注册此钩子后只能观察到不到10%的训练时间加速, 通常意味着在这种情况下,所有reduce并不是性能瓶颈。 如果无法轻松获取GPU跟踪或跟踪分析因某些因素(如所有reduce与计算之间的重叠或跨等级的不同步)而变得复杂, 这种工具尤其有用。

Example::
>>> ddp_model.register_comm_hook(None, noop_hook)
Return type

未来[张量]

通信钩子的检查点

有状态的通信钩子可以保存为模型检查点的一部分,以支持训练器重新启动。 要使钩子可序列化,应定义__setstate____getstate__

警告

__getstate__ 应该从返回的字典中排除不可序列化的属性。

警告

__setstate__ 应该正确初始化非序列化属性,这些属性从提供的 state 中排除。

PowerSGDState 实现了 __setstate____getstate__,可以作为参考使用。

class torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.PowerSGDState[源代码]
__getstate__()[源代码]

返回一个Dict[str, Any],它将被序列化并保存。

process_group 不可序列化,并从返回的状态中排除。

__setstate__(state)[源代码]

获取提供的 state 并设置为此 PowerSGDState 实例。

process_group 设置为默认值。

这是一个简单的、端到端的保存和重新加载PowerSGD状态和钩子的示例。

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp

from torch.nn.parallel import DistributedDataParallel
from torch.distributed.algorithms.ddp_comm_hooks import powerSGD_hook as powerSGD

class SimpleModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(24,24)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(24,12)

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # 初始化进程组
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

def run_demo(demo_fn, world_size):
    mp.spawn(
        demo_fn,
        args=(world_size,),
        nprocs=world_size,
        join=True)

def demo_serialization(rank, world_size):
    setup(rank, world_size)

    CHECKPOINT = tempfile.gettempdir() + "/checkpoint.pt"

    model = SimpleModel().to(rank)
    ddp_model = DistributedDataParallel(model, device_ids=[rank])

    powersgd_hook = powerSGD.powerSGD_hook
    powersgd_state = powerSGD.PowerSGDState(process_group=None)

    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
    ddp_model.register_comm_hook(powersgd_state, powersgd_hook)

    state = {
        'state_dict': ddp_model.state_dict(),
        'comm_hook': powersgd_hook,
        'comm_hook_state': powersgd_state}

    if rank == 0:
        torch.save(state, CHECKPOINT)

    dist.barrier()
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    checkpoint = torch.load(CHECKPOINT, map_location=map_location)

    new_ddp_model = DistributedDataParallel(SimpleModel().to(rank), device_ids=[rank])
    new_ddp_model.load_state_dict(checkpoint['state_dict'])
    powersgd_hook = checkpoint['comm_hook']
    powersgd_state = checkpoint['comm_hook_state']

    new_ddp_model.register_comm_hook(powersgd_state, powersgd_hook)

    if rank == 0:
        os.remove(CHECKPOINT)

    cleanup()

if __name__ == "__main__":
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    run_demo(demo_serialization, world_size)

致谢

非常感谢 PowerSGD 论文作者 Thijs Vogels 对 PowerSGD 通信钩子的代码审查,以及 对比实验, 这些实验表明 PowerSGD 通信钩子的性能与 原始 论文 中的实现相当。