Shortcuts

会合

在 Torch Distributed Elastic 的上下文中,我们使用术语 rendezvous 来指代一种将 分布式同步 原语与 对等发现 结合起来的特定功能。

它被 Torch Distributed Elastic 用于收集训练作业的参与者(即节点),以便它们都能就相同的参与者列表和每个人的角色达成一致,并做出一致的集体决策,以确定训练何时可以开始/恢复。

Torch Distributed Elastic rendezvous 提供了以下关键功能:

屏障

执行集合的节点将全部阻塞,直到集合被视为完成——当至少有min个节点加入集合屏障(对于同一个任务)时,集合即被视为完成。这也意味着屏障的大小不一定固定。

在达到 min 节点数量后,还有一个额外的小等待时间 - 这是为了确保不会“太快”完成会合(这可能会排除在同一时间尝试加入的其他节点)。

如果max数量的节点在屏障处聚集,则立即完成会合。

还有一个总体的超时机制,如果在规定时间内未达到最小节点数,则会导致会合失败。这是为了在资源管理器出现问题时,帮助释放部分分配的作业资源,应被视为不可重试的简单安全措施。

排他性

一个简单的分布式屏障是不够的,因为我们也需要确保在任何给定的时间(对于一个给定的作业)只有一个节点组存在。换句话说,新节点(即迟加入的节点)不应该能够形成一个并行的独立工作组来执行相同的作业。

Torch Distributed Elastic rendezvous 确保如果一组节点已经完成了一次rendezvous(因此可能已经在训练中),那么尝试进行rendezvous的额外“迟到”节点只会宣布自己处于等待状态,并且必须等到现有的(之前完成的)rendezvous被销毁后才能继续。

一致性

当一次集合完成时,所有成员将就任务成员资格和每个人在其中扮演的角色达成一致。这个角色用一个称为rank的整数表示,该整数介于0和world size之间。

请注意,排名是不稳定的,因为在下一次(重新)会合时,同一个节点可能会被分配不同的排名。

容错性:

Torch Distributed Elastic rendezvous 旨在容忍在 rendezvous 过程中出现的节点故障。如果在加入 rendezvous 和完成 rendezvous 之间,某个进程崩溃(或失去网络连接等),那么将自动与剩余的健康节点重新进行 rendezvous。

一个节点也可以在完成(或被其他节点观察到已完成)会合之后失败——这种情况将由Torch Distributed Elastic的train_loop处理(其中也会触发重新会合)。

共享键值存储

当会合完成后,会创建并返回一个共享的键值存储。该存储实现了 torch.distributed.Store API(参见 分布式通信文档)。

此存储仅由已完成会合的成员共享。它旨在由 Torch Distributed Elastic 用于交换初始化作业控制和数据平面所需的信息。

等待工作线程和集合点关闭

Torch Distributed Elastic 集合处理对象提供了额外的功能,这些功能在技术上并不属于集合过程的一部分:

  1. 查询有多少工人在屏障处迟到,谁可以参加下一次会合。

  2. 将集合点设置为关闭,以通知所有节点不要参与下一次集合点。

动态会合处理器:

Torch Distributed Elastic 带有 DynamicRendezvousHandler 类,该类实现了上述的 rendezvous 机制。它是一个与后端无关的类型,期望在构造期间指定一个特定的 RendezvousBackend 实例。

Torch distributed 用户可以实现自己的后端类型,或者使用 PyTorch 自带的以下实现之一:

下面是一个描述如何进行会合的状态图。

../_images/etcd_rdzv_diagram.png

注册表

class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[源代码]

保留参数以构造一个RendezvousHandler

Parameters
  • backend (str) – 用于处理rendezvous的后端名称。

  • 端点 (str) – 集合点端点,通常为 <主机名>[:<端口>] 形式。

  • run_id (str) – 集合的ID。

  • min_nodes (int) – 允许加入仲裁的最小节点数。

  • max_nodes (int) – 允许加入集合的最大节点数。

  • local_addr (可选[字符串]) – 本地节点的地址。

  • **kwargs – 指定后端的附加参数。

get(key, default=None)[源代码]

如果key存在,则返回key的值,否则返回default

Return type

任意

get_as_bool(key, default=None)[源代码]

返回 key 的值作为 bool

Return type

可选[布尔值]

get_as_int(key, default=None)[源代码]

返回 key 的值作为 int

Return type

