超参数优化

%load_ext autoreload
%autoreload 2

机器学习预测方法由许多超参数定义,这些超参数控制着它们的行为,从速度和内存需求到预测性能都有影响。长期以来,手动超参数调优占主导地位。这种方法耗时,自动化超参数优化方法的出现被证明比手动调优、网格搜索和随机搜索更高效。

BaseAuto 类通过 ray 提供与超参数优化算法的共享 API 连接,如 OptunaHyperOptDragonfly 等,给您提供网格搜索、贝叶斯优化以及其他先进工具(如 hyperband)的访问权限。

理解超参数的影响仍然是一项宝贵的技能,因为它可以帮助指导信息丰富的超参数空间设计,从而更快地进行自动探索。

图 1. 数据集划分示例(左),验证集(黄色)和测试集(橙色)。超参数优化指导信号来自验证集。
from fastcore.test import test_eq
from nbdev.showdoc import show_doc
import warnings
from copy import deepcopy
from os import cpu_count

import torch
import pytorch_lightning as pl

from ray import air, tune
from ray.tune.integration.pytorch_lightning import TuneReportCallback
from ray.tune.search.basic_variant import BasicVariantGenerator
class MockTrial:
    def suggest_int(*args, **kwargs):
        return 'int'
    def suggest_categorical(self, name, choices):
        return choices
    def suggest_uniform(*args, **kwargs):
        return 'uniform'
    def suggest_loguniform(*args, **kwargs):
        return 'loguniform'
    def suggest_float(*args, **kwargs):
        if 'log' in kwargs:
            return 'quantized_log'
        elif 'step' in kwargs:
            return 'quantized_loguniform'
        return 'float'
