torch.distributed.elastic.rendezvous.etcd_server 的源代码
#!/usr/bin/env python3
# 版权所有 (c) Facebook, Inc. 及其附属公司。
# 保留所有权利。
#
# 本源代码根据在源代码树的根目录中的LICENSE文件中找到的BSD风格许可证进行许可。
import atexit
import logging
import os
import shlex
import shutil
import socket
import subprocess
import tempfile
import time
from typing import Optional, TextIO, Union
try:
import etcd # type: ignore[import]
except ModuleNotFoundError:
pass
log = logging.getLogger(__name__)
def find_free_port():
"""
查找一个空闲端口并将临时套接字绑定到该端口,以便端口可以“保留”直到使用。
.. 注意:: 返回的套接字在使用端口之前必须关闭,
否则会发生“地址已在使用中”的错误。
套接字应尽可能接近端口的消费者持有并关闭,
否则,存在更大的竞争条件风险,其他进程可能会看到端口为空闲并占用它。
返回: 绑定到保留的空闲端口的套接字
用法::
sock = find_free_port()
port = sock.getsockname()[1]
sock.close()
use_port(port)
"""
addrs = socket.getaddrinfo(
host="localhost", port=None, family=socket.AF_UNSPEC, type=socket.SOCK_STREAM
)
for addr in addrs:
family, type, proto, _, _ = addr
try:
s = socket.socket(family, type, proto)
s.bind(("localhost", 0))
s.listen(0)
return s
except OSError as e:
s.close() # type: ignore[possibly-undefined]
print(f"Socket creation attempt failed: {e}")
raise RuntimeError("Failed to create a socket")
def stop_etcd(subprocess, data_dir: Optional[str] = None):
if subprocess and subprocess.poll() is None:
log.info("停止 etcd 服务器")
subprocess.terminate()
subprocess.wait()
if data_dir:
log.info("删除 etcd 数据目录: %s", data_dir)
shutil.rmtree(data_dir, ignore_errors=True)
[docs]class EtcdServer:
"""
.. 注意:: 在 etcd 服务器 v3.4.3 上测试。
在随机空闲端口上启动和停止本地独立 etcd 服务器。适用于单节点、多工作进程启动或测试,
其中边车 etcd 服务器比单独设置 etcd 服务器更方便。
此类注册了一个终止处理程序,以在退出时关闭 etcd 子进程。此终止处理程序不能替代调用 ``stop()`` 方法。
以下回退机制用于查找 etcd 二进制文件:
1. 使用环境变量 TORCHELASTIC_ETCD_BINARY_PATH
2. 如果存在,使用 ``<此文件根目录>/bin/etcd``
3. 使用 ``PATH`` 中的 ``etcd``
用法
::
server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
server.start()
client = server.get_client()
# 使用客户端
server.stop()
参数:
etcd_binary_path: etcd 服务器二进制文件的路径(见上文回退路径)
"""
def __init__(self, data_dir: Optional[str] = None):
self._port = -1
self._host = "localhost"
root = os.path.dirname(__file__)
default_etcd_bin = os.path.join(root, "bin/etcd")
self._etcd_binary_path = os.environ.get(
"TORCHELASTIC_ETCD_BINARY_PATH", default_etcd_bin
)
if not os.path.isfile(self._etcd_binary_path):
self._etcd_binary_path = "etcd"
self._base_data_dir = (
data_dir if data_dir else tempfile.mkdtemp(prefix="torchelastic_etcd_data")
)
self._etcd_cmd = None
self._etcd_proc: Optional[subprocess.Popen] = None
def _get_etcd_server_process(self) -> subprocess.Popen:
if not <span class="bp