使用 XGBoost 和 LightGBM 开始分布式训练#

Ray Train 内置支持 XGBoost 和 LightGBM。

快速入门#

import ray
from ray.train import ScalingConfig
from ray.train.xgboost import XGBoostTrainer

# Load data.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")

# Split data into train and validation.
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)

trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=2,
        # Whether to use GPU acceleration. Set to True to schedule GPU workers.
        use_gpu=False,
    ),
    label_column="target",
    num_boost_round=20,
    params={
        # XGBoost specific params (see the `xgboost.train` API reference)
        "objective": "binary:logistic",
        # uncomment this and set `use_gpu=True` to use GPU for training
        # "tree_method": "gpu_hist",
        "eval_metric": ["logloss", "error"],
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
    # If running in a multi-node cluster, this is where you
    # should configure the run's persistent storage that is accessible
    # across all worker nodes.
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
print(result.metrics)
import ray
from ray.train import ScalingConfig
from ray.train.lightgbm import LightGBMTrainer

# Load data.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")

# Split data into train and validation.
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)

trainer = LightGBMTrainer(
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=2,
        # Whether to use GPU acceleration. Set to True to schedule GPU workers.
        use_gpu=False,
    ),
    label_column="target",
    num_boost_round=20,
    params={
        # LightGBM specific params
        "objective": "binary",
        "metric": ["binary_logloss", "binary_error"],
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
    # If running in a multi-node cluster, this is where you
    # should configure the run's persistent storage that is accessible
    # across all worker nodes.
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
print(result.metrics)

使用基于树模型的基本训练#

正如在原始的 xgboost.train()lightgbm.train() 函数中一样,训练参数作为 params 字典传递。

import ray
from ray.train import ScalingConfig
from ray.train.xgboost import XGBoostTrainer

# Load data.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")

# Split data into train and validation.
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)

trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=2,
        # Whether to use GPU acceleration. Set to True to schedule GPU workers.
        use_gpu=False,
    ),
    label_column="target",
    num_boost_round=20,
    params={
        # XGBoost specific params (see the `xgboost.train` API reference)
        "objective": "binary:logistic",
        # uncomment this and set `use_gpu=True` to use GPU for training
        # "tree_method": "gpu_hist",
        "eval_metric": ["logloss", "error"],
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
    # If running in a multi-node cluster, this is where you
    # should configure the run's persistent storage that is accessible
    # across all worker nodes.
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
print(result.metrics)
import ray
from ray.train import ScalingConfig
from ray.train.lightgbm import LightGBMTrainer

# Load data.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")

# Split data into train and validation.
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)

trainer = LightGBMTrainer(
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=2,
        # Whether to use GPU acceleration. Set to True to schedule GPU workers.
        use_gpu=False,
    ),
    label_column="target",
    num_boost_round=20,
    params={
        # LightGBM specific params
        "objective": "binary",
        "metric": ["binary_logloss", "binary_error"],
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
    # If running in a multi-node cluster, this is where you
    # should configure the run's persistent storage that is accessible
    # across all worker nodes.
    # run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()
print(result.metrics)

训练器构造函数传递 Ray 特定的参数。

保存和加载 XGBoost 和 LightGBM 检查点#

当你在每次提升轮次中训练一棵新树时,你可以保存一个检查点来快照到目前为止的训练进度。XGBoostTrainerLightGBMTrainer 都实现了开箱即用的检查点功能。这些检查点可以使用静态方法 XGBoostTrainer.get_modelLightGBMTrainer.get_model 加载到内存中。

唯一需要的更改是配置 CheckpointConfig 以设置检查点频率。例如,以下配置在每次提升轮次时保存一个检查点,并且只保留最新的检查点:

from ray.train import RunConfig, CheckpointConfig

run_config = RunConfig(
    checkpoint_config=CheckpointConfig(
        # Checkpoint every iteration.
        checkpoint_frequency=1,
        # Only keep the latest checkpoint and delete the others.
        num_to_keep=1,
    )
)

# from ray.train.xgboost import XGBoostTrainer
# trainer = XGBoostTrainer(..., run_config=run_config)

小技巧

一旦你启用了检查点,你可以按照 这个指南 来启用容错功能。

如何扩展训练?#

使用 Ray Train 的好处是,你可以通过调整 ScalingConfig 无缝扩展你的训练。

备注

Ray Train 不会修改或改变底层 XGBoost 或 LightGBM 分布式训练算法的运行方式。Ray 仅提供编排、数据摄取和容错功能。有关 GBDT 分布式训练的更多信息,请参阅 XGBoost 文档LightGBM 文档

以下是一些常见用例的示例:

设置:4个节点,每个节点8个CPU。

用例:在多节点训练中利用所有资源。

scaling_config = ScalingConfig(
    num_workers=4,
    resources_per_worker={"CPU": 8},
)

设置:1个节点,配备8个CPU和4个GPU。

用例:如果你有一个包含多个GPU的单节点,你需要使用分布式训练来利用所有GPU。

scaling_config = ScalingConfig(
    num_workers=4,
    use_gpu=True,
)

设置:4个节点,每个节点有8个CPU和4个GPU。

用例:如果你有多个节点且每个节点有多个GPU,你需要为每个GPU调度一个工作进程。

scaling_config = ScalingConfig(
    num_workers=16,
    use_gpu=True,
)

请注意,您只需调整工作者的数量。Ray 会自动处理其他所有事情。

警告

指定一个*共享存储位置*(如云存储或NFS)对于单节点集群是*可选的*,但对于多节点集群是**必需的**。对于多节点集群,使用本地路径在检查点期间会:ref:引发错误 <multinode-local-storage-warning>

trainer = XGBoostTrainer(
    ..., run_config=ray.train.RunConfig(storage_path="s3://...")
)

你应该使用多少个远程执行者?#

这取决于你的工作量和集群设置。通常情况下,对于仅使用CPU的训练,每个节点运行多个远程执行器并没有固有的好处。这是因为XGBoost已经可以通过线程利用多个CPU。

然而,在某些情况下,您应该考虑每个节点启动多个参与者:

  • 对于 多GPU训练,每个GPU应该有一个单独的远程执行者。因此,如果你的机器有24个CPU和4个GPU,你希望启动4个远程执行者,每个执行者分配6个CPU和1个GPU。

  • 异构集群 中,你可能需要找到 CPU 数量的 最大公约数 。例如,对于一个分别有 4、8 和 12 个 CPU 的三个节点的集群,你应该将 actor 数量设置为 6,每个 actor 的 CPU 数量设置为 4。

如何使用GPU进行训练?#

Ray Train 为 XGBoost 和 LightGBM 提供了多 GPU 训练功能。核心后端自动利用 NCCL2 进行跨设备通信。您只需为每个 GPU 启动一个 actor 并设置兼容 GPU 的参数。例如,将 XGBoost 的 tree_method 设置为 gpu_hist。更多详情请参阅 XGBoost 文档。

例如,如果你有两台机器,每台有4个GPU,你想要启动8个工作进程,并设置 use_gpu=True。通常,每个执行者分配少于一个(例如,0.5)或多于一个GPU是没有好处的。

你应该在每台机器的演员之间均匀分配CPU,所以如果你的机器除了4个GPU之外还有16个CPU,每个演员应该有4个CPU可以使用。

trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=2,
        # Whether to use GPU acceleration.
        use_gpu=True,
    ),
    params={
        # XGBoost specific params
        "tree_method": "gpu_hist",
        "eval_metric": ["logloss", "error"],
    },
    label_column="target",
    num_boost_round=20,
    datasets={"train": train_dataset, "valid": valid_dataset},
)

