Python API#
SkyPilot 提供了一个用 Python 编写的编程 API,该 API 由 CLI 在底层使用。
注意
Python API 包含比 CLI 更多的实验性函数/类。也就是说,它已被用户用于开发多个 Python 库。
如有问题或需要支持,请联系开发团队。 您的反馈对于改进此API非常宝贵!
核心API#
sky.launch#
- sky.launch(task, cluster_name=None, retry_until_up=False, idle_minutes_to_autostop=None, dryrun=False, down=False, stream_logs=True, backend=None, optimize_target=OptimizeTarget.COST, detach_setup=False, detach_run=False, no_setup=False, clone_disk_from=None, fast=False, _is_launched_by_jobs_controller=False, _is_launched_by_sky_serve_controller=False, _disable_controller_check=False)[source]#
启动一个集群或任务。
任务的设置和运行命令在任务的工作目录下执行(当指定时,它会同步到远程集群)。任务在集群上经历作业队列调度。
目前,第一个参数必须是 sky.Task,或者(实验性高级用法)sky.Dag。在后一种情况下,目前它必须包含一个单一任务;对管道/通用 DAG 的支持仍在实验分支中。
- Parameters:
cluster_name (
Optional[str]) – 要创建/重用的集群的名称。如果为 None,则自动生成一个名称。retry_until_up (
bool) – 是否重试启动集群直到它启动。idle_minutes_to_autostop (
Optional[int]) – 在集群空闲这么多分钟后自动停止,即集群的任务队列中没有正在运行或挂起的任务。每当在任务队列中发现设置/运行/挂起的任务时,空闲时间会被重置。设置此标志等同于运行sky.launch(..., detach_run=True, ...)然后运行sky.autostop(idle_minutes=。如果未设置,集群将不会自动停止。) down (
bool) – 在所有作业完成后(无论成功或异常)关闭集群。如果同时设置了–idle-minutes-to-autostop,集群将在指定的空闲时间后关闭。请注意,如果在配置/数据同步/设置过程中发生错误,集群将不会关闭以便进行调试。dryrun (
bool) – 如果为True,则不实际启动集群。stream_logs (
bool) – 如果为True,则在终端显示日志。backend (
Optional[Backend]) – 要使用的后端。如果为None,则使用默认后端 (CloudVMRayBackend)。optimize_target (
OptimizeTarget) – 优化的目标。选项:OptimizeTarget.COST, OptimizeTarget.TIME.detach_setup (
bool) – 如果为True,作为作业本身的一部分在非交互模式下运行设置。您可以安全地使用ctrl-c来脱离日志记录,它不会中断设置过程。要在脱离后再次查看日志,请使用sky logs。要取消设置,请通过sky cancel取消作业。对于长时间运行的设置命令非常有用。detach_run (
bool) – 如果为True,一旦作业提交,立即从此函数返回,并且不流式传输执行日志。no_setup (
bool) – 如果为True,则不重新运行设置命令。clone_disk_from (
Optional[str]) – [实验性] 如果设置,从指定的集群克隆磁盘。这对于将集群迁移到不同的可用区或区域非常有用。快速 (
bool) – [实验性] 如果集群已经启动并可用,跳过配置和设置步骤。
示例
import sky task = sky.Task(run='echo hello SkyPilot') task.set_resources( sky.Resources(cloud=sky.AWS(), accelerators='V100:4')) sky.launch(task, cluster_name='my-cluster')
- Raises:
exceptions.ClusterOwnerIdentityMismatchError – 如果集群由其他用户拥有。
exceptions.InvalidClusterNameError – 如果集群名称无效。
exceptions.ResourcesMismatchError – 如果请求的资源与现有集群不匹配。
exceptions.NotSupportedError – 如果后端/云/集群不支持所需功能。
exceptions.ResourcesUnavailableError – 如果请求的资源无法满足。异常的failover_history将被设置为: 1. 空:如果第一次sky.optimize()未能找到可行的资源;没有进行预检查或实际启动尝试。 2. 非空:如果至少有一个异常来自我们的预检查(例如,集群名称无效)或某个区域/区域抛出资源不可用。
exceptions.CommandError – 任何ssh命令错误。
exceptions.NoCloudAccessError – 如果所有云都被禁用。
根据后端的不同,可能会引发其他异常。
- Returns:
- Optional[int]; 提交作业的作业ID。如果
后端不是CloudVmRayBackend,或者没有作业提交到集群,则为None。
- handle: Optional[backends.ResourceHandle]; 集群的句柄。如果
是dryrun,则为None。
- Return type:
任务ID
sky.exec#
- sky.exec(task, cluster_name, dryrun=False, down=False, stream_logs=True, backend=None, detach_run=False)[源代码]#
在现有集群上执行任务。
此函数执行两个操作:
工作目录同步,如果任务指定了工作目录;
执行任务的
run命令。
所有其他步骤(配置、设置命令、文件挂载同步)都将被跳过。如果任务中的任何这些规范发生了变化,此函数将不会反映这些变化。要确保集群的设置是最新的,请使用
sky.launch()代替。执行和调度行为:
任务将经过作业队列调度,尊重任何指定的资源需求。它可以在集群中任何具有足够资源的节点上执行。
任务在工作目录下运行(如果指定了)。
任务以非交互方式运行(没有伪终端或pty),因此像
htop这样的交互式命令不起作用。请改用ssh my_cluster。
- Parameters:
任务 (
Union[Task,Dag]) – sky.Task,或包含要执行任务的sky.Dag(实验性;仅限1个任务)。cluster_name (
str) – 用于执行任务的现有集群的名称。down (
bool) – 在所有作业完成后(无论成功或异常)关闭集群。如果同时设置了–idle-minutes-to-autostop,集群将在指定的空闲时间后关闭。请注意,如果在配置/数据同步/设置过程中发生错误,集群将不会关闭以便进行调试。dryrun (
bool) – 如果为True,则不实际执行任务。stream_logs (
bool) – 如果为True,则在终端显示日志。backend (
Optional[Backend]) – 要使用的后端。如果为None,则使用默认后端 (CloudVMRayBackend)。detach_run (
bool) – 如果为True,任务提交后将从日志记录中分离。
- Raises:
ValueError – 如果指定的集群未处于UP状态。
sky.exceptions.ClusterDoesNotExist – 如果指定的集群不存在。
sky.exceptions.NotSupportedError – 如果指定的集群是一个不支持此操作的控制器。
- Returns:
- Optional[int]; 提交作业的作业ID。如果
后端不是CloudVmRayBackend,或者没有作业提交到集群,则为None。
- handle: Optional[backends.ResourceHandle]; 集群的句柄。如果
是dryrun,则为None。
- Return type:
任务ID
sky.stop#
- sky.stop(cluster_name, purge=False)[source]#
停止一个集群。
当集群停止时,附加磁盘上的数据不会丢失。实例的计费将停止,而磁盘仍将收费。这些磁盘在重新启动集群时将重新附加。
目前,spot实例集群无法停止(除了GCP,它确实允许在停止spot虚拟机时保留磁盘内容)。
- Parameters:
- Raises:
sky.exceptions.ClusterDoesNotExist – 指定的集群不存在。
RuntimeError – 无法停止集群。
sky.exceptions.NotSupportedError – 如果指定的集群是一个spot集群,或者是一个TPU VM Pod集群,或者是托管作业控制器。
- Return type:
sky.start#
- sky.start(cluster_name, idle_minutes_to_autostop=None, retry_until_up=False, down=False, force=False)[source]#
重启一个集群。
如果集群之前已停止(状态为STOPPED)或在配置/运行时安装过程中失败(状态为INIT),此函数将尝试启动集群。在后一种情况下,将重新尝试配置和运行时安装。
在重新启动已停止的集群时,不会使用自动故障转移配置。它将在之前选择的相同云、区域和区域上启动。
如果集群已经处于UP状态,此函数将无效。
- Parameters:
cluster_name (
str) – 要启动的集群的名称。idle_minutes_to_autostop (
Optional[int]) – 在集群空闲这么多分钟后自动停止,即集群的任务队列中没有正在运行或挂起的任务。每当在任务队列中发现设置/运行/挂起的任务时,空闲时间会被重置。设置此标志等同于运行sky.launch(..., detach_run=True, ...)然后运行sky.autostop(idle_minutes=。如果未设置,集群将不会自动停止。) retry_until_up (
bool) – 是否重试启动集群直到它启动。down (
bool) – 自动关闭集群:在所有作业完成后(无论是成功还是异常),在指定的空闲时间后自动关闭集群。需要设置idle_minutes_to_autostop。force (
bool) – 是否强制启动集群,即使它已经启动。 对于升级SkyPilot运行时非常有用。
- Raises:
ValueError – 参数值无效:(1) 如果
down设置为 True 但idle_minutes_to_autostop为 None;(2) 如果指定的 集群是托管作业控制器,并且idle_minutes_to_autostop不是 None 或down为 True(省略 它们以使用默认的自动停止设置)。sky.exceptions.ClusterDoesNotExist – 指定的集群不存在。
sky.exceptions.NotSupportedError – 如果要重启的集群是使用不支持此操作的非默认后端启动的。
sky.exceptions.ClusterOwnerIdentitiesMismatchError – 如果要重启的集群是由其他用户启动的。
- Return type:
CloudVmRayResourceHandle
天空下降#
- sky.down(cluster_name, purge=False)[source]#
拆除一个集群。
拆除集群将删除所有相关资源(所有计费停止),并且附加磁盘上的任何数据都将丢失。集群中的加速器(例如,TPU)也将被删除。
- Parameters:
- Raises:
sky.exceptions.ClusterDoesNotExist – 指定的集群不存在。
RuntimeError – 未能成功关闭集群。
sky.exceptions.NotSupportedError – 指定的集群是受管理的作业控制器。
- Return type:
sky.status#
- sky.status(cluster_names=None, refresh=False)[source]#
获取集群状态。
如果提供了cluster_names,则返回这些集群。否则,返回所有集群。
每个返回值都有以下字段:
{ 'name': (str) cluster name, 'launched_at': (int) timestamp of last launch on this cluster, 'handle': (ResourceHandle) an internal handle to the cluster, 'last_use': (str) the last command/entrypoint that affected this cluster, 'status': (sky.ClusterStatus) cluster status, 'autostop': (int) idle time before autostop, 'to_down': (bool) whether autodown is used instead of autostop, 'metadata': (dict) metadata of the cluster, }
每个集群可以有以下状态之一:
INIT: 集群可能处于运行或关闭状态。它可能发生在以下情况:正在进行的配置或运行时设置。(
sky.launch()已启动但尚未完成。)或者,集群处于异常状态,例如,一些集群节点宕机,或者SkyPilot运行时不健康。(要恢复集群,请尝试在其上再次运行
sky launch。)
UP: 配置和运行时设置已成功,集群已启动。(最近的sky.launch()已成功完成。)STOPPED: 集群已停止,存储已持久化。使用sky.start()来重启集群。
自动停止列:
自动停止列表示集群在空闲(没有作业运行)多少分钟后将自动停止。如果
to_down为 True,集群将被自动关闭,而不是自动停止。
获取最新的集群状态:
在正常情况下,当集群完全由SkyPilot管理(即没有在云控制台中进行手动操作)且未使用自动停止功能时,此命令返回的表将准确反映集群状态。
在集群在SkyPilot之外发生变化的情况下(例如,在云控制台中进行手动操作;未管理的Spot集群被抢占)或对于启用了自动停止的集群,使用
refresh=True从云提供商查询最新的集群状态。
sky.autostop#
- sky.autostop(cluster_name, idle_minutes, down=False)[source]#
为集群安排自动停止/自动关闭。
自动停止/自动关闭功能将在集群空闲达到指定时长后自动停止或拆除集群。空闲意味着集群的任务队列中没有进行中(待处理/运行中)的任务。
集群的空闲时间重置为零,当:
一个作业被提交(
sky.launch()或sky.exec())。集群已重新启动。
当没有活动设置时,会设置自动停止。(即从未设置过任何自动停止设置,或者之前的自动停止设置已被取消。)这对于重新启动自动停止计时器非常有用。
示例:假设一个没有任何自动停止设置的集群已经空闲了1小时,然后设置了30分钟的自动停止。集群不会立即自动停止。相反,空闲计时器只有在设置了自动停止后才会开始计数。
当为同一集群指定了多个自动停止设置时,最后一个设置将优先。
- Parameters:
- Raises:
sky.exceptions.ClusterDoesNotExist – 如果集群不存在。
sky.exceptions.ClusterNotUpError – 如果集群未启动。
sky.exceptions.NotSupportedError – 如果集群不是基于 CloudVmRayBackend 或者集群是 TPU VM Pod。
sky.exceptions.ClusterOwnerIdentityMismatchError – 如果当前用户与创建集群的用户不同。
sky.exceptions.CloudUserIdentityError – 如果我们无法获取当前用户身份。
- Return type:
任务#
- class sky.Task(name=None, *, setup=None, run=None, envs=None, workdir=None, num_nodes=None, docker_image=None, event_callback=None, blocked_resources=None)[源代码]#
任务:要在云上运行的计算。
- __init__(name=None, *, setup=None, run=None, envs=None, workdir=None, num_nodes=None, docker_image=None, event_callback=None, blocked_resources=None)[source]#
初始化一个任务。
所有字段都是可选的。
Task.run是实际的程序:可以是要运行的 shell 命令(str)或用于不同节点的命令生成器(lambda;见下文)。可选地,调用
Task.set_resources()来设置此任务的资源需求。如果未设置,则假定为默认的仅CPU需求(与sky launch相同)。这个类的所有设置器,
Task.set_*(),返回self,即它们是流畅的API,可以链式调用。示例
# A Task that will sync up local workdir '.', containing # requirements.txt and train.py. sky.Task(setup='pip install requirements.txt', run='python train.py', workdir='.') # An empty Task for provisioning a cluster. task = sky.Task(num_nodes=n).set_resources(...) # Chaining setters. sky.Task().set_resources(...).set_file_mounts(...)
- Parameters:
setup (
Optional[str]) – 一个设置命令,将在执行运行命令run之前运行,并在workdir下执行。run (
Union[str,Callable[[int,List[str]],Optional[str]],None]) – 任务的实际命令。如果不为None,则可以是shell命令(str)或命令生成器(callable)。如果是后者,它必须接受节点排名和节点地址列表作为输入,并返回一个shell命令(str)(对于某些节点可以返回None,在这种情况下不会在这些节点上运行任何命令)。运行命令将在workdir下运行。注意命令生成器应该是一个自包含的lambda函数。workdir (
Optional[str]) – 本地工作目录。该目录将同步到远程虚拟机上的某个位置,并且setup和run命令将在该位置下运行(因此,它们在调用二进制文件时可以依赖相对路径)。num_nodes (
Optional[int]) – 为此任务配置的节点数量。如果为None,则视为1个节点。如果大于1,每个节点将执行自己的设置/运行命令,其中run可以是一个字符串,意味着所有节点都获得相同的命令,或者是一个lambda函数,其语义如上所述。docker_image (
Optional[str]) – (实验性:仅在LocalDockerBackend使用时有效。)此任务将基于的基础Docker镜像。默认为‘gpuci/miniforge-cuda:11.4-devel-ubuntu18.04’。blocked_resources (
Optional[Iterable[Resources]]) – 此任务无法运行的一组资源。
- static from_yaml(yaml_path)[source]#
从任务YAML初始化任务。
示例
task = sky.Task.from_yaml('/path/to/task.yaml')
- Parameters:
yaml_path (
str) – 有效任务yaml文件的文件路径。- Raises:
ValueError – 如果路径被加载为str而不是dict;或者 如果有任何其他解析错误。
- Return type:
- set_service(service)[source]#
设置此任务的服务规范。
- Parameters:
service (
Optional[SkyServiceSpec]) – 一个 SkyServiceSpec 对象。- Returns:
当前任务,带有服务集。
- Return type:
self
- set_file_mounts(file_mounts)[source]#
设置此任务的文件挂载。
用于同步数据集、dotfiles等。
文件挂载是一个字典:
{remote_path: local_path/cloud URI}。 本地(或云)文件/目录将被同步到远程虚拟机上的指定路径,该任务将在这些虚拟机上运行。源路径或目标路径都不能以斜杠结尾。
示例
task.set_file_mounts({ '~/.dotfile': '/local/.dotfile', # /remote/dir/ will contain the contents of /local/dir/. '/remote/dir': '/local/dir', })
- Parameters:
file_mounts (
Optional[Dict[str,str]]) – 一个可选的字典,包含{remote_path: local_path/cloud URI},其中 remote 表示此任务最终将在其上运行的虚拟机,local 表示启动任务的节点。- Returns:
当前任务,已设置文件挂载。
- Return type:
self
- Raises:
ValueError – 如果输入路径无效。
- update_file_mounts(file_mounts)[source]#
更新此任务的文件挂载。
与 set_file_mounts() 不同,此函数更新到现有的 file_mounts(调用
dict.update()),而不是覆盖它。这应该在配置之前调用以生效。
示例
task.update_file_mounts({ '~/.config': '~/Documents/config', '/tmp/workdir': '/local/workdir/cnn-cifar10', })
- Parameters:
file_mounts (
Dict[str,str]) – 一个字典,格式为{remote_path: local_path/cloud URI},其中 remote 表示最终运行此任务的虚拟机, local 表示启动任务的节点。- Returns:
当前任务,文件挂载已更新。
- Return type:
self
- Raises:
ValueError – 如果输入路径无效。
- set_storage_mounts(storage_mounts)[source]#
设置此任务的存储挂载。
存储挂载是一个字典:
{mount_path: sky.Storage object}, 每个挂载将一个 sky.Storage 对象(一个云对象存储桶) 挂载到远程集群内的路径。可以通过从本地目录上传(设置
source)或由现有的云存储桶支持(将name设置为存储桶名称;或将source设置为存储桶URI)来创建一个sky.Storage对象。示例
task.set_storage_mounts({ '/remote/imagenet/': sky.Storage(name='my-bucket', source='/local/imagenet'), })
- Parameters:
storage_mounts (
Optional[Dict[str,Storage]]) – 一个可选的字典{mount_path: sky.Storage object},其中 mount_path 是远程虚拟机内部的路径,Storage 对象将挂载在该路径上。- Returns:
当前任务,已设置存储挂载。
- Return type:
self
- Raises:
ValueError – 如果输入路径无效。
- update_storage_mounts(storage_mounts)[来源]#
更新此任务的存储挂载。
与 set_storage_mounts() 不同,此函数更新现有的 storage_mounts(调用
dict.update()),而不是覆盖它。这应该在配置之前调用以生效。
- Parameters:
storage_mounts (
Dict[str,Storage]) – 一个可选的字典,包含{mount_path: sky.Storage object},其中 mount_path 是远程虚拟机内部的路径,Storage 对象将挂载在该路径上。- Returns:
当前任务,已更新存储挂载。
- Return type:
self
- Raises:
ValueError – 如果输入路径无效。
资源#
- class sky.Resources(cloud=None, instance_type=None, cpus=None, memory=None, accelerators=None, accelerator_args=None, use_spot=None, job_recovery=None, region=None, zone=None, image_id=None, disk_size=None, disk_tier=None, ports=None, labels=None, _docker_login_config=None, _is_image_managed=None, _requires_fuse=None, _cluster_config_overrides=None)[source]#
资源:计算任务的需求。
此类一旦创建就是不可变的(以确保在属性更改时进行一些验证)。要更新Resources实例的属性,请使用resources.copy(**new_properties)。
已使用:
用于表示任务/应用程序的资源请求
作为“过滤器”以获取可启动的具体实例
用于计算账单
用于在云上进行配置
- __init__(cloud=None, instance_type=None, cpus=None, memory=None, accelerators=None, accelerator_args=None, use_spot=None, job_recovery=None, region=None, zone=None, image_id=None, disk_size=None, disk_tier=None, ports=None, labels=None, _docker_login_config=None, _is_image_managed=None, _requires_fuse=None, _cluster_config_overrides=None)[source]#
初始化一个资源对象。
所有字段都是可选的。
Resources.is_launchable决定资源是否完全指定以启动实例。示例
# Fully specified cloud and instance type (is_launchable() is True). sky.Resources(clouds.AWS(), 'p3.2xlarge') sky.Resources(clouds.GCP(), 'n1-standard-16') sky.Resources(clouds.GCP(), 'n1-standard-8', 'V100') # Specifying required resources; the system decides the # cloud/instance type. The below are equivalent: sky.Resources(accelerators='V100') sky.Resources(accelerators='V100:1') sky.Resources(accelerators={'V100': 1}) sky.Resources(cpus='2+', memory='16+', accelerators='V100')
- Parameters:
cloud (
Optional[Cloud]) – 要使用的云。cpus (
Union[None,int,float,str]) – 任务所需的CPU数量。 如果是字符串,必须是'2'或'2+'的形式,其中+表示任务至少需要2个CPU。memory (
Union[None,int,float,str]) – 所需的内存量,单位为GiB。如果是一个字符串,必须是'16'或'16+'的形式,其中+表示任务至少需要16 GB的内存。加速器 (
Union[None,str,Dict[str,int]]) – 所需的加速器。如果是字符串,必须是'V100'或'V100:2'的形式,其中:2表示任务需要2个V100 GPU。如果是字典,必须是{'V100': 2}或{'tpu-v2-8': 1}的形式。accelerator_args (
Optional[Dict[str,str]]) – 特定于加速器的参数。例如,{'tpu_vm': True, 'runtime_version': 'tpu-vm-base'}用于TPU。job_recovery (
Union[Dict[str,Union[str,int]],str,None]) –用于管理作业的作业恢复策略,以便从抢占中恢复集群。有关更多详细信息,请参阅 recovery_strategy 模块 # pylint: disable=line-too-long 当提供字典时,它可以包含以下字段:
strategy: 要使用的恢复策略。
max_restarts_on_errors: 用户代码错误时的最大重启次数。
image_id (
Union[Dict[str,str],str,None]) –要使用的镜像ID。如果是字符串,必须是来自云的镜像ID字符串,例如AWS:
'ami-1234567890abcdef0', GCP:'projects/my-project-id/global/images/my-image-name'; 或者,由SkyPilot提供的镜像标签,例如AWS:'skypilot:gpu-ubuntu-2004'。如果是字典,必须是从区域到镜像ID的映射字典,例如:{ 'us-west1': 'ami-1234567890abcdef0', 'us-east1': 'ami-1234567890abcdef0' }
disk_tier (
Union[str,DiskTier,None]) – 要使用的磁盘性能层级。如果为 None,则默认为'medium'。ports (
Union[int,str,List[str],Tuple[str],None]) – 要在实例上打开的端口。labels (
Optional[Dict[str,str]]) – 应用于实例的标签。这些标签对于分配可能被外部工具使用的元数据非常有用。 具体实现取决于所选的云平台 - 在AWS上,标签映射到实例标签。在GCP上,标签映射到实例标签。在 Kubernetes上,标签映射到pod标签。在其他云平台上,标签不受支持,将被忽略。_docker_login_config (
Optional[DockerLoginConfig]) – 要使用的docker配置。这包括 docker用户名、密码和注册服务器。如果为None,则跳过 docker登录。_requires_fuse (
Optional[bool]) – 任务是否需要FUSE挂载支持。某些云实现内部使用此选项来进行FUSE挂载的额外设置。此标志还可以防止在不支持FUSE挂载的现有集群上使用FUSE挂载。如果为None,则默认为False。
- Raises:
ValueError – 如果某些属性无效。
exceptions.NoCloudAccessError – 如果没有启用公共云。