Shortcuts

torchrun (弹性启动)

超集 torch.distributed.launch

torchrun 提供了比 torch.distributed.launch 更多的功能,包括以下附加功能:

  1. 工作节点故障通过重新启动所有工作节点来优雅地处理。

  2. Worker RANKWORLD_SIZE 是自动分配的。

  3. 节点数量可以在最小和最大尺寸之间变化(弹性)。

注意

torchrun 是一个指向主模块 torch.distributed.run控制台脚本, 在 setup.pyentry_points 配置中声明。 它等同于调用 python -m torch.distributed.run

从 torch.distributed.launch 过渡到 torchrun

torchrun 支持与 torch.distributed.launch 相同的参数,除了 --use-env 现在已被弃用。要从 torch.distributed.launch 迁移到 torchrun,请按照以下步骤操作:

  1. 如果你的训练脚本已经在从 LOCAL_RANK 环境变量中读取 local_rank。 那么你只需要简单地省略 --use-env 标志,例如:

    torch.distributed.launch

    torchrun

    $ python -m torch.distributed.launch --use-env train_script.py
    
    $ torchrun train_script.py
    
  2. 如果你的训练脚本从 --local-rank 命令行参数读取本地排名。 请将你的训练脚本修改为从 LOCAL_RANK 环境变量读取, 如下列代码片段所示:

    torch.distributed.launch

    torchrun

    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--local-rank", type=int)
    args = parser.parse_args()
    
    local_rank = args.local_rank
    
    import os
    local_rank = int(os.environ["LOCAL_RANK"])
    

上述更改足以从 torch.distributed.launch 迁移到 torchrun。 为了利用 torchrun 的新功能,如弹性、容错和错误报告,请参考:

  • 训练脚本 了解更多关于编写符合 torchrun 的训练脚本的信息。

  • 本页其余部分提供了有关 torchrun 功能的更多信息。

用法

单节点多工作线程

torchrun
    --standalone
    --nnodes=1
    --nproc-per-node=$NUM_TRAINERS
    YOUR_TRAINING_SCRIPT.py (--arg1 ... 训练脚本参数...)

堆叠单节点多工作节点

要在同一主机上运行多个实例(单独的任务)的单节点、多工作节点,我们需要确保每个实例(任务)设置在不同的端口上,以避免端口冲突(或者更糟的是,两个任务合并为一个任务)。为此,您必须使用--rdzv-backend=c10d运行,并通过设置--rdzv-endpoint=localhost:$PORT_k指定不同的端口。对于--nodes=1,通常方便让torchrun自动选择一个空闲的随机端口,而不是为每次运行手动分配不同的端口。

torchrun
    --rdzv-backend=c10d
    --rdzv-endpoint=localhost:0
    --nnodes=1
    --nproc-per-node=$NUM_TRAINERS
    YOUR_TRAINING_SCRIPT.py (--arg1 ... 训练脚本参数...)

容错(固定数量的工作节点,无弹性,容忍3次失败)

torchrun
    --nnodes=$NUM_NODES
    --nproc-per-node=$NUM_TRAINERS
    --max-restarts=3
    --rdzv-id=$JOB_ID
    --rdzv-backend=c10d
    --rdzv-endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... 训练脚本参数...)

HOST_NODE_ADDR,格式为 <主机>[:<端口>](例如 node1.example.com:29400),指定节点和端口,C10d rendezvous 后端应在此节点和端口上实例化并托管。它可以是训练集群中的任何节点,但理想情况下,您应选择一个具有高带宽的节点。

注意

如果没有指定端口号,HOST_NODE_ADDR 默认设置为 29400。

弹性(min=1, max=4, 容忍最多3次成员变更或故障)

torchrun
    --nnodes=1:4
    --nproc-per-node=$NUM_TRAINERS
    --max-restarts=3
    --rdzv-id=$JOB_ID
    --rdzv-backend=c10d
    --rdzv-endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... 训练脚本参数...)