如何预处理数据以进行训练?#

特别是对于表格数据,Ray Data 自带开箱即用的 预处理器,这些预处理器实现了常见的特征预处理操作。你可以通过在将数据集传递给 Trainer 之前对其应用这些预处理器来与 Ray Train Trainer 一起使用。例如:

import ray

from ray.data.preprocessors import MinMaxScaler
from ray.train.xgboost import XGBoostTrainer
from ray.train import ScalingConfig

train_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(0, 32, 3)])
valid_dataset = ray.data.from_items([{"x": x, "y": 2 * x} for x in range(1, 32, 3)])

preprocessor = MinMaxScaler(["x"])
preprocessor.fit(train_dataset)
train_dataset = preprocessor.transform(train_dataset)
valid_dataset = preprocessor.transform(valid_dataset)

trainer = XGBoostTrainer(
    label_column="y",
    params={"objective": "reg:squarederror"},
    scaling_config=ScalingConfig(num_workers=2),
    datasets={"train": train_dataset, "valid": valid_dataset},
)
result = trainer.fit()

如何优化XGBoost的内存使用?#

XGBoost 使用一种计算优化的数据结构,即 DMatrix,来存储训练数据。当将数据集转换为 DMatrix 时,XGBoost 会创建中间副本,并最终持有完整数据的完整副本。XGBoost 将数据转换为本地数据格式。在64位系统上,该格式为64位浮点数。根据系统和原始数据集的数据类型,此矩阵占用的内存可能比原始数据集更多。

基于CPU的训练的**峰值内存使用量**至少是数据集大小的**3倍**,假设在64位系统上使用dtype float32,再加上大约**400,000 KiB**用于其他资源,如操作系统需求和存储中间结果。

示例

  • 机器类型:AWS m5.xlarge(4 个 vCPU,16 GiB 内存)

  • 可用RAM: ~15,350,000 KiB

  • 数据集:1,250,000 行,每行有 1024 个特征,数据类型为 float32。总大小:5,000,000 KiB

  • XGBoost DMatrix 大小: ~10,000,000 KiB

此数据集正好适合在此节点上进行训练。

请注意,在32位系统上,DMatrix的大小可能会降低。

GPUs

通常,基于GPU的训练存在相同的内存需求。此外,GPU必须有足够的内存来容纳数据集。

在前面的示例中,GPU必须至少有10,000,000 KiB(约9.6 GiB)的内存。然而,经验数据显示,使用 DeviceQuantileDMatrix 似乎会导致更高的峰值GPU内存使用量,可能用于加载数据时的中间存储(约10%)。

最佳实践

为了减少峰值内存使用,请考虑以下建议:

  • 将数据存储为 float32 或更低。通常情况下,你不需要更高的精度,并且将数据保持在较小的格式有助于减少初始数据加载时的峰值内存使用。

  • 从CSV加载数据时传递 dtype。否则,浮点值默认加载为 np.float64,这将使峰值内存使用量增加33%。