torchx.schedulers¶
TorchX 调度器定义了现有调度器的插件。与runner一起使用时,它们将组件作为作业提交到相应的调度器后端。TorchX 支持一些开箱即用的调度器。您可以通过实现 .. py:class::torchx.schedulers 并在入口点注册它来添加自己的调度器。
所有调度器¶
调度器函数¶
- torchx.schedulers.get_scheduler_factories() Dict[str, SchedulerFactory][source]¶
get_scheduler_factories 返回所有可用的调度器名称及其实例化方法。
字典中的第一个调度程序被用作默认调度程序。
调度器类¶
- class torchx.schedulers.Scheduler(backend: str, session_name: str)[source]¶
一个抽象调度器功能的接口。 实现者只需要实现那些用
@abc.abstractmethod注解的方法。- cancel(app_id: str) None[source]¶
取消/终止应用程序。此方法在同一线程内是幂等的,并且可以安全地在同一应用程序上多次调用。然而,当从同一应用程序的多个线程/进程调用时,此方法的确切语义取决于底层调度器API的幂等性保证。
注意
此方法不会阻塞应用程序以达到取消状态。要确保应用程序达到终止状态,请使用
waitAPI。
- close() None[source]¶
仅适用于具有本地状态的调度程序!关闭调度程序,释放任何已分配的资源。一旦关闭,调度程序对象被视为不再有效,并且对该对象调用的任何方法都会导致未定义的行为。
此方法不应引发异常,并且允许在同一对象上多次调用。
注意
仅适用于具有本地状态的调度程序实现 (
torchx/schedulers/local_scheduler.py)。 仅包装远程调度程序客户端的调度程序不需要 实现此方法。
- abstract describe(app_id: str) Optional[DescribeAppResponse][source]¶
描述指定的应用程序。
- Returns:
AppDef 描述或如果应用程序不存在则为
None。
- abstract list() List[ListAppResponse][source]¶
对于在调度程序上启动的应用程序,此API返回一个ListAppResponse对象列表,每个对象都包含应用程序ID及其状态。 注意:此API处于原型阶段,可能会发生变化。
- log_iter(app_id: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str][source]¶
返回一个迭代器,用于遍历
k``th replica of the ``role的日志行。 当所有符合条件的日志行都被读取后,迭代器结束。如果调度程序支持基于时间的光标获取自定义时间范围内的日志行,则
since和until字段会被尊重,否则它们会被忽略。不指定since和until等同于获取所有可用的日志行。如果until为空,则迭代器的行为类似于tail -f,跟随日志输出直到作业达到终止状态。日志的确切定义取决于调度程序。一些调度程序可能将stderr或stdout视为日志,而其他调度程序可能从日志文件中读取日志。
行为和假设:
如果在一个不存在的应用程序上调用,会产生未定义行为 调用者应在调用此方法之前使用
exists(app_id)检查应用程序是否存在。不是有状态的,使用相同参数调用此方法两次会返回一个新的迭代器。之前的迭代进度会丢失。
并不总是支持日志尾部查看。并非所有调度器都支持实时日志迭代(例如,在应用程序运行时查看日志尾部)。请参考特定调度器的文档以了解迭代器的行为。
- 3.1 If the scheduler supports log-tailing, it should be controlled
通过
should_tail参数。
不保证日志的保留。当调用此方法时,底层调度程序可能已经清除了此应用程序的日志记录。如果是这样,此方法会引发任意异常。
如果
should_tail为 True,该方法仅在可访问的日志行完全耗尽且应用程序达到最终状态时引发StopIteration异常。例如,如果应用程序卡住并且不产生任何日志行,则迭代器会阻塞,直到应用程序最终被终止(无论是通过超时还是手动),此时它会引发StopIteration。如果
should_tail为 False,当没有更多日志时,该方法会引发StopIteration。不需要所有调度程序都支持。
一些调度器可能通过支持
__getitem__来支持行光标 (例如iter[50]跳转到第50条日志行)。- Whitespace is preserved, each new line should include
\n. To 支持交互式进度条,返回的行不需要包含
\n,但应在打印时不换行以正确处理\r回车符。
- Whitespace is preserved, each new line should include
- Parameters:
streams – 要选择的IO输出流。 选项之一:combined, stdout, stderr。 如果调度程序不支持所选的流,它将抛出一个ValueError。
- Returns:
一个
Iterator,用于遍历指定角色副本的日志行- Raises:
NotImplementedError – 如果调度程序不支持日志迭代
- abstract schedule(dryrun_info: AppDryRunInfo) str[source]¶
与
submit相同,只是它接受一个AppDryRunInfo。 鼓励实现者实现此方法,而不是直接实现submit,因为submit可以通过以下方式轻松实现:dryrun_info = self.submit_dryrun(app, cfg) return schedule(dryrun_info)
- class torchx.schedulers.api.DescribeAppResponse(app_id: str = '<NOT_SET>', state: ~torchx.specs.api.AppState = AppState.UNSUBMITTED, num_restarts: int = -1, msg: str = '<NONE>', structured_error_msg: str = '<NONE>', ui_url: ~typing.Optional[str] = None, roles_statuses: ~typing.List[~torchx.specs.api.RoleStatus] = <factory>, roles: ~typing.List[~torchx.specs.api.Role] = <factory>)[source]¶
由
Scheduler.describe(app)API返回的响应对象。包含调度器已知的应用程序状态和描述。对于某些调度器实现,此响应对象具有重新创建AppDef对象所需且足够的信息。对于这些类型的调度器,用户可以重新run()重新创建的应用程序。否则,用户只能调用非创建方法(例如wait(),status()等)。由于这个类是一个数据类并且包含许多成员变量,我们保持使用简单,提供了一个无参数构造函数,并选择直接访问成员变量而不是提供访问器。
如果调度程序返回任意消息,则应填充
msg字段。 如果调度程序返回结构化的json,则应填充structured_error_msg字段。
- class torchx.schedulers.api.ListAppResponse(app_id: str, state: AppState, app_handle: str = '<NOT_SET>')[source]¶
由
scheduler.list()和runner.list()API返回的响应对象。 包含应用程序的app_id、app_handle和状态。 App ID:标识在调度程序上提交的应用程序的唯一标识符 App handle:以URL格式运行的应用程序的标识符,如 {scheduler_backend}://{session_name}/{app_id},由运行程序在调度程序上提交作业时创建。ListAppResponse中的句柄信息由runner.list()填充。此句柄可用于通过torchx CLI或torchx运行程序实例进一步描述应用程序。由于这个类是一个包含一些成员变量的数据类,我们保持使用简单,并选择直接访问成员变量,而不是提供访问器。