HOST_NODE_ADDR,格式为 <主机>[:<端口>](例如 node1.example.com:29400),指定节点和端口,C10d rendezvous 后端应在此节点和端口上实例化并托管。它可以是训练集群中的任何节点,但理想情况下,您应选择一个具有高带宽的节点。

注意

如果没有指定端口号,HOST_NODE_ADDR 默认设置为 29400。

关于rendezvous后端的说明

对于多节点训练,您需要指定:

  1. --rdzv-id: 一个唯一的作业ID(由参与作业的所有节点共享)

  2. --rdzv-backend: 一个实现 torch.distributed.elastic.rendezvous.RendezvousHandler

  3. --rdzv-endpoint: 协调后端运行的端点;通常为 host:port形式。

目前支持开箱即用的 rendezvous 后端包括 c10d(推荐)、etcd-v2etcd(遗留)。要使用 etcd-v2etcd,请设置一个启用了 v2 API 的 etcd 服务器(例如 --enable-v2)。

警告

etcd-v2etcd 会合使用 etcd API v2。您必须在 etcd 服务器上启用 v2 API。我们的测试使用 etcd v3.4.3。

警告

对于基于etcd的会合,我们推荐使用etcd-v2而不是etcd,后者在功能上是等效的,但使用了修订后的实现。etcd目前处于维护模式,并将在未来的版本中被移除。

定义

  1. Node - 物理实例或容器;映射到作业管理器所处理的单元。

  2. Worker - 分布式训练中的一个工作节点。

  3. WorkerGroup - 执行相同功能的工人集合(例如:训练者)。

  4. LocalWorkerGroup - 在同一节点上运行的工作组中的工作子集。

  5. RANK - 工人在工作组中的排名。

  6. WORLD_SIZE - 工作组中工作者的总数。

  7. LOCAL_RANK - 本地工作组中工作者的排名。

  8. LOCAL_WORLD_SIZE - 本地工作组的大小。

  9. rdzv_id - 一个用户定义的ID,用于唯一标识作业的工作组。每个节点使用此ID加入特定的工作组。

  1. rdzv_backend - 集合点(例如 c10d)的后端。这通常是一个强一致性的键值存储。

  2. rdzv_endpoint - 集合后端端点;通常为 <主机>:<端口> 形式。

一个 Node 运行 LOCAL_WORLD_SIZE 个工作线程,这些工作线程组成了一个 LocalWorkerGroup。作业中所有节点中的所有 LocalWorkerGroups 的并集构成了 WorkerGroup

环境变量

以下环境变量在您的脚本中可用:

  1. LOCAL_RANK - 本地排名。

  2. RANK - 全球排名。

  3. GROUP_RANK - 工作组的排名。一个介于0和max_nnodes之间的数字。当每个节点运行一个工作组时,这是节点的排名。

  4. ROLE_RANK - 在具有相同角色的所有工作者中的排名。工作者的角色在WorkerSpec中指定。

  5. LOCAL_WORLD_SIZE - 本地世界大小(例如,本地运行的工作进程数量);等于在torchrun上指定的--nproc-per-node

  6. WORLD_SIZE - 世界大小(作业中工作者的总数)。

  7. ROLE_WORLD_SIZE - 与在WorkerSpec中指定的相同角色一起启动的工作者总数。

  8. MASTER_ADDR - 运行rank为0的worker的主机的FQDN;用于初始化Torch分布式后端。

  9. MASTER_PORT - 在 MASTER_ADDR 上的端口,可用于托管 C10d TCP 存储。

  10. TORCHELASTIC_RESTART_COUNT - 到目前为止工作组重启的次数。

  11. TORCHELASTIC_MAX_RESTARTS - 配置的最大重启次数。

  12. TORCHELASTIC_RUN_ID - 等于rendezvous run_id(例如,唯一的作业ID)。

  13. PYTHON_EXEC - 系统可执行文件覆盖。如果提供,python用户脚本将使用PYTHON_EXEC的值作为可执行文件。默认情况下使用sys.executable

