进程集:并发运行集合操作

大多数Horovod在TensorFlow、PyTorch或MXNet中的操作都具备一个process_set参数:通过设置不同的进程集,您可以让Horovod进程世界的多个子集并行运行不同的集合操作。除了Horovod的基本操作如hvd.allgatherhvd.allreducehvd.alltoallhvd.broadcasthvd.grouped_allreduce之外,许多高级实用对象如hvd.DistributedOptimizer也支持进程集。

例如,考虑构建一个Horovod模型,由四个工作进程进行训练,在"偶数"或"奇数"子集上执行两个并行的allreduce操作。接下来我们将看到三种配置Horovod使用偶数和奇数进程集的方法,为您提供所需的灵活性。代码示例以TensorFlow呈现,但其他支持框架的接口是等效的。

1) 静态进程集

# on all ranks
even_set = hvd.ProcessSet([0,2])
odd_set = hvd.ProcessSet([1,3])
hvd.init(process_sets=[even_set, odd_set])

for p in [hvd.global_process_set, even_set, odd_set]:
  print(p)
# ProcessSet(process_set_id=0, ranks=[0, 1, 2, 3], mpi_comm=None)
# ProcessSet(process_set_id=1, ranks=[0, 2], mpi_comm=None)
# ProcessSet(process_set_id=2, ranks=[1, 3], mpi_comm=None)

# on ranks 0 and 2
result = hvd.allreduce(tensor_for_even_ranks, process_set=even_set)

# on ranks 1 and 3
result = hvd.allreduce(tensor_for_odd_ranks, process_set=odd_set)

这样初始化Horovod后,如果不重启程序,进程集的配置就无法更改。如果你只使用默认的全局进程集(hvd.global_process_set),则对性能没有影响。

2) 来自MPI通信器的静态进程集

# on all ranks
from mpi4py import MPI
comm = MPI.COMM_WORLD
subcomm = MPI.COMM_WORLD.Split(color=MPI.COMM_WORLD.rank % 2,
                               key=MPI.COMM_WORLD.rank)

split_process_set = hvd.ProcessSet(subcomm)

hvd.init(comm, process_sets=[split_process_set])

for p in [hvd.global_process_set, split_process_set]:
    print(p)
# ProcessSet(process_set_id=0, ranks=[0, 1, 2, 3], mpi_comm=<mpi4py.MPI.Intracomm object at 0x7fb817323dd0>)
# ProcessSet(process_set_id=1, ranks=[0, 2], mpi_comm=<mpi4py.MPI.Intracomm object at 0x7fb87e2ddfb0>)
## (split_process_set differs by rank)

# on ranks 0 and 2
result = hvd.allreduce(tensor_for_even_ranks, process_set=split_process_set)

# on ranks 1 and 3
result = hvd.allreduce(tensor_for_odd_ranks, process_set=split_process_set)

如果您已经在分布式程序中使用多个MPI通信器,可以直接将它们接入。

3) 动态进程集

# on all ranks
hvd.init(process_sets="dynamic")  # alternatively set HOROVOD_DYNAMIC_PROCESS_SETS=1
even_set = hvd.add_process_set([0,2])
odd_set = hvd.add_process_set([1,3])

for p in [hvd.global_process_set, even_set, odd_set]:
  print(p)
# ProcessSet(process_set_id=0, ranks=[0, 1, 2, 3], mpi_comm=None)
# ProcessSet(process_set_id=1, ranks=[0, 2], mpi_comm=None)
# ProcessSet(process_set_id=2, ranks=[1, 3], mpi_comm=None)

# on ranks 0 and 2
result = hvd.allreduce(tensor_for_even_ranks, process_set=even_set)

# on ranks 1 and 3
result = hvd.allreduce(tensor_for_odd_ranks, process_set=odd_set)

最灵活的设置是通过“动态”进程集实现的。进程集可以在初始化 Horovod 后通过 hvd.add_process_set()hvd.remove_process_set() 随时动态注册和注销。 所有进程必须以相同的方式和相同的顺序调用这些函数。

请注意,动态进程集会带来一些轻微的额外同步开销。