Horovod on Spark

horovod.spark 包提供了一个便捷的 Horovod 封装器,使得在 Spark 集群中运行分布式训练任务变得简单。

在训练数据源自Spark的情况下,这实现了紧密的模型设计循环,其中数据处理、模型训练和模型评估都在Spark中完成。

我们提供两个API用于在Spark上运行Horovod:一个高级的Estimator API和一个低级的Run API。两者 使用相同的底层机制在Spark执行器上启动Horovod,但Estimator API抽象了数据 处理(从Spark DataFrames到深度学习数据集)、模型训练循环、模型检查点、指标 收集和分布式训练。

我们推荐使用 Horovod Spark Estimators,如果您:

  • 正在使用Keras (tf.keraskeras) 或 PyTorch 进行训练。

  • 想要直接在来自 pyspark 的 Spark DataFrame 上进行训练。

  • 正在使用标准梯度下降优化过程作为您的训练循环。

如果由于任何原因,Estimator API 无法满足您的需求,Run API 提供了更细粒度的控制。

安装

当安装 Horovod 用于 Spark 时,使用额外的 [spark] 来一并安装所有 Spark 依赖项:

$ ... pip install horovod[spark]

请注意,Horovod Spark Estimators 需要满足以下条件:

  • horovod >= 0.19.0

  • pyspark >= 2.3.2

未包含在[spark]依赖项列表中的是深度学习框架(TensorFlow或PyTorch)。 Horovod Spark Estimators 还需要至少以下组合之一:

  • tensorflow-gpu >= 1.12.0tensorflow >= 1.12.0 (用于 KerasEstimator)

  • torch >= 1.0.0tensorboard >= 1.14.0 (用于 TorchEstimator)

  • torch >= 1.4.0pytorch_lightning >= 1.3.8 (用于 LightningEstimator)

Horovod Spark Estimators

Horovod Spark Estimators 允许您直接在现有的 Spark DataFrame 上训练您的深度神经网络,利用 Horovod 跨多个工作节点扩展的能力,无需任何专门的分布式训练代码:

from tensorflow import keras
import tensorflow as tf
import horovod.spark.keras as hvd

model = keras.models.Sequential()
    .add(keras.layers.Dense(8, input_dim=2))
    .add(keras.layers.Activation('tanh'))
    .add(keras.layers.Dense(1))
    .add(keras.layers.Activation('sigmoid'))

# NOTE: unscaled learning rate
optimizer = keras.optimizers.SGD(lr=0.1)
loss = 'binary_crossentropy'

store = HDFSStore('/user/username/experiments')
keras_estimator = hvd.KerasEstimator(
    num_proc=4,
    store=store,
    model=model,
    optimizer=optimizer,
    loss=loss,
    feature_cols=['features'],
    label_cols=['y'],
    batch_size=32,
    epochs=10)


keras_model = keras_estimator.fit(train_df) \
    .setOutputCols(['predict'])
predict_df = keras_model.transform(test_df)

Estimator隐藏了将Spark数据帧与深度学习训练脚本粘合的复杂性,将数据读取为训练框架可解释的格式,并使用Horovod分发训练。用户只需提供一个Keras或PyTorch模型,Estimator将完成将其适配到数据帧的工作。

训练完成后,Estimator(评估器)会返回训练模型的Transformer(转换器)表示。该模型转换器可以像任何Spark ML转换器一样使用,对输入DataFrame进行预测,并将预测结果作为新列写入输出DataFrame中。

Estimators 可以通过模型检查点、热启动重新训练和指标记录(用于 Tensorboard)来跟踪实验历史,使用 Estimator Store 抽象。存储用于持久化所有训练产物,包括训练数据的中间表示。Horovod 原生支持 HDFS 和本地文件系统的存储。

Petastorm 默认使用基于数据加载器, 但用户可以通过重写 BaseDataLoader 接口来自定义数据加载器。异步数据加载器混合类也可以 添加到数据加载器之上。此外,KerasEstimator 和 TorchEstimator 都支持一个可选的 DataModule 参数,类似于 Lightning DataModule,它抽象了数据加载过程并允许替代实现。 例如,NVTabularDataModule 集成了来自 NVTabular 的 KerasSequenceLoader 以实现 GPU 加速的数据加载。

有一个示例 Dockerfile 用于构建支持 NVTabular 的 Horovod。

from horovod.spark.keras.datamodule import NVTabularDataModule

keras_estimator = hvd.KerasEstimator(
    data_module=NVTabularDataModule,   # default: PetastormDataModule
    num_proc=4,
    store=store,
    model=model,
    optimizer=optimizer,
    loss=loss,
    feature_cols=['features'],
    label_cols=['y'],
    continuous_cols=CONTINUOUS_COLS,
    categorical_cols=CATEGORICAL_COLS,
    batch_size=32,
    epochs=10)

端到端示例

keras_spark_rossmann_estimator.py script 提供了一个 端到端数据准备和模型训练的示例,用于 Rossmann Store Sales Kaggle 竞赛。它受到文章An Introduction to Deep Learning for Tabular Data的启发, 并利用了文章中引用的笔记本代码。该示例分为三个部分:

  1. 第一部分对竞赛提供并由社区收集的初始CSV文件集执行复杂的数据预处理。

  2. 第二部分定义了一个Keras模型,并使用Horovod在Spark上执行模型的分布式训练。

  3. 第三部分使用最佳模型进行预测并创建提交文件。

要运行该示例,请确保安装带有 [spark] 的 Horovod,然后:

$ wget https://raw.githubusercontent.com/horovod/horovod/master/examples/spark/keras/keras_spark_rossmann_estimator.py
$ wget http://files.fast.ai/part2/lesson14/rossmann.tgz
$ tar zxvf rossmann.tgz
$ python keras_spark_rossmann_estimator.py

对于pytorch,你可以查看pytorch_lightning_spark_mnist.py脚本了解如何使用带有horovod后端的lightning估计器在spark上训练mnist模型。

在现有Parquet数据集上进行训练

如果你的数据已经是Parquet格式,并且希望使用Horovod Spark Estimators进行训练, 你可以无需在Spark中重新处理数据。使用Estimator.fit_on_parquet(),你可以直接 在现有的Parquet数据集上进行训练:

store = HDFSStore(train_path='/user/username/training_dataset', val_path='/user/username/val_dataset')
keras_estimator = hvd.KerasEstimator(
    num_proc=4,
    store=store,
    model=model,
    optimizer=optimizer,
    loss=loss,
    feature_cols=['features'],
    label_cols=['y'],
    batch_size=32,
    epochs=10)

keras_model = keras_estimator.fit_on_parquet()

生成的 keras_model 可以像任何 Spark Transformer 一样使用,或者您可以提取底层的 Keras 模型并在 Spark 之外使用:

model = keras_model.getModel()
pred = model.predict([np.ones([1, 2], dtype=np.float32)])

此方法适用于使用horovod.spark.common.util.prepare_data创建的数据集。它同样适用于 任何不包含Spark用户自定义数据类型(如DenseVectorSparseVector)的Parquet文件。建议使用 prepare_data以确保数据已为训练做好适当准备,即使您已有 Parquet格式的现有数据集。使用prepare_data可以让您根据计划使用的 训练进程数量正确分区数据集,并压缩大型稀疏数据列:

store = HDFSStore(train_path='/user/username/training_dataset', val_path='/user/username/val_dataset')
with util.prepare_data(num_processes=4,
                       store=store,
                       df=df,
                       feature_columns=['features'],
                       label_columns=['y'],
                       validation=0.1,
                       compress_sparse=True):
    keras_estimator = hvd.KerasEstimator(
        num_proc=4,
        store=store,
        model=model,
        optimizer=optimizer,
        loss=loss,
        feature_cols=['features'],
        label_cols=['y'],
        batch_size=32,
        epochs=10)

    keras_model = keras_estimator.fit_on_parquet()

一旦数据准备就绪,您可以在未来的Spark应用程序中重复使用它,无需再次调用 util.prepare_data

Horovod Spark 运行

你也可以在Spark上使用Horovod来运行与普通训练脚本相同的代码,支持Horovod所支持的任何框架。为此,只需将你的训练逻辑写在一个函数中,然后使用horovod.spark.run在Spark之上通过MPI并行执行该函数。

由于 Spark 上的 Horovod 使用 cloudpickle 将训练函数发送给工作节点执行,您可以在训练函数中捕获来自训练脚本或笔记本的局部变量,类似于在 PySpark 中使用用户自定义函数。

以下提供了一个在Spark中运行Horovod作业的简单示例:

$ pyspark
[PySpark welcome message]

