torch_frame.gbdt.tuned_xgboost 的源代码

from __future__ import annotations

import copy
from typing import Any

import numpy as np
import torch
from torch import Tensor

from torch_frame import Metric, TaskType, TensorFrame, stype
from torch_frame.gbdt import GBDT


def neg_to_nan(x: Tensor) -> Tensor:
    r"""Convert -1 category back to NaN that can be handled by GBDT.

    Args:
        x (Tensor): Input categ. feature, where `-1` represents `NaN`.

    Returns:
        x (Tensor): Output categ. feature, where `-1` is replaced with `NaN`
    """
    is_neg = x == -1
    if is_neg.any():
        x = copy.copy(x).to(torch.float32)
        x[is_neg] = torch.nan
    return x


[docs]class XGBoost(GBDT): r"""An XGBoost model implementation with hyper-parameter tuning using Optuna. This implementation extends GBDT and aims to find optimal hyperparameters by optimizing the given objective function. """ def _to_xgboost_input( self, tf: TensorFrame, ) -> tuple[np.ndarray, np.ndarray | None, list[str]]: r"""Convert :class:`TensorFrame` into XGBoost-compatible input format: :obj:`(feat, y, feat_types)`. Args: tf (Tensor Frame): Input :obj:TensorFrame object. Returns: feat (numpy.ndarray): Output :obj:`numpy.ndarray` by concatenating tensors of numerical and categorical features of the input :class:`TensorFrame`. y (numpy.ndarray, optional): Prediction target. feature_types (List[str]): List of feature types: "q" for numerical features and "c" for categorical features. The abbreviation aligns with xgboost tutorial. <https://github.com/dmlc/xgboost/blob/master/doc/ tutorials/categorical.rst#using-native-interface> """ tf = tf.cpu() y = tf.y if y is not None: y: np.ndarray = y.numpy() feats: list[Tensor] = [] types: list[str] = [] if stype.categorical in tf.feat_dict: feats.append(neg_to_nan(tf.feat_dict[stype.categorical])) types.extend(['c'] * len(tf.col_names_dict[stype.categorical])) if stype.numerical in tf.feat_dict: feats.append(tf.feat_dict[stype.numerical]) types.extend(['q'] * len(tf.col_names_dict[stype.numerical])) if stype.embedding in tf.feat_dict: feat = tf.feat_dict[stype.embedding] feat = feat.values feat = feat.view(feat.size(0), -1) feats.append(feat) types.extend(['q'] * feat.size(-1)) # TODO Add support for other stypes. if len(feats) == 0: raise ValueError("The input TensorFrame object is empty.") feat = torch.cat(feats, dim=-1).numpy() return feat, y, types
[docs] def objective( self, trial: Any, # optuna.trial.Trial dtrain: Any, # xgboost.DMatrix dvalid: Any, # xgboost.DMatrix num_boost_round: int, early_stopping_rounds: int, ) -> float: r"""Objective function to be optimized. Args: trial (optuna.trial.Trial): Optuna trial object. dtrain (xgboost.DMatrix): Train data. dvalid (xgboost.DMatrix): Validation data. num_boost_round (int): Number of boosting round. early_stopping_rounds (int): Number of early stopping rounds. Returns: float: Best objective value. Root mean squared error for regression task and accuracy for classification task. """ import optuna import xgboost self.params = { "booster": trial.suggest_categorical("booster", ["gbtree", "dart"]), "lambda": (0.0 if not trial.suggest_categorical('use_lambda', [True, False]) else trial.suggest_float('lambda', 1e-8, 1e2, log=True)), "alpha": (0.0 if not trial.suggest_categorical('use_alpha', [True, False]) else trial.suggest_float('alpha', 1e-8, 1e2, log=True)) } if self.params["booster"] == "gbtree" or self.params[ "booster"] == "dart": self.params["max_depth"] = trial.suggest_int("max_depth", 3, 11) self.params["min_child_weight"] = trial.suggest_float( "min_child_weight", 1e-8, 1e5, log=True) self.params["subsample"] = trial.suggest_float( "subsample", 0.5, 1.0) self.params["colsample_bytree"] = trial.suggest_float( "colsample_bytree", 0.5, 1.0) self.params["colsample_bylevel"] = trial.suggest_float( "colsample_bylevel", 0.5, 1.0) self.params["gamma"] = (0.0 if not trial.suggest_categorical( 'use_gamma', [True, False]) else trial.suggest_float( 'gamma', 1e-8, 1e2, log=True)) self.params["eta"] = trial.suggest_float('learning_rate', 1e-6, 1.0, log=True) if self.params["booster"] == "dart": self.params["sample_type"] = trial.suggest_categorical( "sample_type", ["uniform", "weighted"]) self.params["normalize_type"] = trial.suggest_categorical( "normalize_type", ["tree", "forest"]) self.params["rate_drop"] = trial.suggest_float( "rate_drop", 1e-8, 1.0, log=True) self.params["skip_drop"] = trial.suggest_float( "skip_drop", 1e-8, 1.0, log=True) if self.task_type == TaskType.MULTICLASS_CLASSIFICATION: self.params["objective"] = "multi:softmax" self.params["eval_metric"] = "merror" elif self.task_type == TaskType.REGRESSION: if self.metric == Metric.RMSE: self.params["objective"] = "reg:squarederror" self.params["eval_metric"] = "rmse" elif self.metric == Metric.MAE: self.params["objective"] = "reg:absoluteerror" self.params["eval_metric"] = "mae" elif self.task_type == TaskType.BINARY_CLASSIFICATION: self.params["objective"] = "binary:logistic" if self.metric == Metric.ROCAUC: self.params["eval_metric"] = "auc" elif self.metric == Metric.ACCURACY: self.params["eval_metric"] = "error" else: raise ValueError(f"{self.__class__.__name__} is not supported for " f"{self.task_type}.") pruning_callback = optuna.integration.XGBoostPruningCallback( trial, f"validation-{self.params['eval_metric']}") if self.task_type == TaskType.MULTICLASS_CLASSIFICATION: self.params["num_class"] = self._num_classes or len( np.unique(dtrain.get_label())) boost = xgboost.train(self.params, dtrain, num_boost_round=num_boost_round, early_stopping_rounds=early_stopping_rounds, verbose_eval=False, evals=[ (dvalid, 'validation') ], callbacks=[pruning_callback]) if boost.best_iteration: iteration_range = (0, boost.best_iteration + 1) else: iteration_range = None pred = boost.predict(dvalid, iteration_range) # If xgboost early stops on multiclass classification # task, then the output shape would be (batch_size, num_classes). # We need to take argmax to get the final prediction output. if (boost.best_iteration and self.task_type == TaskType.MULTICLASS_CLASSIFICATION): assert pred.shape[1] == self.params["num_class"] pred = torch.argmax(torch.from_numpy(pred), dim=1) else: pred = torch.from_numpy(pred) score = self.compute_metric(torch.from_numpy(dvalid.get_label()), pred) return score
def _tune( self, tf_train: TensorFrame, tf_val: TensorFrame, num_trials: int, num_boost_round: int = 2000, early_stopping_rounds: int = 50, ): import optuna import xgboost if self.task_type == TaskType.REGRESSION: study = optuna.create_study(direction="minimize") else: study = optuna.create_study(direction="maximize") train_feat, train_y, train_feat_type = self._to_xgboost_input(tf_train) val_feat, val_y, val_feat_type = self._to_xgboost_input(tf_val) assert train_y is not None assert val_y is not None dtrain = xgboost.DMatrix(train_feat, label=train_y, feature_types=train_feat_type, enable_categorical=True) dvalid = xgboost.DMatrix(val_feat, label=val_y, feature_types=val_feat_type, enable_categorical=True) study.optimize( lambda trial: self.objective( trial, dtrain, dvalid, num_boost_round, early_stopping_rounds), num_trials) self.params.update(study.best_params) self.model = xgboost.train(self.params, dtrain, num_boost_round=num_boost_round, early_stopping_rounds=early_stopping_rounds, verbose_eval=False, evals=[(dvalid, 'validation')]) def _predict(self, tf_test: TensorFrame) -> Tensor: import xgboost device = tf_test.device test_feat, test_y, test_feat_type = self._to_xgboost_input(tf_test) dtest = xgboost.DMatrix(test_feat, label=test_y, feature_types=test_feat_type, enable_categorical=True) if self.model.best_iteration is not None: iteration_range = self.model.best_iteration else: iteration_range = None pred = self.model.predict(dtest, iteration_range) # If xgboost early stops on multiclass classification # task, then the output shape would be (batch_size, num_classes). # We need to take argmax to get the final prediction output. if (self.model.best_iteration and self.task_type == TaskType.MULTICLASS_CLASSIFICATION): assert pred.shape[1] == self._num_classes pred = torch.argmax(torch.from_numpy(pred), dim=1) else: pred = torch.from_numpy(pred) return pred.to(device) def _load(self, path: str) -> None: import xgboost self.model = xgboost.Booster(model_file=path)