分布式通信包 - torch.distributed¶
注意
请参阅PyTorch分布式概述 以简要了解与分布式训练相关的所有功能。
后端¶
torch.distributed
支持三种内置后端,每种后端具有不同的功能。下表显示了哪些函数可用于 CPU / CUDA 张量。
MPI 仅在用于构建 PyTorch 的实现支持 CUDA 时才支持 CUDA。
后端 |
|
|
|
|||
---|---|---|---|---|---|---|
设备 |
中央处理器 |
GPU |
中央处理器 |
GPU |
中央处理器 |
GPU |
发送 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
接收 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
广播 |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
all_reduce |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
减少 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
all_gather |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
收集 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
散点图 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
减少分散 |
✘ |
✘ |
✘ |
✘ |
✘ |
✓ |
全部到全部 |
✘ |
✘ |
✓ |
? |
✘ |
✓ |
障碍 |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
PyTorch自带的后端¶
PyTorch 分布式包支持 Linux(稳定版)、MacOS(稳定版)和 Windows(原型版)。 默认情况下,对于 Linux,Gloo 和 NCCL 后端会被编译并包含在 PyTorch 分布式中(仅在使用 CUDA 编译时包含 NCCL)。MPI 是一个可选的后端,只有在从源代码构建 PyTorch 时才能包含(例如,在安装了 MPI 的主机上构建 PyTorch)。
注意
截至 PyTorch v1.8,Windows 支持所有集体通信后端,但不包括 NCCL,
如果 init_method 参数的 init_process_group()
指向一个文件,它必须遵循以下模式:
本地文件系统,
init_method="file:///d:/tmp/some_file"
共享文件系统,
init_method="file://////{machine_name}/{share_folder_name}/some_file"
与在Linux平台上相同,您可以通过设置环境变量MASTER_ADDR和MASTER_PORT来启用TcpStore。
使用哪个后端?¶
在过去,我们经常被问到:“我应该使用哪个后端?”。
经验法则
使用NCCL后端进行分布式GPU训练
使用 Gloo 后端进行分布式 CPU 训练。
配备InfiniBand互连的GPU主机
使用NCCL,因为它是目前唯一支持InfiniBand和GPUDirect的后端。
具有以太网互连的GPU主机
使用NCCL,因为它目前提供了最佳的分布式GPU训练性能,特别是在多进程单节点或多节点分布式训练中。如果你遇到任何NCCL的问题,可以使用Gloo作为备用选项。(请注意,Gloo目前在GPU上运行速度比NCCL慢。)
使用InfiniBand互连的CPU主机
如果你的InfiniBand已启用IP over IB,请使用Gloo,否则,请使用MPI。我们计划在即将发布的版本中为Gloo添加InfiniBand支持。
使用以太网互连的CPU主机
使用 Gloo,除非你有特定的原因使用 MPI。
常见环境变量¶
选择要使用的网络接口¶
默认情况下,NCCL 和 Gloo 后端都会尝试找到合适的网络接口来使用。如果自动检测到的接口不正确,您可以使用以下环境变量来覆盖它(适用于各自的后端):
NCCL_SOCKET_IFNAME,例如
export NCCL_SOCKET_IFNAME=eth0
GLOO_SOCKET_IFNAME,例如
export GLOO_SOCKET_IFNAME=eth0
如果你使用的是Gloo后端,你可以通过用逗号分隔来指定多个接口,如下所示:export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3
。
后端将在这几个接口之间以轮询方式分发操作。
所有进程都必须在这个变量中指定相同数量的接口,这一点至关重要。
其他NCCL环境变量¶
调试 - 如果遇到NCCL失败,您可以设置NCCL_DEBUG=INFO
来打印一个明确的警告消息以及基本的NCCL初始化信息。
您还可以使用 NCCL_DEBUG_SUBSYS
来获取有关 NCCL 特定方面的更多详细信息。例如,NCCL_DEBUG_SUBSYS=COLL
将打印集体调用的日志,这在调试挂起问题时可能会有所帮助,特别是那些由集体类型或消息大小不匹配引起的问题。如果拓扑检测失败,设置 NCCL_DEBUG_SUBSYS=GRAPH
将有助于检查详细的检测结果,并在需要 NCCL 团队进一步帮助时将其保存为参考。
性能调优 - NCCL 根据其拓扑检测自动进行调优,以节省用户的调优工作。在某些基于套接字的系统上,用户仍可以尝试调整 NCCL_SOCKET_NTHREADS
和 NCCL_NSOCKS_PERTHREAD
以增加套接字网络带宽。这两个环境变量已由 NCCL 为某些云提供商(如 AWS 或 GCP)进行了预调优。
有关NCCL环境变量的完整列表,请参阅 NVIDIA NCCL的官方文档
基础¶
torch.distributed 包提供了 PyTorch 支持和通信原语,用于在运行在一个或多个机器上的多个计算节点之间进行多进程并行。类 torch.nn.parallel.DistributedDataParallel()
基于此功能,提供了一个同步分布式训练的包装器,适用于任何 PyTorch 模型。这与 Multiprocessing package - torch.multiprocessing 和 torch.nn.DataParallel()
提供的并行类型不同,因为它支持多个网络连接的机器,并且用户必须显式地为每个进程启动主训练脚本的一个单独副本。
在单机同步情况下,torch.distributed 或 torch.nn.parallel.DistributedDataParallel()
包装器相比其他数据并行方法,包括 torch.nn.DataParallel()
,仍然具有优势:
每个进程维护自己的优化器,并在每次迭代中执行完整的优化步骤。虽然这可能看起来是多余的,因为梯度已经在进程之间被收集并平均,因此对于每个进程来说都是相同的,但这意味着不需要参数广播步骤,从而减少了在节点之间传输张量所花费的时间。
每个进程包含一个独立的 Python 解释器,消除了从单个 Python 进程驱动多个执行线程、模型副本或 GPU 所带来的额外解释器开销和“GIL 争用”。这对于大量使用 Python 运行时的模型尤其重要,包括具有循环层或许多小组件的模型。
初始化¶
在使用任何其他方法之前,需要使用 torch.distributed.init_process_group()
或 torch.distributed.device_mesh.init_device_mesh()
函数初始化包。
两者都会阻塞,直到所有进程都加入。
- torch.distributed.is_available()[源代码]¶
如果分布式包可用,则返回
True
。否则,
torch.distributed
不暴露任何其他API。目前,torch.distributed
在Linux、MacOS和Windows上可用。在从源代码构建PyTorch时,设置USE_DISTRIBUTED=1
以启用它。 目前,Linux和Windows的默认值是USE_DISTRIBUTED=1
, MacOS的默认值是USE_DISTRIBUTED=0
。- Return type
- torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[源代码]¶
初始化默认的分布式进程组。
这也将初始化分布式包。
- There are 2 main ways to initialize a process group:
明确指定
store
、rank
和world_size
。指定
init_method
(一个URL字符串),用于指示如何发现对等点。可以选择指定rank
和world_size
,或者将所有必需的参数编码在URL中并省略它们。
如果两者都未指定,
init_method
将被假定为 “env://”。- Parameters
backend (str 或 Backend,可选) – 要使用的后端。根据构建时的配置,有效值包括
mpi
、gloo
、nccl
和ucc
。如果未提供后端,则将创建gloo
和nccl
后端,请参阅下面的注释以了解如何管理多个后端。此字段可以作为小写字符串提供(例如,"gloo"
),也可以通过Backend
属性访问(例如,Backend.GLOO
)。如果使用nccl
后端在每台机器上使用多个进程,则每个进程必须独占访问其使用的每个 GPU,因为进程之间共享 GPU 可能会导致死锁。ucc
后端是实验性的。init_method (str, 可选) – 指定如何初始化进程组的URL。如果没有指定
init_method
或store
,则默认为“env://”。与store
互斥。world_size (int, 可选) – 参与作业的进程数。如果指定了
store
,则为必需。rank (int, 可选) – 当前进程的等级(它应该是一个介于0和
world_size
-1之间的数字)。 如果指定了store
,则需要此参数。存储 (存储,可选) – 所有工作节点均可访问的键/值存储,用于交换连接/地址信息。 与
init_method
互斥。超时 (timedelta, 可选) – 针对进程组执行操作的超时时间。默认值为NCCL为10分钟,其他后端为30分钟。 这是集体操作在异步中止后进程崩溃的持续时间。 这样做是因为CUDA执行是异步的,由于失败的异步NCCL操作可能导致后续CUDA操作在损坏的数据上运行,因此不再安全继续执行用户代码。 当设置TORCH_NCCL_BLOCKING_WAIT时,进程将阻塞并等待此超时。
group_name (str, 可选, 已弃用) – 组名。此参数被忽略
pg_options (ProcessGroupOptions, 可选) – 进程组选项 指定在构造特定进程组时需要传递的额外选项。目前,我们唯一支持的选项是
ProcessGroupNCCL.Options
用于nccl
后端,可以指定is_high_priority_stream
,以便 当有计算内核等待时,nccl 后端可以选择高优先级的 cuda 流。device_id (torch.device, 可选) – 一个特定的设备,用于“绑定”此进程,允许进行特定后端的优化。目前,这有两个效果,仅在NCCL下:通信器会立即形成(调用
ncclCommInit*
立即进行,而不是通常的延迟调用),并且子组将在可能的情况下使用ncclCommSplit
以避免不必要的组创建开销。如果你想提前知道NCCL初始化错误,你也可以使用此字段。
注意
要启用
backend == Backend.MPI
,PyTorch 需要在支持 MPI 的系统上从源代码构建。注意
对多个后端的支持是实验性的。目前,当没有指定后端时,将创建
gloo
和nccl
后端。gloo
后端将用于 CPU 张量的集合操作,而nccl
后端将用于 CUDA 张量的集合操作。可以通过传递格式为“: , : ”的字符串来指定自定义后端,例如“cpu:gloo,cuda:custom_backend”。
- torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None)[源代码]¶
基于device_type、mesh_shape和mesh_dim_names参数初始化一个DeviceMesh。
这将创建一个具有n维数组布局的DeviceMesh,其中n是mesh_shape的长度。 如果提供了mesh_dim_names,则每个维度被标记为mesh_dim_names[i]。
注意
init_device_mesh 遵循SPMD编程模型,这意味着相同的PyTorch Python程序在集群中的所有进程/等级上运行。确保 mesh_shape(描述设备布局的nD数组的维度)在所有等级上相同。不一致的 mesh_shape 可能导致挂起。
注意
如果没有找到进程组,init_device_mesh 将初始化分布式进程组/组,这些进程组/组是分布式通信所必需的。
- Parameters
- Returns
一个表示设备布局的
DeviceMesh
对象。- Return type
- Example::
>>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
- torch.distributed.is_torchelastic_launched()[源代码]¶
检查此进程是否通过
torch.distributed.elastic
(即 torchelastic)启动。环境变量
TORCHELASTIC_RUN_ID
的存在被用作判断当前进程是否通过 torchelastic 启动的代理。这是一个合理的代理,因为TORCHELASTIC_RUN_ID
映射到 rendezvous id,它始终是一个非空值,用于对等发现的工作标识。- Return type
目前支持三种初始化方法:
TCP 初始化¶
有两种使用TCP进行初始化的方法,这两种方法都需要一个所有进程都可以访问的网络地址和一个所需的world_size
。第一种方法需要指定一个属于rank 0进程的地址。这种初始化方法要求所有进程手动指定rank。
请注意,在最新的分布式包中不再支持多播地址。group_name
也已被弃用。
import torch.distributed as dist
# 使用其中一台机器的地址
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
rank=args.rank, world_size=4)
环境变量初始化¶
此方法将从环境变量中读取配置,允许用户完全自定义信息的获取方式。需要设置的变量包括:
MASTER_PORT
- 必需;必须是rank 0机器上的空闲端口MASTER_ADDR
- 必需(除了rank 0);rank 0节点的地址WORLD_SIZE
- 必需;可以在这里设置,也可以在调用初始化函数时设置RANK
- 必需;可以在这里设置,也可以在调用初始化函数时设置
排名为0的机器将用于建立所有连接。
这是默认方法,意味着不需要指定init_method
(或者可以是env://
)。
初始化后¶
一旦运行了torch.distributed.init_process_group()
,就可以使用以下函数。要检查进程组是否已经初始化,请使用torch.distributed.is_initialized()
。
- class torch.distributed.Backend(name)[源代码]¶
一个类似枚举的类,用于后端。
可用的后端:GLOO、NCCL、UCC、MPI及其他已注册的后端。
此类型的值为小写字符串,例如
"gloo"
。它们可以通过属性访问,例如Backend.NCCL
。此类可以直接调用以解析字符串,例如,
Backend(backend_str)
将检查backend_str
是否有效,如果有效则返回解析后的字符串(小写)。它也接受大写字符串, 例如,Backend("GLOO")
返回"gloo"
。注意
条目
Backend.UNDEFINED
存在,但仅用作某些字段的初始值。用户不应直接使用它,也不应假设其存在。- classmethod register_backend(name, func, extended_api=False, devices=None)[源代码]¶
使用给定的名称和实例化函数注册一个新的后端。
此类方法由第三方
ProcessGroup
扩展用于注册新的后端。- Parameters
名称 (字符串) –
ProcessGroup
扩展的后端名称。它应该与init_process_group()
中的名称匹配。func (function) – 实例化后端的函数处理程序。 该函数应在后端扩展中实现,并接受四个参数,包括
store
,rank
,world_size
, 和timeout
。extended_api (bool, 可选) – 后端是否支持扩展参数结构。 默认值:
False
。如果设置为True
,后端 将获得一个c10d::DistributedBackendOptions
的实例,以及 由后端实现定义的进程组选项对象。设备 (字符串 或 字符串列表, 可选) – 此后端支持的设备类型,例如“cpu”、“cuda”等。如果 None,则假设支持“cpu”和“cuda”
注意
对第三方后端的支持是实验性的,可能会发生变化。
- torch.distributed.get_backend(group=None)[源代码]¶
返回给定进程组的底层实现。
- Parameters
组 (进程组, 可选) – 要操作的进程组。默认是通用主进程组。如果指定了另一个特定的组,调用进程必须是
group
的一部分。- Returns
给定进程组的后端,作为小写字符串。
- Return type
分布式键值存储¶
分布式包附带了一个分布式键值存储,可以用于在组内的进程之间共享信息,以及在torch.distributed.init_process_group()
中初始化分布式包(通过显式创建存储作为指定init_method
的替代方法)。键值存储有3种选择:TCPStore
、FileStore
和HashStore
。
- class torch.distributed.Store¶
所有存储实现的基类,例如 PyTorch 分布式提供的 3 种实现:(
TCPStore
,FileStore
, 和HashStore
).
- class torch.distributed.TCPStore¶
基于TCP的分布式键值存储实现。服务器存储保存数据,而客户端存储可以通过TCP连接到服务器存储并执行操作,例如
set()
插入键值对,get()
检索键值对等。始终应初始化一个服务器存储,因为客户端存储将等待服务器建立连接。- Parameters
host_name (str) – 服务器存储应运行的主机名或IP地址。
端口 (int) – 服务器存储应监听传入请求的端口。
world_size (int, 可选) – 存储用户的总数(客户端数量 + 1 用于服务器)。默认值为 None(None 表示非固定数量的存储用户)。
is_master (bool, 可选) – 初始化服务器存储时为True,客户端存储时为False。默认值为False。
超时 (timedelta, 可选) – 在初始化和诸如
get()
和wait()
等方法中,存储使用的超时时间。默认值为 timedelta(seconds=300)wait_for_workers (bool, 可选) – 是否等待所有工作线程连接到服务器存储。这仅在world_size为固定值时适用。默认值为True。
multi_tenant (bool, 可选) – 如果为True,当前进程中所有具有相同主机/端口的
TCPStore
实例将使用相同的底层TCPServer
。默认为False。master_listen_fd (int, 可选) – 如果指定,底层
TCPServer
将监听此文件描述符,该描述符必须是一个已经绑定到port
的套接字。在某些场景下,这有助于避免端口分配的竞争。默认值为 None(表示服务器创建一个新套接字并尝试将其绑定到port
)。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 在进程1上运行(服务器) >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # 在进程2上运行(客户端) >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # 初始化后,客户端或服务器都可以使用存储方法 >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key")
- class torch.distributed.HashStore¶
基于底层哈希表的线程安全存储实现。此存储可以在同一进程内使用(例如,由其他线程使用),但不能跨进程使用。
- Example::
>>> import torch.distributed as dist >>> store = dist.HashStore() >>> # 可以从其他线程使用存储 >>> # 初始化后可以使用任何存储方法 >>> store.set("first_key", "first_value")
- class torch.distributed.FileStore¶
一个使用文件来存储底层键值对的存储实现。
- Example::
>>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # 在初始化后,可以使用客户端或服务器中的任何存储方法 >>> store1.set("first_key", "first_value") >>> store2.get("first_key")
- class torch.distributed.PrefixStore¶
一个围绕任何三种键值存储(
TCPStore
、FileStore
和HashStore
) 的包装器,它在插入到存储中的每个键前添加一个前缀。- Parameters
前缀 (str) – 在插入存储之前,添加到每个键的前缀字符串。
存储 (torch.distributed.store) – 一个形成底层键值存储的存储对象。
- torch.distributed.Store.set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None ¶
根据提供的
键
和值
将键值对插入到存储中。如果键
已经存在于存储中,它将用新提供的值
覆盖旧值。- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # 应该返回 "first_value" >>> store.get("first_key")
- torch.distributed.Store.get(self: torch._C._distributed_c10d.Store, arg0: str) bytes ¶
检索存储中与给定的
key
关联的值。如果key
在存储中不存在,函数将等待timeout
,该超时在初始化存储时定义,然后抛出异常。- Parameters
键 (str) – 该函数将返回与此键关联的值。
- Returns
如果
key
存在于存储中,则与key
关联的值。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # 应该返回 "first_value" >>> store.get("first_key")
- torch.distributed.Store.add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: int) int ¶
对于给定的
key
,第一次调用 add 会在存储中创建一个与key
关联的计数器,并初始化为amount
。后续对相同key
的 add 调用会将计数器增加指定的amount
。调用add()
时,如果该 key 已经通过set()
在存储中设置,将会导致异常。- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以TCPStore为例,其他存储类型也可以使用 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # 应该返回7 >>> store.get("first_key")
- torch.distributed.Store.compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) bytes ¶
根据提供的
key
将键值对插入到存储中,并在插入之前比较expected_value
和desired_value
。只有在key
在存储中已经存在expected_value
或者expected_value
为空字符串时,才会设置desired_value
。- Parameters
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # 应该返回 "second_value" >>> store.get("key")
- torch.distributed.Store.wait(*args, **kwargs)¶
重载函数。
等待(self: torch._C._distributed_c10d.Store, arg0: List[str]) -> None
等待
keys
中的每个键被添加到存储中。如果在存储初始化时设置的timeout
之前没有设置所有键,则wait
将抛出异常。- Parameters
keys (列表) – 在存储中等待直到它们被设置的键的列表。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 使用 TCPStore 作为示例,其他存储类型也可以使用 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # 这将在30秒后抛出异常 >>> store.wait(["bad_key"])
wait(self: torch._C._distributed_c10d.Store, arg0: List[str], arg1: datetime.timedelta) -> None
等待
keys
中的每个键被添加到存储中,如果在提供的timeout
时间内键未被设置,则抛出异常。- Parameters
keys (列表) – 在存储中等待直到它们被设置的键的列表。
超时 (时间增量) – 在抛出异常之前等待键添加的时间。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 使用 TCPStore 作为示例,其他存储类型也可以使用 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # 这将在10秒后抛出异常 >>> store.wait(["bad_key"], timedelta(seconds=10))
- torch.distributed.Store.num_keys(self: torch._C._distributed_c10d.Store) int ¶
返回存储中设置的键的数量。请注意,这个数量通常会比通过
set()
和add()
添加的键的数量多一个,因为一个键用于协调使用存储的所有工作线程。警告
当与
TCPStore
一起使用时,num_keys
返回写入底层文件的键的数量。如果存储被销毁并且使用相同的文件创建了另一个存储,原始键将被保留。- Returns
存储中存在的键的数量。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 以TCPStore为例,其他存储类型也可以使用 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # 这应该返回2 >>> store.num_keys()
- torch.distributed.Store.delete_key(self: torch._C._distributed_c10d.Store, arg0: str) bool ¶
从存储中删除与
key
关联的键值对。如果键成功删除,则返回 true,如果未删除,则返回 false。- Parameters
键 (str) – 要从存储中删除的键
- Returns
True 如果
key
被删除,否则 False。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 使用 TCPStore 作为示例,HashStore 也可以使用 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # 这应该返回 true >>> store.delete_key("first_key") >>> # 这应该返回 false >>> store.delete_key("bad_key")
- torch.distributed.Store.set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) None ¶
设置商店的默认超时时间。此超时时间在初始化和
wait()
以及get()
中使用。- Parameters
超时 (时间间隔) – 在存储中设置的超时时间。
- Example::
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # 使用 TCPStore 作为示例,其他存储类型也可以使用 >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # 这将在10秒后抛出异常 >>> store.wait(["bad_key"])
组¶
默认情况下,集体操作在默认组(也称为世界)上进行,并要求所有进程进入分布式函数调用。然而,某些工作负载可以从更细粒度的通信中受益。这就是分布式组发挥作用的地方。new_group()
函数可用于创建新组,包含所有进程的任意子集。它返回一个不透明的组句柄,可以作为 group
参数传递给所有集体操作(集体操作是用于在某些已知的编程模式中交换信息的分布式函数)。
- torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False)[源代码]¶
创建一个新的分布式组。
此函数要求主组中的所有进程(即分布式作业的所有进程)进入此函数,即使它们不是该组的成员。此外,所有进程应按相同顺序创建组。
警告
同时使用多个进程组与
NCCL
后端是不安全的,用户应在应用程序中执行显式同步,以确保一次只使用一个进程组。这意味着一个进程组的集合操作应在设备上完成执行(不仅仅是入队,因为CUDA执行是异步的),然后才能将另一个进程组的集合操作入队。有关更多详细信息,请参阅同时使用多个NCCL通信器。- Parameters
超时 (时间增量, 可选) – 详情和默认值请参见 init_process_group。
backend (str 或 Backend,可选) – 要使用的后端。根据构建时的配置,有效值为
gloo
和nccl
。 默认使用与全局组相同的pg_options (进程组选项, 可选) – 指定在构造特定进程组时需要传递的额外选项。例如,对于
nccl
后端,可以指定is_high_priority_stream
,以便进程组可以选择高优先级的cuda流。use_local_synchronization (bool, 可选) – 在进程组创建结束时执行组内本地屏障。这与非成员等级不需要调用API并且不加入屏障的情况不同。
- Returns
一个分布式组的句柄,可以传递给集体调用,如果等级不在
ranks
中则为None。
注意:use_local_synchronization 与 MPI 不兼容。
注意:虽然 use_local_synchronization=True 在大规模集群和小进程组中可以显著加快,但由于它改变了集群行为,非成员等级不加入组屏障(),因此需要谨慎使用。
注意:当每个rank创建多个重叠的进程组时,use_local_synchronization=True可能会导致死锁。为避免这种情况,请确保所有rank遵循相同的全球创建顺序。
- torch.distributed.get_group_rank(group, global_rank)[源代码]¶
将全球排名转换为组排名。
global_rank
必须是group
的一部分,否则会引发 RuntimeError。- Parameters
组 (进程组) – 要查找相对排名的进程组。
global_rank (int) – 要查询的全局排名。
- Returns
相对于
group
的global_rank
的组排名- Return type
注意:在默认进程组上调用此函数将返回标识
设备网格¶
DeviceMesh 是一个更高层次的抽象,用于管理进程组(或 NCCL 通信器)。
它允许用户轻松创建节点间和节点内的进程组,而无需担心如何为不同的子进程组正确设置排名,并且它有助于轻松管理这些分布式进程组。init_device_mesh()
函数可以用于创建新的 DeviceMesh,使用描述设备拓扑的网格形状。
- class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None)[源代码]¶
DeviceMesh 表示一个设备网格,其中设备的布局可以表示为一个 n 维数组,并且 n 维数组中的每个值是默认进程组排名的全局 ID。
DeviceMesh 可以用来描述集群中设备的布局,并作为集群内设备列表之间通信的代理。
DeviceMesh 可以用作上下文管理器。
注意
DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/等级上运行。因此,用户需要确保 mesh 数组(描述设备布局)在所有等级中应保持一致。不一致的 mesh 将导致静默挂起。
- Parameters
device_type (str) – 网格的设备类型。目前支持:“cpu”,“cuda/cuda-like”。
mesh (ndarray) – 一个多维数组或一个整数张量,描述设备的布局,其中ID是默认进程组的全球ID。
- Returns
一个表示设备布局的
DeviceMesh
对象。- Return type
以下程序以SPMD方式在每个进程/秩上运行。在这个例子中,我们有两个主机,每个主机有4个GPU。 对mesh的第一个维度进行归约将跨列进行归约(0, 4),.. 和(3, 7),对mesh的第二个维度进行归约将跨行进行归约(0, 1, 2, 3)和(4, 5, 6, 7)。
- Example::
>>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # 初始化设备网格为 (2, 4) 以表示拓扑结构 >>> # 跨主机(维度 0)和主机内(维度 1)。 >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
点对点通信¶
isend()
和 irecv()
在使用时返回分布式请求对象。通常,该对象的类型是未指定的,因为它们不应该手动创建,但它们保证支持两种方法:
is_completed()
- 如果操作已完成,则返回Truewait()
- 将阻塞进程直到操作完成。is_completed()
一旦返回,保证返回 True。
- torch.distributed.isend(tensor, dst, group=None, tag=0)[源代码]¶
异步发送一个张量。
警告
在请求完成之前修改
tensor
会导致未定义的行为。警告
tag
在NCCL后端中不受支持。
- torch.distributed.batch_isend_irecv(p2p_op_list)[源代码]¶
异步发送或接收一批张量并返回请求列表。
处理
p2p_op_list
中的每个操作并返回相应的请求。目前支持NCCL、Gloo和UCC后端。- Parameters
p2p_op_list – 一个点对点操作的列表(每个操作符的类型是
torch.distributed.P2POp
)。列表中isend/irecv的顺序 很重要,并且需要与远程端相应的isend/irecv匹配。- Returns
通过调用 op_list 中相应的 op 返回的分布式请求对象列表。
示例
>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1)%world_size) >>> recv_op = dist.P2POp(dist.irecv, recv_tensor, (rank - 1 + world_size)%world_size) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # 等级 0 tensor([0, 1]) # 等级 1
注意
请注意,当此API与NCCL PG后端一起使用时,用户必须使用torch.cuda.set_device设置当前GPU设备,否则会导致意外的挂起问题。
此外,如果这是在传递给
dist.P2POp
的group
中的第一个集体调用,则group
的所有等级都必须参与此 API 调用;否则,行为是未定义的。如果此 API 调用不是group
中的第一个集体调用,则允许仅涉及group
中一部分等级的批量点对点操作。
同步和异步集体操作¶
每个集体操作函数都支持以下两种操作,具体取决于传入的 async_op
标志的设置:
同步操作 - 默认模式,当 async_op
设置为 False
时。
当函数返回时,可以保证集体操作已经执行。对于CUDA操作,不能保证CUDA操作已经完成,因为CUDA操作是异步的。对于CPU集体操作,任何
进一步的函数调用利用集体调用的输出将按预期行为。对于CUDA集体操作,在同一CUDA流上利用输出的函数调用将按预期行为。用户必须注意
在不同流下运行的同步。有关CUDA语义的详细信息,例如流同步,请参阅 CUDA Semantics。
请参阅下面的脚本,查看CPU和CUDA操作在这些语义上的差异示例。
异步操作 - 当 async_op
设置为 True 时。集体操作函数返回一个分布式请求对象。通常,您不需要手动创建它,并且它保证支持两种方法:
is_completed()
- 在CPU集体操作的情况下,如果完成则返回True
。在CUDA操作的情况下,如果操作已成功排队到CUDA流并且可以在默认流上使用输出而无需进一步同步,则返回True
。wait()
- 在CPU集体操作的情况下,将阻塞进程直到操作完成。在CUDA集体操作的情况下,将阻塞直到操作已成功排队到CUDA流并且输出可以在默认流上使用而无需进一步同步。get_future()
- 返回torch._C.Future
对象。支持 NCCL,也支持大多数 GLOO 和 MPI 操作,除了点对点操作。 注意:随着我们继续采用 Futures 并合并 API,get_future()
调用可能会变得多余。
示例
以下代码可以作为在使用分布式集合时关于CUDA操作语义的参考。 它展示了在使用不同CUDA流上的集合输出时显式同步的必要性:
# 代码在每个rank上运行。
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# 等待确保操作已入队,但不一定完成。
handle.wait()
# 在非默认流上使用结果。
with torch.cuda.stream(s):
s.wait_stream(torch.cuda.default_stream())
output.add_(100)
if rank == 0:
# 如果省略了对wait_stream的显式调用,下面的输出将是
# 不确定的1或101,取决于allreduce是否在add完成之后覆盖了
# 该值。
print(output)
集体函数¶
- torch.distributed.broadcast(tensor, src, group=None, async_op=False)[源代码]¶
将张量广播到整个组。
tensor
在所有参与集体通信的进程中必须具有相同数量的元素。
- torch.distributed.broadcast_object_list(object_list, src=0, group=None, device=None)[源代码]¶
将可序列化的对象广播到整个组中。
类似于
broadcast()
,但可以传递 Python 对象。 请注意,object_list
中的所有对象都必须可序列化,以便进行广播。- Parameters
object_list (列表[任意]) – 要广播的输入对象列表。 每个对象必须是可拾取的。只有
src
等级上的对象会被广播,但每个等级必须提供大小相等的列表。src (int) – 要广播的源等级
object_list
。 源等级基于全局进程组(无论group
参数如何)组 – (ProcessGroup, 可选): 要操作的进程组。如果为 None,将使用默认的进程组。默认值为
None
。设备 (
torch.device
, 可选) – 如果非空,对象将被序列化并转换为张量,然后在广播之前移动到设备
。默认值为None
。
- Returns
None
。如果等级是组的一部分,object_list
将包含从src
等级广播的对象。
注意
对于基于NCCL的进程组,对象的内部张量表示必须在通信发生之前移动到GPU设备。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,并且用户有责任确保通过torch.cuda.set_device()
设置此设备,以便每个rank都有一个单独的GPU。注意
请注意,此API与
all_gather()
集合操作略有不同,因为它不提供async_op
句柄,因此将是一个阻塞调用。警告
broadcast_object_list()
隐式使用pickle
模块,该模块已知是不安全的。可以构造恶意的 pickle 数据,在解封时执行任意代码。仅在信任数据的情况下调用此函数。警告
使用GPU张量调用
broadcast_object_list()
不受支持且效率低下,因为它会导致GPU到CPU的传输,因为张量会被序列化。请考虑使用broadcast()
代替。- Example::
>>> # 注意:每个rank上的进程组初始化省略。 >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # 假设world_size为3。 >>> objects = ["foo", 12, {1: 2}] # 任何可pickle的对象 >>> else: >>> objects = [None, None, None] >>> # 假设后端不是NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代码]¶
在所有机器上减少张量数据,使得所有机器都能获得最终结果。
调用后,
tensor
在所有进程中将是逐位相同的。支持复数张量。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有异步操作或不属于该组,则为 None。
示例
>>> # 下面所有的张量都是torch.int64类型。 >>> # 我们有2个进程组,2个等级。 >>> device = torch.device(f'cuda:{rank}') >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # 等级0 tensor([3, 4], device='cuda:1') # 等级1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6], device='cuda:0') # 等级0 tensor([4, 6], device='cuda:1') # 等级1
>>> # 下面的所有张量都是torch.cfloat类型。 >>> # 我们有2个进程组,2个等级。 >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # 等级0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # 等级1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j], device='cuda:0') # 等级0 tensor([4.+4.j, 6.+6.j], device='cuda:1') # 等级1
- torch.distributed.reduce(tensor, dst, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代码]¶
减少所有机器上的张量数据。
只有排名为
dst
的进程将会接收最终结果。
- torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[源代码]¶
将整个组中的张量收集到一个列表中。
支持复数张量。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有异步操作或不属于该组,则为 None。
示例
>>> # 下面的所有张量都是torch.int64类型。 >>> # 我们有2个进程组,2个等级。 >>> device = torch.device(f'cuda:{rank}') >>> tensor_list = [torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)] >>> tensor_list [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # 等级0 [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:1')] # 等级1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # 等级0 tensor([3, 4], device='cuda:1') # 等级1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # 等级0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # 等级1
>>> # 下面的所有张量都是torch.cfloat类型。 >>> # 我们有2个进程组,2个等级。 >>> tensor_list = [torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2)] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # 等级0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # 等级1 >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # 等级0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # 等级1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # 等级0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # 等级1
- torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[源代码]¶
从所有等级收集张量并将它们放入单个输出张量中。
- Parameters
output_tensor (Tensor) – 用于容纳所有rank的tensor元素的输出tensor。它必须正确地设置大小,以具有以下形式之一: (i) 沿主维度连接所有输入tensor;关于“连接”的定义,请参见
torch.cat()
; (ii) 沿主维度堆叠所有输入tensor;关于“堆叠”的定义,请参见torch.stack()
。 下面的示例可能更好地解释了支持的输出形式。input_tensor (张量) – 要从当前等级收集的张量。 与
all_gather
API 不同,此 API 中的输入张量在所有等级中必须具有相同的大小。组 (进程组, 可选) – 要操作的进程组。如果为None,将使用默认的进程组。
async_op (布尔值, 可选) – 此操作是否应为异步操作
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有异步操作或不属于该组,则为 None。
示例
>>> # 以下所有张量均为torch.int64类型,并且位于CUDA设备上。 >>> # 我们有两个rank。 >>> device = torch.device(f'cuda:{rank}') >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> # 以拼接形式输出 >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out, tensor_in) >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 >>> # 以堆叠形式输出 >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1
警告
Gloo 后端不支持此 API。
- torch.distributed.all_gather_object(object_list, obj, group=None)[源代码]¶
从整个组中收集可序列化的对象到一个列表中。
类似于
all_gather()
,但可以传递 Python 对象。 请注意,对象必须可序列化才能被收集。- Parameters
object_list (列表[任意类型]) – 输出列表。它应该正确地调整为该集体的组大小,并将包含输出。
obj (任意) – 要从当前进程广播的可拾取的Python对象。
组 (进程组, 可选) – 要操作的进程组。如果为None,将使用默认的进程组。默认值为
None
。
- Returns
无。如果调用等级是该组的一部分,集体的输出将被填充到输入的
object_list
中。如果调用等级不是该组的一部分,传入的object_list
将保持不变。
注意
请注意,此API与
all_gather()
集合操作略有不同,因为它不提供async_op
句柄,因此将是一个阻塞调用。注意
对于基于NCCL的处理组,对象的内部张量表示必须在通信发生之前移动到GPU设备上。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,并且用户有责任确保通过torch.cuda.set_device()
设置,使得每个rank都有一个单独的GPU。警告
all_gather_object()
隐式使用pickle
模块,该模块已知是不安全的。可以构造恶意的 pickle 数据,在解封时执行任意代码。仅在信任数据时调用此函数。警告
使用 GPU 张量调用
all_gather_object()
不受支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量会被序列化。请考虑使用all_gather()
代替。- Example::
>>> # 注意:每个rank上的进程组初始化省略。 >>> import torch.distributed as dist >>> # 假设world_size为3。 >>> gather_objects = ["foo", 12, {1: 2}] # 任何可pickle的对象 >>> output = [None for _ in gather_objects] >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) >>> output ['foo', 12, {1: 2}]
- torch.distributed.gather(tensor, gather_list=None, dst=0, group=None, async_op=False)[源代码]¶
在一个进程中收集张量列表。
- torch.distributed.gather_object(obj, object_gather_list=None, dst=0, group=None)[源代码]¶
从整个组中的所有进程收集可序列化的对象。
类似于
gather()
,但可以传递 Python 对象。请注意,对象必须可序列化以便进行收集。- Parameters
- Returns
无。在
dst
等级上,object_gather_list
将包含集体的输出。
注意
请注意,此API与gather集体操作略有不同,因为它不提供async_op句柄,因此将是一个阻塞调用。
注意
对于基于NCCL的处理组,对象的内部张量表示必须在通信发生之前移动到GPU设备上。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,并且用户有责任确保通过torch.cuda.set_device()
设置,使得每个rank都有一个单独的GPU。警告
gather_object()
隐式使用pickle
模块,该模块已知是不安全的。可以构造恶意的 pickle 数据,在解封时执行任意代码。请仅在信任数据的情况下调用此函数。警告
使用 GPU 张量调用
gather_object()
不受支持且效率低下,因为它会导致 GPU -> CPU 传输,因为张量会被序列化。请考虑使用gather()
代替。- Example::
>>> # 注意:每个rank上的进程组初始化省略。 >>> import torch.distributed as dist >>> # 假设world_size为3。 >>> gather_objects = ["foo", 12, {1: 2}] # 任何可pickle的对象 >>> output = [None for _ in gather_objects] >>> dist.gather_object( ... gather_objects[dist.get_rank()], ... output if dist.get_rank() == 0 else None, ... dst=0 ... ) >>> # 在rank 0上 >>> output ['foo', 12, {1: 2}]
- torch.distributed.scatter(tensor, scatter_list=None, src=0, group=None, async_op=False)[源代码]¶
将一组张量分散到组中的所有进程。
每个进程将接收一个张量,并将其数据存储在
tensor
参数中。支持复数张量。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有异步操作或不属于该组,则为 None。
注意
请注意,scatter_list 中的所有张量必须具有相同的大小。
- Example::
>>> # 注意:每个rank上的进程组初始化省略。 >>> import torch.distributed as dist >>> tensor_size = 2 >>> t_ones = torch.ones(tensor_size) >>> t_fives = torch.ones(tensor_size) * 5 >>> output_tensor = torch.zeros(tensor_size) >>> if dist.get_rank() == 0: >>> # 假设world_size为2。 >>> # 仅支持张量,所有张量必须大小相同。 >>> scatter_list = [t_ones, t_fives] >>> else: >>> scatter_list = None >>> dist.scatter(output_tensor, scatter_list, src=0) >>> # Rank i 获取 scatter_list[i]。例如,在rank 1上: >>> output_tensor tensor([5., 5.])
- torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list, src=0, group=None)[源代码]¶
将可序列化的对象分散到整个组中。
类似于
scatter()
,但可以传入 Python 对象。在每个 rank 上,分散的对象将存储为scatter_object_output_list
的第一个元素。请注意,scatter_object_input_list
中的所有对象必须可序列化以便进行分散。- Parameters
scatter_object_output_list (列表[任意类型]) – 非空列表,其第一个元素将存储分散到此等级的对象。
scatter_object_input_list (列表[任意]) – 要分散的输入对象列表。 每个对象必须是可拾取的。只有位于
src
等级的对象会被分散,而非src等级的参数可以是None
。src (int) – 从哪个源等级进行分散
scatter_object_input_list
。 源等级基于全局进程组(无论group
参数如何)。组 – (ProcessGroup, 可选): 要操作的进程组。如果为 None,将使用默认的进程组。默认值为
None
。
- Returns
None
。如果等级是组的一部分,scatter_object_output_list
将把它的第一个元素设置为这个等级的分散对象。
注意
请注意,此API与scatter collective略有不同,因为它不提供
async_op
句柄,因此将是一个阻塞调用。警告
scatter_object_list()
隐式使用pickle
模块,该模块已知是不安全的。可以构造恶意的 pickle 数据,在解封时执行任意代码。仅在信任数据的情况下调用此函数。警告
使用GPU张量调用
scatter_object_list()
不受支持且效率低下,因为它会导致GPU到CPU的传输,因为张量会被序列化。请考虑使用scatter()
代替。- Example::
>>> # 注意:每个rank上的进程组初始化省略。 >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # 假设world_size为3。 >>> objects = ["foo", 12, {1: 2}] # 任何可pickle的对象 >>> else: >>> # 非src rank上的列表可以是任何内容,元素不会被使用。 >>> objects = [None, None, None] >>> output_list = [None] >>> dist.scatter_object_list(output_list, objects, src=0) >>> # Rank i 获取 objects[i]。例如,在rank 2上: >>> output_list [{1: 2}]
- torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代码]¶
减少,然后将张量列表分散到组中的所有进程。
- torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[源代码]¶
减少,然后将张量分散到组中的所有等级。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有异步操作或不属于该组,则为 None。
示例
>>> # 下面的所有张量都是torch.int64类型,并且位于CUDA设备上。 >>> # 我们有两个rank。 >>> device = torch.device(f'cuda:{rank}') >>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device) >>> # 输入为拼接形式 >>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device) >>> tensor_in tensor([0, 1, 2, 3], device='cuda:0') # Rank 0 tensor([0, 1, 2, 3], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # 输入为堆叠形式 >>> tensor_in = torch.reshape(tensor_in, (world_size, 2)) >>> tensor_in tensor([[0, 1], [2, 3]], device='cuda:0') # Rank 0 tensor([[0, 1], [2, 3]], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
警告
Gloo 后端不支持此 API。
- torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[源代码]¶
拆分输入张量,然后将拆分后的列表分散到组中的所有进程。
随后,接收到的张量将从组中的所有进程中连接起来,并作为一个单一的输出张量返回。
支持复数张量。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有异步操作或不属于该组,则为 None。
警告
all_to_all_single 是实验性的,可能会发生变化。
示例
>>> input = torch.arange(4) + rank * 4 >>> input tensor([0, 1, 2, 3]) # 等级 0 tensor([4, 5, 6, 7]) # 等级 1 tensor([8, 9, 10, 11]) # 等级 2 tensor([12, 13, 14, 15]) # 等级 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([0, 4, 8, 12]) # 等级 0 tensor([1, 5, 9, 13]) # 等级 1 tensor([2, 6, 10, 14]) # 等级 2 tensor([3, 7, 11, 15]) # 等级 3
>>> # 本质上,它类似于以下操作: >>> scatter_list = list(input.chunk(world_size)) >>> gather_list = list(output.chunk(world_size)) >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # 另一个不均匀分割的例子 >>> input tensor([0, 1, 2, 3, 4, 5]) # 秩 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # 秩 1 tensor([20, 21, 22, 23, 24]) # 秩 2 tensor([30, 31, 32, 33, 34, 35, 36]) # 秩 3 >>> input_splits [2, 2, 1, 1] # 秩 0 [3, 2, 2, 2] # 秩 1 [2, 1, 1, 1] # 秩 2 [2, 2, 2, 1] # 秩 3 >>> output_splits [2, 3, 2, 2] # 秩 0 [2, 2, 1, 2] # 秩 1 [1, 2, 1, 2] # 秩 2 [1, 2, 1, 1] # 秩 3 >>> output = ... >>> dist.all_to_all_single(output, input, output_splits, input_splits) >>> output tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # 秩 0 tensor([ 2, 3, 13, 14, 22, 32, 33]) # 秩 1 tensor([ 4, 15, 16, 23, 34, 35]) # 秩 2 tensor([ 5, 17, 18, 24, 36]) # 秩 3
>>> # 另一个使用torch.cfloat类型张量的示例。 >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input tensor([1+1j, 2+2j, 3+3j, 4+4j]) # 等级 0 tensor([5+5j, 6+6j, 7+7j, 8+8j]) # 等级 1 tensor([9+9j, 10+10j, 11+11j, 12+12j]) # 等级 2 tensor([13+13j, 14+14j, 15+15j, 16+16j]) # 等级 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([1+1j, 5+5j, 9+9j, 13+13j]) # 等级 0 tensor([2+2j, 6+6j, 10+10j, 14+14j]) # 等级 1 tensor([3+3j, 7+7j, 11+11j, 15+15j]) # 等级 2 tensor([4+4j, 8+8j, 12+12j, 16+16j]) # 等级 3
- torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[源代码]¶
将输入张量的列表分散到组中的所有进程,并在输出列表中返回收集的张量列表。
支持复数张量。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有异步操作或不属于该组,则为 None。
警告
all_to_all 是实验性的,可能会发生变化。
示例
>>> input = torch.arange(4) + rank * 4 >>> input = list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # 等级 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # 等级 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # 等级 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # 等级 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # 等级 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # 等级 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # 等级 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # 等级 3
>>> # 本质上,它类似于以下操作: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input tensor([0, 1, 2, 3, 4, 5]) # 秩 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # 秩 1 tensor([20, 21, 22, 23, 24]) # 秩 2 tensor([30, 31, 32, 33, 34, 35, 36]) # 秩 3 >>> input_splits [2, 2, 1, 1] # 秩 0 [3, 2, 2, 2] # 秩 1 [2, 1, 1, 1] # 秩 2 [2, 2, 2, 1] # 秩 3 >>> output_splits [2, 3, 2, 2] # 秩 0 [2, 2, 1, 2] # 秩 1 [1, 2, 1, 2] # 秩 2 [1, 2, 1, 1] # 秩 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # 秩 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # 秩 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # 秩 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # 秩 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # 秩 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # 秩 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # 秩 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # 秩 3
>>> # 另一个使用torch.cfloat类型张量的示例。 >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # 等级 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # 等级 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # 等级 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # 等级 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # 等级 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # 等级 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # 等级 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # 等级 3
- torch.distributed.barrier(group=None, async_op=False, device_ids=None)[源代码]¶
同步所有进程。
这个集体会阻塞进程,直到整个组进入此函数,如果 async_op 为 False,或者如果在 wait() 上调用了异步工作句柄。
- Parameters
- Returns
异步工作句柄,如果 async_op 设置为 True。 如果没有异步操作或不属于该组,则为 None。
注意
ProcessGroupNCCL 现在依赖于流同步而不是设备同步来阻塞CPU。因此,请不要假设barrier()会执行设备同步。
- torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[源代码]¶
同步类似于
torch.distributed.barrier
的进程,但考虑可配置的超时时间。它能够在提供的超时时间内报告未通过此屏障的排名。 具体来说,对于非零排名,将阻塞直到从排名0接收到发送/接收处理。 排名0将阻塞直到处理完来自其他排名的所有发送/接收,并将报告未及时响应的排名失败。 请注意,如果一个排名未达到monitored_barrier(例如由于挂起),所有其他排名将在monitored_barrier中失败。
这个集合将阻塞组中的所有进程/等级,直到整个组成功退出函数,这对于调试和同步非常有用。然而,它可能会对性能产生影响,因此应仅用于调试或需要主机端完全同步点的场景。为了调试目的,可以在应用程序的集合调用之前插入此屏障,以检查是否有任何等级不同步。
注意
请注意,此集合操作仅在使用GLOO后端时支持。
- Parameters
组 (进程组, 可选) – 要操作的进程组。如果
None
,将使用默认的进程组。超时 (datetime.timedelta, 可选) – 监控屏障的超时时间。 如果
None
,将使用默认的进程组超时时间。wait_all_ranks (bool, 可选) – 是否收集所有失败的排名。默认情况下,这是
False
,并且排名0上的monitored_barrier
会在遇到第一个失败的排名时抛出异常,以便快速失败。通过设置wait_all_ranks=True
,monitored_barrier
将收集所有失败的排名,并抛出一个包含所有失败排名信息的错误。
- Returns
无
.
- Example::
>>> # 注意:每个rank上的进程组初始化省略。 >>> import torch.distributed as dist >>> if dist.get_rank() != 1: >>> dist.monitored_barrier() # 引发异常,指示rank 1没有调用monitored_barrier。 >>> # 使用wait_all_ranks=True的示例 >>> if dist.get_rank() == 0: >>> dist.monitored_barrier(wait_all_ranks=True) # 引发异常 >>> # 指示rank 1, 2, ... world_size - 1没有调用monitored_barrier。
- class torch.distributed.Work¶
一个工作对象表示在PyTorch分布式包中对一个挂起的异步操作的句柄。它由非阻塞的集体操作返回,例如dist.all_reduce(tensor, async_op=True)。
- class torch.distributed.ReduceOp¶
一个类似于枚举的类,用于可用的归约操作:
SUM
,PRODUCT
,MIN
,MAX
,BAND
,BOR
,BXOR
, 和PREMUL_SUM
。BAND
,BOR
, 和BXOR
归约在使用NCCL
后端时不可用。AVG
在跨rank求和之前,将值除以世界大小。AVG
仅在使用NCCL
后端时可用, 并且仅适用于NCCL版本2.10或更高版本。PREMUL_SUM
在本地将输入乘以给定的标量后再进行归约。PREMUL_SUM
仅在使用NCCL
后端时可用, 并且仅适用于 NCCL 版本 2.11 或更高版本。用户应使用torch.distributed._make_nccl_premul_sum
。此外,
MAX
、MIN
和PRODUCT
不支持复数张量。这个类的值可以通过属性访问,例如,
ReduceOp.SUM
。 它们用于指定归约集体的策略,例如,reduce()
。此类不支持
__members__
属性。
分析集体通信¶
请注意,您可以使用 torch.profiler
(推荐,仅在1.8.1之后可用)或 torch.autograd.profiler
来分析此处提到的集体通信和点对点通信API。所有开箱即用的后端(gloo
、nccl
、mpi
)都受支持,并且集体通信的使用将在分析输出/跟踪中按预期呈现。分析您的代码与任何常规的torch操作符相同:
import torch
import torch.distributed as dist
with torch.profiler():
tensor = torch.randn(20, 10)
dist.all_reduce(tensor)
请参阅分析器文档以获取分析器功能的完整概述。
多GPU集体函数¶
警告
多GPU函数(每个CPU线程对应多个GPU)已被弃用。从今天起,PyTorch Distributed的首选编程模型是每个线程对应一个设备,如本文档中的API示例所示。如果您是后端开发者,并希望支持每个线程对应多个设备,请联系PyTorch Distributed的维护者。
第三方后端¶
除了内置的GLOO/MPI/NCCL后端外,PyTorch分布式支持通过运行时注册机制支持第三方后端。
有关如何通过C++扩展开发第三方后端的参考信息,请参阅教程 - 自定义C++和CUDA扩展和
test/cpp_extensions/cpp_c10d_extension.cpp
。第三方后端的功能由其自身的实现决定。
新的后端派生自 c10d::ProcessGroup
并通过 torch.distributed.Backend.register_backend()
在导入时注册后端名称和实例化接口。
当手动导入此后端并使用相应的后端名称调用 torch.distributed.init_process_group()
时,torch.distributed
包将在新后端上运行。
警告
第三方后端的支持是实验性的,可能会发生变化。
启动工具¶
The torch.distributed 包还提供了一个启动工具在 torch.distributed.launch。这个辅助工具可以用于为分布式训练每个节点启动多个进程。
模块 torch.distributed.launch
。
torch.distributed.launch
是一个在每个训练节点上生成多个分布式训练进程的模块。
警告
此模块将被弃用,取而代之的是 torchrun。
该工具可用于单节点分布式训练,其中每个节点将启动一个或多个进程。该工具可用于CPU训练或GPU训练。如果该工具用于GPU训练,每个分布式进程将在单个GPU上运行。这可以显著提高单节点训练性能。它还可以用于多节点分布式训练,通过在每个节点上启动多个进程来显著提高多节点分布式训练性能。这对于具有多个支持直接GPU的Infiniband接口的系统尤其有益,因为所有接口都可以用于聚合通信带宽。
在单节点分布式训练或多节点分布式训练的两种情况下,此工具将启动每个节点的给定数量的进程(--nproc-per-node
)。如果用于GPU训练,此数量需要小于或等于当前系统上的GPU数量(nproc_per_node
),并且每个进程将从GPU 0到GPU (nproc_per_node - 1)操作单个GPU。
如何使用此模块:
单节点多进程分布式训练
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 和 所有 其他
参数 of your training script)
多节点多进程分布式训练:(例如,两个节点)
节点 1: (IP: 192.168.1.1, 并且有一个空闲端口: 1234)
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and 所有 其他 参数 of your 训练 脚本)
节点 2:
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
和 所有 其他 参数 of your 训练 脚本)
要查看此模块提供的可选参数:
python -m torch.distributed.launch --help
重要通知:
1. 此工具和多进程分布式(单节点或多节点)GPU训练目前仅在使用NCCL分布式后端时能达到最佳性能。因此,NCCL后端是用于GPU训练的推荐后端。
2. 在您的训练程序中,您必须解析命令行参数:
--local-rank=LOCAL_PROCESS_RANK
,该参数将由本模块提供。
如果您的训练程序使用GPU,您应确保代码仅在LOCAL_PROCESS_RANK的GPU设备上运行。这可以通过以下方式实现:
解析 local_rank 参数
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", type=int)
>>> args = parser.parse_args()
将您的设备设置为本地排名,使用以下任一方法
>>> torch.cuda.set_device(args.local_rank) # 在你的代码运行之前
或
>>> with torch.cuda.device(args.local_rank):
>>> # 你的代码运行
>>> ...
3. 在你的训练程序中,你需要在开始时调用以下函数来启动分布式后端。强烈建议使用init_method=env://
。其他初始化方法(例如tcp://
)可能有效,但env://
是该模块官方支持的方法。
>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>> init_method='env://')
4. 在您的训练程序中,您可以使用常规的分布式函数,或者使用torch.nn.parallel.DistributedDataParallel()
模块。如果您的训练程序使用GPU进行训练,并且您希望使用torch.nn.parallel.DistributedDataParallel()
模块,以下是如何配置它的方法。
>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>> device_ids=[args.local_rank],
>>> output_device=args.local_rank)
请确保 device_ids
参数设置为代码将操作的唯一GPU设备ID。这通常是进程的本地等级。换句话说,device_ids
需要是 [args.local_rank]
,并且 output_device
需要是 args.local_rank
以便使用此工具。
5. 另一种通过环境变量 LOCAL_RANK
将 local_rank
传递给子进程的方法。当你使用 --use-env=True
启动脚本时,此行为将被启用。你必须调整上述子进程示例,将 args.local_rank
替换为 os.environ['LOCAL_RANK']
;当你指定此标志时,启动器不会传递 --local-rank
。
警告
local_rank
不是全局唯一的:它仅在机器上的每个进程中是唯一的。因此,不要使用它来决定是否应该,例如,写入网络文件系统。有关如果不正确执行此操作可能会出现问题的示例,请参见 https://github.com/pytorch/pytorch/issues/12042。
生成实用程序¶
The 多进程包 - torch.multiprocessing 包还提供了一个 spawn
函数在 torch.multiprocessing.spawn()
。这个辅助函数
可以用来生成多个进程。它通过传入你想要运行的函数并生成N个进程来运行它。这也可以用于多进程分布式训练。
有关如何使用的参考信息,请参阅 PyTorch 示例 - ImageNet 实现
请注意,此函数需要 Python 3.4 或更高版本。
调试 torch.distributed
应用程序¶
调试分布式应用程序可能会因为难以理解的挂起、崩溃或跨进程的不一致行为而变得具有挑战性。torch.distributed
提供了一套工具,以自助方式帮助调试训练应用程序:
Python 断点¶
在分布式环境中使用Python的调试器非常方便,但由于它不能开箱即用,许多人根本不使用它。 PyTorch提供了一个围绕pdb的自定义包装器,简化了这一过程。
torch.distributed.breakpoint 使这个过程变得简单。在内部,它通过两种方式自定义 pdb 的断点行为,但其他方面与普通的 pdb 行为相同。 1. 仅在指定的一个rank(由用户指定)上附加调试器。 2. 通过使用 torch.distributed.barrier() 确保所有其他rank停止,一旦被调试的rank发出 continue 命令,屏障就会释放。 3. 将子进程的标准输入重新路由,使其连接到您的终端。
要使用它,只需在所有rank上发出torch.distributed.breakpoint(rank),在每种情况下使用相同的rank值。
监控屏障¶
自v1.10起,torch.distributed.monitored_barrier()
作为 torch.distributed.barrier()
的替代方案存在,当崩溃时,它会提供有关哪个等级可能出现故障的有用信息,即并非所有等级都在提供的超时时间内调用 torch.distributed.monitored_barrier()
。torch.distributed.monitored_barrier()
使用 send
/recv
通信原语在主机端实现了一个类似于确认的屏障,允许等级0报告哪些等级未能及时确认屏障。例如,考虑以下函数,其中等级1未能调用 torch.distributed.monitored_barrier()
(在实践中,这可能是由于应用程序错误或在前一个集体中的挂起):
import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
# 监控屏障需要gloo进程组来执行主机端同步。
group_gloo = dist.new_group(backend="gloo")
if rank not in [1]:
dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
mp.spawn(worker, nprocs=2, args=())
以下错误信息在rank 0上生成,允许用户确定哪些rank可能存在故障并进一步调查:
RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594
TORCH_DISTRIBUTED_DEBUG
¶
使用 TORCH_CPP_LOG_LEVEL=INFO
,环境变量 TORCH_DISTRIBUTED_DEBUG
可以用于触发额外的有用日志记录和集体同步检查,以确保所有等级都适当同步。TORCH_DISTRIBUTED_DEBUG
可以设置为 OFF
(默认)、INFO
或 DETAIL
,具体取决于所需的调试级别。请注意,最详细的选项 DETAIL
可能会影响应用程序性能,因此应仅在调试问题时使用。
设置 TORCH_DISTRIBUTED_DEBUG=INFO
将在使用 torch.nn.parallel.DistributedDataParallel()
训练的模型初始化时产生额外的调试日志,而
TORCH_DISTRIBUTED_DEBUG=DETAIL
将额外记录运行时性能统计数据,并在选定的迭代次数中进行日志记录。这些运行时统计数据
包括前向时间、后向时间、梯度通信时间等。例如,给定以下应用程序:
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
class TwoLinLayerNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Linear(10, 10, bias=False)
self.b = torch.nn.Linear(10, 1, bias=False)
def forward(self, x):
a = self.a(x)
b = self.b(x)
return (a, b)
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
print("初始化模型")
model = TwoLinLayerNet().cuda()
print("初始化DDP")
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
inp = torch.randn(10, 10).cuda()
print("训练")
for _ in range(20):
output = ddp_model(inp)
loss = output[0] + output[1]
loss.sum().backward()
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ[
"TORCH_DISTRIBUTED_DEBUG"
] = "DETAIL" # 设置为DETAIL以记录运行时日志。
mp.spawn(worker, nprocs=2, args=())
以下日志在初始化时渲染:
I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO
以下日志在运行时渲染(当设置TORCH_DISTRIBUTED_DEBUG=DETAIL
时):
I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] 训练 TwoLinLayerNet unused_parameter_size=0
Avg 前向计算时间: 40838608
Avg 反向计算时间: 5983335
Avg 反向通信时间: 4326421
Avg 反向通信/计算重叠时间: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] 训练 TwoLinLayerNet unused_parameter_size=0
Avg 前向计算时间: 42850427
Avg 反向计算时间: 3885553
Avg 反向通信时间: 2357981
Avg 反向通信/计算重叠时间: 2234674
此外,TORCH_DISTRIBUTED_DEBUG=INFO
增强了由于模型中未使用参数而在 torch.nn.parallel.DistributedDataParallel()
中的崩溃日志记录。目前,如果前向传递中可能存在未使用的参数,则必须将 find_unused_parameters=True
传递到 torch.nn.parallel.DistributedDataParallel()
初始化中,并且从 v1.10 开始,所有模型输出都需要在损失计算中使用,因为 torch.nn.parallel.DistributedDataParallel()
不支持反向传递中的未使用参数。这些约束对于较大的模型尤其具有挑战性,因此当崩溃并出现错误时,torch.nn.parallel.DistributedDataParallel()
将记录所有未使用的参数的完全限定名称。例如,在上面的应用程序中,如果我们修改 loss
以改为计算为 loss = output[1]
,那么 TwoLinLayerNet.a
在反向传递中不会接收到梯度,因此导致 DDP
失败。在崩溃时,用户会收到有关未使用参数的信息,这对于大型模型来说可能难以手动查找:
运行时错误:期望在开始新的迭代之前完成先前的归约。此错误表明您的模块中存在未用于生成损失的参数。您可以通过将关键字参数 `find_unused_parameters=True` 传递给 `torch.nn.parallel.DistributedDataParallel` 来启用未使用参数的检测,并确保所有 `forward` 函数输出都参与计算损失。
如果已经完成了上述操作,那么分布式数据并行模块无法在模块的 `forward` 函数的返回值中定位输出张量。请在报告此问题时包含损失函数以及模块的 `forward` 函数的返回值的结构(例如列表、字典、可迭代对象)。
在 rank 0 上未收到梯度的参数:a.weight
在 rank 0 上未收到梯度的参数索引:0
设置 TORCH_DISTRIBUTED_DEBUG=DETAIL
将触发在用户直接或间接(例如 DDP allreduce
)发出的每个集体调用上进行额外的统一性和同步检查。这是通过创建一个包装进程组来实现的,该进程组包装了由 torch.distributed.init_process_group()
和 torch.distributed.new_group()
API 返回的所有进程组。因此,这些 API 将返回一个包装进程组,该进程组可以像常规进程组一样使用,但在将集体分派到基础进程组之前执行一致性检查。目前,这些检查包括一个 torch.distributed.monitored_barrier()
,它确保所有等级完成其未完成的集体调用并报告卡住的等级。接下来,通过确保所有集体函数匹配并且使用一致的张量形状调用,对集体本身进行一致性检查。如果不是这种情况,应用程序崩溃时会包含详细的错误报告,而不是挂起或无信息的错误消息。例如,考虑以下函数,该函数在 torch.distributed.all_reduce()
中输入形状不匹配:
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
tensor = torch.randn(10 if rank == 0 else 20).cuda()
dist.all_reduce(tensor)
torch.cuda.synchronize(device=rank)
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
mp.spawn(worker, nprocs=2, args=())
使用 NCCL
后端时,此类应用程序可能会导致挂起,这在非平凡场景中可能难以找到根本原因。如果用户启用
TORCH_DISTRIBUTED_DEBUG=DETAIL
并重新运行应用程序,以下错误消息揭示了根本原因:
work = default_pg.allreduce([tensor], opts)
RuntimeError: 在验证集体ALLREDUCE的形状张量时出错,位于秩0。这可能表明输入到集体的形状在各个秩之间不匹配。得到的形状:10
[ torch.LongTensor{1} ]
注意
为了在运行时对调试级别进行细粒度控制,还可以使用函数 torch.distributed.set_debug_level()
、torch.distributed.set_debug_level_from_env()
和
torch.distributed.get_debug_level()
。
此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可以与 TORCH_SHOW_CPP_STACKTRACES=1 一起使用,以在检测到集体不同步时记录整个调用堆栈。这些集体不同步检查将适用于所有使用 c10d
集体调用的应用程序,这些调用由通过 torch.distributed.init_process_group()
和 torch.distributed.new_group()
API 创建的进程组支持。
日志记录¶
除了通过torch.distributed.monitored_barrier()
和TORCH_DISTRIBUTED_DEBUG
提供的显式调试支持外,torch.distributed
的底层C++库还在不同级别输出日志消息。这些消息有助于理解分布式训练作业的执行状态,并解决诸如网络连接失败等问题。下表展示了如何通过TORCH_CPP_LOG_LEVEL
和TORCH_DISTRIBUTED_DEBUG
环境变量的组合来调整日志级别。
|
|
有效日志级别 |
---|---|---|
|
忽略 |
错误 |
|
忽略 |
警告 |
|
忽略 |
信息 |
|
|
调试 |
|
|
跟踪(又名全部) |
分布式组件引发自定义异常类型,这些类型派生自RuntimeError:
torch.distributed.DistError: 这是所有分布式异常的基类型。
torch.distributed.DistBackendError: 当发生特定于后端的错误时,会抛出此异常。例如,如果使用NCCL后端,而用户尝试使用NCCL库不可用的GPU。
torch.distributed.DistNetworkError: 当网络库遇到错误时抛出此异常(例如:连接被对端重置)
torch.distributed.DistStoreError: 当存储遇到错误时抛出此异常(例如:TCPStore 超时)
- class torch.distributed.DistError¶
当分布式库中发生错误时引发的异常
- class torch.distributed.DistBackendError¶
当分布式系统中发生后端错误时引发的异常
- class torch.distributed.DistNetworkError¶
当分布式系统中发生网络错误时引发的异常
- class torch.distributed.DistStoreError¶
分布式存储中发生错误时引发的异常
如果你正在进行单节点训练,交互式地设置断点可能会很方便。我们提供了一种方便的方法来为单个rank设置断点: