Horovod 与 Keras¶
Horovod 支持 Keras 和常规 TensorFlow 的方式类似。要将 Horovod 与 Keras 一起使用,请对您的训练脚本进行以下修改:
运行
hvd.init()。
将每个GPU固定分配给单个进程。
在典型的每个进程对应一个GPU的设置中,将此设置为本地排名。服务器上的第一个进程将被分配第一个GPU,第二个进程将被分配第二个GPU,依此类推。
对于TensorFlow v1:
config = tf.ConfigProto() config .gpu_options.visible_device_list = str(hvd.local_rank()) K.set_session(tf.Session(config=config))
对于TensorFlow v2:
gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
根据工作节点数量缩放学习率。
在同步分布式训练中,有效批次大小会随工作节点数量成比例增加。 提高学习率可以补偿增大的批次大小。
将优化器包装在
hvd.DistributedOptimizer中。分布式优化器将梯度计算委托给原始优化器,使用 allreduce 或 allgather 对梯度进行平均,然后应用这些平均后的梯度。
注意: 对于模型并行使用场景,存在一些局部变量(层),它们的梯度不需要通过 allreduce 或 allgather 进行同步。您可以通过调用返回的包装优化器的 register_local_var() API 来注册这些变量。或者,您可以使用
horovod.keras.PartialDistributedOptimizerAPI,并将局部层传递给此 API 以注册它们的局部变量。
添加
hvd.callbacks.BroadcastGlobalVariablesCallback(0)以将初始变量状态从 rank 0 广播到所有其他进程。在使用随机权重开始训练或从检查点恢复时,这对于确保所有工作节点的一致初始化是必要的。
注意: 对于模型并行使用场景,存在局部变量(层),其权重不需要广播。您可以通过添加
hvd.callbacks.BroadcastGlobalVariablesCallback(0, local_variables=[局部变量列表])来将这些局部变量传递给此回调。
修改你的代码,仅在工作者0上保存检查点,以防止其他工作者损坏它们。
通过使用
hvd.rank() != 0来保护模型检查点代码实现这一点。
注意
Keras 2.0.9 存在一个已知问题,会导致每个工作进程分配服务器上的所有GPU,而不是由本地排名分配的GPU。如果每个服务器有多个GPU,请升级到Keras 2.1.2或降级到Keras 2.0.8。
要使用与
tensorflow捆绑的keras,你必须在导入语句中使用from tensorflow import keras而不是import keras,以及使用import horovod.tensorflow.keras as hvd而不是import horovod.keras as hvd。
from __future__ import print_function
import keras
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as K
import math
import tensorflow as tf
import horovod.keras as hvd
# Horovod: initialize Horovod.
hvd.init()
# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))
batch_size = 128
num_classes = 10
# Horovod: adjust number of epochs based on number of GPUs.
epochs = int(math.ceil(12.0 / hvd.size()))
# Input image dimensions
img_rows, img_cols = 28, 28
# The data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()
if K.image_data_format() == 'channels_first':
x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
input_shape = (1, img_rows, img_cols)
else:
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
# Convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
activation='relu',
input_shape=input_shape))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))
# Horovod: adjust learning rate based on number of GPUs.
opt = keras.optimizers.Adadelta(1.0 * hvd.size())
# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt)
model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=opt,
metrics=['accuracy'])
callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
model.fit(x_train, y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=1,
validation_data=(x_test, y_test))
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])
TensorFlow v2 Keras 示例(来自 MNIST 示例):
import tensorflow as tf
import horovod.tensorflow.keras as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
# Build model and dataset
dataset = ...
model = ...
opt = tf.optimizers.Adam(0.001 * hvd.size())
# Horovod: add Horovod DistributedOptimizer.
opt = hvd.DistributedOptimizer(opt)
# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
optimizer=opt,
metrics=['accuracy'],
experimental_run_tf_function=False)
callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
model.fit(dataset,
steps_per_epoch=500 // hvd.size(),
callbacks=callbacks,
epochs=24,
verbose=1 if hvd.rank() == 0 else 0)