跳过主要内容

以编程方式部署流程

除了通过 API 运行流,您还可以将流程序性地部署到 Metaflow 支持的其中一个生产协调器。例如,您可以使用此功能创建一个部署脚本,作为 您 CI/CD 系统的一部分 运行,例如在 GitHub Actions 上,在拉取请求获得批准后自动将流部署到生产环境。

tip

要查看Deployer实际应用的示例,请参阅 Config-Driven Experimentation中的sweep示例

使用Deployer API 部署到生产环境

部署通过 Deployer API 进行处理,该API与用于将流推送到诸如 Argo WorkflowsStep Functions 的生产协调器的命令行界面紧密相连。

该图概述了部署过程及相关对象:

  1. 实例化一个指向您要部署的流程文件的 Deployer 类:
from metaflow import Deployer
deployer = Deployer('helloflow.py')
  1. 选择一个调度程序 - 这里是 Argo Workflows - 然后调用 create() 来部署流程
deployed_flow = deployer.argo_workflows().create()

流程现在已被安排执行!如果您使用a @schedule 装饰器注解了该流程,它将会在预定的时间自动运行。
如果您使用@trigger@trigger_on_finish对其进行注解,它将在指定事件到达时自动运行。

显式触发流程

您可以通过调用 trigger() 显式触发已部署的流程

triggered_run = deployed_flow.trigger()

您可以在 trigger 中指定任何 Parameters,例如:

triggered_run = deployed_flow.trigger(alpha=5, algorithm='cubic')

触发将返回一个 TriggeredRun 对象,表示一个即将被调度的运行。当 start 步骤开始执行时,相应的 Run 对象 将变得可访问。这可能需要一些时间,例如,如果需要启动一个新的云实例来执行任务:

# wait for the run object to be available, timeout None means wait forever
run_obj = triggered_run.wait_for_run(timeout=None)
print('Run started', run_obj)

终止一个触发的运行

您可以通过调用随时终止一个触发的运行

triggered_run.terminate()

访问先前部署的流程

您可以使用from_deployment方法检索现有的deployed_flow对象,而不是创建新的部署。这使您可以处理之前已部署的流程,而无需再次调用create()。

一旦你拥有了 deployed_flow 对象,你可以使用它的 trigger() 方法来创建一个 triggered_run 对象并执行该流。这种方法在你需要引用和运行现有部署而不是创建新的部署时特别有用。

from metaflow import Deployer

deployer = Deployer('helloflow.py')
deployed_flow = deployer.argo_workflows().create()

# save this for later use...
identifier = deployed_flow.name
from metaflow import DeployedFlow

# use the identifier saved above..
deployed_flow = DeployedFlow.from_deployment(identifier=identifier)
triggered_run = deployed_flow.trigger()
note

目前,from_deployment 方法仅在 argo-workflows 中可用。

特定于Orchestrator的方法

除了上述常见的方法,每个调度器还提供额外的方法来管理部署和触发运行。有关详细信息,请参见 Deployer 的 API 文档

note

目前,Deployer 不支持部署到 Apache Airflow,因为 Airflow 不提供用于部署的 API。相反,您应该手动 复制生成的 Airflow dag 到您的 Airflow 服务器。