Shortcuts

Elastic Agent

服务器

弹性代理是torchelastic的控制平面。

这是一个启动和管理底层工作进程的过程。 代理负责:

  1. 使用分布式torch:工作进程启动时携带了所有必要的信息,可以成功且轻松地调用 torch.distributed.init_process_group()

  2. 容错性:监控工作节点,在检测到工作节点故障或不健康时,关闭所有工作节点并重新启动所有节点。

  3. 弹性:响应成员变化并使用新成员重新启动工作进程。

最简单的代理是按节点部署的,并与本地进程一起工作。 一个更高级的代理可以远程启动和管理工作线程。代理可以是完全去中心化的,根据其管理的工作线程做出决策。 或者可以是被协调的,与其他代理(管理同一作业中的工作线程)进行通信,以做出集体决策。

下面是一个管理本地工作组代理的示意图。

../_images/agent_diagram.jpg

概念

本节描述了与理解torchelastic中agent角色相关的高级类和概念。

class torch.distributed.elastic.agent.server.ElasticAgent[源代码]

负责管理一个或多个工作进程的代理进程。

工作进程被假定为常规的分布式PyTorch脚本。当工作进程由代理创建时,代理提供必要的信息,以便工作进程能够正确初始化一个torch进程组。

确切的部署拓扑结构和代理与工作者的比例取决于代理的具体实现和用户的作业放置偏好。例如,要在GPU上运行一个分布式训练作业,使用8个训练器(每个GPU一个),可以:

  1. 使用8个单GPU实例,每个实例放置一个代理,每个代理管理1个工作线程。

  2. 使用4个双GPU实例,每个实例放置一个代理,每个代理管理2个工作线程。

  3. 使用2个四重GPU实例,每个实例放置一个代理,每个代理管理4个工作线程。

  4. 使用1个8 GPU实例,每个实例放置一个代理,每个代理管理8个工作线程。

用法

group_result = agent.run()
 if group_result.is_failed():
   # 工作失败
   failure = group_result.failures[0]
   log.exception("worker 0 failed with exit code : %s", failure.exit_code)
 else:
   return group_result.return_values[0] # 返回排名0的结果
abstract get_worker_group(role='default')[源代码]

返回给定角色工作组

请注意,工作组是一个可变对象,因此在多线程/多进程环境中,其状态可能会发生变化。 实现者被鼓励(但不是必须)返回一个防御性的只读副本。

Return type

工作组

abstract run(role='default')[源代码]

运行代理。

支持在失败时最多重试工作组 max_restarts 次。

Returns

执行结果,包含每个工作者的返回值或失败详情,按工作者的全局排名映射。

Raises

异常 - 与工作进程无关的其他失败

Return type

运行结果

class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=30.0, master_port=None, master_addr=None, local_addr=None)[源代码]

关于某种特定类型工人的蓝图信息。

对于给定的角色,必须只存在一个工作器规格。 工作器规格预计在所有节点(机器)上是一致的, 即每个节点为特定规格运行相同数量的工作器。

Parameters
  • 角色 (str) – 为具有此规格的工作者定义的用户角色

  • local_world_size (int) – 要运行的本地工作者的数量

  • fn (可选[可调用]) – (已弃用,请改用入口点)

  • entrypoint可选[联合[可调用, 字符串]])– 工作函数或命令

  • args (元组) – 传递给 entrypoint 的参数

  • rdzv_handler (RendezvousHandler) – 处理这组工作者的rdzv

  • max_restarts (int) – 工作者的最大重试次数

  • monitor_interval (float) – 每 n 秒监控一次工作线程的状态

  • master_port (可选[整数]) – 在rank 0上运行c10d存储的固定端口 如果未指定,则将选择一个随机的空闲端口

  • master_addr (可选[字符串]) – 在rank 0上运行c10d存储的固定master_addr 如果未指定,则将在代理rank 0上选择主机名

  • 重定向 – 将标准流重定向到文件, 通过传递映射选择性地为特定本地等级重定向

  • tee – 将指定的标准流(s)同时输出到控制台和文件, 通过传递一个映射,可以选择性地为特定的本地排名进行tee操作, 优先于redirects设置。

get_entrypoint_name()[源代码]

获取入口点名称。

如果入口点是一个函数(例如 Callable),则返回其 __qualname__,否则如果入口点是一个二进制文件(例如 str),则返回二进制文件名。

class torch.distributed.elastic.agent.server.WorkerState(value)[源代码]

一个WorkerGroup的状态。

工作组中的工作者会作为一个整体改变状态。如果工作组中的任何一个工作者失败,整个工作组都会被视为失败:

