Shortcuts

torchx.runner

运行器允许您在支持的调度器之一上将组件作为独立作业运行。运行器接受一个specs.AppDef对象,该对象是通过使用一组用户提供的参数评估组件函数的结果,以及调度器名称和调度器参数(也称为runcfgrunopts),并将组件作为作业提交(见下图)。

_images/runner_diagram.png

运行器函数

torchx.runner.get_runner(name: Optional[str] = None, component_defaults: Optional[Dict[str, Dict[str, str]]] = None, **scheduler_params: Any) Runner[source]

构造并获取Runner对象的便捷方法。用法:

with get_runner() as runner:
  app_handle = runner.run(component(args), scheduler="kubernetes", runcfg)
  print(runner.status(app_handle))

或者,

runner = get_runner()
try:
   app_handle = runner.run(component(args), scheduler="kubernetes", runcfg)
   print(runner.status(app_handle))
finally:
   runner.close()
Parameters:
  • name – 人类可读的名称,将作为所有启动作业的一部分包含在内。

  • scheduler_params – 将传递给所有可用调度器构造函数的额外参数。

运行器类

class torchx.runner.Runner(name: str, scheduler_factories: Dict[str, SchedulerFactory], component_defaults: Optional[Dict[str, Dict[str, str]]] = None, scheduler_params: Optional[Dict[str, object]] = None)[source]

TorchX 单个组件运行器。具有供用户操作 AppDefs 的方法。如果应用程序是在本地启动的,Runner 将缓存有关启动的应用程序的信息,否则由特定的调度器实现决定。

cancel(app_handle: str) None[source]

停止应用程序,有效地指示调度程序取消作业。如果应用程序不存在,则不执行任何操作。

注意

此方法在取消请求提交给调度程序后立即返回。应用程序将处于RUNNING状态,直到调度程序实际终止作业。如果调度程序成功中断作业并终止它,最终状态将为CANCELLED,否则将为FAILED

close() None[source]

关闭此运行器并释放/清理所有已分配的资源。 递归调用所有调度器上的close()方法。 一旦在运行器上调用此方法,运行器对象将被视为无效,并且在运行器对象上调用的任何方法以及与此运行器关联的调度器都具有未定义的行为。 可以在同一运行器对象上多次调用此方法。

describe(app_handle: str) Optional[AppDef][source]

根据应用程序句柄重建应用程序(尽可能)。 请注意,重建的应用程序可能不是通过运行API提交的完整应用程序。 能够重建多少应用程序取决于调度程序。

Returns:

AppDef 或 None,如果应用程序不再存在或调度程序不支持描述应用程序句柄

dryrun(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo[source]

在给定的调度程序上使用提供的运行配置对应用程序进行试运行。 实际上不会提交应用程序,而是返回将会提交的内容。返回的AppDryRunInfo经过美化格式化,可以直接打印或记录。

用法:

dryrun_info = session.dryrun(app, scheduler="local", cfg)
print(dryrun_info)
dryrun_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo[source]

run_component() 的试运行版本。实际上不会运行组件,只是返回“将会”运行的内容。

list(scheduler: str) List[ListAppResponse][source]

对于在调度程序上启动的应用程序,此API返回一个ListAppResponse对象列表,每个对象都有应用程序ID、应用程序句柄及其状态。 注意:此API处于原型阶段,可能会发生变化。

log_lines(app_handle: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str][source]

返回指定作业容器日志行的迭代器。

注意

  1. k 是节点(主机)ID,而不是 rank

  2. sinceuntil 不一定总是被遵守(取决于调度程序)。

警告

返回的迭代器的语义和保证高度依赖于调度器。有关此日志迭代器的高级语义,请参见torchx.specs.api.Scheduler.log_iter。因此,强烈不建议使用此方法生成输出以传递给下游函数/依赖项。此方法不保证返回100%的日志行。如果调度器已经完全或部分清除应用程序的日志记录,则此方法返回无或部分日志行是完全有效的。

返回的行将包括空白字符,例如 \n\r。在输出这些行时,您应确保避免添加 额外的换行符。

用法:

app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]())

