Shortcuts

torch.distributed.elastic.events.api 的源代码

#!/usr/bin/env python3

# 版权所有 (c) Facebook, Inc. 及其附属公司。
# 保留所有权利。
#
# 此源代码根据在此源树根目录下的LICENSE文件中找到的BSD风格许可证进行许可。

import json
from dataclasses import asdict, dataclass, field
from enum import Enum
from typing import Dict, Union, Optional

__all__ = ['EventSource', 'Event', 'NodeState', 'RdzvEvent']

EventMetadataValue = Union[str, int, float, bool, None]


[docs]class EventSource(str, Enum): """已知的事件生产者标识符。""" AGENT = "AGENT" WORKER = "WORKER"
[docs]@dataclass class Event: """ 该类表示在torchelastic作业执行期间发生的通用事件。 事件可以是任何有意义的行为。 参数: name: 事件名称。 source: 事件生产者,例如代理或工作者 timestamp: 事件发生时的时间戳(以毫秒为单位)。 metadata: 与事件相关的附加数据。 """ name: str source: EventSource timestamp: int = 0 metadata: Dict[str, EventMetadataValue] = field(default_factory=dict) def __str__(self): return self.serialize() @staticmethod def deserialize(data: Union[str, "Event"]) -> "Event": if isinstance(data, Event): return data if isinstance(data, str): data_dict = json.loads(data) data_dict["source"] = EventSource[data_dict["source"]] # type: ignore[possibly-undefined] return Event(**data_dict) def serialize(self) -> str: return json.dumps(asdict(self))
class NodeState(str, Enum): """节点在rendezvous中可以处于的状态。""" INIT = "INIT" RUNNING = "RUNNING" SUCCEEDED = "SUCCEEDED" FAILED = "FAILED" @dataclass class RdzvEvent: """ 表示任何rendezvous事件的dataclass。 参数: name: 事件名称。(例如:当前正在执行的操作) run_id: rendezvous的运行ID message: 描述事件的消息 hostname: 节点的hostname pid: 节点的进程ID node_state: 节点的状态(INIT, RUNNING, SUCCEEDED, FAILED) master_endpoint: rendezvous存储的主端点(如果已知) rank: 节点的rank(如果已知) local_id: 节点的local_id(如果在dynamic_rendezvous.py中定义) error_trace: 错误堆栈跟踪(如果是错误事件)。 """ name: str run_id: str message: str hostname: str pid: int node_state: NodeState master_endpoint: str = "" rank: Optional[int] = None local_id: Optional[int] = None error_trace: str = "" def __str__(self): return self.serialize() @staticmethod def deserialize(data: Union[str, "RdzvEvent"]) -> "RdzvEvent": if isinstance(data, RdzvEvent): return data if isinstance(data, str): data_dict = json.loads(data) data_dict["node_state"] = NodeState[data_dict["node_state"]] # type: ignore[possibly-undefined] return RdzvEvent(**data_dict) def serialize(self) -> str: return json.dumps(asdict(self))
优云智算