跳过主要内容

创建流程

本文介绍了Metaflow的基本概念。如果您迫不及待想要在实践中体验Metaflow,您可以从教程开始。在完成教程后,您可以返回本文档以了解更多关于Metaflow如何工作的知识。

tip

下面的例子展示了如何在命令行中执行流程。如果你想在笔记本中开发流程,可以快速查看如何在笔记本中运行流程

Metaflow代码的结构

Metaflow遵循数据流范式,将程序建模为操作的有向图。这是表达数据处理管道,特别是机器学习的自然范式。

我们称操作的图为 一个流。您定义操作,称为 步骤,它们是图的节点,并包含到下一个步骤的转换,这些转换作为边。

Metaflow 对图的结构设置了一些约束。首先,每个流程需要一个叫做 start 的步骤和一个叫做 end 的步骤。流程的执行,我们称之为 a run,从 start 开始。如果最终的 end 步骤成功完成,则运行是成功的。

startend之间发生的事情由你决定。你可以使用Metaflow支持的以下三种类型的过渡的任意组合来构建图形:

线性

最基本的过渡类型是 线性 过渡。它从一个步骤移动到另一个步骤。

这里是一个包含两个线性转换的图表:

对应的 Metaflow 脚本如下:

from metaflow import FlowSpec, step

class LinearFlow(FlowSpec):

@step
def start(self):
self.my_var = 'hello world'
self.next(self.a)

@step
def a(self):
print('the data artifact is: %s' % self.my_var)
self.next(self.end)

@step
def end(self):
print('the data artifact is still: %s' % self.my_var)

if __name__ == '__main__':
LinearFlow()

将这个代码片段保存到一个文件中,linear.py。你可以像其他Python脚本一样在命令行中执行Metaflow流程。试试这个:

python linear.py run

每当您在文档中看到这样的流程时,只需将其保存在文件中并像上面一样执行。

工件

除了按顺序执行步骤 startaend,这个流程还创建了一个叫 数据工件my_var。在 Metaflow 中,通过将值赋给实例变量(如 my_var)来简单地创建数据工件。

工件是Metaflow的核心概念。它们有许多用途:

  • 它们允许您管理数据流,而无需手动加载和存储数据。

  • 所有工件都会被持久化,以便于以后使用Client API进行分析,使用Cards进行可视化,甚至可在不同流程中使用。

  • 工件在各种环境中一致工作,因此您可以在本地运行某些步骤,并且可以在云中运行某些步骤,而无需担心显式传输数据。

  • 访问过去的工件可以极大地帮助 调试,因为 你可以在故障发生前查看数据,甚至可以 恢复过去的执行 在修复错误之后。

数据工件在创建后在所有步骤中都可用,因此它们的行为类似于任何正常的实例变量。对此规则的一个例外是分支,如下所述。

分支

您可以使用一个分支来表示并行步骤。在下图中,start过渡到两个并行步骤,ab。允许任意数量的并行步骤。这样的分支的一个好处是性能:Metaflow可以在多个CPU核心或多个云实例上执行ab

from metaflow import FlowSpec, step

class BranchFlow(FlowSpec):

@step
def start(self):
self.next(self.a, self.b)

@step
def a(self):
self.x = 1
self.next(self.join)

@step
def b(self):
self.x = 2
self.next(self.join)

@step
def join(self, inputs):
print('a is %s' % inputs.a.x)
print('b is %s' % inputs.b.x)
print('total is %d' % sum(input.x for input in inputs))
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
BranchFlow()

每个分支必须被合并。合并步骤不需要像上面那样被称为 join,但它必须带有一个额外的参数,就像上面的 inputs 一样。

在上面的例子中,x的值是不明确的:a将其设置为1,而b设置为2。为了消除歧义,合并步骤可以引用分支中的特定步骤,如上面的inputs.a.x。为了方便,您还可以使用inputs迭代分支中的所有步骤,就像在上面的join步骤中的最后打印语句中所做的那样。有关更多详细信息,请参见关于数据流通过图表的部分。

