torch.distributed.elastic.events.api 的源代码
#!/usr/bin/env python3
# 版权所有 (c) Facebook, Inc. 及其附属公司。
# 保留所有权利。
#
# 此源代码根据在此源树根目录下的LICENSE文件中找到的BSD风格许可证进行许可。
import json
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import Dict, Union, Optional
__all__ = ['EventSource', 'Event', 'NodeState', 'RdzvEvent']
EventMetadataValue = Union[str, int, float, bool, None]
[docs]class EventSource(str, Enum):
"""已知的事件生产者标识符。"""
AGENT = "AGENT"
WORKER = "WORKER"
[docs]@dataclass
class Event:
"""
该类表示在torchelastic作业执行期间发生的通用事件。
事件可以是任何有意义的行为。
参数:
name: 事件名称。
source: 事件生产者,例如代理或工作者
timestamp: 事件发生时的时间戳(以毫秒为单位)。
metadata: 与事件相关的附加数据。
"""
name: str
source: EventSource
timestamp: int = 0
metadata: Dict[str, EventMetadataValue] = field(default_factory=dict)
def __str__(self):
return self.serialize()
@staticmethod
def deserialize(data: Union[str, "Event"]) -> "Event":
if isinstance(data, Event):
return data
if isinstance(data, str):
data_dict = json.loads(data)
data_dict["source"] = EventSource[data_dict["source"]] # type: ignore[possibly-undefined]
return Event(**data_dict)
def serialize(self) -> str:
return json.dumps(asdict(self))
class NodeState(str, Enum):
"""节点在rendezvous中可以处于的状态。"""
INIT = "INIT"
RUNNING = "RUNNING"
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"
@dataclass
class RdzvEvent:
"""
表示任何rendezvous事件的dataclass。
参数:
name: 事件名称。(例如:当前正在执行的操作)
run_id: rendezvous的运行ID
message: 描述事件的消息
hostname: 节点的hostname
pid: 节点的进程ID
node_state: 节点的状态(INIT, RUNNING, SUCCEEDED, FAILED)
master_endpoint: rendezvous存储的主端点(如果已知)
rank: 节点的rank(如果已知)
local_id: 节点的local_id(如果在dynamic_rendezvous.py中定义)
error_trace: 错误堆栈跟踪(如果是错误事件)。
"""
name: str
run_id: str
message: str
hostname: str
pid: int
node_state: NodeState
master_endpoint: str = ""
rank: Optional[int] = None
local_id: Optional[int] = None
error_trace: str = ""
def __str__(self):
return self.serialize()
@staticmethod
def deserialize(data: Union[str, "RdzvEvent"]) -> "RdzvEvent":
if isinstance(data, RdzvEvent):
return data
if isinstance(data, str):
data_dict = json.loads(data)
data_dict["node_state"] = NodeState[data_dict["node_state"]] # type: ignore[possibly-undefined]
return RdzvEvent(**data_dict)
def serialize(self) -> str:
return json.dumps(asdict(self))