torch.distributed.elastic.rendezvous.etcd_rendezvous 的源代码
#!/usr/bin/env python3
# 版权所有 (c) Facebook, Inc. 及其附属公司。
# 保留所有权利。
#
# 本源代码根据在源代码树的根目录中的LICENSE文件中找到的BSD风格许可证进行许可。
import json
import logging
import sys
import threading
import time
from typing import Optional
import etcd # type: ignore[import]
from torch.distributed.elastic.rendezvous import (
RendezvousClosedError,
RendezvousError,
RendezvousHandler,
RendezvousParameters,
RendezvousTimeoutError,
)
from .utils import parse_rendezvous_endpoint
from .etcd_store import EtcdStore, cas_delay
_log_fmt = logging.Formatter("%(levelname)s %(asctime)s %(message)s")
_log_handler = logging.StreamHandler(sys.stderr)
_log_handler.setFormatter(_log_fmt)
log = logging.getLogger(__name__)
log.propagate = False
log.setLevel(logging.INFO)
log.addHandler(_log_handler)
# 可重试的失败异常意味着我们太晚进行所需的状态转换(例如由于竞争条件),
# 现在应该从头开始重新启动。
# 建议设置一个小延迟以避免对Etcd进行垃圾邮件攻击。
class EtcdRendezvousRetryableFailure(Exception):
pass
# 类似于可重试的失败,但观察到的新状态表明我们可以立即重试,
# 即不需要“安全延迟”。
class EtcdRendezvousRetryImmediately(Exception):
pass
# 默认的rendezvous超时时间。
_DEFAULT_TIMEOUT: int = 600 # 10分钟
# 在达到最小节点数后,如果rendezvous是弹性的(min != max),则额外等待时间。
_DEFAULT_LAST_CALL_TIMEOUT: int = 30 # 30秒
# EtcdRendezvous内部使用的各种常量
CONST_ETCD_SETUP_TTL = 5
CONST_ETCD_FROZEN_TTL = 10
CONST_ETCD_JOINABLE_EPHEMERAL_TTL = 10
# 工作者的keep-alive键的临时节点TTL:
CONST_WORKER_KEEPALIVE_TTL = 10
# 特定run_id目录的临时TTL。所有特定run_id(作业实例)的rendezvous状态数据
# 都包含在此目录中。其唯一的作用是清理旧运行的rendezvous数据(以防
# etcd服务器是持久的),并且不会影响正确性,但应大于任何工作者进程预期存活的超时时间:
CONST_RUNID_SUBROOT_TTL = 7200 # 2小时
[docs]class EtcdRendezvousHandler(RendezvousHandler):
"""
实现一个
:py:class:`torch.distributed.elastic.rendezvous.RendezvousHandler`接口
由
:py:class:`torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous`支持。
``EtcdRendezvousHandler``使用URL来配置要使用的rendezvous类型,并将实现特定的配置传递给rendezvous模块。基本的etcd rendezvous配置URL如下所示
::
etcd://:/?min_workers=&max_workers= # noqa: W605
-- 示例 --
etcd://localhost:2379/1234?min_workers=1&max_workers=3
上面的URL解释如下:
1. 使用注册了``etcd``方案的rendezvous处理程序
2. 要使用的``etcd``端点是``localhost:2379``
3. ``job_id == 1234``用作etcd中的前缀(这允许在多个作业之间共享一个公共的etcd服务器,只要``job_ids``保证是唯一的)。请注意,作业ID可以是任何字符串(例如,不需要是数字),只要它是唯一的。
4. ``min_workers=1``和``max_workers=3``指定成员大小的范围 - Torch Distributed Elastic在集群大小大于或等于``min_workers``时开始运行作业,并允许最多``max_workers``加入集群。
以下是可以传递给etcd rendezvous的完整参数列表:
+--------------------------------------------+--------------------------+
| 参数 | 描述 |
+============================================+==========================+
| min_workers | 最小工作者数量 |
| | 以使rendezvous有效 |
+--------------------------------------------+--------------------------+
| max_workers | 允许加入的最大工作者数量 |
+--------------------------------------------+--------------------------+
| timeout | 总超时时间 |
| | 在此时间内next_rendezvous |
| | 预期成功(默认600秒) |
+--------------------------------------------+--------------------------+
| last_call_timeout | 在达到最小工作者数量后 |
| | 的额外等待时间(“最后一次调用”)|
| | 默认30秒 |
+--------------------------------------------+--------------------------+
| etcd_prefix | 路径前缀(从etcd根目录) |
| | 所有etcd节点将在此路径下创建|
| | 默认是``/torchelastic/p2p``|
+--------------------------------------------+--------------------------+
"""
def __init__(self, rdzv_impl):
self._rdzv_impl = rdzv_impl
def __del__(self):
# TODO: 考虑使用weakref代替。
del self._rdzv_impl
def get_backend(self) -> str:
return "etcd"
def next_rendezvous(self):
rdzv_version, rank, world_size = self._rdzv_impl.rendezvous_barrier()
log.info("Creating EtcdStore as the c10d::Store implementation")
store = self._rdzv_impl.setup_kv_store(rdzv_version)
return store, rank, world_size
def is_closed(self):
try:
_, state = self._rdzv_impl.get_rdzv_state()
return state["status"] == "closed"
except etcd.EtcdKeyNotFound:
# 没有rendezvous状态,所以它不能是关闭的。
return False