class BaseAuto(pl.LightningModule):
    """
    Class for Automatic Hyperparameter Optimization, it builds on top of `ray` to 
    give access to a wide variety of hyperparameter optimization tools ranging 
    from classic grid search, to Bayesian optimization and HyperBand algorithm.

    The validation loss to be optimized is defined by the `config['loss']` dictionary
    value, the config also contains the rest of the hyperparameter search space.

    It is important to note that the success of this hyperparameter optimization
    heavily relies on a strong correlation between the validation and test periods.

    Parameters
    ----------
    cls_model : PyTorch/PyTorchLightning model
        See `neuralforecast.models` [collection here](https://nixtla.github.io/neuralforecast/models.html).
    h : int
        Forecast horizon
    loss : PyTorch module
        Instantiated train loss class from [losses collection](https://nixtla.github.io/neuralforecast/losses.pytorch.html).
    valid_loss : PyTorch module
        Instantiated valid loss class from [losses collection](https://nixtla.github.io/neuralforecast/losses.pytorch.html).
    config : dict or callable
        Dictionary with ray.tune defined search space or function that takes an optuna trial and returns a configuration dict.
    search_alg : ray.tune.search variant or optuna.sampler
        For ray see https://docs.ray.io/en/latest/tune/api_docs/suggestion.html
        For optuna see https://optuna.readthedocs.io/en/stable/reference/samplers/index.html.
    num_samples : int
        Number of hyperparameter optimization steps/samples.
    cpus : int (default=os.cpu_count())
        Number of cpus to use during optimization. Only used with ray tune.
    gpus : int (default=torch.cuda.device_count())
        Number of gpus to use during optimization, default all available. Only used with ray tune.
    refit_with_val : bool
        Refit of best model should preserve val_size.
    verbose : bool
        Track progress.
    alias : str, optional (default=None)
        Custom name of the model.
    backend : str (default='ray')
        Backend to use for searching the hyperparameter space, can be either 'ray' or 'optuna'.
    callbacks : list of callable, optional (default=None)
        List of functions to call during the optimization process.
        ray reference: https://docs.ray.io/en/latest/tune/tutorials/tune-metrics.html
        optuna reference: https://optuna.readthedocs.io/en/stable/tutorial/20_recipes/007_optuna_callback.html
    """
    def __init__(self, 
                 cls_model,
                 h,
                 loss,
                 valid_loss,
                 config, 
                 search_alg=BasicVariantGenerator(random_state=1),
                 num_samples=10,
                 cpus=cpu_count(),
                 gpus=torch.cuda.device_count(),
                 refit_with_val=False,
                 verbose=False,
                 alias=None,
                 backend='ray',
                 callbacks=None,
                ):
        super(BaseAuto, self).__init__()
        with warnings.catch_warnings(record=False):
            warnings.filterwarnings('ignore')
            # 以下行发出警告,提示正在保存loss属性
            # 但我们确实想拯救它。
            self.save_hyperparameters() # 允许从检查点类进行实例化

        if backend == 'ray':
            if not isinstance(config, dict):
                raise ValueError(
                    "You have to provide a dict as `config` when using `backend='ray'`"
                )
            config_base = deepcopy(config)
        elif backend == 'optuna':
            if not callable(config):
                raise ValueError(
                    "You have to provide a function that takes a trial and returns a dict as `config` when using `backend='optuna'`"
                )
            # 从配置函数中提取常量值以进行验证
            config_base = config(MockTrial())
        else:
            raise ValueError(f"Unknown backend {backend}. The supported backends are 'ray' and 'optuna'.")
        if config_base.get('h', None) is not None:
            raise Exception("Please use `h` init argument instead of `config['h']`.")
        if config_base.get('loss', None) is not None:
            raise Exception("Please use `loss` init argument instead of `config['loss']`.")
        if config_base.get('valid_loss', None) is not None:
            raise Exception("Please use `valid_loss` init argument instead of `config['valid_loss']`.")
        # 此属性有助于保护 
        # 模型与数据集交互保护
        if 'early_stop_patience_steps' in config_base.keys():
            self.early_stop_patience_steps = 1
        else:
            self.early_stop_patience_steps = -1

        if callable(config):
            # 在此重置config_base,以便在配置函数中保存参数以进行覆盖
            config_base = {}

        # 在配置中添加损失,并保护有效损失的默认值
        config_base['h'] = h
        config_base['loss'] = loss
        if valid_loss is None:
            valid_loss = loss
        config_base['valid_loss'] = valid_loss

        if isinstance(config, dict):
            self.config = config_base            
        else:
            def config_f(trial):
                return {**config(trial), **config_base}
            self.config = config_f            
        
        self.h = h
        self.cls_model = cls_model
        self.loss = loss
        self.valid_loss = valid_loss

        self.num_samples = num_samples
        self.search_alg = search_alg
        self.cpus = cpus
        self.gpus = gpus
        self.refit_with_val = refit_with_val or self.early_stop_patience_steps > 0
        self.verbose = verbose
        self.alias = alias
        self.backend = backend
        self.callbacks = callbacks

        # 基类属性
        self.SAMPLING_TYPE = cls_model.SAMPLING_TYPE

    def __repr__(self):
        return type(self).__name__ if self.alias is None else self.alias
    
    def _train_tune(self, config_step, cls_model, dataset, val_size, test_size):
        """ BaseAuto._train_tune

        Internal function that instantiates a NF class model, then automatically
        explores the validation loss (ptl/val_loss) on which the hyperparameter 
        exploration is based.

        **Parameters:**<br>
        `config_step`: Dict, initialization parameters of a NF model.<br>
        `cls_model`: NeuralForecast model class, yet to be instantiated.<br>
        `dataset`: NeuralForecast dataset, to fit the model.<br>
        `val_size`: int, validation size for temporal cross-validation.<br>
        `test_size`: int, test size for temporal cross-validation.<br>
        """
        metrics = {"loss": "ptl/val_loss", "train_loss": "train_loss"}
        callbacks = [TuneReportCallback(metrics, on="validation_end")]
        if 'callbacks' in config_step.keys():
            callbacks.extend(config_step['callbacks'])
        config_step = {**config_step, **{'callbacks': callbacks}}

        # 保护数据类型免受调优采样器的影响
        if 'batch_size' in config_step.keys():
            config_step['batch_size'] = int(config_step['batch_size'])
        if 'windows_batch_size' in config_step.keys():
            config_step['windows_batch_size'] = int(config_step['windows_batch_size'])

        # 调谐会话接收验证信号
        # 从专门的PL TuneReportCallback
        _ = self._fit_model(cls_model=cls_model,
                                config=config_step,
                                dataset=dataset,
                                val_size=val_size,
                                test_size=test_size)

    def _tune_model(self, cls_model, dataset, val_size, test_size,
                cpus, gpus, verbose, num_samples, search_alg, config):
        train_fn_with_parameters = tune.with_parameters(
            self._train_tune,
            cls_model=cls_model,
            dataset=dataset,
            val_size=val_size,
            test_size=test_size,
        )

        # 设备
        if gpus > 0:
            device_dict = {'gpu':gpus}
        else:
            device_dict = {'cpu':cpus}

        # 在Windows系统中,避免使用过长的试用目录名称
        import platform
        trial_dirname_creator=(lambda trial: f"{trial.trainable_name}_{trial.trial_id}") if platform.system() == 'Windows' else None

        tuner = tune.Tuner(
            tune.with_resources(train_fn_with_parameters, device_dict),
            run_config=air.RunConfig(callbacks=self.callbacks, verbose=verbose),
            tune_config=tune.TuneConfig(
                metric="loss",
                mode="min",
                num_samples=num_samples, 
                search_alg=search_alg,
                trial_dirname_creator=trial_dirname_creator,
            ),
            param_space=config,
        )
        results = tuner.fit()
        return results

    @staticmethod
    def _ray_config_to_optuna(ray_config):
        def optuna_config(trial):
            out = {}
            for k, v in ray_config.items():
                if hasattr(v, 'sampler'):
                    sampler = v.sampler
                    if isinstance(sampler, tune.search.sample.Integer.default_sampler_cls):
                        v = trial.suggest_int(k, v.lower, v.upper)
                    elif isinstance(sampler, tune.search.sample.Categorical.default_sampler_cls):
                        v = trial.suggest_categorical(k, v.categories)                    
                    elif isinstance(sampler, tune.search.sample.Uniform):
                        v = trial.suggest_uniform(k, v.lower, v.upper)
                    elif isinstance(sampler, tune.search.sample.LogUniform):
                        v = trial.suggest_loguniform(k, v.lower, v.upper)
                    elif isinstance(sampler, tune.search.sample.Quantized):
                        if isinstance(sampler.get_sampler(), tune.search.sample.Float._LogUniform):
                            v = trial.suggest_float(k, v.lower, v.upper, log=True)
                        elif isinstance(sampler.get_sampler(), tune.search.sample.Float._Uniform):
                            v = trial.suggest_float(k, v.lower, v.upper, step=sampler.q)
                    else:
                        raise ValueError(f"Couldn't translate {type(v)} to optuna.")
                out[k] = v
            return out
        return optuna_config

    def _optuna_tune_model(
        self,
        cls_model,
        dataset,
        val_size,
        test_size,
        verbose,
        num_samples,
        search_alg,
        config,
        distributed_config,
    ):
        import optuna

        def objective(trial):
            user_cfg = config(trial)
            cfg = deepcopy(user_cfg)
            model = self._fit_model(
                cls_model=cls_model,
                config=cfg,
                dataset=dataset,
                val_size=val_size,
                test_size=test_size,
                distributed_config=distributed_config,
            )
            trial.set_user_attr('ALL_PARAMS', user_cfg)
            metrics = model.metrics
            trial.set_user_attr('METRICS', {
                "loss": metrics["ptl/val_loss"],
                "train_loss": metrics["train_loss"],
            })
            return trial.user_attrs['METRICS']['loss']

        if isinstance(search_alg, optuna.samplers.BaseSampler):
            sampler = search_alg
        else:
            sampler = None

        study = optuna.create_study(sampler=sampler, direction='minimize')
        study.optimize(
            objective,
            n_trials=num_samples,
            show_progress_bar=verbose,
            callbacks=self.callbacks,
        )
        return study

    def _fit_model(self, cls_model, config,
                   dataset, val_size, test_size, distributed_config=None):
        model = cls_model(**config)
        model = model.fit(
            dataset,
            val_size=val_size, 
            test_size=test_size,
            distributed_config=distributed_config,
        )
        return model

    def fit(self, dataset, val_size=0, test_size=0, random_seed=None, distributed_config=None):
        """ BaseAuto.fit

        Perform the hyperparameter optimization as specified by the BaseAuto configuration 
        dictionary `config`.

        The optimization is performed on the `TimeSeriesDataset` using temporal cross validation with 
        the validation set that sequentially precedes the test set.

        **Parameters:**<br>
        `dataset`: NeuralForecast's `TimeSeriesDataset` see details [here](https://nixtla.github.io/neuralforecast/tsdataset.html)<br>
        `val_size`: int, size of temporal validation set (needs to be bigger than 0).<br>
        `test_size`: int, size of temporal test set (default 0).<br>
        `random_seed`: int=None, random_seed for hyperparameter exploration algorithms, not yet implemented.<br>
        **Returns:**<br>
        `self`: fitted instance of `BaseAuto` with best hyperparameters and results<br>.
        """
        #我们需要val_size > 0才能执行
        #超参数选择。
        search_alg = deepcopy(self.search_alg)
        val_size = val_size if val_size > 0 else self.h
        if self.backend == 'ray':
            if distributed_config is not None:
                raise ValueError('distributed training is not supported for the ray backend.')
            results = self._tune_model(
                cls_model=self.cls_model,
                dataset=dataset,
                val_size=val_size,
                test_size=test_size, 
                cpus=self.cpus,
                gpus=self.gpus,
                verbose=self.verbose,
                num_samples=self.num_samples, 
                search_alg=search_alg, 
                config=self.config,
            )            
            best_config = results.get_best_result().config            
        else:
            results = self._optuna_tune_model(
                cls_model=self.cls_model,
                dataset=dataset,
                val_size=val_size, 
                test_size=test_size, 
                verbose=self.verbose,
                num_samples=self.num_samples, 
                search_alg=search_alg, 
                config=self.config,
                distributed_config=distributed_config,
            )
            best_config = results.best_trial.user_attrs['ALL_PARAMS']
        self.model = self._fit_model(
            cls_model=self.cls_model,
            config=best_config,
            dataset=dataset,
            val_size=val_size * self.refit_with_val,
            test_size=test_size,
            distributed_config=distributed_config,
        )
        self.results = results

         # 增加了与NeuralForecast核心兼容的属性
        self.futr_exog_list = self.model.futr_exog_list
        self.hist_exog_list = self.model.hist_exog_list
        self.stat_exog_list = self.model.stat_exog_list
        return self

    def predict(self, dataset, step_size=1, **data_kwargs):
        """ BaseAuto.predict

        Predictions of the best performing model on validation.

        **Parameters:**<br>
        `dataset`: NeuralForecast's `TimeSeriesDataset` see details [here](https://nixtla.github.io/neuralforecast/tsdataset.html)<br>
        `step_size`: int, steps between sequential predictions, (default 1).<br>
        `**data_kwarg`: additional parameters for the dataset module.<br>
        `random_seed`: int=None, random_seed for hyperparameter exploration algorithms (not implemented).<br>
        **Returns:**<br>
        `y_hat`: numpy predictions of the `NeuralForecast` model.<br>
        """
        return self.model.predict(dataset=dataset, 
                                  step_size=step_size, **data_kwargs)

    def set_test_size(self, test_size):
        self.model.set_test_size(test_size)

    def get_test_size(self):
        return self.model.test_size
    
    def save(self, path):
        """ BaseAuto.save

        将拟合好的模型保存到磁盘。

        **参数:**<br>
        `path`: str, 保存模型的路径。<br>
        """
        self.model.save(path)
