Shortcuts

torch.distributed.elastic.rendezvous.etcd_server 的源代码

#!/usr/bin/env python3

# 版权所有 (c) Facebook, Inc. 及其附属公司。
# 保留所有权利。
#
# 本源代码根据在源代码树的根目录中的LICENSE文件中找到的BSD风格许可证进行许可。
import atexit
import logging
import os
import shlex
import shutil
import socket
import subprocess
import tempfile
import time
from typing import Optional, TextIO, Union

try:
    import etcd  # type: ignore[import]
except ModuleNotFoundError:
    pass


log = logging.getLogger(__name__)


def find_free_port():
    """
    查找一个空闲端口并将临时套接字绑定到该端口,以便端口可以“保留”直到使用。

    .. 注意:: 返回的套接字在使用端口之前必须关闭,
              否则会发生“地址已在使用中”的错误。
              套接字应尽可能接近端口的消费者持有并关闭,
              否则,存在更大的竞争条件风险,其他进程可能会看到端口为空闲并占用它。

    返回: 绑定到保留的空闲端口的套接字

    用法::

    sock = find_free_port()
    port = sock.getsockname()[1]
    sock.close()
    use_port(port)
    """
    addrs = socket.getaddrinfo(
        host="localhost", port=None, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM
    )

    for addr in addrs:
        family, type, proto, _, _ = addr
        try:
            s = socket.socket(family, type, proto)
            s.bind(("localhost", 0))
            s.listen(0)
            return s
        except OSError as e:
            s.close()  # type: ignore[possibly-undefined]
            print(f"Socket creation attempt failed: {e}")
    raise RuntimeError("Failed to create a socket")


def stop_etcd(subprocess, data_dir: Optional[str] = None):
    if subprocess and subprocess.poll() is None:
        log.info("停止 etcd 服务器")
        subprocess.terminate()
        subprocess.wait()

    if data_dir:
        log.info("删除 etcd 数据目录: %s", data_dir)
        shutil.rmtree(data_dir, ignore_errors=True)


[docs]class EtcdServer: """ .. 注意:: 在 etcd 服务器 v3.4.3 上测试。 在随机空闲端口上启动和停止本地独立 etcd 服务器。适用于单节点、多工作进程启动或测试, 其中边车 etcd 服务器比单独设置 etcd 服务器更方便。 此类注册了一个终止处理程序,以在退出时关闭 etcd 子进程。此终止处理程序不能替代调用 ``stop()`` 方法。 以下回退机制用于查找 etcd 二进制文件: 1. 使用环境变量 TORCHELASTIC_ETCD_BINARY_PATH 2. 如果存在,使用 ``<此文件根目录>/bin/etcd`` 3. 使用 ``PATH`` 中的 ``etcd`` 用法 :: server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd") server.start() client = server.get_client() # 使用客户端 server.stop() 参数: etcd_binary_path: etcd 服务器二进制文件的路径(见上文回退路径) """ def __init__(self, data_dir: Optional[str] = None): self._port = -1 self._host = "localhost" root = os.path.dirname(__file__) default_etcd_bin = os.path.join(root, "bin/etcd") self._etcd_binary_path = os.environ.get( "TORCHELASTIC_ETCD_BINARY_PATH", default_etcd_bin ) if not os.path.isfile(self._etcd_binary_path): self._etcd_binary_path = "etcd" self._base_data_dir = ( data_dir if data_dir else tempfile.mkdtemp(prefix="torchelastic_etcd_data") ) self._etcd_cmd = None self._etcd_proc: Optional[subprocess.Popen] = None def _get_etcd_server_process(self) -> subprocess.Popen: if not <span class="bp
优云智算