会合¶
在 Torch Distributed Elastic 的上下文中,我们使用术语 rendezvous 来指代一种将 分布式同步 原语与 对等发现 结合起来的特定功能。
它被 Torch Distributed Elastic 用于收集训练作业的参与者(即节点),以便它们都能就相同的参与者列表和每个人的角色达成一致,并做出一致的集体决策,以确定训练何时可以开始/恢复。
Torch Distributed Elastic rendezvous 提供了以下关键功能:
屏障:
执行集合的节点将全部阻塞,直到集合被视为完成——当至少有min个节点加入集合屏障(对于同一个任务)时,集合即被视为完成。这也意味着屏障的大小不一定固定。
在达到 min 节点数量后,还有一个额外的小等待时间 - 这是为了确保不会“太快”完成会合(这可能会排除在同一时间尝试加入的其他节点)。
如果max数量的节点在屏障处聚集,则立即完成会合。
还有一个总体的超时机制,如果在规定时间内未达到最小节点数,则会导致会合失败。这是为了在资源管理器出现问题时,帮助释放部分分配的作业资源,应被视为不可重试的简单安全措施。
排他性:
一个简单的分布式屏障是不够的,因为我们也需要确保在任何给定的时间(对于一个给定的作业)只有一个节点组存在。换句话说,新节点(即迟加入的节点)不应该能够形成一个并行的独立工作组来执行相同的作业。
Torch Distributed Elastic rendezvous 确保如果一组节点已经完成了一次rendezvous(因此可能已经在训练中),那么尝试进行rendezvous的额外“迟到”节点只会宣布自己处于等待状态,并且必须等到现有的(之前完成的)rendezvous被销毁后才能继续。
一致性:
当一次集合完成时,所有成员将就任务成员资格和每个人在其中扮演的角色达成一致。这个角色用一个称为rank的整数表示,该整数介于0和world size之间。
请注意,排名是不稳定的,因为在下一次(重新)会合时,同一个节点可能会被分配不同的排名。
容错性:
Torch Distributed Elastic rendezvous 旨在容忍在 rendezvous 过程中出现的节点故障。如果在加入 rendezvous 和完成 rendezvous 之间,某个进程崩溃(或失去网络连接等),那么将自动与剩余的健康节点重新进行 rendezvous。
一个节点也可以在完成(或被其他节点观察到已完成)会合之后失败——这种情况将由Torch Distributed Elastic的train_loop处理(其中也会触发重新会合)。
共享键值存储:
当会合完成后,会创建并返回一个共享的键值存储。该存储实现了 torch.distributed.Store API(参见 分布式通信文档)。
此存储仅由已完成会合的成员共享。它旨在由 Torch Distributed Elastic 用于交换初始化作业控制和数据平面所需的信息。
等待工作线程和集合点关闭:
Torch Distributed Elastic 集合处理对象提供了额外的功能,这些功能在技术上并不属于集合过程的一部分:
查询有多少工人在屏障处迟到,谁可以参加下一次会合。
将集合点设置为关闭,以通知所有节点不要参与下一次集合点。
动态会合处理器:
Torch Distributed Elastic 带有 DynamicRendezvousHandler
类,该类实现了上述的 rendezvous 机制。它是一个与后端无关的类型,期望在构造期间指定一个特定的 RendezvousBackend 实例。
Torch distributed 用户可以实现自己的后端类型,或者使用 PyTorch 自带的以下实现之一:
C10dRendezvousBackend: 使用C10d存储(默认情况下为TCPStore)作为rendezvous后端。使用C10d存储的主要优点是它不需要第三方依赖(如etcd)来建立rendezvous。EtcdRendezvousBackend: 取代了旧的EtcdRendezvousHandler类。将一个EtcdRendezvousBackend实例传递给DynamicRendezvousHandler在功能上等同于 实例化一个EtcdRendezvousHandler。store = TCPStore("localhost") backend = C10dRendezvousBackend(store, "my_run_id") rdzv_handler = DynamicRendezvousHandler.from_backend( run_id="my_run_id", store=store, backend=backend, min_nodes=2, max_nodes=4 )
下面是一个描述如何进行会合的状态图。
注册表¶
- class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[源代码]¶
保留参数以构造一个
RendezvousHandler。- Parameters
- class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[源代码]¶
表示一个
RendezvousHandler后端的注册表。- create_handler(params)[源代码]¶
创建一个新的
RendezvousHandler。- Return type
- register(backend, creator)[源代码]¶
注册一个新的会合后端。
- Parameters
后端 (str) – 后端的名称。
创建者 (可调用[[RendezvousParameters], RendezvousHandler]) – 用于调用以构造
RendezvousHandler的回调。
处理器¶
- class torch.distributed.elastic.rendezvous.RendezvousHandler[源代码]¶
主要集合接口。
注意
分布式 Torch 用户通常不需要实现自己的
RendezvousHandler。基于 C10d Store 的实现已经提供,并且推荐给大多数用户使用。- abstract get_run_id()[源代码]¶
返回rendezvous的运行ID。
运行ID是一个用户定义的ID,用于唯一标识分布式应用程序的一个实例。它通常映射到一个作业ID,并用于允许节点加入正确的分布式应用程序。
- Return type
- abstract is_closed()[源代码]¶
检查是否已关闭会合。
关闭的约会意味着在同一任务中所有未来的重新约会尝试都将失败。
is_closed()和set_closed()具有最终传播的语义,不应用于同步。其意图是,如果至少有一个节点决定作业已完成,它将关闭rendezvous,其他节点将很快观察到这一点并停止运行。- Return type
- abstract next_rendezvous()[源代码]¶
集合屏障的主要入口点。
阻塞直到会合完成并且当前进程被包含在形成的worker组中,或者发生超时,或者会合被标记为关闭。
- Returns
一个包含
torch.distributed.Store、rank和world size的元组。- Raises
RendezvousClosedError – 该集合点已关闭。
RendezvousConnectionError – 连接到rendezvous后端失败。
RendezvousStateError – 会合状态已损坏。
RendezvousTimeoutError – 集合未能在规定时间内完成。
- Return type
- abstract num_nodes_waiting()[源代码]¶
返回在集合屏障处迟到的节点数量,因此未被包含在当前工作组中。
调用者应定期调用此方法以检查是否有新节点等待加入任务,如果有,则通过调用
next_rendezvous()(重新集合)来接纳它们。- Return type
异常¶
实现¶
动态会合¶
- torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[源代码]¶
从指定的参数创建一个新的
DynamicRendezvousHandler。- Parameters
存储 (存储) – 作为rendezvous一部分返回的C10d存储。
后端 (RendezvousBackend) – 用于保存集合状态的后端。
- Return type
参数
描述
加入超时
预期的会合完成总时间,单位为秒。默认为600秒。
上次调用超时
在达到最小节点数量后,完成会合前的额外等待时间,以秒为单位。默认为30秒。
关闭超时
在调用
RendezvousHandler.set_closed()或RendezvousHandler.shutdown()后,预期会合关闭的时间,以秒为单位。默认为30秒。
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[源代码]¶
表示一个处理程序,用于在一组节点之间设置会合点。
- classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None)[源代码]¶
创建一个新的
DynamicRendezvousHandler。- Parameters
run_id (str) – 集合的运行ID。
存储 (存储) – 作为rendezvous一部分返回的C10d存储。
后端 (RendezvousBackend) – 用于保存集合状态的后端。
min_nodes (int) – 允许加入仲裁的最小节点数。
max_nodes (int) – 允许加入集合的最大节点数。
超时 (可选[RendezvousTimeout]) – 集合点的超时配置。
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[源代码]¶
表示一个持有集合状态的后端。
- abstract get_state()[源代码]¶
获取约会状态。
- Returns
编码后的rendezvous状态及其fencing令牌的元组,或者如果后端中未找到状态,则为
None。- Raises
RendezvousConnectionError – 与后端的连接失败。
RendezvousStateError – 会合状态已损坏。
- Return type
- abstract set_state(state, token=None)[源代码]¶
设置会合状态。
新的会合状态是根据条件设置的:
如果指定的
token与存储在后台的 fencing token 匹配,状态将被更新。新的状态将连同其 fencing token 一起返回给调用者。如果指定的
token与存储在后台的防护令牌不匹配,状态将不会被更新;相反,现有的状态及其防护令牌将被返回给调用者。如果指定的
token是None,则仅当后端没有现有状态时,才会设置新状态。新状态或现有状态及其防护令牌将返回给调用者。
- Parameters
状态 (字节) – 编码的会合状态。
token (可选[任意]) – 一个可选的防护令牌,该令牌是通过之前的调用 到
get_state()或set_state()获取的。
- Returns
序列化的rendezvous状态、其防护令牌以及一个布尔值,指示我们的设置尝试是否成功。
- Raises
RendezvousConnectionError – 与后端的连接失败。
RendezvousStateError – 会合状态已损坏。
- Return type
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[源代码]¶
保持一个集合点的超时配置。
- Parameters
C10d 后端¶
- torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[源代码]¶
从指定的参数创建一个新的
C10dRendezvousBackend。参数
描述
存储类型
C10d 存储的类型。目前支持的类型是“tcp”和“file”,分别对应于
torch.distributed.TCPStore和torch.distributed.FileStore。 默认为“tcp”。读取超时
存储操作的读取超时时间,单位为秒。 默认为60秒。
请注意,这仅适用于
torch.distributed.TCPStore。与torch.distributed.FileStore无关,后者不 将超时作为参数。是主机
一个布尔值,指示此后端实例是否将托管C10d存储。如果未指定,将通过匹配此机器的主机名或IP地址与指定的汇合点来启发式推断。默认为
None。请注意,此配置选项仅适用于
torch.distributed.TCPStore。在正常情况下,您可以安全地跳过它;唯一需要它的情况是当其值无法正确确定时(例如,rendezvous端点的主机名是CNAME或与机器的FQDN不匹配)。- Return type
- class torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[源代码]¶
表示一个基于C10d的rendezvous后端。
- Parameters
存储 (存储) – 用于与C10d存储进行通信的
torch.distributed.Store实例。run_id (str) – 集合的运行ID。
Etcd 后端¶
- torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[源代码]¶
从指定的参数创建一个新的
EtcdRendezvousBackend。参数
描述
读取超时
etcd操作的读取超时时间,单位为秒。 默认为60秒。
协议
用于与 etcd 通信的协议。有效值为“http”和“https”。默认为“http”。
ssl_cert
用于HTTPS的SSL客户端证书的路径。默认为
None。ssl_cert_key
用于与HTTPS一起使用的SSL客户端证书的私钥路径。默认为
None。ca_cert
根SSL权威证书的路径。默认为
None。- Return type
Etcd 会合 (旧版)¶
警告
The DynamicRendezvousHandler 类取代了 EtcdRendezvousHandler 类,并且推荐给大多数用户使用。EtcdRendezvousHandler 目前处于维护模式,并将在未来被弃用。
- class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl)[源代码]¶
实现了一个
torch.distributed.elastic.rendezvous.RendezvousHandler接口 由torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous支持。EtcdRendezvousHandler使用一个URL来配置要使用的rendezvous类型,并将实现特定的配置传递给rendezvous模块。基本的etcd rendezvous配置URL如下所示etcd://
: / ?min_workers= &max_workers= # noqa: W605 -- 示例 -- etcd://localhost:2379/1234?min_workers=1&max_workers=3 上述URL解释如下:
使用注册到
etcd方案的 rendezvous 处理程序要使用的
etcd端点是localhost:2379job_id == 1234被用作etcd中的前缀(这允许在多个作业之间共享一个公共的etcd服务器,只要job_ids保证是唯一的)。请注意,作业ID可以是任何字符串(例如,不需要是数字),只要它是唯一的。min_workers=1和max_workers=3指定了一个成员大小的范围 - Torch Distributed Elastic 只要集群大小大于或等于min_workers就开始运行作业,并允许最多max_workers加入集群。
以下是可传递给 etcd 集合点的完整参数列表:
参数
描述
最小工作线程数
使会合有效的最少工人数量
最大工作线程数
允许的最大工作线程数
超时
在 下一次会合预期成功 的总超时时间 (默认 600秒)
上次调用超时
在达到最小工作线程数后,额外的等待时间(“最后一次呼叫”)(默认为30秒)
etcd前缀
路径前缀(从 etcd 根目录开始),所有 etcd 节点将在此路径下创建(默认为
/torchelastic/p2p)
Etcd 存储¶
当使用etcd作为rendezvous后端时,EtcdStore 是由 next_rendezvous() 返回的C10d Store 实例类型。
- class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[源代码]¶
通过利用rendezvous etcd实例来实现c10 Store接口。
这是由
EtcdRendezvous返回的存储对象。
Etcd 服务器¶
The EtcdServer 是一个方便的类,使您可以轻松地在子进程上启动和停止 etcd 服务器。这对于测试或单节点(多工作者)部署非常有用,在这些情况下手动设置 etcd 服务器会非常麻烦。
警告
对于生产和多节点部署,请考虑正确部署一个高可用的etcd服务器,因为这是您分布式作业的单点故障。
- class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[源代码]¶
注意
在 etcd 服务器 v3.4.3 上测试。
在随机空闲端口上启动和停止本地独立的etcd服务器。适用于单节点、多工作进程启动或测试,此时使用边车etcd服务器比单独设置etcd服务器更为方便。
此类注册一个终止处理程序,以便在退出时关闭etcd子进程。此终止处理程序不能替代调用
stop()方法。以下回退机制用于查找 etcd 二进制文件:
使用环境变量 TORCHELASTIC_ETCD_BINARY_PATH
如果存在,使用
file root>/bin/etcd 使用来自
PATH的etcd
用法
server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd") server.start() client = server.get_client() # 使用客户端 server.stop()
- Parameters
etcd_binary_path – etcd 服务器二进制文件的路径(见上文以获取回退路径)