未知 - 代理失去了对工作组状态的跟踪,无法恢复
初始化 - 工作组对象已创建但尚未启动
健康 - 工作正常运行且健康
不健康 - 工作正常运行但不健康
已停止 - 工作被代理中断停止
成功 - 工作完成运行(退出代码 0)
失败 - 工作未能成功完成(退出代码非 0)

一个工作组从初始的 INIT 状态开始, 然后进入 HEALTHYUNHEALTHY 状态, 最终达到终端的 SUCCEEDEDFAILED 状态。

工作组可以被中断并暂时被代理置于停止状态。处于停止状态的工作者将由代理在不久的将来重新调度启动。一些工作者被置于停止状态的例子包括:

  1. 观察到工作组故障|不健康

  2. 检测到成员变更

当对工作组执行操作(启动、停止、rdzv、重试等)失败,并且导致操作仅部分应用于工作组时,状态将变为未知。通常这种情况发生在代理在状态变更事件期间遇到未捕获/未处理的异常时。代理不应尝试恢复处于未知状态的工作组,而是最好自我终止,并允许作业管理器重试该节点。

static is_running(state)[源代码]

返回 Worker 的状态。

Returns

如果工作线程状态表示工作线程仍在运行(例如,进程存在但不一定是健康的),则为真。

Return type

bool

class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[源代码]

一个工作实例。

与此相对的是WorkerSpec,它表示一个工作者的规格。一个Worker是从一个WorkerSpec创建的。一个Worker对于一个WorkerSpec就像一个对象对于一个类。

工作者的idElasticAgent的具体实现来解释。对于本地代理,它可能是工作者的pid (int),对于远程代理,它可能被编码为host:port (string)

Parameters
  • id (任意) – 唯一标识一个工作者(由代理解释)

  • local_rank (int) – 工作者的本地排名

  • global_rank (int) – 工作者的全局排名

  • role_rank (int) – 在具有相同角色的所有工作者中的排名

  • world_size (int) – 工作者的数量(全局)

  • role_world_size (int) – 具有相同角色的工作者的数量

class torch.distributed.elastic.agent.server.WorkerGroup(spec)[源代码]

一组 Worker 实例。

该类为给定的 WorkerSpec 定义了一组由 ElasticAgent 管理的 Worker 实例。工作组是否包含跨实例的工作者取决于代理的实现。

实现

以下是torchelastic提供的代理实现。

class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[源代码]

一个处理本地主机工作者的 torchelastic.agent.server.ElasticAgent 实现。

此代理按主机部署,并配置为生成n个工作线程。 当使用GPU时,n映射到主机上可用的GPU数量。

本地代理不会与其他主机上部署的其他本地代理进行通信,即使工作进程可能进行跨主机通信。工作进程ID被解释为本地进程。代理作为一个单一单元启动和停止所有工作进程。

工作函数和传递给工作函数的参数必须是与Python多进程兼容的。为了将多进程数据结构传递给工作进程,您可以在与指定的start_method相同的多进程上下文中创建数据结构,并将其作为函数参数传递。

The exit_barrier_timeout 指定等待其他代理完成的时间(以秒为单位)。这作为一个安全网,用于处理工作线程在不同时间完成的情况,以防止代理将提前完成的工作线程视为缩减事件。强烈建议用户代码处理确保工作线程以同步方式终止,而不是依赖于 exit_barrier_timeout。

如果在 `LocalElasticAgent` 进程中定义了值为 1 的环境变量 TORCHELASTIC_ENABLE_FILE_TIMER,则可以启用基于命名管道的看门狗。 可选地,可以设置另一个环境变量 `TORCHELASTIC_TIMER_FILE`,并为其指定一个唯一的文件名用于命名管道。如果未设置环境变量 `TORCHELASTIC_TIMER_FILE``LocalElasticAgent` 将在内部创建一个唯一的文件名,并将其设置为环境变量 `TORCHELASTIC_TIMER_FILE`,该环境变量将被传播到工作进程,以允许它们连接到 `LocalElasticAgent` 使用的同一命名管道。

日志被写入指定的日志目录。每条日志行默认会以[${role_name}${local_rank}]:为前缀(例如[trainer0]: foobar)。日志前缀可以通过传递一个模板字符串作为log_line_prefix_template参数来自定义。以下宏(标识符)在运行时会被替换:${role_name}, ${local_rank}, ${rank}。例如,要将每条日志行的前缀设置为全局排名而不是本地排名,请设置log_line_prefix_template = "[${rank}]:

示例启动函数

def trainer(args) -> str:
    return "do train"

def main():
    start_method="spawn"
    shared_queue= multiprocessing.get_context(start_method).Queue()
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint=trainer,
                args=("foobar",),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec, start_method)
    results = agent.run()

    if results.is_failed():
        print("训练失败")
    else:
        print(f"rank 0 返回值: {results.return_values[0]}")
        # 打印 -> rank 0 返回值: do train

示例启动二进制文件