可选[整数]

class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[源代码]

表示一个RendezvousHandler后端的注册表。

create_handler(params)[源代码]

创建一个新的 RendezvousHandler

Return type

RendezvousHandler

register(backend, creator)[源代码]

注册一个新的会合后端。

Parameters

处理器

class torch.distributed.elastic.rendezvous.RendezvousHandler[源代码]

主要集合接口。

注意

分布式 Torch 用户通常不需要实现自己的 RendezvousHandler。基于 C10d Store 的实现已经提供,并且推荐给大多数用户使用。

abstract get_backend()[源代码]

返回集合后端的名称。

Return type

str

abstract get_run_id()[源代码]

返回rendezvous的运行ID。

运行ID是一个用户定义的ID,用于唯一标识分布式应用程序的一个实例。它通常映射到一个作业ID,并用于允许节点加入正确的分布式应用程序。

Return type

str

abstract is_closed()[源代码]

检查是否已关闭会合。

关闭的约会意味着在同一任务中所有未来的重新约会尝试都将失败。

is_closed()set_closed() 具有最终传播的语义,不应用于同步。其意图是,如果至少有一个节点决定作业已完成,它将关闭rendezvous,其他节点将很快观察到这一点并停止运行。

Return type

bool

abstract next_rendezvous()[源代码]

集合屏障的主要入口点。

阻塞直到会合完成并且当前进程被包含在形成的worker组中,或者发生超时,或者会合被标记为关闭。

Returns

一个包含 torch.distributed.Storerankworld size 的元组。

Raises
Return type

元组[存储, 整数, 整数]

abstract num_nodes_waiting()[源代码]

返回在集合屏障处迟到的节点数量,因此未被包含在当前工作组中。

调用者应定期调用此方法以检查是否有新节点等待加入任务,如果有,则通过调用next_rendezvous()(重新集合)来接纳它们。

Return type

int

abstract set_closed()[源代码]

将约会标记为已关闭。

abstract shutdown()[源代码]

关闭为会合打开的所有资源。

示例:

rdzv_handler = ...
try:
    store, rank, world_size = rdzv_handler.next_rendezvous()
finally:
    rdzv_handler.shutdown()
Return type

bool

异常

class torch.distributed.elastic.rendezvous.RendezvousError[源代码]

表示用于表示集合点错误的基类型。

class torch.distributed.elastic.rendezvous.RendezvousClosedError[源代码]

当一个会合被关闭时引发。

class torch.distributed.elastic.rendezvous.RendezvousTimeoutError[源代码]

当一个会面未能在规定时间内完成时引发。

class torch.distributed.elastic.rendezvous.RendezvousConnectionError[源代码]

当与一个会合后端的连接失败时引发。

class torch.distributed.elastic.rendezvous.RendezvousStateError[源代码]

当会合状态损坏时引发。

class torch.distributed.elastic.rendezvous.RendezvousGracefulExitError[源代码]

当节点未被包含在会合中并优雅地退出时引发。

异常是一种退出堆栈的机制,但这并不意味着失败。

实现

动态会合

torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[源代码]

从指定的参数创建一个新的 DynamicRendezvousHandler

Parameters
  • 存储 (存储) – 作为rendezvous一部分返回的C10d存储。

  • 后端 (RendezvousBackend) – 用于保存集合状态的后端。

Return type

动态会合处理器

参数

描述

加入超时

预期的会合完成总时间,单位为秒。默认为600秒。

上次调用超时

在达到最小节点数量后,完成会合前的额外等待时间,以秒为单位。默认为30秒。

关闭超时

在调用RendezvousHandler.set_closed()RendezvousHandler.shutdown()后,预期会合关闭的时间,以秒为单位。默认为30秒。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[源代码]

表示一个处理程序,用于在一组节点之间设置会合点。

classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None)[源代码]

创建一个新的 DynamicRendezvousHandler

Parameters
  • run_id (str) – 集合的运行ID。

  • 存储 (存储) – 作为rendezvous一部分返回的C10d存储。

  • 后端 (RendezvousBackend) – 用于保存集合状态的后端。

  • min_nodes (int) – 允许加入仲裁的最小节点数。

  • max_nodes (int) – 允许加入集合的最大节点数。

  • local_addr (可选[字符串]) – 本地节点地址。

  • 超时 (可选[RendezvousTimeout]) – 集合点的超时配置。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[源代码]

