torchx.runner¶
运行器允许您在支持的调度器之一上将组件作为独立作业运行。运行器接受一个specs.AppDef对象,该对象是通过使用一组用户提供的参数评估组件函数的结果,以及调度器名称和调度器参数(也称为runcfg或runopts),并将组件作为作业提交(见下图)。
运行器函数¶
- torchx.runner.get_runner(name: Optional[str] = None, component_defaults: Optional[Dict[str, Dict[str, str]]] = None, **scheduler_params: Any) Runner[source]¶
构造并获取Runner对象的便捷方法。用法:
with get_runner() as runner: app_handle = runner.run(component(args), scheduler="kubernetes", runcfg) print(runner.status(app_handle))
或者,
runner = get_runner() try: app_handle = runner.run(component(args), scheduler="kubernetes", runcfg) print(runner.status(app_handle)) finally: runner.close()
- Parameters:
name – 人类可读的名称,将作为所有启动作业的一部分包含在内。
scheduler_params – 将传递给所有可用调度器构造函数的额外参数。
运行器类¶
- class torchx.runner.Runner(name: str, scheduler_factories: Dict[str, SchedulerFactory], component_defaults: Optional[Dict[str, Dict[str, str]]] = None, scheduler_params: Optional[Dict[str, object]] = None)[source]¶
TorchX 单个组件运行器。具有供用户操作
AppDefs的方法。如果应用程序是在本地启动的,Runner将缓存有关启动的应用程序的信息,否则由特定的调度器实现决定。- cancel(app_handle: str) None[source]¶
停止应用程序,有效地指示调度程序取消作业。如果应用程序不存在,则不执行任何操作。
注意
此方法在取消请求提交给调度程序后立即返回。应用程序将处于
RUNNING状态,直到调度程序实际终止作业。如果调度程序成功中断作业并终止它,最终状态将为CANCELLED,否则将为FAILED。
- close() None[source]¶
关闭此运行器并释放/清理所有已分配的资源。 递归调用所有调度器上的
close()方法。 一旦在运行器上调用此方法,运行器对象将被视为无效,并且在运行器对象上调用的任何方法以及与此运行器关联的调度器都具有未定义的行为。 可以在同一运行器对象上多次调用此方法。
- describe(app_handle: str) Optional[AppDef][source]¶
根据应用程序句柄重建应用程序(尽可能)。 请注意,重建的应用程序可能不是通过运行API提交的完整应用程序。 能够重建多少应用程序取决于调度程序。
- Returns:
AppDef 或 None,如果应用程序不再存在或调度程序不支持描述应用程序句柄
- dryrun(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo[source]¶
在给定的调度程序上使用提供的运行配置对应用程序进行试运行。 实际上不会提交应用程序,而是返回将会提交的内容。返回的
AppDryRunInfo经过美化格式化,可以直接打印或记录。用法:
dryrun_info = session.dryrun(app, scheduler="local", cfg) print(dryrun_info)
- dryrun_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo[source]¶
run_component()的试运行版本。实际上不会运行组件,只是返回“将会”运行的内容。
- list(scheduler: str) List[ListAppResponse][source]¶
对于在调度程序上启动的应用程序,此API返回一个ListAppResponse对象列表,每个对象都有应用程序ID、应用程序句柄及其状态。 注意:此API处于原型阶段,可能会发生变化。
- log_lines(app_handle: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str][source]¶
返回指定作业容器日志行的迭代器。
注意
k是节点(主机)ID,而不是rank。since和until不一定总是被遵守(取决于调度程序)。
警告
返回的迭代器的语义和保证高度依赖于调度器。有关此日志迭代器的高级语义,请参见
torchx.specs.api.Scheduler.log_iter。因此,强烈不建议使用此方法生成输出以传递给下游函数/依赖项。此方法不保证返回100%的日志行。如果调度器已经完全或部分清除应用程序的日志记录,则此方法返回无或部分日志行是完全有效的。返回的行将包括空白字符,例如
\n或\r。在输出这些行时,您应确保避免添加 额外的换行符。用法:
app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]()) print("== trainer node 0 logs ==") for line in session.log_lines(app_handle, "trainer", k=0): # for prints newlines will already be present in the line print(line, end="") # when writing to a file nothing extra is necessary f.write(line)
不鼓励的反模式:
# DO NOT DO THIS! # parses accuracy metric from log and reports it for this experiment run accuracy = -1 for line in session.log_lines(app_handle, "trainer", k=0): if matches_regex(line, "final model_accuracy:[0-9]*"): accuracy = parse_accuracy(line) break report(experiment_name, accuracy)
- Parameters:
app_handle – 应用程序句柄
role_name – 应用程序中的角色(例如 训练师)
k – 获取日志的角色的第k个副本
regex – 可选的正则表达式过滤器,如果留空则返回所有行
since – 基于日期时间的起始游标。如果留空,则从第一条日志行(作业开始)开始。
until – 基于日期时间的结束游标。如果留空,则跟随日志输出直到作业完成并且所有日志行都被消耗。
- Returns:
指定应用程序的角色第k个副本的迭代器。
- Raises:
UnknownAppException – 如果调度程序中不存在该应用程序
- run(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str[source]¶
在指定模式下运行给定的应用程序。
注意
Runner的子类应该实现schedule方法,而不是直接覆盖这个方法。- Returns:
用于调用应用程序上其他操作API的应用程序句柄。
- run_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str[source]¶
运行一个组件。
componenthas the following resolution order(high to low):- User-registered components. Users can register components via
https://packaging.python.org/specifications/entry-points/。该方法在组
torchx.components中查找入口点。
- Builtin components relative to torchx.components. The path to the component should
模块名称相对于torchx.components,函数名称格式为:
$module.$function。
- File-based components in format:
$FILE_PATH:FUNCTION_NAME. Both relative and 支持绝对路径。
- File-based components in format:
用法:
# resolved to torchx.components.distributed.ddp() runner.run_component("distributed.ddp", ...) # resolved to my_component() function in ~/home/components.py runner.run_component("~/home/components.py:my_component", ...)
- Returns:
用于调用应用程序上其他操作API的应用程序句柄
- Raises:
ComponentValidationException – 如果组件无效。
ComponentNotFoundException – 如果无法解析
component_path。
- schedule(dryrun_info: AppDryRunInfo) str[source]¶
实际上从给定的dryrun信息运行应用程序。 当需要覆盖调度程序请求中不可通过对象API配置的参数时,这很有用。
警告
请谨慎使用此方法,因为滥用此方法覆盖原始调度程序请求中的许多参数可能会导致长期使用TorchX不符合规定。此方法旨在短期内让用户能够尝试某些特定于调度程序的功能,而无需等待TorchX在其API中公开调度程序功能。
注意
建议
Session的子类实现此方法,而不是直接实现run方法。用法:
dryrun_info = session.dryrun(app, scheduler="default", cfg) # overwrite parameter "foo" to "bar" dryrun_info.request.foo = "bar" app_handle = session.submit(dryrun_info)
- scheduler_run_opts(scheduler: str) runopts[source]¶
返回支持的调度器后端的
runopts。用法:
local_runopts = session.scheduler_run_opts("local_cwd") print("local scheduler run options: {local_runopts}")
- Returns:
指定调度器类型的
runopts。
- status(app_handle: str) Optional[AppStatus][source]¶
- Returns:
应用程序的状态,如果应用程序不再存在(例如,过去已停止并从调度程序的后端移除),则为
None。