def main():
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint="/usr/local/bin/trainer",
                args=("--trainer-args", "foobar"),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec)
    results = agent.run()

    if not results.is_failed():
        print("二进制启动没有返回值")

扩展代理

要扩展代理,您可以直接实现 `ElasticAgent,但我们建议您改为扩展 SimpleElasticAgent,它提供了大部分的框架,并让您只需实现几个特定的抽象方法。

class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[源代码]

一个管理特定类型工作角色的ElasticAgent

一个管理工作者(WorkerGroup)的ElasticAgent,用于单个WorkerSpec,例如一种特定类型的工作者角色。

_assign_worker_ranks(store, group_rank, group_world_size, spec)[源代码]

确定工作进程的适当等级。

排名分配是根据以下算法完成的:

  1. 每个代理将其配置(group_rank、group_world_size、num_workers)写入公共存储。

  2. 每个代理检索所有代理的配置,并使用角色和等级进行两级排序。

  3. 确定全局排名:当前代理的工人的全局排名是代理的group_rank之前的infos数组的偏移量。偏移量计算为所有排名小于group_rank的代理的local_world_size的总和。工人的排名将是:[offset, offset+local_world_size)

  4. 确定角色等级:角色等级是使用第3点中的算法确定的,但偏移量是从与当前角色相同且具有最小组等级的第一个代理开始的。

Return type

列表[Worker]

_exit_barrier()[源代码]

定义一个屏障,使代理进程保持活动状态,直到所有工作线程完成。

等待所有代理完成执行其本地工作程序(无论成功与否)的exit_barrier_timeout秒。这作为防止用户脚本在不同时间终止的安全措施。

_initialize_workers(worker_group)[源代码]

为 worker_group 启动一组新的 worker。

本质上,这是一个会合操作,随后是start_workers。 调用者应首先调用_stop_workers()来停止正在运行的工作线程,然后再调用此方法。

乐观地将刚刚启动的工作组状态设置为健康,并将实际的状态监控委托给_monitor_workers()方法

abstract _monitor_workers(worker_group)[源代码]

检查 worker_group 的工作者。

此函数还会返回工作组的新的状态。

Return type

运行结果

_rendezvous(worker_group)[源代码]

为工作器规范中指定的工作器运行会合。

为工作者分配新的全局等级和世界大小。 更新工作者组的rendezvous存储。

_restart_workers(worker_group)[源代码]

重新启动(停止、会合、启动)组中所有本地工作线程。

abstract _shutdown(death_sig=Signals.SIGTERM)[源代码]

清理代理工作期间分配的任何资源。

Parameters

death_sig (信号) – 发送给子进程的信号,默认是SIGTERM

abstract _start_workers(worker_group)[源代码]

启动 worker_group.spec.local_world_size 数量的工作进程。

这是根据工作组的工作规范。 返回一个从 local_rank 到工作 id 的映射。

Return type

字典[整数, 任意]

abstract _stop_workers(worker_group)[源代码]

停止指定工作组中的所有工作线程。

实现者必须处理由 WorkerState定义的所有状态的工人。也就是说,它必须优雅地处理停止不存在的工人、不健康的(卡住的)工人等。

class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[源代码]

返回工作执行的结果。

运行结果遵循“全有或全无”策略,即当且仅当该代理管理的所有本地工作线程都成功完成时,运行才算成功。

如果结果是成功的(例如 is_failed() = False),那么 return_values 字段包含由该代理管理的工人的输出(返回值),这些输出按其全局排名映射。即 result.return_values[0] 是全局排名为 0 的返回值。

注意

return_values 仅在工作入口点是一个函数时才有意义。以二进制入口点指定的工作者通常没有返回值,return_values 字段是无意义的,可能为空。

如果 is_failed() 返回 True,那么 failures 字段包含失败信息,同样,按失败工作者的全局排名映射。

return_valuesfailures 中的键是互斥的,也就是说, 一个工作者的最终状态只能是以下之一:成功、失败。根据代理的重启策略,由代理故意终止的工作者,不会在 return_valuesfailures 中表示。

代理中的看门狗

如果在 `LocalElasticAgent` 进程中定义了值为 1 的环境变量 TORCHELASTIC_ENABLE_FILE_TIMER,则可以启用基于命名管道的看门狗。 可选地,可以设置另一个环境变量 `TORCHELASTIC_TIMER_FILE`,并为其指定一个唯一的文件名作为命名管道。如果未设置环境变量 `TORCHELASTIC_TIMER_FILE``LocalElasticAgent` 将在内部创建一个唯一的文件名,并将其设置为环境变量 `TORCHELASTIC_TIMER_FILE`,该环境变量将被传播到工作进程,以允许它们连接到 `LocalElasticAgent` 使用的同一命名管道。

优云智算