表示一个持有集合状态的后端。

abstract get_state()[源代码]

获取约会状态。

Returns

编码后的rendezvous状态及其fencing令牌的元组,或者如果后端中未找到状态,则为None

Raises
Return type

可选[元组[字节, 任意]]

abstract property name: str

获取后端名称。

abstract set_state(state, token=None)[源代码]

设置会合状态。

新的会合状态是根据条件设置的:

  • 如果指定的 token 与存储在后台的 fencing token 匹配,状态将被更新。新的状态将连同其 fencing token 一起返回给调用者。

  • 如果指定的 token 与存储在后台的防护令牌不匹配,状态将不会被更新;相反,现有的状态及其防护令牌将被返回给调用者。

  • 如果指定的 tokenNone,则仅当后端没有现有状态时,才会设置新状态。新状态或现有状态及其防护令牌将返回给调用者。

Parameters
  • 状态 (字节) – 编码的会合状态。

  • token (可选[任意]) – 一个可选的防护令牌,该令牌是通过之前的调用 到 get_state()set_state() 获取的。

Returns

序列化的rendezvous状态、其防护令牌以及一个布尔值,指示我们的设置尝试是否成功。

Raises
Return type

可选[元组[字节, 任意, 布尔]]

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[源代码]

保持一个集合点的超时配置。

Parameters
  • join (可选[时间差]) – 预期完成会合的时间。

  • last_call (可选[时间差]) – 在达到所需的最少参与者数量后,完成会合前的额外等待时间。

  • 关闭 (可选[时间间隔]) – 在调用 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 后,预期会合关闭的时间。

  • keep_alive – 在此时间内应完成保持活动的心跳。

property close: timedelta

获取关闭超时时间。

property heartbeat: timedelta

获取保持活动心跳超时时间。

property join: timedelta

获取连接超时时间。

property last_call: timedelta

获取最后一次调用的超时时间。

C10d 后端

torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[源代码]

从指定的参数创建一个新的 C10dRendezvousBackend

参数

描述

存储类型

C10d 存储的类型。目前支持的类型是“tcp”和“file”,分别对应于 torch.distributed.TCPStoretorch.distributed.FileStore。 默认为“tcp”。

读取超时

存储操作的读取超时时间,单位为秒。 默认为60秒。

请注意,这仅适用于 torch.distributed.TCPStore。与 torch.distributed.FileStore 无关,后者不 将超时作为参数。

是主机

一个布尔值,指示此后端实例是否将托管C10d存储。如果未指定,将通过匹配此机器的主机名或IP地址与指定的汇合点来启发式推断。默认为None

请注意,此配置选项仅适用于 torch.distributed.TCPStore。在正常情况下,您可以安全地跳过它;唯一需要它的情况是当其值无法正确确定时(例如,rendezvous端点的主机名是CNAME或与机器的FQDN不匹配)。

Return type

元组[C10dRendezvousBackend, 存储]

class torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[源代码]

表示一个基于C10d的rendezvous后端。

Parameters
get_state()[源码]

请参阅基类。

Return type

可选[元组[字节, 任意]]

property name: str

查看基类。

set_state(state, token=None)[源码]

查看基类。

Return type

可选[元组[字节, 任意, 布尔]]

Etcd 后端

torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[源代码]

从指定的参数创建一个新的 EtcdRendezvousBackend

参数

描述

读取超时

etcd操作的读取超时时间,单位为秒。 默认为60秒。

协议

用于与 etcd 通信的协议。有效值为“http”和“https”。默认为“http”。

ssl_cert

用于HTTPS的SSL客户端证书的路径。默认为None

ssl_cert_key

用于与HTTPS一起使用的SSL客户端证书的私钥路径。默认为 None

ca_cert

根SSL权威证书的路径。默认为None

Return type

元组[EtcdRendezvousBackend, 存储]

class torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend(client, run_id, key_prefix=None, ttl=None)[源代码]

表示一个基于etcd的集合后端。

Parameters
  • 客户端 (客户端) – 用于与 etcd 通信的 etcd.Client 实例。

  • run_id (str) – 集合的运行ID。

  • key_prefix (可选[字符串]) – 在 etcd 中存储 rendezvous 状态的路径。

  • ttl (可选[整数]) – 会合状态的TTL。如果未指定,默认为两小时。

get_state()[源代码]

查看基类。

Return type

