使用AWS Step Functions调度Metaflow流程
AWS Step Functions 是一个通用的 工作流编排工具 - 你可以 阅读 AWS Step Functions 文档以了解所有 信息。如果你 只想将你的流投入生产,这份文档包含了你需要知道的一切。
从Metaflow的角度来看,AWS Step Functions的主要好处如下:
- AWS Step Functions 协调作为状态机表达的工作流,这些状态机是有向图的超集。这意味着我们可以完全自动地将 Metaflow 流映射到相应的 AWS Step Functions 状态机。这给你关于执行内容和方式提供了更多细节,相比于将 Metaflow 脚本视为黑箱。
- AWS Step Functions 提供了在生产环境中运行工作流所需的工具。您可以利用 AWS 提供的经过实战检验的解决方案进行警报、监控和调度。通过使用 AWS Step Functions,您的 Metaflow 流可以与更广泛的 AWS 产品无缝集成。
在AWS Step Functions上运行时,Metaflow代码的运行方式与本地完全相同:不需要对代码进行任何更改。所有在AWS Step Functions上运行的步骤生成的数据工件都可以通过Client API访问。所有任务都在AWS Batch上运行,遵循@resources装饰器,就像对所有步骤添加了@batch装饰器一样,详见执行远程任务。
本文件描述了AWS Step Functions调度的基础知识。如果您的项目涉及多人、多工作流,或者变得对业务至关重要,请查看关于协调更大Metaflow项目的部分。
您可以通过 Deployer API 以编程方式与步骤函数交互 - 在这里了解更多。
将流程推送到生产环境
我们来使用关于参数部分的流程作为一个例子:
from metaflow import FlowSpec, Parameter, step
class ParameterFlow(FlowSpec):
alpha = Parameter('alpha',
help='Learning rate',
default=0.01)
@step
def start(self):
print('alpha is %f' % self.alpha)
self.next(self.end)
@step
def end(self):
print('alpha is still %f' % self.alpha)
if __name__ == '__main__':
ParameterFlow()
将此脚本保存为文件 parameter_flow.py。要将版本部署到 AWS Step Functions,只需运行
python parameter_flow.py --with retry step-functions create
此命令会捕捉您在工作目录中的代码快照,以及使用的Metaflow版本,并将整个包导出到AWS Step Functions以进行调度。
强烈建议在将应用程序部署到AWS Step Functions时启用重试,您可以通过如上所示的--with retry轻松实现。然而,在这样做之前,请确保您的所有步骤都是安全的,可以进行重试。如果您的某些步骤以无法容忍自动重试的方式与外部服务交互,请使用重试,并将次数设置为零 (times=0),如如何防止重试中所述。
该命令将把您的工作流导出到AWS Step Functions。您还可以在AWS Step Functions UI中按名称搜索该流。您应该能够看到导出流的可视化,如下所示:


您可以点击橙色的开始执行按钮在AWS Step Functions上执行流程。它会弹出一个对话框询问输入。您可以将您的参数指定为带有Parameters作为键的转义JSON字符串 - ****
{
"Parameters" : "{\"alpha\": 0.5}"
}
Metaflow 会自动将您流程的参数映射到 AWS Step Functions 上的相应参数。
在输入对话框中点击开始执行后,AWS Step Functions 开始运行流程:

