#!/usr/bin/env python3
# 版权所有 (c) Facebook, Inc. 及其附属公司。
# 保留所有权利。
#
# 本源代码根据在此源树根目录下的LICENSE文件中找到的BSD风格许可证进行许可。
"""
启动和管理 ``n`` 个工作子进程的库,这些子进程可以由函数或二进制文件指定。
对于函数,它使用 ``torch.multiprocessing``(因此使用python的 ``multiprocessing``)来生成/分叉工作进程。对于二进制文件,它使用python的 ``subprocessing.Popen`` 来创建工作进程。
用法1:作为函数启动两个训练器
::
from torch.distributed.elastic.multiprocessing import Std, start_processes
def trainer(a, b, c):
pass # 训练
# 运行两个训练器
# LOCAL_RANK=0 trainer(1,2,3)
# LOCAL_RANK=1 trainer(4,5,6)
ctx = start_processes(
name="trainer",
entrypoint=trainer,
args={0: (1,2,3), 1: (4,5,6)},
envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}},
log_dir="/tmp/foobar",
redirects=Std.ALL, # 将所有工作进程的stdout/stderr写入日志文件
tee={0: Std.ERR}, # 仅将本地排名0的stderr输出到控制台
)
# 等待所有训练器副本完成
ctx.wait()
用法2:作为二进制文件启动2个echo工作进程
::
# 等同于调用
# echo hello
# echo world > stdout.log
ctx = start_processes(
name="echo"
entrypoint="echo",
log_dir="/tmp/foobar",
args={0: "hello", 1: "world"},
redirects={1: Std.OUT},
)
与 ``torch.multiprocessing`` 类似,函数 :func:`start_processes` 的返回值是一个进程上下文 (:class:`api.PContext`)。如果启动了函数,则返回 :class:`api.MultiprocessContext`,如果启动了二进制文件,则返回 :class:`api.SubprocessContext`。两者都是父类 :class:`api.PContext` 的具体实现。
"""
import os
from typing import Callable, Dict, Optional, Tuple, Union, Set
from torch.distributed.elastic.multiprocessing.api import ( # noqa: F401
_validate_full_rank,
DefaultLogsSpecs,
LogsDest,
LogsSpecs,
MultiprocessContext,
PContext,
ProcessFailure,
RunProcsResult,
SignalException,
Std,
SubprocessContext,
to_map,
)
from torch.distributed.elastic.utils.logging import get_logger
__all__ = [
"start_processes",
"MultiprocessContext",
"PContext",
"ProcessFailure",
"RunProcsResult",
"SignalException",
"Std",
"LogsDest",
"LogsSpecs",
"DefaultLogsSpecs",
"SubprocessContext",
"to_map",
]
log = get_logger(__name__)
[docs]def start_processes(
name: str,
entrypoint: Union[Callable, str],
args: Dict[int, Tuple],
envs: Dict[int, Dict[str, str]],
logs_specs: LogsSpecs,
log_line_prefixes: Optional[Dict[int, str]] = None,
start_method: str = "spawn",
) -> PContext:
"""
使用提供的选项启动 ``n`` 个 ``entrypoint`` 进程的副本。
``entrypoint`` 可以是 ``Callable``(函数)或 ``str``(二进制文件)。
副本的数量由 ``args`` 和 ``envs`` 参数的条目数决定,它们需要具有相同的键集。
``args`` 和 ``env`` 参数是要传递给入口点的参数和环境变量,按副本索引(本地排名)映射。
所有本地排名都必须被考虑。
也就是说,键集应该是 ``{0,1,...,(nprocs-1)}``。
.. 注意:: 当 ``entrypoint`` 是二进制文件(``str``)时,``args`` 只能是字符串。
如果提供了任何其他类型,则将其转换为字符串表示形式
(例如 ``str(arg1)``)。此外,二进制文件失败时,只有在主函数被注释为
``torch.distributed.elastic.multiprocessing.errors.record`` 时,才会写入 ``error.json`` 错误文件。对于函数启动,
这是默认完成的,无需手动使用 ``@record`` 注释。
``redirects`` 和 ``tee`` 是指定将哪些标准流重定向到 ``log_dir`` 中的日志文件的位掩码。
有效的掩码值在 ``Std`` 中定义。
要仅重定向/tee某些本地排名,请将 ``redirects`` 作为映射传递,键为
本地排名,以指定重定向行为。
任何缺失的本地排名将默认为 ``Std.NONE``。
``tee`` 的作用类似于unix的 "tee" 命令,即重定向 + 打印到控制台。
要避免工作进程的stdout/stderr打印到控制台,请使用 ``redirects`` 参数。
对于每个进程,``log_dir`` 将包含:
#. ``{local_rank}/error.json``: 如果进程失败,包含错误信息的文件
#. ``{local_rank}/stdout.json``: 如果 ``redirect & STDOUT == STDOUT``
#. ``{local_rank}/stderr.json``: 如果 ``redirect & STDERR == STDERR``
.. 注意:: 期望 ``log_dir`` 存在、为空且为目录。
示例:
::
log_dir = "/tmp/test"
# 正常;两个foo的副本:foo("bar0"),foo("bar1")
start_processes(
name="trainer",
entrypoint=foo,
args:{0:("bar0",), 1:("bar1",),
envs:{0:{}, 1:{}},
log_dir=log_dir
)
# 无效;缺少本地排名1的环境变量
start_processes(
name="trainer",
entrypoint=foo,
args:{0:("bar0",), 1:("bar1",),
envs:{0:{}},
log_dir=log_dir
)
# 正常;两个/usr/bin/touch的副本:touch file1,touch file2
start_processes(
name="trainer",
entrypoint="/usr/bin/touch",
args:{0:("file1",), 1:("file2",),
envs:{0:{}, 1:{}},
log_dir=log_dir
)
# 注意;参数被转换为字符串,运行:
# echo "1" "2" "3" 和 echo "[1, 2, 3]"
start_processes(
name="trainer",
entrypoint="/usr/bin/echo",
args:{0:(1,2,3), 1:([1,2,3],),
envs:{0:{}, 1:{}},
log_dir=log_dir
)
参数:
name: 描述进程的人类可读的短名称
(在tee stdout/stderr输出时用作标题)
entrypoint: 可以是 ``Callable``(函数)或 ``cmd``(二进制文件)
args: 每个副本的参数
envs: 每个副本的环境变量
log_dir: 用于写入日志文件的目录
<