Horovod 与 TensorFlow

要使用 Horovod 与 TensorFlow,请对您的训练脚本进行以下修改:

  1. 运行 hvd.init()

  1. 将每个GPU固定分配给单个进程。

    在典型的每个进程一个GPU的设置中,将其设置为本地排名。服务器上的第一个进程将被分配第一个GPU,第二个进程将被分配第二个GPU,依此类推。

    对于TensorFlow v1

    config = tf.ConfigProto()
    config .gpu_options.visible_device_list = str(hvd.local_rank())
    

    对于TensorFlow v2

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
    

  1. 根据工作节点数量缩放学习率。

    在同步分布式训练中,有效批次大小会随工作节点数量成比例增加。 提高学习率可以补偿增大的批次大小。

  1. 将优化器包装在hvd.DistributedOptimizer中。

    分布式优化器将梯度计算委托给原始优化器,使用allreduceallgather对梯度进行平均,然后应用这些平均后的梯度。

    对于TensorFlow v2,当使用tf.GradientTape时,将tape包装在hvd.DistributedGradientTape中,而不是包装优化器。

    注意:对于模型并行使用场景,存在不需要同步梯度(通过allreduce或allgather)的局部变量(层)。您可以通过调用返回的包装优化器的register_local_var() API来注册这些变量,或者,您可以使用horovod.keras.PartialDistributedOptimizer API并将局部层传递给此API以注册它们的局部变量。此外,当使用tf.GradientTape时,将tape包装在hvd.PartialDistributedGradientTape中而不是DistributedGradientTape,并将局部层传递给它以注册它们的局部变量。

  1. 将初始变量状态从等级0广播到所有其他进程。

    当使用随机权重开始训练或从检查点恢复时,这对于确保所有工作节点的一致初始化是必要的。

    对于TensorFlow v1,在使用MonitoredTrainingSession时添加hvd.BroadcastGlobalVariablesHook(0)。 当不使用MonitoredTrainingSession时,在全局变量初始化后执行hvd.broadcast_global_variables操作。

    对于TensorFlow v2,在模型和优化器初始化后使用hvd.broadcast_variables

  1. 修改你的代码,仅在worker 0上保存检查点,以防止其他worker损坏它们。

    对于TensorFlow v1,如果hvd.rank() != 0,通过传递checkpoint_dir=Nonetf.train.MonitoredTrainingSession来实现这一点。

    对于TensorFlow v2,构造一个tf.train.Checkpoint,并且仅在hvd.rank() == 0时调用checkpoint.save()

TensorFlow v1 示例(完整训练示例请参见 examples 目录):

import tensorflow as tf
import horovod.tensorflow as hvd


# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())

# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())

# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)

# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]

# Make training operation
train_op = opt.minimize(loss)

# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
                                       config=config,
                                       hooks=hooks) as mon_sess:
  while not mon_sess.should_stop():
    # Perform synchronous training.
    mon_sess.run(train_op)

TensorFlow v2 示例(来自 MNIST 示例):

import tensorflow as tf
import horovod.tensorflow as hvd

# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# Build model and dataset
dataset = ...
model = ...
loss = tf.losses.SparseCategoricalCrossentropy()
opt = tf.optimizers.Adam(0.001 * hvd.size())

checkpoint_dir = './checkpoints'
checkpoint = tf.train.Checkpoint(model=model, optimizer=opt)

@tf.function
def training_step(images, labels, first_batch):
    with tf.GradientTape() as tape:
        probs = mnist_model(images, training=True)
        loss_value = loss(labels, probs)

    # Horovod: add Horovod Distributed GradientTape.
    tape = hvd.DistributedGradientTape(tape)

    grads = tape.gradient(loss_value, mnist_model.trainable_variables)
    opt.apply_gradients(zip(grads, mnist_model.trainable_variables))

    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    #
    # Note: broadcast should be done after the first gradient step to ensure optimizer
    # initialization.
    if first_batch:
        hvd.broadcast_variables(mnist_model.variables, root_rank=0)
        hvd.broadcast_variables(opt.variables(), root_rank=0)

    return loss_value

