Horovod on Ray

horovod.ray 允许用户在Ray集群上利用Horovod。

目前,Ray + Horovod 集成提供了一个 RayExecutor API

注意

Ray + Horovod 集成目前仅支持 Gloo 后端。

安装

使用额外的 [ray] 选项来安装 Ray 以及 Horovod。

$ HOROVOD_WITH_GLOO=1 ... pip install 'horovod[ray]'

请参阅Ray文档以获取高级安装说明

Horovod Ray 执行器

Horovod Ray 集成提供了一个 RayExecutor 抽象(文档), 它是一组 Ray actors(有状态进程)的封装器。

from horovod.ray import RayExecutor

# Start the Ray cluster or attach to an existing Ray cluster
ray.init()

# Start num_workers actors on the cluster
executor = RayExecutor(
    setting, num_workers=num_workers, use_gpu=True)

# This will launch `num_workers` actors on the Ray Cluster.
executor.start()

所有智能体将成为 Horovod 环的一部分,因此 RayExecutor 调用将能够支持任意的 Horovod 集合操作。

请注意,这里隐含了一个假设,即集群在形态上是同构的(即所有机器都有相同数量的可用插槽)。这只是一个实现细节,并非根本性限制。

要实际执行一个函数,你可以运行以下内容:

# Using the stateless `run` method, a function can take in any args or kwargs
def simple_fn():
    hvd.init()
    print("hvd rank", hvd.rank())
    return hvd.rank()

# Execute the function on all workers at once
result = executor.run(simple_fn)
# Check that the rank of all workers is unique
assert len(set(result)) == hosts * num_slots

executor.shutdown()

有状态执行

Ray的一个独特特性是它对有状态的智能体的支持。这意味着你可以在每个工作节点上启动任意的Python类,轻松支持数据缓存在内存中的操作和调用。

import torch
from horovod.torch import hvd
from horovod.ray import RayExecutor

class MyModel:
    def __init__(self, learning_rate):
        self.model = NeuralNet()
        optimizer = torch.optim.SGD(
            self.model.parameters(),
            lr=learning_rate,
        )
        self.optimizer = hvd.DistributedOptimizer(optimizer)

    def get_weights(self):
        return dict(self.model.parameters())

    def train(self):
        return self._train(self.model, self.optimizer)


ray.init()
executor = RayExecutor(...)
executor.start(executable_cls=MyModel)

# Run 5 training steps
for i in range(5):
    # Stateful `execute` method takes the current worker executable as a parameter
    executor.execute(lambda worker: worker.train())

# Obtain the trained weights from each model replica
result = executor.execute(lambda worker: worker.get_weights())

# `result` will be N copies of the model weights
assert all(isinstance(res, dict) for res in result)

弹性Ray执行器

Ray 还通过 RayExecutor 支持 弹性执行。与默认的 Horovod 类似,Ray 的非弹性和弹性版本之间的区别在于主机和工作进程的数量是在运行时动态确定的。

你必须首先设置一个Ray集群。Ray集群可以支持任何云提供商(AWS、GCP、Azure)的自动扩展。

# First, run `pip install boto3` and `aws configure`
#
# Create or update the cluster. When the command finishes, it will print
# out the command that can be used to SSH into the cluster head node.
$ ray up ray/python/ray/autoscaler/aws/example-full.yaml

在您设置好Ray集群后,需要将现有弹性Horovod训练脚本的部分内容移至训练函数中。具体来说,模型的实例化以及hvd.elastic.run调用的执行都应在此函数内部完成。

import horovod.torch as hvd

# Put the Horovod concepts into a single function
# This function will be serialized with Cloudpickle
def training_fn():
    hvd.init()
    model = Model()
    torch.cuda.set_device(hvd.local_rank())

    @hvd.elastic.run
    def train(state):
        for state.epoch in range(state.epoch, epochs):
            ...
            state.commit()


    state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
    state.register_reset_callbacks([on_state_reset])
    train(state)
    return

然后,您可以连接到基础的Ray集群并执行训练函数:

import ray
from horovod.ray import RayExecutor

ray.init(address="auto")  # attach to the Ray cluster
settings = RayExecutor.create_settings(verbose=True)
executor = RayExecutor(
    settings, min_workers=1, use_gpu=True, cpus_per_slot=2)
executor.start()
executor.run(training_fn)

Ray 将自动启动远程智能体,在节点可用时执行 training_fn。请注意,executor.run 调用将在任一训练函数成功终止或所有工作节点失败时终止。

AWS: 集群启动器

你也可以轻松利用Ray集群启动器来启动云实例。

# Save as `ray_cluster.yaml`

cluster_name: horovod-cluster
provider: {type: aws, region: us-west-2}
auth: {ssh_user: ubuntu}
min_workers: 3
max_workers: 3

# Deep Learning AMI (Ubuntu) Version 21.0
head_node: {InstanceType: p3.2xlarge, ImageId: ami-0b294f219d14e6a82}
worker_nodes: {InstanceType: p3.2xlarge, ImageId: ami-0b294f219d14e6a82}
setup_commands: # Set up each node.
    - HOROVOD_WITH_GLOO=1 HOROVOD_GPU_OPERATIONS=NCCL pip install horovod[ray]

你可以启动指定的Ray集群并通过以下方式监控其状态:

$ ray up ray_cluster.yaml  # starts the head node
$ ray monitor ray_cluster.yaml  # wait for worker nodes

然后,在你的python脚本中,确保添加ray.init(address="auto")以连接到分布式Ray集群。

-ray.init()
+ray.init(address="auto")

然后你可以在集群上执行Ray脚本:

$ ray submit ray_cluster.yaml <your_script.py>

# the above is is equivalent to
$ ray attach ray_cluster.yaml  # ssh
ubuntu@ip-172-31-24-53:~$ python <your_script.py>