在这种情况下,运行应该没有问题地成功。如果有错误,您可以按照使用Metaflow进行调试中解释的内容,在本地重现它们。
您也可以通过命令行触发工作流:
python parameter_flow.py step-functions trigger --alpha 0.5
如果您再次运行 step-functions create,它将在 AWS Step Functions 上创建您流程的新版本。最新版本将自动成为生产版本
(由于 AWS Step Functions 提供的一致性保证,可能需要几秒钟才能发生这个变化)。如果您想在不干扰生产流程的情况下测试 AWS Step Functions,您可以更改类的名称,例如从 ParameterFlow 更改为 ParameterFlowStaging,然后在新名称下 step-functions create 流,或者使用
@project
装饰器。
请注意,step-functions create 为您的生产流程创建一个新的隔离的 生产命名空间。请阅读 组织结果 以了解命名空间行为的所有信息。
如果您的流程不接受任何参数,并且您想从AWS Step Functions UI中执行它,您需要在输入对话框中传入以下内容:
{
"Parameters" : "{}"
}
限制并发任务的数量
默认情况下,Metaflow 配置 AWS Step Functions 在 foreach 步骤中最多同时执行 100 个任务。这应该确保大多数工作流快速完成,而不会使您的 AWS Batch 队列(执行后端)不堪重负。
如果您的工作流程包含一个大规模的foreach并且您需要更快的结果,您可以通过 --max-workers 选项来增加默认值。例如,step-functions create --max-workers 500 允许每个foreach步骤同时执行500个任务。
此选项类似于 run
--max-workers,用于限制 AWS Step Functions 之外的并发。
部署时参数
自 Metaflow 2.13 以来,Metaflow 提供了一种新构造,Config,它
比部署时参数提供了更灵活的配置方法。
更多信息请查看 Configuring Flows。
您可以通过在部署时评估的参数自定义AWS步骤函数部署,即在执行step-functions create时。
例如,您可以根据谁部署了工作流或在哪个 Git 分支上执行了部署来更改参数的默认值。关键是,参数中的函数只在 step-functions create 期间评估一次,而不是在流的执行期间。
您可以像往常一样在本地运行流程。参数内的函数仅在执行开始时调用一次。
from metaflow import FlowSpec, Parameter, step, JSONType
from datetime import datetime
import json
def deployment_info(context):
return json.dumps({'who': context.user_name,
'when': datetime.now().isoformat()})
class DeploymentInfoFlow(FlowSpec):
info = Parameter('deployment_info',
type=JSONType,
default=deployment_info)
@step
def start(self):
print('This flow was deployed at %s by %s'\
% (self.info['when'], self.info['who']))
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
DeploymentInfoFlow()
当 step-functions create 被调用时, deployment_info 被评估,这会捕捉到你的用户名和部署时间。此信息在 AWS Step Functions 上保持不变,尽管用户可以覆盖默认值。
参数中定义的任何函数都会接收到context对象。它包含与正在部署的流相关的各种字段。通过依赖于传递给上下文的值,您可以创建通用的部署时函数,这些函数可以被多个流重用。
调度一个流程
默认情况下,AWS Step Functions上的流程不会自动运行。您需要设置触发器,以便在事件发生时启动流程。
在步骤函数中,Metaflow 提供了对通过基于时间的 (cron) 触发器触发 Metaflow 流的内置支持,正如名称所示,它在特定时间触发工作流。截至今天,[基于事件的触发] (/production/event-triggering) 在步骤函数中不受支持。
基于时间的触发器在 FlowSpec 级别使用 @schedule 装饰器实现。这个流程每小时触发:
from metaflow import FlowSpec, schedule, step
from datetime import datetime
@schedule(hourly=True)
class HourlyFlow(FlowSpec):
@step
def start(self):
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print('time is %s' % now)
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
HourlyFlow()
你可以用 @schedule 以以下方式定义时间表:
@schedule(weekly=True)在周日午夜运行工作流。@schedule(daily=True)每天在午夜运行工作流。@schedule(hourly=True)每小时运行一次工作流。@schedule(cron='0 10 * * ? *')在给定的 Cron 时间表下运行工作流,在这种情况下为每天上午10点UTC。您可以使用 这里 定义的规则来定义cron选项的时间表。
重现失败的生产运行
让我们使用 DebugFlow 来自调试部分 作为示例。该流程在步骤 b 中存在一个错误。当您运行它时,失败的运行将在 AWS Step Functions 界面上显示如下:

-0fa449301236d6773f122388d74743a0.png)
请注意5ca85f96-8508-409d-a5f5-b567db1040c5的执行ID。当在AWS Step Functions上运行时,Metaflow使用AWS Step Functions执行ID(以sfn-为前缀)作为运行ID。
图形可视化显示步骤 b 失败了,正如预期的那样。首先,您应该检查失败步骤的日志,以了解它失败的原因。您可以通过在 AWS Step Functions 界面中查找 JobId,访问 AWS Batch 步骤日志,方法是点击界面右侧的 Exception 面板中的 Error 组块来访问。您可以在 AWS Batch 控制台中使用此 JobId 来检查作业日志。此 JobId 也是该步骤的 metaflow 任务 ID。
接下来,我们想要在本地重现上述错误。我们通过恢复特定的 AWS Step Functions 运行来执行此操作,该运行失败了:
python debug.py resume --origin-run-id sfn-5ca85f96-8508-409d-a5f5-b567db1040c5
这将重用来自AWS Step Functions运行的start和a步骤的结果。 它将尝试在本地重新运行步骤b,这会出现与生产环境中相同的错误。
您可以像上面那样在本地修复错误。在这个简单的流程中,您可以在本地运行整个流程以确认修复有效。在验证结果后,您可以使用 step-functions create 部署新版本到生产环境。
然而,这对于复杂的生产流程可能并不是一个可行的方法。例如,流程可能处理大量无法在本地实例中处理的数据。我们有更好的方法来为生产准备流程:
用于生产的暂存流程
测试一个要求严格的流程最简单的方法是使用AWS Batch运行它。这即使在恢复时也有效:
python debug.py resume --origin-run-id sfn-5ca85f96-8508-409d-a5f5-b567db1040c5 --with batch
这将恢复您的流程并在AWS Batch上运行每一步。当您准备好进行端到端的固定流程测试时,只需按如下方式运行:
python debug.py run --with batch
或者,您可以临时更改流程的名称,例如从 DebugFlow 更改为 DebugFlowStaging。然后,您可以使用新名称运行 step-functions create,这将在 AWS Step Functions 上创建一个单独的暂存流程。您还可以使用 @project 装饰器。
您可以自由测试临时流程,而不会干扰生产流程。一旦临时流程成功运行,您可以自信地将新版本部署到生产环境。