使用分布式XGBoost训练模型#

本指南将向您展示如何使用分布式XGBoost训练模型并执行批量推理。您将使用Ray Train和Ray Data。

让我们开始安装我们的依赖项:

!pip install -qU "ray[data,train]"
[notice] A new release of pip available: 22.3.1 -> 23.1.2
[notice] To update, run: pip install --upgrade pip

然后我们需要一些导入:

from typing import Tuple

import ray
from ray.data import Dataset, Preprocessor
from ray.data.preprocessors import StandardScaler
from ray.train.xgboost import XGBoostTrainer
from ray.train import Result, ScalingConfig
import xgboost

接下来我们定义一个函数来加载我们的训练、验证和测试数据集。

def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
    dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
    train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
    test_dataset = valid_dataset.drop_columns(["target"])
    return train_dataset, valid_dataset, test_dataset

以下函数将创建一个XGBoost训练器,训练它,并返回结果。

def train_xgboost(num_workers: int, use_gpu: bool = False) -> Result:
    train_dataset, valid_dataset, _ = prepare_data()

    # 缩放一些随机列
    columns_to_scale = ["mean radius", "mean texture"]
    preprocessor = StandardScaler(columns=columns_to_scale)
    train_dataset = preprocessor.fit_transform(train_dataset)
    valid_dataset = preprocessor.transform(valid_dataset)

    # XGBoost 特定参数
    params = {
        "tree_method": "approx",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    }

    trainer = XGBoostTrainer(
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
        label_column="target",
        params=params,
        datasets={"train": train_dataset, "valid": valid_dataset},
        num_boost_round=100,
        metadata = {"preprocessor_pkl": preprocessor.serialize()}
    )
    result = trainer.fit()
    print(result.metrics)

    return result

一旦我们获得结果,就可以对得到的模型进行批量推理。让我们为此定义一个工具函数。

import pandas as pd
from ray.train import Checkpoint


class Predict:

    def __init__(self, checkpoint: Checkpoint):
        self.model = XGBoostTrainer.get_model(checkpoint)
        self.preprocessor = Preprocessor.deserialize(checkpoint.get_metadata()["preprocessor_pkl"])

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        preprocessed_batch = self.preprocessor.transform_batch(batch)
        dmatrix = xgboost.DMatrix(preprocessed_batch)
        return {"predictions": self.model.predict(dmatrix)}


def predict_xgboost(result: Result):
    _, _, test_dataset = prepare_data()

    scores = test_dataset.map_batches(
        Predict, 
        fn_constructor_args=[result.checkpoint], 
        concurrency=1, 
        batch_format="pandas"
    )
    
    predicted_labels = scores.map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
    print(f"PREDICTED LABELS")
    predicted_labels.show()

现在我们可以开始训练了:

result = train_xgboost(num_workers=2, use_gpu=False)

Tune Status

Current time:2023-07-06 18:33:25
Running for: 00:00:06.19
Memory: 14.9/64.0 GiB

System Info

Using FIFO scheduling algorithm.
Logical resource usage: 2.0/10 CPUs, 0/0 GPUs

Trial Status

Trial name status loc iter total time (s) train-logloss train-error valid-logloss
XGBoostTrainer_40fed_00000TERMINATED127.0.0.1:40725 101 4.90132 0.00587595 0 0.06215
(XGBoostTrainer pid=40725) The `preprocessor` arg to Trainer is deprecated. Apply preprocessor transformations ahead of time by calling `preprocessor.transform(ds)`. Support for the preprocessor arg will be dropped in a future release.
(XGBoostTrainer pid=40725) Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
(XGBoostTrainer pid=40725) Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Aggregate]
(XGBoostTrainer pid=40725) Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
(XGBoostTrainer pid=40725) Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`






(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory:   0%|          | 0/14 [00:00<?, ?it/s]

                                                              

                                                                      


(XGBoostTrainer pid=40725) Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(StandardScaler._transform_pandas)]



(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory:   0%|          | 0/14 [00:01<?, ?it/s]
                                                              

                                                                       


(XGBoostTrainer pid=40725) Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)



(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory:   0%|          | 0/14 [00:01<?, ?it/s]
                                                              

                                                                       


(XGBoostTrainer pid=40725) Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`



(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory:   0%|          | 0/14 [00:01<?, ?it/s]
                                                              

                                                                       


(XGBoostTrainer pid=40725) Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(StandardScaler._transform_pandas)]



(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory:   0%|          | 0/14 [00:01<?, ?it/s]
                                                              

                                                                       


(XGBoostTrainer pid=40725) Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)



(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory:   0%|          | 0/14 [00:01<?, ?it/s]
                                                              

                                                                       


(XGBoostTrainer pid=40725) Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`



(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory:   0%|          | 0/14 [00:01<?, ?it/s]


                                                                                                                                     




(_RemoteRayXGBoostActor pid=40741) [18:33:23] task [xgboost.ray]:5022217360 got new rank 1                                   
2023-07-06 18:33:25,975	INFO tune.py:1148 -- Total run time: 6.20 seconds (6.19 seconds for the tuning loop).
{'train-logloss': 0.00587594546605992, 'train-error': 0.0, 'valid-logloss': 0.06215000962556052, 'valid-error': 0.02941176470588235, 'time_this_iter_s': 0.0101318359375, 'should_checkpoint': True, 'done': True, 'training_iteration': 101, 'trial_id': '40fed_00000', 'date': '2023-07-06_18-33-25', 'timestamp': 1688693605, 'time_total_s': 4.901317834854126, 'pid': 40725, 'hostname': 'Balajis-MacBook-Pro-16', 'node_ip': '127.0.0.1', 'config': {}, 'time_since_restore': 4.901317834854126, 'iterations_since_restore': 101, 'experiment_tag': '0'}

并对获得的模型进行推理:

predict_xgboost(result)
2023-07-06 18:33:27,259	INFO read_api.py:374 -- To satisfy the requested parallelism of 20, each read task output will be split into 20 smaller blocks.
2023-07-06 18:33:28,112	INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches(<lambda>)->MapBatches(Predict)] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2023-07-06 18:33:28,112	INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-06 18:33:28,114	INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
2023-07-06 18:33:28,150	INFO actor_pool_map_operator.py:117 -- MapBatches(<lambda>)->MapBatches(Predict): Waiting for 1 pool actors to start...
PREDICTED LABELS
                                                                                                                        
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}