部署

  1. (C10d 后端不需要)启动 rendezvous 后端服务器并获取端点(作为 --rdzv-endpoint 传递给启动脚本)

  2. 单节点多工作线程:在主机上启动启动器以启动代理进程,该进程创建并监控本地工作线程组。

  3. 多节点多工作进程:在参与训练的所有节点上使用相同的参数启动启动器。

当使用作业/集群管理器时,多节点作业的入口点命令应为该启动器。

故障模式

  1. Worker failure: 对于一个有 n 个 worker 的训练任务,如果 k<=n 个 worker 失败,所有 worker 都会停止并重新启动,最多重启 max_restarts 次。

  2. 代理失败:代理失败会导致本地工作组失败。由作业管理器决定是使整个作业失败(gang语义)还是尝试替换节点。代理支持这两种行为。

  3. 节点故障:与代理故障相同。

会员变动

  1. 节点离开(缩减规模):代理被告知节点离开,所有现有工作进程被停止,形成一个新的 WorkerGroup,并且所有工作进程以新的 RANKWORLD_SIZE 启动。

  2. 节点到达(扩展):新节点被接纳到任务中,所有现有工作节点停止, 形成一个新的 WorkerGroup,并且所有工作节点以新的 RANKWORLD_SIZE 启动。

重要通知

  1. 此工具和多进程分布式(单节点或多节点)GPU训练目前仅在使用NCCL分布式后端时才能达到最佳性能。因此,NCCL后端是用于GPU训练的推荐后端。

  2. 初始化 Torch 进程组所需的环境变量由本模块提供,无需手动传递 RANK。要在训练脚本中初始化进程组,只需运行:

>>> import torch.distributed as dist
>>> dist.init_process_group(backend="gloo|nccl")
  1. 在您的训练程序中,您可以使用常规的分布式函数,或者使用torch.nn.parallel.DistributedDataParallel()模块。如果您的训练程序使用GPU进行训练,并且您希望使用torch.nn.parallel.DistributedDataParallel()模块,以下是如何配置它的方法。

local_rank = int(os.environ["LOCAL_RANK"])
model = torch.nn.parallel.DistributedDataParallel(model,
                                                  device_ids=[local_rank],
                                                  output_device=local_rank)

请确保 device_ids 参数设置为代码将操作的唯一GPU设备ID。这通常是进程的本地等级。换句话说,device_ids 需要是 [int(os.environ("LOCAL_RANK"))],并且 output_device 需要是 int(os.environ("LOCAL_RANK")) 以便使用此工具。

  1. 在失败或成员变更时,所有幸存的 worker 会立即被终止。请确保对您的进度进行检查点操作。检查点的频率应取决于您的作业对丢失工作的容忍度。

  2. 此模块仅支持同构的 LOCAL_WORLD_SIZE。也就是说,假设所有节点运行相同数量的本地工作线程(每个角色)。

  3. RANK 是不稳定的。在重新启动之间,节点上的本地工作线程可能会被分配与之前不同的排名范围。切勿对排名的稳定性或 RANKLOCAL_RANK 之间的某些关联进行硬编码假设。

  4. 当使用弹性(min_size!=max_size)时,不要对WORLD_SIZE进行硬编码假设,因为世界大小可能会随着节点的离开和加入而改变。

  5. 建议您的脚本具有以下结构:

def main():
  load_checkpoint(checkpoint_path)
  initialize()
  train()

def train():
  for batch in iter(dataset):
    train_step(batch)

    if should_checkpoint:
      save_checkpoint(checkpoint_path)
  1. (推荐) 在工作节点发生错误时,此工具将汇总错误详情(例如时间、排名、主机、进程ID、回溯等)。在每个节点上,按时间戳排序的第一个错误会被启发式地报告为“根本原因”错误。要获取作为错误摘要一部分的回溯信息,您必须在训练脚本的主入口函数中进行装饰,如以下示例所示。如果未进行装饰,则摘要将不包含异常的回溯信息,而仅包含退出代码。有关torchelastic错误处理的详细信息,请参阅:https://pytorch.org/docs/stable/elastic/errors.html

from torch.distributed.elastic.multiprocessing.errors import record

@record
def main():
    # 进行训练
    pass

if __name__ == "__main__":
    main()