请注意,您可以任意嵌套分支,也就是说,您可以在一个分支内部进行分支。 只需记住要合并您创建的所有分支。

遍历

像上面这样的静态分支在开发时知道分支时非常有用。或者,您可能希望根据数据动态分支。这是a foreach分支的用例。

Foreach的工作原理类似于上面的branch,但是它不是创建命名的步骤方法,而是在foreach循环内执行许多步骤的并行副本。

一个 foreach 循环可以遍历下面的任何列表 titles

from metaflow import FlowSpec, step

class ForeachFlow(FlowSpec):

@step
def start(self):
self.titles = ['Stranger Things',
'House of Cards',
'Narcos']
self.next(self.a, foreach='titles')

@step
def a(self):
self.title = '%s processed' % self.input
self.next(self.join)

@step
def join(self, inputs):
self.results = [input.title for input in inputs]
self.next(self.end)

@step
def end(self):
print('\n'.join(self.results))

if __name__ == '__main__':
ForeachFlow()

foreach循环通过在self.next()中指定关键字参数foreach来初始化。foreach参数接受一个字符串,它是存储在实例变量中的列表的名称,例如上面的titles

在foreach循环中的步骤创建独立的 任务 来处理列表中的每个项目。 在这里,Metaflow为步骤 a 创建了三个并行任务,以并行处理 titles 列表中的三个项目。您可以通过一个名为 input 的实例变量访问分配给任务的特定项目。

foreach 循环必须像静态分支一样连接。请注意,foreach 循环中的任务没有命名,因此您只能使用 inputs 进行迭代。如果需要,您可以在 foreach 步骤中为实例变量分配一个值,这有助于您识别该任务。

您可以任意嵌套foreach并将它们与分支和线性步骤结合。

步骤应该是什么?

没有一种绝对正确的方式将代码结构化为步骤图,但这里有一些最佳实践供您参考。

Metaflow将步骤视为不可分割的执行单元。也就是说,一个步骤要么整体成功,要么整体失败。步骤成功完成后,Metaflow会保存所有在步骤代码中创建的实例变量,因此即使后续步骤失败,该步骤也不必重新执行。换句话说,您可以检查步骤完成时存在的数据工件,但无法检查在步骤中处理的数据。

这使得一个步骤a 检查点。你的步骤越细致,你就能越好地控制检查结果和恢复失败的 运行。

将步骤分得过于细小的一个缺点是,检查点会增加一些开销。逐行执行代码作为单独步骤没有意义。保持您的步骤简洁但不要过小。一个好的经验法则是,单个步骤的运行时间不应超过一小时,最好远低于这个时间。

另一个重要的考虑是代码的可读性。尝试运行

python myflow.py show

这将打印出您的流程步骤。概述是否让您对您的代码有一个好的了解?如果步骤过于宽泛,可能需要将它们分开,以使整体流程更具描述性。

如何为流程定义参数?

这是一个定义参数 alpha 的流程示例:

from metaflow import FlowSpec, Parameter, step

class ParameterFlow(FlowSpec):
alpha = Parameter('alpha',
help='Learning rate',
default=0.01)

@step
def start(self):
print('alpha is %f' % self.alpha)
self.next(self.end)

@step
def end(self):
print('alpha is still %f' % self.alpha)

if __name__ == '__main__':
ParameterFlow()

参数是通过将一个 Parameter 对象分配给类变量来定义的。参数变量在所有步骤中都可用,像上面的 alpha 一样。

您可以在命令行中设置参数值,如下所示:

python parameter_flow.py run --alpha 0.6

您可以查看可用的参数:

python parameter_flow.py run --help

参数的类型基于其默认值的类型。如果参数没有有意义的默认值,可以按照如下方式定义:

num_components = Parameter('num_components',
help='Number of components',
required=True,
type=int)

现在在未将 --num_components 设置为整数值的情况下,无法运行该流程。

有关更多信息,请参见Parameter类的API参考

高级参数

tip

要了解更多灵活配置流程的方法,请查看配置流程