show_doc(BaseAuto, title_level=3)
show_doc(BaseAuto.fit, title_level=3)
show_doc(BaseAuto.predict, title_level=3)
import logging
import warnings

import pytorch_lightning as pl
logging.getLogger("pytorch_lightning").setLevel(logging.ERROR)
warnings.filterwarnings("ignore")
import optuna
import pandas as pd
from neuralforecast.models.mlp import MLP
from neuralforecast.utils import AirPassengersDF as Y_df
from neuralforecast.tsdataset import TimeSeriesDataset
from neuralforecast.losses.numpy import mae
from neuralforecast.losses.pytorch import MAE, MSE
Y_train_df = Y_df[Y_df.ds<='1959-12-31'] # 132次列车
Y_test_df = Y_df[Y_df.ds>'1959-12-31']   # 12项测试

dataset, *_ = TimeSeriesDataset.from_df(Y_train_df)
class RayLogLossesCallback(tune.Callback):
    def on_trial_complete(self, iteration, trials, trial, **info):
        result = trial.last_result
        print(40 * '-' + 'Trial finished' + 40 * '-')
        print(f'Train loss: {result["train_loss"]:.2f}. Valid loss: {result["loss"]:.2f}')
        print(80 * '-')
config = {
    "hidden_size": tune.choice([512]),
    "num_layers": tune.choice([3, 4]),
    "input_size": 12,
    "max_steps": 10,
    "val_check_steps": 5
}
auto = BaseAuto(h=12, loss=MAE(), valid_loss=MSE(), cls_model=MLP, config=config, num_samples=2, cpus=1, gpus=0, callbacks=[RayLogLossesCallback()])
auto.fit(dataset=dataset)
y_hat = auto.predict(dataset=dataset)
assert mae(Y_test_df['y'].values, y_hat[:, 0]) < 200
def config_f(trial):
    return {
        "hidden_size": trial.suggest_categorical('hidden_size', [512]),
        "num_layers": trial.suggest_categorical('num_layers', [3, 4]),
        "input_size": 12,
        "max_steps": 10,
        "val_check_steps": 5
    }

