torch.distributed.elastic.rendezvous.etcd_store 的源代码
```html
# 版权所有 (c) Facebook, Inc. 及其附属公司。 # 保留所有权利。 # # 本源代码根据在此源树根目录下的LICENSE文件中找到的BSD风格许可证进行授权。 import datetime import random import time from base64 import b64decode, b64encode from typing import Optional import etcd # type: ignore[import] # pyre-ignore[21]: 在 `torch.distributed` 中找不到名称 `Store`。 from torch.distributed import Store # 延迟(睡眠)一小段随机时间以减少CAS失败。 # 这不会影响正确性,但会减少对etcd服务器的请求。 def cas_delay(): time.sleep(random.uniform(0, 0.1)) # pyre-fixme[11]: 注释 `Store` 未定义为类型。[docs]class EtcdStore(Store): """ 通过搭便车在rendezvous etcd实例上实现c10 Store接口。 这是由 ``EtcdRendezvous`` 返回的存储对象。 """ def __init__( self, etcd_client, etcd_store_prefix, # 默认超时与c10d/Store.hpp中的相同 timeout: Optional[datetime.timedelta] = None, ): super().__init__() # 需要用于pybind trampoline。 self.client = etcd_client self.prefix = etcd_store_prefix if timeout is not None: self.set_timeout(timeout) if not self.prefix.endswith("/"): self.prefix += "/"[docs] def set(self, key, value): """ 将键/值对写入 ``EtcdStore``。 键和值可以是Python ``str`` 或 ``bytes``。 """ self.client.set(key=self.prefix + self._encode(key), value=self._encode(value))[docs] def get(self, key) -> bytes: """ 通过键获取值,可能会进行阻塞等待。 如果键没有立即存在,将进行阻塞等待 最多 ``timeout`` 时间,或直到键被发布。 返回: 值 ``(bytes)`` 引发: LookupError - 如果在超时后键仍未发布 """ b64_key = self.prefix + self._encode(key) kvs = self._try_wait_get([b64_key]) if kvs is None: raise LookupError(f"键 {key} 在 EtcdStore 中未找到") return self._decode(kvs[b64_key])[docs] def add(self, key, num: int) -> int: """ 原子地按整数增量增加值。 整数表示为使用基数10的字符串。如果键不存在, 将假定默认值为 ``0``。 返回: 新的(增加的)值 """ b64_key = self._encode(key) # c10d Store假设值是一个表示为十进制字符串的整数 try: # 如果此键尚未存在,则假定默认值 "0": node = self.client.write( key=self.prefix + b64_key, value=self._encode(str(num)), # 即 0 + num prevExist=False, ) return int(self._decode(node.value)) except etcd.EtcdAlreadyExist: pass while True: # 注意:c10d Store没有删除键的方法,因此我们可以 # 确定它仍然存在。 node = self.client.get(key=self.prefix + b64_key) new_value = self._encode(str(int(self._decode(node.value)) + num)) try: node = self.client.test_and_set( key=node.key<