print("== trainer node 0 logs ==")
for line in session.log_lines(app_handle, "trainer", k=0):
   # for prints newlines will already be present in the line
   print(line, end="")

   # when writing to a file nothing extra is necessary
   f.write(line)

不鼓励的反模式:

# DO NOT DO THIS!
# parses accuracy metric from log and reports it for this experiment run
accuracy = -1
for line in session.log_lines(app_handle, "trainer", k=0):
   if matches_regex(line, "final model_accuracy:[0-9]*"):
       accuracy = parse_accuracy(line)
       break
report(experiment_name, accuracy)
Parameters:
  • app_handle – 应用程序句柄

  • role_name – 应用程序中的角色(例如 训练师)

  • k – 获取日志的角色的第k个副本

  • regex – 可选的正则表达式过滤器,如果留空则返回所有行

  • since – 基于日期时间的起始游标。如果留空,则从第一条日志行(作业开始)开始。

  • until – 基于日期时间的结束游标。如果留空,则跟随日志输出直到作业完成并且所有日志行都被消耗。

Returns:

指定应用程序的角色第k个副本的迭代器。

Raises:

UnknownAppException – 如果调度程序中不存在该应用程序

run(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str[source]

在指定模式下运行给定的应用程序。

注意

Runner 的子类应该实现 schedule 方法,而不是直接覆盖这个方法。

Returns:

用于调用应用程序上其他操作API的应用程序句柄。

run_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str[source]

运行一个组件。

component has the following resolution order(high to low):
  • User-registered components. Users can register components via

    https://packaging.python.org/specifications/entry-points/。该方法在组 torchx.components 中查找入口点。

  • Builtin components relative to torchx.components. The path to the component should

    模块名称相对于torchx.components,函数名称格式为: $module.$function

  • File-based components in format: $FILE_PATH:FUNCTION_NAME. Both relative and

    支持绝对路径。

用法:

# resolved to torchx.components.distributed.ddp()
runner.run_component("distributed.ddp", ...)

# resolved to my_component() function in ~/home/components.py
runner.run_component("~/home/components.py:my_component", ...)
Returns:

用于调用应用程序上其他操作API的应用程序句柄

Raises:
  • ComponentValidationException – 如果组件无效。

  • ComponentNotFoundException – 如果无法解析component_path

schedule(dryrun_info: AppDryRunInfo) str[source]

实际上从给定的dryrun信息运行应用程序。 当需要覆盖调度程序请求中不可通过对象API配置的参数时,这很有用。

警告

请谨慎使用此方法,因为滥用此方法覆盖原始调度程序请求中的许多参数可能会导致长期使用TorchX不符合规定。此方法旨在短期内让用户能够尝试某些特定于调度程序的功能,而无需等待TorchX在其API中公开调度程序功能。

注意

建议Session的子类实现此方法,而不是直接实现run方法。

用法:

dryrun_info = session.dryrun(app, scheduler="default", cfg)

# overwrite parameter "foo" to "bar"
dryrun_info.request.foo = "bar"

app_handle = session.submit(dryrun_info)
scheduler_backends() List[str][source]

返回所有支持的调度器后端列表。

scheduler_run_opts(scheduler: str) runopts[source]

返回支持的调度器后端的runopts

用法:

local_runopts = session.scheduler_run_opts("local_cwd")
print("local scheduler run options: {local_runopts}")
Returns:

指定调度器类型的runopts

status(app_handle: str) Optional[AppStatus][source]
Returns:

应用程序的状态,如果应用程序不再存在(例如,过去已停止并从调度程序的后端移除),则为None

stop(app_handle: str) None[source]

参见方法 cancel

警告

此方法将在未来被弃用。它已被cancel取代,后者提供了相同的功能。这一更改是为了与CLI和调度器API保持一致。

wait(app_handle: str, wait_interval: float = 10) Optional[AppStatus][source]

块等待(无限期)应用程序完成。 可能的实现:

while(True):
    app_status = status(app)
    if app_status.is_terminal():
        return
    sleep(10)
Parameters:
  • app_handle – 等待完成的应用句柄

  • wait_interval – 轮询状态前的最小等待间隔

Returns:

应用程序的终端状态,如果应用程序不再存在,则为None