Horovod 与 Keras

Horovod 支持 Keras 和常规 TensorFlow 的方式类似。要将 Horovod 与 Keras 一起使用,请对您的训练脚本进行以下修改:

  1. 运行 hvd.init()

  1. 将每个GPU固定分配给单个进程。

    在典型的每个进程对应一个GPU的设置中,将此设置为本地排名。服务器上的第一个进程将被分配第一个GPU,第二个进程将被分配第二个GPU,依此类推。

    对于TensorFlow v1

    config = tf.ConfigProto()
    config .gpu_options.visible_device_list = str(hvd.local_rank())
    K.set_session(tf.Session(config=config))
    

    对于TensorFlow v2

    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
    

  1. 根据工作节点数量缩放学习率。

    在同步分布式训练中,有效批次大小会随工作节点数量成比例增加。 提高学习率可以补偿增大的批次大小。

  1. 将优化器包装在 hvd.DistributedOptimizer 中。

    分布式优化器将梯度计算委托给原始优化器,使用 allreduceallgather 对梯度进行平均,然后应用这些平均后的梯度。

    注意: 对于模型并行使用场景,存在一些局部变量(层),它们的梯度不需要通过 allreduce 或 allgather 进行同步。您可以通过调用返回的包装优化器的 register_local_var() API 来注册这些变量。或者,您可以使用 horovod.keras.PartialDistributedOptimizer API,并将局部层传递给此 API 以注册它们的局部变量。

  1. 添加 hvd.callbacks.BroadcastGlobalVariablesCallback(0) 以将初始变量状态从 rank 0 广播到所有其他进程。

    在使用随机权重开始训练或从检查点恢复时,这对于确保所有工作节点的一致初始化是必要的。

    注意: 对于模型并行使用场景,存在局部变量(层),其权重不需要广播。您可以通过添加 hvd.callbacks.BroadcastGlobalVariablesCallback(0, local_variables=[局部变量列表]) 来将这些局部变量传递给此回调。

  1. 修改你的代码,仅在工作者0上保存检查点,以防止其他工作者损坏它们。

    通过使用hvd.rank() != 0来保护模型检查点代码实现这一点。

注意

  • Keras 2.0.9 存在一个已知问题,会导致每个工作进程分配服务器上的所有GPU,而不是由本地排名分配的GPU。如果每个服务器有多个GPU,请升级到Keras 2.1.2或降级到Keras 2.0.8。

  • 要使用与 tensorflow 捆绑的 keras,你必须在导入语句中使用 from tensorflow import keras 而不是 import keras,以及使用 import horovod.tensorflow.keras as hvd 而不是 import horovod.keras as hvd

查看完整的简单(如下所示)和高级训练示例。

from __future__ import print_function
import keras
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K
import math
import tensorflow as tf
import horovod.keras as hvd

# Horovod: initialize Horovod.
hvd.init()

# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))

batch_size = 128
num_classes = 10

# Horovod: adjust number of epochs based on number of GPUs.
epochs = int(math.ceil(12.0 / hvd.size()))

# Input image dimensions
img_rows, img_cols = 28, 28

# The data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()

if K.image_data_format() == 'channels_first':
    x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
    x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
    input_shape = (1, img_rows, img_cols)
else:
    x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
    x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
    input_shape = (img_rows, img_cols, 1)

x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# Convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
                activation='relu',
                input_shape=input_shape))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))

# Horovod: adjust learning rate based on number of GPUs.
opt = keras.optimizers.Adadelta(1.0 * hvd.size())

# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt)

model.compile(loss=keras.losses.categorical_crossentropy,
              optimizer=opt,
              metrics=['accuracy'])

callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]

# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
    callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))

model.fit(x_train, y_train,
          batch_size=batch_size,
          callbacks=callbacks,
          epochs=epochs,
          verbose=1,
          validation_data=(x_test, y_test))
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

TensorFlow v2 Keras 示例(来自 MNIST 示例):

import tensorflow as tf
import horovod.tensorflow.keras as hvd

# Initialize Horovod
hvd.init()

# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# Build model and dataset
dataset = ...
model = ...
opt = tf.optimizers.Adam(0.001 * hvd.size())

# Horovod: add Horovod DistributedOptimizer.
opt = hvd.DistributedOptimizer(opt)

# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
                    optimizer=opt,
                    metrics=['accuracy'],
                    experimental_run_tf_function=False)

callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]

# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
    callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))

model.fit(dataset,
          steps_per_epoch=500 // hvd.size(),
          callbacks=callbacks,
          epochs=24,
          verbose=1 if hvd.rank() == 0 else 0)