Shortcuts

torch.distributed.elastic.rendezvous.etcd_store 的源代码

```html
# 版权所有 (c) Facebook, Inc. 及其附属公司。
# 保留所有权利。
#
# 本源代码根据在此源树根目录下的LICENSE文件中找到的BSD风格许可证进行授权。

import datetime
import random
import time
from base64 import b64decode, b64encode
from typing import Optional

import etcd  # type: ignore[import]

# pyre-ignore[21]: 在 `torch.distributed` 中找不到名称 `Store`。
from torch.distributed import Store


# 延迟(睡眠)一小段随机时间以减少CAS失败。
# 这不会影响正确性,但会减少对etcd服务器的请求。
def cas_delay():
    time.sleep(random.uniform(0, 0.1))


# pyre-fixme[11]: 注释 `Store` 未定义为类型。
[docs]class EtcdStore(Store): """ 通过搭便车在rendezvous etcd实例上实现c10 Store接口。 这是由 ``EtcdRendezvous`` 返回的存储对象。 """ def __init__( self, etcd_client, etcd_store_prefix, # 默认超时与c10d/Store.hpp中的相同 timeout: Optional[datetime.timedelta] = None, ): super().__init__() # 需要用于pybind trampoline。 self.client = etcd_client self.prefix = etcd_store_prefix if timeout is not None: self.set_timeout(timeout) if not self.prefix.endswith("/"): self.prefix += "/"
[docs] def set(self, key, value): """ 将键/值对写入 ``EtcdStore``。 键和值可以是Python ``str`` 或 ``bytes``。 """ self.client.set(key=self.prefix + self._encode(key), value=self._encode(value))
[docs] def get(self, key) -> bytes: """ 通过键获取值,可能会进行阻塞等待。 如果键没有立即存在,将进行阻塞等待 最多 ``timeout`` 时间,或直到键被发布。 返回: 值 ``(bytes)`` 引发: LookupError - 如果在超时后键仍未发布 """ b64_key = self.prefix + self._encode(key) kvs = self._try_wait_get([b64_key]) if kvs is None: raise LookupError(f"键 {key} 在 EtcdStore 中未找到") return self._decode(kvs[b64_key])
[docs] def add(self, key, num: int) -> int: """ 原子地按整数增量增加值。 整数表示为使用基数10的字符串。如果键不存在, 将假定默认值为 ``0``。 返回: 新的(增加的)值 """ b64_key = self._encode(key) # c10d Store假设值是一个表示为十进制字符串的整数 try: # 如果此键尚未存在,则假定默认值 "0": node = self.client.write( key=self.prefix + b64_key, value=self._encode(str(num)), # 即 0 + num prevExist=False, ) return int(self._decode(node.value)) except etcd.EtcdAlreadyExist: pass while True: # 注意:c10d Store没有删除键的方法,因此我们可以 # 确定它仍然存在。 node = self.client.get(key=self.prefix + b64_key) new_value = self._encode(str(int(self._decode(node.value)) + num)) try: node = self.client.test_and_set( key=node.key<
优云智算