跳过主要内容

分布式计算

Metaflow的 foreach 结构 允许你并发运行任务。在foreach的情况下,任务独立执行。当工作负载是 毫不相关的并行时,这种模式效果很好,也就是说,任务之间不相互通信,并且它们不必同时执行。

还有其他工作负载,例如大型模型的分布式训练,这需要任务之间相互交互。 Metaflow 提供了另一种机制,即 @parallel 装饰器,它协调整个相互依赖的任务。实际上,装饰器在运行时启动一个临时计算集群,作为 Metaflow 流的一部分,利用了 Metaflow 的特性,比如 依赖管理版本控制生产部署

通常,这种模式是通过以下某个特定框架的装饰器使用的,如 @torchrun@deepspeed,下面进行了描述,这使得使用特定框架进行分布式训练变得简单。如果您需要对集群进行低级访问,例如与还没有相应高级装饰器的框架一起使用,请查看本页面末尾的低级访问文档。

info

要使用分布式计算,请遵循此处的设置说明。如果您需要帮助入门,联系Metaflow Slack

高级装饰器

开始的最简单方法是使用其中一个高级装饰器 - 在这篇博客文章中查看概述

装饰器实现用户体验描述PyPi 发布示例
@torchrunUse current.torch.run to submit your torch.distributed program. No need to log into each node, call the code once in @step.A torchrun command that runs @step function code on each node. Torch distributed is used under the hood to handle communication between nodes.metaflow-torchrunMinGPT
@deepspeedExposes current.deepspeed.run
Requires OpenSSH and OpenMPI installed in the Metaflow task container.
Form MPI cluster with passwordless SSH configured at task runtime (to reduce the risk of leaking private keys). Submit the Deepspeed program and run.metaflow-deepspeedBert & Dolly
@metaflow_rayWrite a Ray program locally or call script from @step function, @metaflow_ray takes care of forming the Ray cluster.Forms a Ray cluster dynamically. Runs the @step function code on the control task as Ray’s “head node”.metaflow-rayGPT-J & Distributed XGBoost
@tensorflowPut TensorFlow code in a distributed strategy scope, and call it from step function.Run the @step function code on each node. This means the user picks the appropriate strategy in their code.metaflow-tensorflowKeras Distributed
@mpiExposes current.mpi.cc, current.mpi.broadcast_file, current.mpi.run, current.mpi.exec. Cluster SSH config is handled automatically inside the decorator. Requires OpenSSH and an MPI implementation are installed in the Metaflow task container. It was tested against OpenMPI, which you can find a sample Dockerfile for here.Forms an MPI cluster with passwordless SSH configured at task runtime. Users can submit a mpi4py program or compile, broadcast, and submit a C program.metaflow-mpiLibgrape
info

请注意,这些装饰器不包含在metaflow包中,但它们是作为Metaflow扩展实现的。您需要在开发环境中单独安装它们,但它们会被Metaflow自动打包,因此您不需要将它们包含在Docker镜像或@conda/@pypi中。还要注意,这些扩展不是稳定的Metaflow API的一部分,因此可能会发生变化。

tip

在运行要求高的训练工作负载时,建议使用 @checkpoint 装饰器 以确保即使任务遇到虚假的失败也不会丢失进度。

低级访问

在底层,Metaflow 确保您获得所需类型和数量的计算节点同时运行,以便它们能够相互通信和协调。

您可以使用此计算集群实现您自己的任何分布式计算算法。为了说明这一点,考虑一个简单的例子,设置一个任务集群,这些任务通过 MPI 彼此通信。从技术上讲,MPI 不是必需的 - 您可以使用任何您想要的协议进行通信 - 但 MPI 是一个流行的选择。

MPI 示例

让我们创建一个基于 这个例子的简单 Hello World MPI 程序。 该程序识别主节点 (rank == 0),向所有工作节点发送消息,这些节点接收并打印出来。我们使用 mpi4py 作为 MPI 协议的 Python 封装。

首先,让我们创建一个 MPI 脚本, mpi_hello.py

import mpi4py
from mpi4py import MPI

if __name__ == "__main__":

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
print(f"Cluster has {size} processes")
for i in range(1, size):
msg = "Main node says hi! 👋"
comm.send(msg, dest=i)
else:
msg = comm.recv()
print(f"👷 Worker node {rank} received message: {msg}")

接下来,让我们创建一个启动四个节点集群的流程,感谢 num_parallel=4,并在集群中运行我们之前定义的MPI脚本,在每个节点上启动两个工作进程。

from metaflow import FlowSpec, step, batch, mpi, current

N_CPU = 2
N_NODES = 4

class MPI4PyFlow(FlowSpec):

@step
def start(self):
self.next(self.multinode, num_parallel=N_NODES)

@batch(image="eddieob/mpi-base:1", cpu=N_CPU)
@mpi
@step
def multinode(self):
current.mpi.exec(
args=["-n", str(N_CPU * N_NODES), "--allow-run-as-root"],
program="python mpi_hello.py",
)
self.next(self.join)

@step
def join(self, inputs):
self.next(self.end)

@step
def end(self):
pass

if __name__ == "__main__":
MPI4PyFlow()

要运行流程,请确保您的AWS Batch环境已 配置为支持多节点 作业。然后,安装 Metaflow的MPI扩展

pip install metaflow-mpi

并运行流程

python mpiflow.py run

该示例使用了一个镜像,eddieob/mpi-base,在这个Dockerfile中定义。该镜像包括用于通信的MPI和ssh。请注意,Metaflow会自动打包mpi_hello.py,因此不必将其包含在镜像中。