>>> def fn(magic_number):
...   import horovod.torch as hvd
...   hvd.init()
...   print('Hello, rank = %d, local_rank = %d, size = %d, local_size = %d, magic_number = %d' % (hvd.rank(), hvd.local_rank(), hvd.size(), hvd.local_size(), magic_number))
...   return hvd.rank()
...
>>> import horovod.spark
>>> horovod.spark.run(fn, args=(42,))
Running 16 processes...
[Stage 0:>                                                        (0 + 16) / 16]
Hello, rank = 15, local_rank = 3, size = 16, local_size = 4, magic_number = 42
Hello, rank = 13, local_rank = 1, size = 16, local_size = 4, magic_number = 42
Hello, rank = 8, local_rank = 0, size = 16, local_size = 4, magic_number = 42
Hello, rank = 9, local_rank = 1, size = 16, local_size = 4, magic_number = 42
Hello, rank = 10, local_rank = 2, size = 16, local_size = 4, magic_number = 42
Hello, rank = 11, local_rank = 3, size = 16, local_size = 4, magic_number = 42
Hello, rank = 6, local_rank = 2, size = 16, local_size = 4, magic_number = 42
Hello, rank = 4, local_rank = 0, size = 16, local_size = 4, magic_number = 42
Hello, rank = 0, local_rank = 0, size = 16, local_size = 4, magic_number = 42
Hello, rank = 1, local_rank = 1, size = 16, local_size = 4, magic_number = 42
Hello, rank = 2, local_rank = 2, size = 16, local_size = 4, magic_number = 42
Hello, rank = 5, local_rank = 1, size = 16, local_size = 4, magic_number = 42
Hello, rank = 3, local_rank = 3, size = 16, local_size = 4, magic_number = 42
Hello, rank = 12, local_rank = 0, size = 16, local_size = 4, magic_number = 42
Hello, rank = 7, local_rank = 3, size = 16, local_size = 4, magic_number = 42
Hello, rank = 14, local_rank = 2, size = 16, local_size = 4, magic_number = 42
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]
>>>

一个更完整的示例可以在 keras_spark_rossmann_run.py 中找到,它展示了如何使用底层的 horovod.spark.run API 通过以下步骤端到端地训练模型:

$ wget https://raw.githubusercontent.com/horovod/horovod/master/examples/spark/keras/keras_spark_rossmann_run.py
$ wget http://files.fast.ai/part2/lesson14/rossmann.tgz
$ tar zxvf rossmann.tgz
$ python keras_spark_rossmann_run.py

Spark集群设置

由于深度学习工作负载的资源需求通常与典型的数据处理工作负载有很大不同,因此在DL Spark集群设置时需要考虑一些特定因素。

GPU 训练

对于GPU训练,一种方法是设置一个独立的GPU Spark集群,并将每个执行器配置为# of CPU cores = # of GPUs。这可以在独立模式下按如下方式实现:

$ echo "export SPARK_WORKER_CORES=<# of GPUs>" >> /path/to/spark/conf/spark-env.sh
$ /path/to/spark/sbin/start-all.sh

该方法将spark.task.cpus设置转换为控制每个进程请求的GPU数量(默认为1)。

正在进行的 SPARK-24615 工作旨在 在未来的 Spark 版本中引入 GPU 感知的资源调度。

CPU 训练

对于CPU训练,一种方法是在训练会话创建期间指定spark.task.cpus设置:

conf = SparkConf().setAppName('training') \
    .setMaster('spark://training-cluster:7077') \
    .set('spark.task.cpus', '16')
spark = SparkSession.builder.config(conf=conf).getOrCreate()

这种方法允许您重复使用同一个Spark集群进行数据准备和训练。

安全

Horovod on Spark 使用 Open MPI 在 Spark 中运行 Horovod 作业,因此其安全性与 Open MPI 实现本身相同。

由于Open MPI不使用加密通信,并且能够启动新进程,建议使用网络级安全措施将Horovod作业与潜在攻击者隔离

环境调节旋钮

  • HOROVOD_SPARK_START_TIMEOUT - 设置Spark任务生成、注册和开始运行代码的默认超时时间。如果Spark任务的执行器是按需调度的,并且可能需要很长时间才能启动,那么在系统级别增加此超时时间可能会很有用。

Horovod on Databricks

要在Databricks上的Spark中运行Horovod,请使用以下任一模式中的DBFS路径创建一个Store实例:

  • /dbfs/...

  • dbfs:/...

  • file:///dbfs/...

store = Store.create(dbfs_path)
# or explicitly using DBFSLocalStore
store = DBFSLocalStore(dbfs_path)

DBFSLocalStore 使用 Databricks 文件系统 (DBFS) 本地文件 API (AWS | Azure) 作为中间数据和训练产物的存储。

Databricks 在 Databricks Runtime 7.0 ML GPU 及以上版本中预配置了 GPU 感知调度。详情请参阅 GPU 调度说明 (AWS | Azure) 。

使用Estimator API,horovod将启动# 每个工作节点上的任务数 = 每个工作节点上的GPU数量,每个任务将从spark固定GPU到分配的GPU。

通过运行API,来自horovod.spark.task的函数get_available_devices()将返回调用get_available_devices()的spark任务所分配的GPU列表。 请参阅keras_spark3_rossmann.py以获取使用运行API与get_available_devices()的示例。

在某些情况下,您可能希望忽略由Spark分配的GPU设备,并始终使用本地排名作为GPU索引。 您可以设置环境变量HOROVOD_SPARK_USE_LOCAL_RANK_GPU_INDEX1,以使Horovod使用本地排名 作为每个任务的GPU索引。