Shortcuts

torch.distributed.elastic.rendezvous.etcd_rendezvous 的源代码

#!/usr/bin/env python3

# 版权所有 (c) Facebook, Inc. 及其附属公司。
# 保留所有权利。
#
# 本源代码根据在源代码树的根目录中的LICENSE文件中找到的BSD风格许可证进行许可。

import json
import logging
import sys
import threading
import time
from typing import Optional

import etcd  # type: ignore[import]
from torch.distributed.elastic.rendezvous import (
    RendezvousClosedError,
    RendezvousError,
    RendezvousHandler,
    RendezvousParameters,
    RendezvousTimeoutError,
)

from .utils import parse_rendezvous_endpoint
from .etcd_store import EtcdStore, cas_delay


_log_fmt = logging.Formatter("%(levelname)s %(asctime)s %(message)s")
_log_handler = logging.StreamHandler(sys.stderr)
_log_handler.setFormatter(_log_fmt)

log = logging.getLogger(__name__)
log.propagate = False
log.setLevel(logging.INFO)
log.addHandler(_log_handler)


# 可重试的失败异常意味着我们太晚进行所需的状态转换(例如由于竞争条件),
# 现在应该从头开始重新启动。
# 建议设置一个小延迟以避免对Etcd进行垃圾邮件攻击。
class EtcdRendezvousRetryableFailure(Exception):
    pass


# 类似于可重试的失败,但观察到的新状态表明我们可以立即重试,
# 即不需要“安全延迟”。
class EtcdRendezvousRetryImmediately(Exception):
    pass


# 默认的rendezvous超时时间。
_DEFAULT_TIMEOUT: int = 600  # 10分钟

# 在达到最小节点数后,如果rendezvous是弹性的(min != max),则额外等待时间。
_DEFAULT_LAST_CALL_TIMEOUT: int = 30  # 30秒

# EtcdRendezvous内部使用的各种常量
CONST_ETCD_SETUP_TTL = 5
CONST_ETCD_FROZEN_TTL = 10
CONST_ETCD_JOINABLE_EPHEMERAL_TTL = 10

# 工作者的keep-alive键的临时节点TTL:
CONST_WORKER_KEEPALIVE_TTL = 10

# 特定run_id目录的临时TTL。所有特定run_id(作业实例)的rendezvous状态数据
# 都包含在此目录中。其唯一的作用是清理旧运行的rendezvous数据(以防
# etcd服务器是持久的),并且不会影响正确性,但应大于任何工作者进程预期存活的超时时间:
CONST_RUNID_SUBROOT_TTL = 7200  # 2小时


[docs]class EtcdRendezvousHandler(RendezvousHandler): """ 实现一个 :py:class:`torch.distributed.elastic.rendezvous.RendezvousHandler`接口 :py:class:`torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous`支持。 ``EtcdRendezvousHandler``使用URL来配置要使用的rendezvous类型,并将实现特定的配置传递给rendezvous模块。基本的etcd rendezvous配置URL如下所示 :: etcd://:/?min_workers=&max_workers= # noqa: W605 -- 示例 -- etcd://localhost:2379/1234?min_workers=1&max_workers=3 上面的URL解释如下: 1. 使用注册了``etcd``方案的rendezvous处理程序 2. 要使用的``etcd``端点是``localhost:2379`` 3. ``job_id == 1234``用作etcd中的前缀(这允许在多个作业之间共享一个公共的etcd服务器,只要``job_ids``保证是唯一的)。请注意,作业ID可以是任何字符串(例如,不需要是数字),只要它是唯一的。 4. ``min_workers=1``和``max_workers=3``指定成员大小的范围 - Torch Distributed Elastic在集群大小大于或等于``min_workers``时开始运行作业,并允许最多``max_workers``加入集群。 以下是可以传递给etcd rendezvous的完整参数列表: +--------------------------------------------+--------------------------+ | 参数 | 描述 | +============================================+==========================+ | min_workers | 最小工作者数量 | | | 以使rendezvous有效 | +--------------------------------------------+--------------------------+ | max_workers | 允许加入的最大工作者数量 | +--------------------------------------------+--------------------------+ | timeout | 总超时时间 | | | 在此时间内next_rendezvous | | | 预期成功(默认600秒) | +--------------------------------------------+--------------------------+ | last_call_timeout | 在达到最小工作者数量后 | | | 的额外等待时间(“最后一次调用”)| | | 默认30秒 | +--------------------------------------------+--------------------------+ | etcd_prefix | 路径前缀(从etcd根目录) | | | 所有etcd节点将在此路径下创建| | | 默认是``/torchelastic/p2p``| +--------------------------------------------+--------------------------+ """ def __init__(self, rdzv_impl): self._rdzv_impl = rdzv_impl def __del__(self): # TODO: 考虑使用weakref代替。 del self._rdzv_impl def get_backend(self) -> str: return "etcd" def next_rendezvous(self): rdzv_version, rank, world_size = self._rdzv_impl.rendezvous_barrier() log.info("Creating EtcdStore as the c10d::Store implementation") store = self._rdzv_impl.setup_kv_store(rdzv_version) return store, rank, world_size def is_closed(self): try: _, state = self._rdzv_impl.get_rdzv_state() return state["status"] == "closed" except etcd.EtcdKeyNotFound: # 没有rendezvous状态,所以它不能是关闭的。 return False
优云智算