# Horovod: adjust number of steps based on number of GPUs.
for batch, (images, labels) in enumerate(dataset.take(10000 // hvd.size())):
    loss_value = training_step(images, labels, batch == 0)

    if batch % 10 == 0 and hvd.local_rank() == 0:
        print('Step #%d\tLoss: %.6f' % (batch, loss_value))

# Horovod: save checkpoints only on worker 0 to prevent other workers from
# corrupting it.
if hvd.rank() == 0:
    checkpoint.save(checkpoint_dir)

Horovod 与 TensorFlow 数据服务

TensorFlow Data Service 允许将数据集的CPU密集型处理从训练过程中转移到拥有丰富CPU资源的进程集群中。

使用 Horovod,可以轻松地在您的 Horovod 集群上启动 TensorFlow 数据服务,并将您的 Horovod 训练任务连接到该服务。

运行以下命令通过 Horovod 启动 TensorFlow 数据服务:

horovodrun -np 4 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json

这将启动一个TensorFlow数据服务(此处称为计算作业),包含一个调度器和四个工作器。

注意

配置文件由计算作业写入,必须位于运行该计算作业的所有节点均可访问的路径上,例如分布式文件系统。

然后,您的训练任务可以通过在TensorFlow数据集上调用.send_to_data_service(…),将CPU密集型数据集操作转移到该数据服务:

from horovod.tensorflow.data.compute_service import TfDataServiceConfig

hvd.init()
rank = hvd.rank()
size = hvd.size()

compute_config = TfDataServiceConfig.read('/tmp/compute.json', wait_for_file_creation=True)

dataset = dataset.repeat() \
    .shuffle(10000) \
    .batch(128) \
    .send_to_data_service(compute_config, rank, size) \
    .prefetch(tf.data.experimental.AUTOTUNE)

在调用send_to_data_service之前的所有转换将由数据服务执行,而在此之后的所有转换则由训练脚本在本地执行。

你可以在示例目录中找到 tensorflow2_mnist_data_service.py 示例。

首先如上所示启动数据服务。在数据服务运行期间,启动示例训练脚本:

horovodrun -np 2 python tensorflow2_mnist_data_service.py /tmp/compute.json

计算任务通常在CPU节点上运行,而训练任务则在GPU节点上运行。这样可以在CPU节点上运行CPU密集型数据集转换,同时在GPU节点上运行GPU密集型训练。可以有多个CPU专门用于一个GPU任务。

使用 --hosts 参数分别在 CPU 节点(此处为 cpu-node-1cpu-node-2) 和 GPU 节点(此处为 gpu-node-1gpu-node-2)上运行计算和训练任务:

horovodrun -np 4 --hosts cpu-node-1:2,cpu-node-2:2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json
horovodrun -np 2 --hosts gpu-node-1:1,gpu-node-2:1 python tensorflow2_mnist_data_service.py /tmp/compute.json

注意

请确保您理解TensorFlow数据服务如何分发数据集转换: 参见distribute转换。

多个调度器

数据服务允许多个调度器,每个训练任务一个。每个调度器获得相同数量的工作器。 由于工作器专用于单个调度器,工作器专用于单个训练任务。 您的计算作业大小(-np 4)必须是调度器数量(--dispatchers 2)的倍数:

horovodrun -np 4 python -m horovod.tensorflow.data.compute_worker --dispatchers 2 /tmp/compute.json

这要求调度器数量 (--dispatchers 2) 与训练任务规模 (-np 2) 相匹配:

horovodrun -np 2 python tensorflow2_mnist_data_service.py /tmp/compute.json

单调度器

使用单个调度器,TensorFlow 允许在所有训练任务中复用数据集。这是基于先到先得或轮询机制实现的。唯一支持的处理模式是 "distributed_epoch"

训练端调度器

默认情况下,调度器在计算作业内部运行。然而,您也可以让它们在训练作业内部运行。 添加 --dispatcher-side training 来告知计算作业,调度器是由训练作业启动的。

horovodrun -np 4 python -m horovod.tensorflow.data.compute_worker --dispatcher-side training /tmp/compute.json

训练脚本随后通过with tf_data_service(…)启动调度器并分发数据集本身:

hvd.init()
rank = hvd.rank()
size = hvd.size()

compute_config = TfDataServiceConfig.read('/tmp/compute.json', wait_for_file_creation=True)

with tf_data_service(compute_config, rank) as dispatcher_address:

    dataset = dataset.repeat() \
        .shuffle(10000) \
        .batch(128) \
        .apply(tf.data.experimental.service.distribute(
            processing_mode="distributed_epoch",
            service=dispatcher_address,
            job_name='job' if reuse_dataset else None,
            consumer_index=rank if round_robin else None,
            num_consumers=size if round_robin else None)) \
        .prefetch(tf.data.experimental.AUTOTUNE)

要查看使训练任务运行调度器所需的具体更改,只需对比训练端示例与计算端示例:

diff -w examples/tensorflow2/tensorflow2_mnist_data_service_train_fn_*

在Spark集群上计算任务

计算任务可以使用 spark-submit 在 Spark 集群上启动:

worker_py=$(python -c "import horovod.spark.tensorflow.compute_worker as worker; print(worker.__file__)")
spark-submit --master "local[4]" "$worker_py" /tmp/compute.json

在计算作业运行期间,启动训练作业:

cd examples/spark/tensorflow2 spark-submit –master “local[2]” –py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json

与往常一样,配置文件必须位于运行计算作业的所有节点均可访问的路径上。