在上面的例子中, Parameters 采用了简单的标量值,例如整数或浮点值。为了支持 Parameter 的更复杂值,Metaflow 允许你将值指定为 JSON。如果你的 Parameter 是值的列表、映射或更复杂的数据结构,这个功能非常方便。

此示例允许用户定义一个按国家映射的GDP作为一个 Parameter:

from metaflow import FlowSpec, Parameter, step, JSONType

class JSONParameterFlow(FlowSpec):
gdp = Parameter('gdp',
help='Country-GDP Mapping',
type=JSONType,
default='{"US": 1939}')

country = Parameter('country',
help='Choose a country',
default='US')

@step
def start(self):
print('The GDP of %s is $%dB' % (self.country, self.gdp[self.country]))
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
JSONParameterFlow()

按照如下方式执行代码:

python parameter_flow.py run --gdp '{"US": 1}'

数据流通过图

如前所述,对于线性步骤,数据工件会被传播,任何线性步骤都可以使用实例变量访问先前步骤创建的数据工件。在这种情况下,Metaflow可以通过简单地获取先前步骤结束时该工件的值来轻松确定每个工件的值。

然而,在连接步骤中,工件的值可能在传入分支上被设置为不同的值;工件的值被认为是模糊的。

为了在foreach或branch之后更容易实现一个join步骤,Metaflow提供了一个工具函数,merge_artifacts,来帮助传播明确的值。

from metaflow import FlowSpec, step

class MergeArtifactsFlow(FlowSpec):

@step
def start(self):
self.pass_down = 'a'
self.next(self.a, self.b)

@step
def a(self):
self.common = 5
self.x = 1
self.y = 3
self.from_a = 6
self.next(self.join)

@step
def b(self):
self.common = 5
self.x = 2
self.y = 4
self.next(self.join)

@step
def join(self, inputs):
self.x = inputs.a.x
self.merge_artifacts(inputs, exclude=['y'])
print('x is %s' % self.x)
print('pass_down is %s' % self.pass_down)
print('common is %d' % self.common)
print('from_a is %d' % self.from_a)
self.next(self.c)

@step
def c(self):
self.next(self.d, self.e)

@step
def d(self):
self.conflicting = 7
self.next(self.join2)

@step
def e(self):
self.conflicting = 8
self.next(self.join2)

@step
def join2(self, inputs):
self.merge_artifacts(inputs, include=['pass_down', 'common'])
print('Only pass_down and common exist here')
self.next(self.end)

@step
def end(self):
pass

if __name__ == '__main__':
MergeArtifactsFlow()

在上面的例子中,merge_artifacts 函数的表现如下:

  • join 中,pass_down 被传播,因为它在 ab 中都是未修改的。
  • join 中,common 也会被传播,因为它在两个分支中被设置为相同的值。请记住,在确定一个工件是否模棱两可时,重要的是工件的值;Metaflow 使用 基于内容的去重 来存储工件,因此可以确定两个工件的值是否相同。
  • join 中,x 在调用 merge_artifacts 之前由代码明确处理,这导致 merge_artifacts 在传播工件时忽略 x。这种模式允许您手动解决希望传播的工件中的任何歧义。
  • join 中,y 不会被传播,因为它列在 exclude 列表中。这个模式允许你防止传播不再相关的工件。请记住,merge_artifacts 的默认行为是传播所有传入的工件。
  • join 中,from_a 被传播,因为它只在一个分支中设置,因此是明确的。merge_artifacts 将传播所有值,即使它们只在一个输入分支中存在。
  • join2 中,include 关键字被使用,允许您显式指定在合并时考虑的文档。这在排除的文档列表大于要包含的文档列表时非常有用。您不能在同一个 merge_artifacts 调用中同时使用 includeexclude 列表。还要注意,如果在 include 中指定了一个文档,如果它在当前步骤中不存在或在输入之一中不存在(换句话说,它是“缺失的”),将抛出错误。include 参数仅在版本 2.2.1 或更高版本中可用。

如果要合并的工件具有模糊值,则 merge_artifacts 函数会引发异常。请记住,merge_artifacts 会尝试合并所有传入的工件,除非它们已经在步骤中存在或已在 exclude 列表中被明确排除。