Shortcuts

多进程

库,用于启动和管理由函数或二进制文件指定的n个工作子进程的副本。

对于函数,它使用 torch.multiprocessing(因此使用 Python 的 multiprocessing)来生成/分叉工作进程。对于二进制文件,它使用 Python 的 subprocessing.Popen 来创建工作进程。

用法1:作为函数启动两个训练器

from torch.distributed.elastic.multiprocessing import Std, start_processes

def trainer(a, b, c):
    pass # 训练


# 运行两个训练器
# LOCAL_RANK=0 trainer(1,2,3)
# LOCAL_RANK=1 trainer(4,5,6)
ctx = start_processes(
        name="trainer",
        entrypoint=trainer,
        args={0: (1,2,3), 1: (4,5,6)},
        envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}},
        log_dir="/tmp/foobar",
        redirects=Std.ALL, # 将所有工作者的标准输出/错误写入日志文件
        tee={0: Std.ERR}, # 仅将本地排名为0的错误输出到控制台
      )

# 等待所有训练器副本完成
ctx.wait()

用法 2:以二进制形式启动 2 个 echo 工作进程

# 与调用相同
# echo hello
# echo world > stdout.log
ctx = start_processes(
        name="echo"
        entrypoint="echo",
        log_dir="/tmp/foobar",
        args={0: "hello", 1: "world"},
        redirects={1: Std.OUT},
       )

就像 torch.multiprocessing 一样,函数 start_processes() 的返回值是一个进程上下文(api.PContext)。如果启动了一个函数,则返回一个 api.MultiprocessContext,如果启动了一个二进制文件,则返回一个 api.SubprocessContext。两者都是父类 api.PContext 的具体实现。

启动多个工作进程

torch.distributed.elastic.multiprocessing.start_processes(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None, start_method='spawn')[源代码]

启动 nentrypoint 进程,并使用提供的选项。

entrypoint 可以是 Callable(函数)或 str(二进制)。 副本的数量由 argsenvs 参数的条目数决定,这些参数需要具有相同的键集。

argsenv 参数是传递给由副本索引(本地等级)映射的入口点的参数和环境变量。 所有本地等级都必须被考虑。 也就是说,键集应该是 {0,1,...,(nprocs-1)}

注意

entrypoint 是一个二进制文件(str)时,args 只能是字符串。如果提供了其他类型,则会被转换为字符串表示形式(例如 str(arg1))。此外,二进制失败时,只有在主函数被标注为 torch.distributed.elastic.multiprocessing.errors.record 时,才会写入一个 error.json 错误文件。对于函数启动,这是默认完成的,无需手动使用 @record 标注。

redirectstee 是指定将哪些标准流重定向到 log_dir 中的日志文件的位掩码。有效的掩码值在 Std 中定义。 要仅重定向/分流某些本地排名,请将 redirects 作为映射传递,键为本地排名,以指定重定向行为。 任何缺失的本地排名将默认为 Std.NONE

tee 的作用类似于 Unix 的“tee”命令,因为它重定向并打印到控制台。 为了避免工作进程的 stdout/stderr 打印到控制台,请使用 redirects 参数。

对于每个进程,log_dir 将包含:

  1. {local_rank}/error.json: 如果进程失败,包含错误信息的文件

  2. {local_rank}/stdout.json: 如果 redirect & STDOUT == STDOUT

  3. {local_rank}/stderr.json: 如果 redirect & STDERR == STDERR

注意

预期 log_dir 存在、为空且是一个目录。

示例:

log_dir = "/tmp/test"

