Horovod 与 TensorFlow¶
要使用 Horovod 与 TensorFlow,请对您的训练脚本进行以下修改:
运行
hvd.init()。
将每个GPU固定分配给单个进程。
在典型的每个进程一个GPU的设置中,将其设置为本地排名。服务器上的第一个进程将被分配第一个GPU,第二个进程将被分配第二个GPU,依此类推。
对于TensorFlow v1:
config = tf.ConfigProto() config .gpu_options.visible_device_list = str(hvd.local_rank())
对于TensorFlow v2:
gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
根据工作节点数量缩放学习率。
在同步分布式训练中,有效批次大小会随工作节点数量成比例增加。 提高学习率可以补偿增大的批次大小。
将优化器包装在
hvd.DistributedOptimizer中。分布式优化器将梯度计算委托给原始优化器,使用allreduce或allgather对梯度进行平均,然后应用这些平均后的梯度。
对于TensorFlow v2,当使用
tf.GradientTape时,将tape包装在hvd.DistributedGradientTape中,而不是包装优化器。注意:对于模型并行使用场景,存在不需要同步梯度(通过allreduce或allgather)的局部变量(层)。您可以通过调用返回的包装优化器的
register_local_var()API来注册这些变量,或者,您可以使用horovod.keras.PartialDistributedOptimizerAPI并将局部层传递给此API以注册它们的局部变量。此外,当使用tf.GradientTape时,将tape包装在hvd.PartialDistributedGradientTape中而不是DistributedGradientTape,并将局部层传递给它以注册它们的局部变量。
将初始变量状态从等级0广播到所有其他进程。
当使用随机权重开始训练或从检查点恢复时,这对于确保所有工作节点的一致初始化是必要的。
对于TensorFlow v1,在使用
MonitoredTrainingSession时添加hvd.BroadcastGlobalVariablesHook(0)。 当不使用MonitoredTrainingSession时,在全局变量初始化后执行hvd.broadcast_global_variables操作。对于TensorFlow v2,在模型和优化器初始化后使用
hvd.broadcast_variables。
修改你的代码,仅在worker 0上保存检查点,以防止其他worker损坏它们。
对于TensorFlow v1,如果
hvd.rank() != 0,通过传递checkpoint_dir=None给tf.train.MonitoredTrainingSession来实现这一点。对于TensorFlow v2,构造一个
tf.train.Checkpoint,并且仅在hvd.rank() == 0时调用checkpoint.save()。
TensorFlow v1 示例(完整训练示例请参见 examples 目录):
import tensorflow as tf
import horovod.tensorflow as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.visible_device_list = str(hvd.local_rank())
# Build model...
loss = ...
opt = tf.train.AdagradOptimizer(0.01 * hvd.size())
# Add Horovod Distributed Optimizer
opt = hvd.DistributedOptimizer(opt)
# Add hook to broadcast variables from rank 0 to all other processes during
# initialization.
hooks = [hvd.BroadcastGlobalVariablesHook(0)]
# Make training operation
train_op = opt.minimize(loss)
# Save checkpoints only on worker 0 to prevent other workers from corrupting them.
checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
config=config,
hooks=hooks) as mon_sess:
while not mon_sess.should_stop():
# Perform synchronous training.
mon_sess.run(train_op)
TensorFlow v2 示例(来自 MNIST 示例):
import tensorflow as tf
import horovod.tensorflow as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
# Build model and dataset
dataset = ...
model = ...
loss = tf.losses.SparseCategoricalCrossentropy()
opt = tf.optimizers.Adam(0.001 * hvd.size())
checkpoint_dir = './checkpoints'
checkpoint = tf.train.Checkpoint(model=model, optimizer=opt)
@tf.function
def training_step(images, labels, first_batch):
with tf.GradientTape() as tape:
probs = mnist_model(images, training=True)
loss_value = loss(labels, probs)
# Horovod: add Horovod Distributed GradientTape.
tape = hvd.DistributedGradientTape(tape)
grads = tape.gradient(loss_value, mnist_model.trainable_variables)
opt.apply_gradients(zip(grads, mnist_model.trainable_variables))
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
#
# Note: broadcast should be done after the first gradient step to ensure optimizer
# initialization.
if first_batch:
hvd.broadcast_variables(mnist_model.variables, root_rank=0)
hvd.broadcast_variables(opt.variables(), root_rank=0)
return loss_value
# Horovod: adjust number of steps based on number of GPUs.
for batch, (images, labels) in enumerate(dataset.take(10000 // hvd.size())):
loss_value = training_step(images, labels, batch == 0)
if batch % 10 == 0 and hvd.local_rank() == 0:
print('Step #%d\tLoss: %.6f' % (batch, loss_value))
# Horovod: save checkpoints only on worker 0 to prevent other workers from
# corrupting it.
if hvd.rank() == 0:
checkpoint.save(checkpoint_dir)
Horovod 与 TensorFlow 数据服务¶
TensorFlow Data Service 允许将数据集的CPU密集型处理从训练过程中转移到拥有丰富CPU资源的进程集群中。
使用 Horovod,可以轻松地在您的 Horovod 集群上启动 TensorFlow 数据服务,并将您的 Horovod 训练任务连接到该服务。
运行以下命令通过 Horovod 启动 TensorFlow 数据服务:
horovodrun -np 4 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json
这将启动一个TensorFlow数据服务(此处称为计算作业),包含一个调度器和四个工作器。
注意
配置文件由计算作业写入,必须位于运行该计算作业的所有节点均可访问的路径上,例如分布式文件系统。
然后,您的训练任务可以通过在TensorFlow数据集上调用.send_to_data_service(…),将CPU密集型数据集操作转移到该数据服务:
from horovod.tensorflow.data.compute_service import TfDataServiceConfig
hvd.init()
rank = hvd.rank()
size = hvd.size()
compute_config = TfDataServiceConfig.read('/tmp/compute.json', wait_for_file_creation=True)
dataset = dataset.repeat() \
.shuffle(10000) \
.batch(128) \
.send_to_data_service(compute_config, rank, size) \
.prefetch(tf.data.experimental.AUTOTUNE)
在调用send_to_data_service之前的所有转换将由数据服务执行,而在此之后的所有转换则由训练脚本在本地执行。
你可以在示例目录中找到 tensorflow2_mnist_data_service.py 示例。
首先如上所示启动数据服务。在数据服务运行期间,启动示例训练脚本:
horovodrun -np 2 python tensorflow2_mnist_data_service.py /tmp/compute.json
计算任务通常在CPU节点上运行,而训练任务则在GPU节点上运行。这样可以在CPU节点上运行CPU密集型数据集转换,同时在GPU节点上运行GPU密集型训练。可以有多个CPU专门用于一个GPU任务。
使用 --hosts 参数分别在 CPU 节点(此处为 cpu-node-1 和 cpu-node-2)
和 GPU 节点(此处为 gpu-node-1 和 gpu-node-2)上运行计算和训练任务:
horovodrun -np 4 --hosts cpu-node-1:2,cpu-node-2:2 python -m horovod.tensorflow.data.compute_worker /tmp/compute.json
horovodrun -np 2 --hosts gpu-node-1:1,gpu-node-2:1 python tensorflow2_mnist_data_service.py /tmp/compute.json
注意
请确保您理解TensorFlow数据服务如何分发数据集转换: 参见distribute转换。
多个调度器¶
数据服务允许多个调度器,每个训练任务一个。每个调度器获得相同数量的工作器。
由于工作器专用于单个调度器,工作器专用于单个训练任务。
您的计算作业大小(-np 4)必须是调度器数量(--dispatchers 2)的倍数:
horovodrun -np 4 python -m horovod.tensorflow.data.compute_worker --dispatchers 2 /tmp/compute.json
这要求调度器数量 (--dispatchers 2) 与训练任务规模 (-np 2) 相匹配:
horovodrun -np 2 python tensorflow2_mnist_data_service.py /tmp/compute.json
单调度器¶
使用单个调度器,TensorFlow 允许在所有训练任务中复用数据集。这是基于先到先得或轮询机制实现的。唯一支持的处理模式是 "distributed_epoch"。
训练端调度器¶
默认情况下,调度器在计算作业内部运行。然而,您也可以让它们在训练作业内部运行。
添加 --dispatcher-side training 来告知计算作业,调度器是由训练作业启动的。
horovodrun -np 4 python -m horovod.tensorflow.data.compute_worker --dispatcher-side training /tmp/compute.json
训练脚本随后通过with tf_data_service(…)启动调度器并分发数据集本身:
hvd.init()
rank = hvd.rank()
size = hvd.size()
compute_config = TfDataServiceConfig.read('/tmp/compute.json', wait_for_file_creation=True)
with tf_data_service(compute_config, rank) as dispatcher_address:
dataset = dataset.repeat() \
.shuffle(10000) \
.batch(128) \
.apply(tf.data.experimental.service.distribute(
processing_mode="distributed_epoch",
service=dispatcher_address,
job_name='job' if reuse_dataset else None,
consumer_index=rank if round_robin else None,
num_consumers=size if round_robin else None)) \
.prefetch(tf.data.experimental.AUTOTUNE)
要查看使训练任务运行调度器所需的具体更改,只需对比训练端示例与计算端示例:
diff -w examples/tensorflow2/tensorflow2_mnist_data_service_train_fn_*
在Spark集群上计算任务¶
计算任务可以使用 spark-submit 在 Spark 集群上启动:
worker_py=$(python -c "import horovod.spark.tensorflow.compute_worker as worker; print(worker.__file__)")
spark-submit --master "local[4]" "$worker_py" /tmp/compute.json
在计算作业运行期间,启动训练作业:
cd examples/spark/tensorflow2 spark-submit –master “local[2]” –py-files tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher.py,tensorflow2_mnist_data_service_train_fn_training_side_dispatcher.py tensorflow2_mnist_data_service.py /tmp/compute.json
与往常一样,配置文件必须位于运行计算作业的所有节点均可访问的路径上。