可选[元组[字节, 任意]]

property name: str

查看基类。

set_state(state, token=None)[源代码]

查看基类。

Return type

可选[元组[字节, 任意, 布尔]]

Etcd 会合 (旧版)

警告

The DynamicRendezvousHandler 类取代了 EtcdRendezvousHandler 类,并且推荐给大多数用户使用。EtcdRendezvousHandler 目前处于维护模式,并将在未来被弃用。

class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl)[源代码]

实现了一个 torch.distributed.elastic.rendezvous.RendezvousHandler 接口 由 torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous 支持。 EtcdRendezvousHandler 使用一个URL来配置要使用的rendezvous类型,并将实现特定的配置传递给rendezvous模块。基本的etcd rendezvous配置URL如下所示

etcd://:/?min_workers=&max_workers=  # noqa: W605

-- 示例 --

etcd://localhost:2379/1234?min_workers=1&max_workers=3

上述URL解释如下:

  1. 使用注册到 etcd 方案的 rendezvous 处理程序

  2. 要使用的 etcd 端点是 localhost:2379

  3. job_id == 1234 被用作etcd中的前缀(这允许在多个作业之间共享一个公共的etcd服务器,只要job_ids保证是唯一的)。请注意,作业ID可以是任何字符串(例如,不需要是数字),只要它是唯一的。

  4. min_workers=1max_workers=3 指定了一个成员大小的范围 - Torch Distributed Elastic 只要集群大小大于或等于 min_workers 就开始运行作业,并允许最多 max_workers 加入集群。

以下是可传递给 etcd 集合点的完整参数列表:

参数

描述

最小工作线程数

使会合有效的最少工人数量

最大工作线程数

允许的最大工作线程数

超时

在 下一次会合预期成功 的总超时时间 (默认 600秒)

上次调用超时

在达到最小工作线程数后,额外的等待时间(“最后一次呼叫”)(默认为30秒)

etcd前缀

路径前缀(从 etcd 根目录开始),所有 etcd 节点将在此路径下创建(默认为 /torchelastic/p2p

Etcd 存储

当使用etcd作为rendezvous后端时,EtcdStore 是由 next_rendezvous() 返回的C10d Store 实例类型。

class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[源代码]

通过利用rendezvous etcd实例来实现c10 Store接口。

这是由 EtcdRendezvous 返回的存储对象。

add(key, num)[源代码]

原子性地将一个值增加一个整数。

整数以字符串形式表示,使用基数10。如果键不存在,将假定默认值为0

Returns

新的(递增的)值

Return type

int

check(keys)[源代码]

检查所有键是否立即存在(无需等待)。

Return type

bool

get(key)[源代码]

通过键获取值,可能进行阻塞等待。

如果键没有立即存在,将会进行阻塞等待,最多等待timeout时长,或者直到键被发布。

Returns

(字节)

Raises

LookupError - 如果在超时后键仍未发布

Return type

字节

set(key, value)[源代码]

将键/值对写入 EtcdStore

键和值可以是 Python strbytes

wait(keys, override_timeout=None)[源代码]

等待所有密钥发布,或直到超时。

Raises

LookupError - 如果发生超时

Etcd 服务器

The EtcdServer 是一个方便的类,使您可以轻松地在子进程上启动和停止 etcd 服务器。这对于测试或单节点(多工作者)部署非常有用,在这些情况下手动设置 etcd 服务器会非常麻烦。

警告

对于生产和多节点部署,请考虑正确部署一个高可用的etcd服务器,因为这是您分布式作业的单点故障。

class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[源代码]

注意

在 etcd 服务器 v3.4.3 上测试。

在随机空闲端口上启动和停止本地独立的etcd服务器。适用于单节点、多工作进程启动或测试,此时使用边车etcd服务器比单独设置etcd服务器更为方便。

此类注册一个终止处理程序,以便在退出时关闭etcd子进程。此终止处理程序不能替代调用stop()方法。

以下回退机制用于查找 etcd 二进制文件:

  1. 使用环境变量 TORCHELASTIC_ETCD_BINARY_PATH

  2. 如果存在,使用 file root>/bin/etcd

  3. 使用来自 PATHetcd

用法

server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
server.start()
client = server.get_client()
# 使用客户端
server.stop()
Parameters

etcd_binary_path – etcd 服务器二进制文件的路径(见上文以获取回退路径)

优云智算