# 好的;两个foo的副本:foo("bar0"), foo("bar1")
start_processes(
   name="trainer",
   entrypoint=foo,
   args:{0:("bar0",), 1:("bar1",),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
)

# 无效;缺少本地排名1的环境
start_processes(
   name="trainer",
   entrypoint=foo,
   args:{0:("bar0",), 1:("bar1",),
   envs:{0:{}},
   log_dir=log_dir
)

# 好的;两个/usr/bin/touch的副本:touch file1, touch file2
start_processes(
   name="trainer",
   entrypoint="/usr/bin/touch",
   args:{0:("file1",), 1:("file2",),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
 )

# 注意;参数被转换为字符串,运行:
# echo "1" "2" "3" 和 echo "[1, 2, 3]"
start_processes(
   name="trainer",
   entrypoint="/usr/bin/echo",
   args:{0:(1,2,3), 1:([1,2,3],),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
 )
Parameters
  • 名称 (str) – 一个描述进程的人类可读的简短名称 (在tee stdout/stderr输出时用作标题)

  • entrypoint (Union[Callable, str]) – 可以是 Callable(函数)或 cmd(二进制文件)

  • args (字典[整数, 元组]) – 传递给每个副本的参数

  • envs (Dict[int, Dict[str, str]]) – 每个副本的环境变量

  • log_dir – 用于写入日志文件的目录

  • start_method (str) – 多进程启动方法(spawn, fork, forkserver) 对于二进制文件忽略

  • 重定向 – 将哪些标准流重定向到日志文件

  • tee – 重定向的标准流 + 打印到控制台

  • local_ranks_filter – 哪些等级的日志打印到控制台

Return type

PContext

进程上下文

class torch.distributed.elastic.multiprocessing.api.PContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[源代码]

标准化通过不同机制启动的一组进程操作的基类。

名称 PContext 是有意为之,以与 torch.multiprocessing.ProcessContext 区分开来。

警告

标准输出和标准错误应始终是tee_stdouts和tee_stderrs的超集(分别),这是因为tee是通过重定向+tail -f 实现的。

class torch.distributed.elastic.multiprocessing.api.MultiprocessContext(name, entrypoint, args, envs, start_method, logs_specs, log_line_prefixes=None)[源代码]

PContext 持有作为函数调用的工作进程。

class torch.distributed.elastic.multiprocessing.api.SubprocessContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[源代码]

PContext 持有作为二进制文件调用的工作进程。

class torch.distributed.elastic.multiprocessing.api.RunProcsResult(return_values=<factory>, failures=<factory>, stdouts=<factory>, stderrs=<factory>)[源代码]

已完成运行的进程的结果,这些进程是通过 start_processes() 启动的。由 PContext 返回。

请注意以下内容:

  1. 所有字段均按本地排名映射

  2. return_values - 仅对函数(不是二进制文件)填充。

  3. stdouts - 标准输出日志的路径(如果没有重定向则为空字符串)

  4. stderrs - 指向 stderr.log 的路径(如果没有重定向则为空字符串)

class torch.distributed.elastic.multiprocessing.api.DefaultLogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[源代码]

默认的 LogsSpecs 实现:

  • log_dir 如果不存在将会被创建

  • 为每次尝试和排名生成嵌套文件夹。

reify(envs)[源代码]

使用以下方案构建日志目标路径:

  • //attempt_//stdout.log

  • //attempt_//stderr.log

  • //attempt_//error.json

Return type

日志目标

class torch.distributed.elastic.multiprocessing.api.LogsDest(stdouts=<factory>, stderrs=<factory>, tee_stdouts=<factory>, tee_stderrs=<factory>, error_files=<factory>)[源代码]

对于每种日志类型,保存本地排名ID到文件路径的映射。

class torch.distributed.elastic.multiprocessing.api.LogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[源代码]

定义每个工作进程的日志处理和重定向。

Parameters
  • log_dir (可选[字符串]) – 日志将被写入的基本目录。

  • 重定向 (联合[标准, 字典[整数, 标准]]) – 重定向到文件的流。传递一个单独的 Std 枚举以重定向所有工作线程,或通过 local_rank 键控的映射来选择性重定向。

  • tee (联合[Std, 字典[整数, Std]]) – 要复制到标准输出/标准错误的流。 传递一个单独的 Std 枚举以复制所有工作线程的流, 或按 local_rank 键控的映射以选择性复制。

abstract reify(envs)[源代码]

给定环境变量,构建每个本地排名的日志文件目标路径。

Envs 参数包含每个本地 rank 的环境变量字典,其中条目在以下位置定义: _start_workers()

Return type

日志目标