Shortcuts

过期计时器

过期计时器与代理程序设置在同一进程中,并从您的脚本中使用,以处理卡住的工人。当您进入一个有可能卡住的代码块时,您可以获取一个过期计时器,该计时器指示计时器服务器在进程未能在自设的过期截止日期前释放计时器时终止该进程。

用法:

import torchelastic.timer as timer
import torchelastic.agent.server as agent

def main():
    start_method = "spawn"
    message_queue = mp.get_context(start_method).Queue()
    server = timer.LocalTimerServer(message, max_interval=0.01)
    server.start() # 非阻塞

    spec = WorkerSpec(
                fn=trainer_func,
                args=(message_queue,),
                ...<OTHER_PARAMS...>)
    agent = agent.LocalElasticAgent(spec, start_method)
    agent.run()

def trainer_func(message_queue):
    timer.configure(timer.LocalTimerClient(message_queue))
    with timer.expires(after=60): # 60秒过期
        # 做一些工作

在上面的示例中,如果 trainer_func 的执行时间超过60秒,那么工作进程将被终止,并且代理将重试工作组。

客户端方法

torch.distributed.elastic.timer.configure(timer_client)[源代码]

配置一个定时器客户端。在使用expires之前必须调用。

torch.distributed.elastic.timer.expires(after, scope=None, client=None)[源码]

获取一个倒计时器,该计时器将在after秒后到期,除非它所包裹的代码块在此时间范围内完成。当计时器到期时,此工作线程有资格被回收。“回收”的确切含义取决于客户端实现。在大多数情况下,回收意味着终止工作线程进程。请注意,工作线程并不保证在time.now() + after时刻被回收,而是工作线程“有资格”被回收,客户端与之通信的TimerServer将最终决定何时以及如何回收具有过期计时器的工作线程。

用法:

torch.distributed.elastic.timer.configure(LocalTimerClient())
with expires(after=10):
    torch.distributed.all_reduce(...)

服务器/客户端实现

以下是torchelastic提供的计时服务器和客户端对。

注意

计时器服务器和客户端总是需要成对实现和使用,因为服务器和客户端之间存在消息协议。

下面是一对基于multiprocess.Queue实现的定时器服务器和客户端。

class torch.distributed.elastic.timer.LocalTimerServer(mp_queue, max_interval=60, daemon=True)[源代码]

LocalTimerClient一起工作的服务器。客户端预计是运行此服务器的父进程的子进程。作业中的每个主机预计都会在本地启动自己的计时器服务器,每个服务器实例管理本地工作线程的计时器(运行在同一主机上的进程)。

class torch.distributed.elastic.timer.LocalTimerClient(mp_queue)[源代码]

LocalTimerServer 的客户端。此客户端旨在在与 LocalTimerServer 运行的同一主机上使用,并使用 pid 来唯一标识一个工作进程。这在每台主机上有多个 GPU 设备的情况下,每个 GPU 生成一个子进程(训练器)时特别有用。

下面是另一对基于命名管道实现的定时器服务器和客户端。

class torch.distributed.elastic.timer.FileTimerServer(file_path, max_interval=10, daemon=True, log_event=None)[源代码]

FileTimerClient一起工作的服务器。客户端预计将在运行此服务器的同一主机上运行。 作业中的每个主机预计将在本地启动自己的计时器服务器,并且每个服务器实例管理本地工作者的计时器(运行在同一主机上的进程)。

Parameters
  • file_path (str) – 字符串,要创建的FIFO特殊文件的路径。

  • max_interval (float) – 浮点数,每个看门狗循环的最大间隔时间,单位为秒。

  • 守护进程 (bool) – 布尔值,是否以守护模式运行看门狗线程。 守护线程不会阻止进程停止。

  • log_event (可选[可调用[[字符串, 可选[FileTimerRequest]], ]]) – 可调用[[Dict[str, str]], None],用于以JSON格式记录事件的可选回调。

class torch.distributed.elastic.timer.FileTimerClient(file_path, signal=Signals.SIGKILL)[源代码]

FileTimerServer 的客户端。此客户端旨在与运行 FileTimerServer 的同一主机上使用,并使用 pid 来唯一标识一个工作进程。 此客户端使用命名管道向 FileTimerServer 发送定时器请求。此客户端是生产者,而 FileTimerServer 是消费者。多个客户端可以与同一个 FileTimerServer 一起工作。

Parameters
  • file_path (str) – 字符串,FIFO 特殊文件的路径。FileTimerServer 必须通过调用 os.mkfifo() 创建它。

  • 信号 – 信号,用于终止进程的信号。使用负值或零信号将不会终止进程。

编写自定义计时器服务器/客户端

要编写自己的定时器服务器和客户端,请扩展 torch.distributed.elastic.timer.TimerServer 作为服务器, torch.distributed.elastic.timer.TimerClient 作为客户端。 TimerRequest 对象用于在服务器和客户端之间传递消息。

class torch.distributed.elastic.timer.TimerRequest(worker_id, scope_id, expiration_time)[源代码]

表示倒计时定时器获取和释放的数据对象,用于TimerClientTimerServer之间。 负的expiration_time应被解释为“释放”请求。

注意

类型 worker_id 是实现特定的。 它是 TimerServer 和 TimerClient 实现中用于唯一标识工作者的内容。

class torch.distributed.elastic.timer.TimerServer(request_queue, max_interval, daemon=True)[源代码]

监控活动计时器并在适当时间过期它们的实体。该服务器负责回收已过期计时器的工人。

abstract clear_timers(worker_ids)[源代码]

清除给定 worker_ids 的所有计时器。

abstract get_expired_timers(deadline)[源代码]

返回每个worker_id的所有已过期计时器。已过期计时器是指其expiration_time小于或等于提供的截止时间的计时器。

Return type

字典[字符串, 列表[TimerRequest]]

abstract register_timers(timer_requests)[源代码]

处理传入的定时器请求并将它们注册到服务器。 定时器请求可以是获取定时器或释放定时器请求。 具有负的 expiration_time 的定时器请求应被解释为释放定时器请求。

class torch.distributed.elastic.timer.TimerClient[源代码]

客户端库,用于通过与 TimerServer 通信来获取和释放倒计时计时器。

abstract acquire(scope_id, expiration_time)[源代码]

为持有此客户端对象的工作线程获取一个计时器,基于给定的scope_id和expiration_time。通常会向TimerServer注册该计时器。

abstract release(scope_id)[源代码]

释放此客户端所代表的工作线程上的scope_id的计时器。调用此方法后,作用域上的倒计时器将不再有效。