PyTorch-Ignite PyTorch-Ignite

使用Ignite进行集体通信

在本教程中,我们将了解如何使用高级分布式函数,如all_reduce()all_gather()broadcast()barrier()。我们将讨论它们各自的独特用例,并以可视化的方式展示它们。

必需的依赖项

!pip install pytorch-ignite

导入

import torch

import ignite.distributed as idist

所有减少

All Reduce Diagram

all_reduce() 方法用于从每个进程中收集指定的张量,并使它们在每个节点上可用,然后对它们执行指定的操作(求和、乘积、最小值、最大值等)。让我们生成3个进程,分别具有等级0、1和2,并在所有这些进程上定义一个tensor。如果我们对tensor执行all_reduce操作并使用SUM操作,那么所有等级上的tensor将被收集、相加并存储在tensor中,如下所示:

def all_reduce_example(local_rank):
    tensor = torch.arange(2, dtype=torch.int64) + 1 + 2 * local_rank
    print(f"Rank {local_rank}, Initial value: {tensor}")

    idist.all_reduce(tensor, op="SUM")
    print(f"Rank {local_rank}, After performing all_reduce: {tensor}")

我们可以使用 idist.spawn 来生成3个进程(nproc_per_node)并执行上述函数。

idist.spawn(backend="gloo", fn=all_reduce_example, args=(), nproc_per_node=3)
Rank 0, Initial value: tensor([1, 2])
Rank 2, Initial value: tensor([5, 6])
Rank 1, Initial value: tensor([3, 4])
Rank 0, After performing all_reduce: tensor([ 9, 12])
Rank 1, After performing all_reduce: tensor([ 9, 12])
Rank 2, After performing all_reduce: tensor([ 9, 12])

现在让我们假设一个更现实的场景 - 您需要找到不同进程中所有可用梯度的平均值。

首先,我们使用get_world_size方法获取可用的GPU数量。然后,对于每个模型参数,我们执行以下操作:

  1. 在每个进程上收集梯度
  2. 对梯度应用求和操作
  3. 除以世界大小以进行平均

最后,我们可以继续使用平均梯度来更新模型参数!

分布式深度学习 101: 简介

你可以使用另一个辅助方法idist.get_world_size()来获取可用的GPU数量(进程),然后使用all_reduce()来收集梯度并应用SUM操作。

def average_gradients(model):
    num_processes = idist.get_world_size()
    for param in model.parameters():
        idist.all_reduce(param.grad.data, op="SUM")
        param.grad.data = param.grad.data / num_processes

所有收集

All Gather Diagram

当您只想在所有参与的进程中收集一个张量、数字或字符串时,可以使用all_gather()方法。作为一个基本示例,假设您必须收集所有等级上存储在num中的不同值。您可以通过如下方式使用all_gather来实现这一点:

def all_gather_example(local_rank):
    num = 2.0 * idist.get_rank()
    print(f"Rank {local_rank}, Initial value: {num}")

    all_nums = idist.all_gather(num)
    print(f"Rank {local_rank}, After performing all_gather: {all_nums}")
idist.spawn(backend="gloo", fn=all_gather_example, args=(), nproc_per_node=3)
Rank 0, Initial value: 0.0
Rank 2, Initial value: 4.0
Rank 1, Initial value: 2.0
Rank 2, After performing all_gather: [0.0, 2.0, 4.0]
Rank 0, After performing all_gather: [0.0, 2.0, 4.0]
Rank 1, After performing all_gather: [0.0, 2.0, 4.0]

现在假设你需要收集分布在所有进程上的预测值到主进程上,以便你可以将它们存储到文件中。以下是你可以如何做到这一点:

def write_preds_to_file(predictions, filename):
    prediction_tensor = torch.tensor(predictions)
    prediction_tensor = idist.all_gather(prediction_tensor)

    if idist.get_rank() == 0:
        torch.save(prediction_tensor, filename)

注意: 在上面的例子中,只有主进程需要收集的值,而不是所有进程。这也可以通过gather()方法来完成。

广播

Broadcast Diagram

方法broadcast()将张量、浮点数或字符串从源进程复制到所有其他进程。例如,您需要从等级0发送消息到所有其他等级。您可以通过在等级0上创建实际消息并在所有其他等级上创建占位符,然后广播消息并指定源等级来实现这一点。如果占位符未在所有等级上定义,您还可以使用safe_mode=True

def broadcast_example(local_rank):
    message = f"hello from rank {idist.get_rank()}"
    print(f"Rank {local_rank}, Initial value: {message}")

    message = idist.broadcast(message, src=0)
    print(f"Rank {local_rank}, After performing broadcast: {message}")
idist.spawn(backend="gloo", fn=broadcast_example, args=(), nproc_per_node=3)
Rank 1, Initial value: hello from rank 1
Rank 2, Initial value: hello from rank 2
Rank 0, Initial value: hello from rank 0
Rank 2, After performing broadcast: hello from rank 0
Rank 0, After performing broadcast: hello from rank 0
Rank 1, After performing broadcast: hello from rank 0

对于一个实际的使用案例,假设您需要从所有进程的排名0中收集预测值和实际值,以计算指标并避免内存错误。您可以通过首先使用all_gather(),然后计算指标,最后使用broadcast()与所有进程共享结果来实现这一点。src下面指的是源进程的排名。

def compute_metric(prediction_tensor, target_tensor):

    prediction_tensor = idist.all_gather(prediction_tensor)
    target_tensor = idist.all_gather(target_tensor)

    result = 0.0
    if idist.get_rank() == 0:
        result = compute_fn(prediction_tensor, target_tensor)

    result = idist.broadcast(result, src=0)

    return result

屏障

barrier() 方法有助于同步所有进程。例如 - 在训练期间下载数据时,我们必须确保只有主进程(rank = 0)下载数据集,以防止子进程(rank > 0)同时将相同文件下载到相同路径。这样,所有子进程都会获得已下载数据集的副本。这时我们可以利用 barrier() 让子进程等待,直到主进程下载完数据集。完成后,所有子进程实例化数据集,而主进程等待。最后,所有进程同步完成。

def get_datasets(config):
    if idist.get_local_rank() > 0:
        idist.barrier()

    train_dataset, test_dataset = get_train_test_datasets(config["data_path"])

    if idist.get_local_rank() == 0:
        idist.barrier()

    return train_dataset, test_dataset