class OptunaLogLossesCallback:
    def __call__(self, study, trial):
        metrics = trial.user_attrs['METRICS']
        print(40 * '-' + 'Trial finished' + 40 * '-')
        print(f'Train loss: {metrics["train_loss"]:.2f}. Valid loss: {metrics["loss"]:.2f}')
        print(80 * '-')
auto2 = BaseAuto(h=12, loss=MAE(), valid_loss=MSE(), cls_model=MLP, config=config_f, search_alg=optuna.samplers.RandomSampler(), num_samples=2, backend='optuna', callbacks=[OptunaLogLossesCallback()])
auto2.fit(dataset=dataset)
assert isinstance(auto2.results, optuna.Study)
y_hat2 = auto2.predict(dataset=dataset)
assert mae(Y_test_df['y'].values, y_hat2[:, 0]) < 200
Y_test_df['AutoMLP'] = y_hat

pd.concat([Y_train_df, Y_test_df]).drop('unique_id', axis=1).set_index('ds').plot()
# 单元测试确保损失正确实例化
import pandas as pd
from neuralforecast.models.mlp import MLP
from neuralforecast.utils import AirPassengersDF as Y_df
from neuralforecast.tsdataset import TimeSeriesDataset
from neuralforecast.losses.pytorch import MAE, MSE
# 单元测试确保损失正确实例化
Y_train_df = Y_df[Y_df.ds<='1959-12-31'] # 132次列车
Y_test_df = Y_df[Y_df.ds>'1959-12-31']   # 12项测试

dataset, *_ = TimeSeriesDataset.from_df(Y_train_df)
config = {
    "hidden_size": tune.choice([512]),
    "num_layers": tune.choice([3, 4]),
    "input_size": 12,
    "max_steps": 1,
    "val_check_steps": 1
}

# 测试实例化
auto = BaseAuto(h=12, loss=MAE(), valid_loss=MSE(), 
                cls_model=MLP, config=config, num_samples=2, cpus=1, gpus=0)
test_eq(str(type(auto.loss)), "<class 'neuralforecast.losses.pytorch.MAE'>")
test_eq(str(type(auto.valid_loss)), "<class 'neuralforecast.losses.pytorch.MSE'>")

# 测试验证默认值
auto = BaseAuto(h=12, loss=MSE(), valid_loss=None,
                cls_model=MLP, config=config, num_samples=2, cpus=1, gpus=0)
test_eq(str(type(auto.loss)), "<class 'neuralforecast.losses.pytorch.MSE'>")
test_eq(str(type(auto.valid_loss)), "<class 'neuralforecast.losses.pytorch.MSE'>")

参考文献

Give us a ⭐ on Github