多进程¶
库,用于启动和管理由函数或二进制文件指定的n
个工作子进程的副本。
对于函数,它使用 torch.multiprocessing
(因此使用 Python 的
multiprocessing
)来生成/分叉工作进程。对于二进制文件,它使用 Python 的
subprocessing.Popen
来创建工作进程。
用法1:作为函数启动两个训练器
from torch.distributed.elastic.multiprocessing import Std, start_processes
def trainer(a, b, c):
pass # 训练
# 运行两个训练器
# LOCAL_RANK=0 trainer(1,2,3)
# LOCAL_RANK=1 trainer(4,5,6)
ctx = start_processes(
name="trainer",
entrypoint=trainer,
args={0: (1,2,3), 1: (4,5,6)},
envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}},
log_dir="/tmp/foobar",
redirects=Std.ALL, # 将所有工作者的标准输出/错误写入日志文件
tee={0: Std.ERR}, # 仅将本地排名为0的错误输出到控制台
)
# 等待所有训练器副本完成
ctx.wait()
用法 2:以二进制形式启动 2 个 echo 工作进程
# 与调用相同
# echo hello
# echo world > stdout.log
ctx = start_processes(
name="echo"
entrypoint="echo",
log_dir="/tmp/foobar",
args={0: "hello", 1: "world"},
redirects={1: Std.OUT},
)
就像 torch.multiprocessing
一样,函数
start_processes()
的返回值是一个进程上下文(api.PContext
)。如果启动了一个函数,则返回一个 api.MultiprocessContext
,如果启动了一个二进制文件,则返回一个 api.SubprocessContext
。两者都是父类 api.PContext
的具体实现。
启动多个工作进程¶
- torch.distributed.elastic.multiprocessing.start_processes(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None, start_method='spawn')[源代码]¶
启动
n
个entrypoint
进程,并使用提供的选项。entrypoint
可以是Callable
(函数)或str
(二进制)。 副本的数量由args
和envs
参数的条目数决定,这些参数需要具有相同的键集。args
和env
参数是传递给由副本索引(本地等级)映射的入口点的参数和环境变量。 所有本地等级都必须被考虑。 也就是说,键集应该是{0,1,...,(nprocs-1)}
。注意
当
entrypoint
是一个二进制文件(str
)时,args
只能是字符串。如果提供了其他类型,则会被转换为字符串表示形式(例如str(arg1)
)。此外,二进制失败时,只有在主函数被标注为torch.distributed.elastic.multiprocessing.errors.record
时,才会写入一个error.json
错误文件。对于函数启动,这是默认完成的,无需手动使用@record
标注。redirects
和tee
是指定将哪些标准流重定向到log_dir
中的日志文件的位掩码。有效的掩码值在Std
中定义。 要仅重定向/分流某些本地排名,请将redirects
作为映射传递,键为本地排名,以指定重定向行为。 任何缺失的本地排名将默认为Std.NONE
。tee
的作用类似于 Unix 的“tee”命令,因为它重定向并打印到控制台。 为了避免工作进程的 stdout/stderr 打印到控制台,请使用redirects
参数。对于每个进程,
log_dir
将包含:{local_rank}/error.json
: 如果进程失败,包含错误信息的文件{local_rank}/stdout.json
: 如果redirect & STDOUT == STDOUT
{local_rank}/stderr.json
: 如果redirect & STDERR == STDERR
注意
预期
log_dir
存在、为空且是一个目录。示例:
log_dir = "/tmp/test" # 好的;两个foo的副本:foo("bar0"), foo("bar1") start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # 无效;缺少本地排名1的环境 start_processes( name="trainer", entrypoint=foo, args:{0:("bar0",), 1:("bar1",), envs:{0:{}}, log_dir=log_dir ) # 好的;两个/usr/bin/touch的副本:touch file1, touch file2 start_processes( name="trainer", entrypoint="/usr/bin/touch", args:{0:("file1",), 1:("file2",), envs:{0:{}, 1:{}}, log_dir=log_dir ) # 注意;参数被转换为字符串,运行: # echo "1" "2" "3" 和 echo "[1, 2, 3]" start_processes( name="trainer", entrypoint="/usr/bin/echo", args:{0:(1,2,3), 1:([1,2,3],), envs:{0:{}, 1:{}}, log_dir=log_dir )
- Parameters
- Return type
进程上下文¶
- class torch.distributed.elastic.multiprocessing.api.PContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[源代码]¶
标准化通过不同机制启动的一组进程操作的基类。
名称
PContext
是有意为之,以与torch.multiprocessing.ProcessContext
区分开来。警告
标准输出和标准错误应始终是tee_stdouts和tee_stderrs的超集(分别),这是因为tee是通过重定向+tail -f
实现的。
- class torch.distributed.elastic.multiprocessing.api.MultiprocessContext(name, entrypoint, args, envs, start_method, logs_specs, log_line_prefixes=None)[源代码]¶
PContext
持有作为函数调用的工作进程。
- class torch.distributed.elastic.multiprocessing.api.SubprocessContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[源代码]¶
PContext
持有作为二进制文件调用的工作进程。
- class torch.distributed.elastic.multiprocessing.api.RunProcsResult(return_values=<factory>, failures=<factory>, stdouts=<factory>, stderrs=<factory>)[源代码]¶
已完成运行的进程的结果,这些进程是通过
start_processes()
启动的。由PContext
返回。请注意以下内容:
所有字段均按本地排名映射
return_values
- 仅对函数(不是二进制文件)填充。stdouts
- 标准输出日志的路径(如果没有重定向则为空字符串)stderrs
- 指向 stderr.log 的路径(如果没有重定向则为空字符串)
- class torch.distributed.elastic.multiprocessing.api.DefaultLogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[源代码]¶
默认的 LogsSpecs 实现:
log_dir 如果不存在将会被创建
为每次尝试和排名生成嵌套文件夹。
- class torch.distributed.elastic.multiprocessing.api.LogsDest(stdouts=<factory>, stderrs=<factory>, tee_stdouts=<factory>, tee_stderrs=<factory>, error_files=<factory>)[源代码]¶
对于每种日志类型,保存本地排名ID到文件路径的映射。