核心


%load_ext autoreload
%autoreload 2

NeuralForecast 包含两个主要组成部分,PyTorch 实现的深度学习预测模型,以及并行化和分布式计算工具。第一个组成部分包括低级 PyTorch 模型估计类,如 models.NBEATSmodels.RNN。第二个组成部分是一个高级的 core.NeuralForecast 包装类,它处理存储在 pandas DataFrame 中的一组时间序列数据。


import shutil
import sys

import git
import s3fs
from fastcore.test import test_eq, test_fail
from nbdev.showdoc import show_doc
from neuralforecast.utils import generate_series
from pathlib import Path

import os
import pickle
import warnings
from copy import deepcopy
from itertools import chain
from typing import Any, Dict, List, Optional, Sequence, Union

import fsspec
import numpy as np
import pandas as pd
import pytorch_lightning as pl
import torch
import utilsforecast.processing as ufp
from coreforecast.grouped_array import GroupedArray
from coreforecast.scalers import (
    LocalBoxCoxScaler,
    LocalMinMaxScaler,
    LocalRobustScaler,
    LocalStandardScaler,
)
from utilsforecast.compat import DataFrame, Series, pl_DataFrame, pl_Series
from utilsforecast.validation import validate_freq

from neuralforecast.common._base_model import DistributedConfig
from neuralforecast.compat import SparkDataFrame
from neuralforecast.tsdataset import _FilesDataset, TimeSeriesDataset, LocalFilesTimeSeriesDataset
from neuralforecast.models import (
    GRU, LSTM, RNN, TCN, DeepAR, DilatedRNN,
    MLP, NHITS, NBEATS, NBEATSx, DLinear, NLinear,
    TFT, VanillaTransformer,
    Informer, Autoformer, FEDformer,
    StemGNN, PatchTST, TimesNet, TimeLLM, TSMixer, TSMixerx,
    MLPMultivariate, iTransformer,
    BiTCN, TiDE, DeepNPTS, SOFTS,
    TimeMixer, KAN
)
from neuralforecast.common._base_auto import BaseAuto, MockTrial

# 这会禁用关于数据加载器中工作线程数量的警告。
# which the user can't control
warnings.filterwarnings("ignore", category=pl.utilities.warnings.PossibleUserWarning)

def _insample_times(
    times: np.ndarray,
    uids: Series,
    indptr: np.ndarray,
    h: int,
    freq: Union[int, str, pd.offsets.BaseOffset],
    step_size: int = 1,
    id_col: str = 'unique_id',
    time_col: str = 'ds',
) -> DataFrame:
    sizes = np.diff(indptr)
    if (sizes < h).any():
        raise ValueError('`sizes` should be greater or equal to `h`.')
    # TODO: we can just truncate here instead of raising an error
    ns, resids = np.divmod(sizes - h, step_size)
    if (resids != 0).any():
        raise ValueError('`sizes - h` should be multiples of `step_size`')
    windows_per_serie = ns + 1
    # determine the offsets for the cutoffs, e.g. 2 means the 3rd training date is a cutoff
    cutoffs_offsets = step_size * np.hstack([np.arange(w) for w in windows_per_serie])
    # start index of each serie, e.g. [0, 17] means the the second serie starts on the 18th entry
    # we repeat each of these as many times as we have windows, e.g. windows_per_serie = [2, 3]
    # would yield [0, 0, 17, 17, 17]
    start_idxs = np.repeat(indptr[:-1], windows_per_serie)
    # determine the actual indices of the cutoffs, we repeat the cutoff for the complete horizon
    # e.g. if we have two series and h=2 this could be [0, 0, 1, 1, 17, 17, 18, 18]
    # which would have the first two training dates from each serie as the cutoffs
    cutoff_idxs = np.repeat(start_idxs + cutoffs_offsets, h)
    cutoffs = times[cutoff_idxs]
    total_windows = windows_per_serie.sum()
    # determine the offsets for the actual dates. this is going to be [0, ..., h] repeated
    ds_offsets = np.tile(np.arange(h), total_windows)
    # determine the actual indices of the times
    # e.g. if we have two series and h=2 this could be [0, 1, 1, 2, 17, 18, 18, 19]
    ds_idxs = cutoff_idxs + ds_offsets
    ds = times[ds_idxs]
    if isinstance(uids, pl_Series):
        df_constructor = pl_DataFrame
    else:
        df_constructor = pd.DataFrame
    out = df_constructor(
        {
            id_col: ufp.repeat(uids, h * windows_per_serie),
            time_col: ds,
            'cutoff': cutoffs,
        }
    )
    # the first cutoff is before the first train date
    actual_cutoffs = ufp.offset_times(out['cutoff'], freq, -1)
    out = ufp.assign_columns(out, 'cutoff', actual_cutoffs)
    return out

uids = pd.Series(['id_0', 'id_1'])
indptr = np.array([0, 4, 10], dtype=np.int32)
h = 2
for step_size, freq, days in zip([1, 2], ['D', 'W-THU'], [1, 14]):
    times = np.hstack([
        pd.date_range('2000-01-01', freq=freq, periods=4),
        pd.date_range('2000-10-10', freq=freq, periods=10),
    ])    
    times_df = _insample_times(times, uids, indptr, h, freq, step_size=step_size)
    pd.testing.assert_frame_equal(
        times_df.groupby('unique_id')['ds'].min().reset_index(),
        pd.DataFrame({
            'unique_id': uids,
            'ds': times[indptr[:-1]],
        })
    )
    pd.testing.assert_frame_equal(
        times_df.groupby('unique_id')['ds'].max().reset_index(),
        pd.DataFrame({
            'unique_id': uids,
            'ds': times[indptr[1:] - 1],
        })
    )
    cutoff_deltas = (
        times_df
        .drop_duplicates(['unique_id', 'cutoff'])
        .groupby('unique_id')
        ['cutoff']
        .diff()
        .dropna()
    )
    assert cutoff_deltas.nunique() == 1
    assert cutoff_deltas.unique()[0] == pd.Timedelta(f'{days}D')

MODEL_FILENAME_DICT = {
    'autoformer': Autoformer, 'autoautoformer': Autoformer,
    'deepar': DeepAR, 'autodeepar': DeepAR,
    'dlinear': DLinear, 'autodlinear': DLinear,
    'nlinear': NLinear, 'autonlinear': NLinear,    
    'dilatedrnn': DilatedRNN , 'autodilatedrnn': DilatedRNN,
    'fedformer': FEDformer, 'autofedformer': FEDformer,
    'gru': GRU, 'autogru': GRU,
    'informer': Informer, 'autoinformer': Informer,
    'lstm': LSTM, 'autolstm': LSTM,
    'mlp': MLP, 'automlp': MLP,
    'nbeats': NBEATS, 'autonbeats': NBEATS,
    'nbeatsx': NBEATSx, 'autonbeatsx': NBEATSx,
    'nhits': NHITS, 'autonhits': NHITS,
    'patchtst': PatchTST, 'autopatchtst': PatchTST,
    'rnn': RNN, 'autornn': RNN,
    'stemgnn': StemGNN, 'autostemgnn': StemGNN,
    'tcn': TCN, 'autotcn': TCN, 
    'tft': TFT, 'autotft': TFT,
    'timesnet': TimesNet, 'autotimesnet': TimesNet,
    'vanillatransformer': VanillaTransformer, 'autovanillatransformer': VanillaTransformer,
    'timellm': TimeLLM,
    'tsmixer': TSMixer, 'autotsmixer': TSMixer,
    'tsmixerx': TSMixerx, 'autotsmixerx': TSMixerx,
    'mlpmultivariate': MLPMultivariate, 'automlpmultivariate': MLPMultivariate,
    'itransformer': iTransformer, 'autoitransformer': iTransformer,
    'bitcn': BiTCN, 'autobitcn': BiTCN,
    'tide': TiDE, 'autotide': TiDE,
    'deepnpts': DeepNPTS, 'autodeepnpts': DeepNPTS,
    'softs': SOFTS, 'autosofts': SOFTS,
    'timemixer': TimeMixer, 'autotimemixer': TimeMixer,
    'kan': KAN, 'autokan': KAN
}

_type2scaler = {
    'standard': LocalStandardScaler,
    'robust': lambda: LocalRobustScaler(scale='mad'),
    'robust-iqr': lambda: LocalRobustScaler(scale='iqr'),
    'minmax': LocalMinMaxScaler,
    'boxcox': lambda: LocalBoxCoxScaler(method='loglik', lower=0.0)
}

def _id_as_idx() -> bool:
    return not bool(os.getenv("NIXTLA_ID_AS_COL", ""))

def _warn_id_as_idx():
    warnings.warn(
        "In a future version the predictions will have the id as a column. "
        "You can set the `NIXTLA_ID_AS_COL` environment variable "
        "to adopt the new behavior and to suppress this warning.",
        category=FutureWarning,
    )

class NeuralForecast:
    
    def __init__(self, 
                 models: List[Any],
                 freq: Union[str, int],
                 local_scaler_type: Optional[str] = None):
        """
        The `core.StatsForecast` class allows you to efficiently fit multiple `NeuralForecast` models 
        for large sets of time series. It operates with pandas DataFrame `df` that identifies series 
        and datestamps with the `unique_id` and `ds` columns. The `y` column denotes the target 
        time series variable.

        Parameters
        ----------
        models : List[typing.Any]
            Instantiated `neuralforecast.models` 
            see [collection here](https://nixtla.github.io/neuralforecast/models.html).
        freq : str or int
            Frequency of the data. Must be a valid pandas or polars offset alias, or an integer.
        local_scaler_type : str, optional (default=None)
            Scaler to apply per-serie to all features before fitting, which is inverted after predicting.
            Can be 'standard', 'robust', 'robust-iqr', 'minmax' or 'boxcox'
        
        Returns
        -------
        self : NeuralForecast
            Returns instantiated `NeuralForecast` class.
        """
        assert all(model.h == models[0].h for model in models), 'All models should have the same horizon'

        self.h = models[0].h
        self.models_init = models
        self.freq = freq
        if local_scaler_type is not None and local_scaler_type not in _type2scaler:
            raise ValueError(f'scaler_type must be one of {_type2scaler.keys()}')
        self.local_scaler_type = local_scaler_type
        self.scalers_: Dict

        # 标志和属性
        self._fitted = False
        self._reset_models()

    def _scalers_fit_transform(self, dataset: TimeSeriesDataset) -> None:
        self.scalers_ = {}        
        if self.local_scaler_type is None:
            return None
        for i, col in enumerate(dataset.temporal_cols):
            if col == 'available_mask':
                continue
            ga = GroupedArray(dataset.temporal[:, i].numpy(), dataset.indptr)                
            self.scalers_[col] = _type2scaler[self.local_scaler_type]().fit(ga)
            dataset.temporal[:, i] = torch.from_numpy(self.scalers_[col].transform(ga))

    def _scalers_transform(self, dataset: TimeSeriesDataset) -> None:
        if not self.scalers_:
            return None
        for i, col in enumerate(dataset.temporal_cols):
            scaler = self.scalers_.get(col, None)
            if scaler is None:
                continue
            ga = GroupedArray(dataset.temporal[:, i].numpy(), dataset.indptr)
            dataset.temporal[:, i] = torch.from_numpy(scaler.transform(ga))

    def _scalers_target_inverse_transform(self, data: np.ndarray, indptr: np.ndarray) -> np.ndarray:
        if not self.scalers_:
            return data
        for i in range(data.shape[1]):
            ga = GroupedArray(data[:, i], indptr)
            data[:, i] = self.scalers_[self.target_col].inverse_transform(ga)
        return data

    def _prepare_fit(self, df, static_df, sort_df, predict_only, id_col, time_col, target_col):
        #待办事项:uids、last_dates 和 ds 应作为数据集类的属性。详见 GitHub 问题。
        self.id_col = id_col
        self.time_col = time_col
        self.target_col = target_col
        self._check_nan(df, static_df, id_col, time_col, target_col)
        
        dataset, uids, last_dates, ds = TimeSeriesDataset.from_df(
            df=df,
            static_df=static_df,
            sort_df=sort_df,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
        if predict_only:
            self._scalers_transform(dataset)
        else:
            self._scalers_fit_transform(dataset)
        return dataset, uids, last_dates, ds


    def _check_nan(self, df, static_df, id_col, time_col, target_col):
        cols_with_nans = []

        temporal_cols = [target_col] + [c for c in df.columns if c not in (id_col, time_col, target_col)]
        if "available_mask" in temporal_cols:
            available_mask = df["available_mask"].to_numpy().astype(bool)
        else:
            available_mask = np.full(df.shape[0], True)

        df_to_check = ufp.filter_with_mask(df, available_mask)
        for col in temporal_cols:
            if ufp.is_nan_or_none(df_to_check[col]).any():
                cols_with_nans.append(col)

        if static_df is not None:
            for col in [x for x in static_df.columns if x != id_col]:
                if ufp.is_nan_or_none(static_df[col]).any():
                    cols_with_nans.append(col)

        if cols_with_nans:
            raise ValueError(f"Found missing values in {cols_with_nans}.")

    def _prepare_fit_distributed(
        self,
        df: SparkDataFrame,
        static_df: Optional[SparkDataFrame],
        sort_df: bool,
        id_col: str,
        time_col: str,
        target_col: str,
        distributed_config: Optional[DistributedConfig],
    ):
        if distributed_config is None:
            raise ValueError(
                "Must set `distributed_config` when using a spark dataframe"
            )
        if self.local_scaler_type is not None:
            raise ValueError(
                "Historic scaling isn't supported in distributed. "
                "Please open an issue if this would be valuable to you."
            )
        temporal_cols = [c for c in df.columns if c not in (id_col, time_col)]
        if static_df is not None:
            static_cols = [c for c in static_df.columns if c != id_col]
            df = df.join(static_df, on=[id_col], how="left")
        else:
            static_cols = None
        self.id_col = id_col
        self.time_col = time_col
        self.target_col = target_col
        self.scalers_ = {}
        self.sort_df = sort_df
        num_partitions = distributed_config.num_nodes * distributed_config.devices
        df = df.repartitionByRange(num_partitions, id_col)
        df.write.parquet(path=distributed_config.partitions_path, mode="overwrite")
        fs, _, _ = fsspec.get_fs_token_paths(distributed_config.partitions_path)
        protocol = fs.protocol 
        if isinstance(protocol, tuple):
            protocol = protocol[0]
        files = [
            f'{protocol}://{file}'
            for file in fs.ls(distributed_config.partitions_path)
            if file.endswith("parquet")
        ]
        return _FilesDataset(
            files=files,
            temporal_cols=temporal_cols,
            static_cols=static_cols,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
            min_size=df.groupBy(id_col).count().agg({"count": "min"}).first()[0],
        )
    
    def _prepare_fit_for_local_files(
            self, 
            files_list: Sequence[str], 
            static_df: Optional[DataFrame], 
            sort_df: bool, 
            id_col: str, 
            time_col: str, 
            target_col: str
        ):
        if self.local_scaler_type is not None:
            raise ValueError(
                "Historic scaling isn't supported when the dataset is split between files. "
                "Please open an issue if this would be valuable to you."
            )
        
        self.id_col = id_col
        self.time_col = time_col
        self.target_col = target_col   
        self.scalers_ = {}   
        self.sort_df = sort_df   

        exogs = self._get_needed_exog() 
        return LocalFilesTimeSeriesDataset.from_data_directories(
            directories=files_list,
            static_df=static_df,
            sort_df=sort_df,
            exogs=exogs,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )


    def fit(self,
        df: Optional[Union[DataFrame, SparkDataFrame, Sequence[str]]] = None,
        static_df: Optional[Union[DataFrame, SparkDataFrame]] = None,
        val_size: Optional[int] = 0,
        sort_df: bool = True,
        use_init_models: bool = False,
        verbose: bool = False,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',
        distributed_config: Optional[DistributedConfig] = None,
    ) -> None:
        """Fit the core.NeuralForecast.

        Fit `models` to a large set of time series from DataFrame `df`.
        and store fitted models for later inspection.

        Parameters
        ----------
        df : pandas, polars or spark DataFrame, or a list of parquet files containing the series, optional (default=None)
            DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
            If None, a previously stored dataset is required.
        static_df : pandas, polars or spark DataFrame, optional (default=None)
            DataFrame with columns [`unique_id`] and static exogenous.
        val_size : int, optional (default=0)
            Size of validation set.
        sort_df : bool, optional (default=False)
            Sort `df` before fitting.
        use_init_models : bool, optional (default=False)
            Use initial model passed when NeuralForecast object was instantiated.
        verbose : bool (default=False)
            Print processing steps.
        id_col : str (default='unique_id')
            Column that identifies each serie.
        time_col : str (default='ds')
            Column that identifies each timestep, its values can be timestamps or integers.
        target_col : str (default='y')
            Column that contains the target.
        distributed_config : neuralforecast.DistributedConfig
            Configuration to use for DDP training. Currently only spark is supported.

        Returns
        -------
        self : NeuralForecast
            Returns `NeuralForecast` class with fitted `models`.
        """
        if (df is None) and not (hasattr(self, 'dataset')):
            raise Exception('You must pass a DataFrame or have one stored.')

        # 模型与数据集交互保护
        if (
            any(model.early_stop_patience_steps > 0 for model in self.models)
            and val_size == 0
        ):
                raise Exception('Set val_size>0 if early stopping is enabled.')

        # 处理并保存新数据集(在自身中)
        if isinstance(df, (pd.DataFrame, pl_DataFrame)):
            validate_freq(df[time_col], self.freq)
            self.dataset, self.uids, self.last_dates, self.ds = self._prepare_fit(
                df=df,
                static_df=static_df,
                sort_df=sort_df,
                predict_only=False,
                id_col=id_col,
                time_col=time_col,
                target_col=target_col,
            )
            self.sort_df = sort_df
        elif isinstance(df, SparkDataFrame):
            if static_df is not None and not isinstance(static_df, SparkDataFrame):
                raise ValueError(
                    "`static_df` must be a spark dataframe when `df` is a spark dataframe."
                )
            self.dataset = self._prepare_fit_distributed(
                df=df,
                static_df=static_df,
                sort_df=sort_df,
                id_col=id_col,
                time_col=time_col,
                target_col=target_col,
                distributed_config=distributed_config,
            )
        elif isinstance(df, Sequence):
            if not all(isinstance(val, str) for val in df):
                raise ValueError("All entries in the list of files must be of type string")        
            self.dataset = self._prepare_fit_for_local_files(
                files_list=df,
                static_df=static_df,
                sort_df=sort_df,
                id_col=id_col,
                time_col=time_col,
                target_col=target_col,
            )
            self.uids = self.dataset.indices
            self.last_dates = self.dataset.last_times
        elif df is None:
            if verbose:
                print("Using stored dataset.")
        else:
            raise ValueError(
                f"`df` must be a pandas, polars or spark DataFrame, or a list of parquet files containing the series, or `None`, got: {type(df)}"
            )

        if val_size is not None:
            if self.dataset.min_size < val_size:
                warnings.warn('Validation set size is larger than the shorter time-series.')

        # 如果使用初始模型,则恢复初始模型
        if use_init_models:
            self._reset_models()

        for i, model in enumerate(self.models):
            self.models[i] = model.fit(
                self.dataset, val_size=val_size, distributed_config=distributed_config
            )

        self._fitted = True

    def make_future_dataframe(self, df: Optional[DataFrame] = None) -> DataFrame:
        """Create a dataframe with all ids and future times in the forecasting horizon.

        Parameters
        ----------
        df : pandas or polars DataFrame, optional (default=None)
            DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
            Only required if this is different than the one used in the fit step.
        """
        if not self._fitted:
            raise Exception('You must fit the model first.')
        if df is not None:
            df = ufp.sort(df, by=[self.id_col, self.time_col])
            last_times_by_id = ufp.group_by_agg(
                df,
                by=self.id_col,
                aggs={self.time_col: 'max'},
                maintain_order=True,
            )
            uids = last_times_by_id[self.id_col]
            last_times = last_times_by_id[self.time_col]
        else:
            uids = self.uids
            last_times = self.last_dates
        return ufp.make_future_dataframe(
            uids=uids,
            last_times=last_times,
            freq=self.freq,
            h=self.h,
            id_col=self.id_col,
            time_col=self.time_col,
        )

    def get_missing_future(
        self, futr_df: DataFrame, df: Optional[DataFrame] = None
    ) -> DataFrame:
        """Get the missing ids and times combinations in `futr_df`.
        
        Parameters
        ----------
        futr_df : pandas or polars DataFrame
            DataFrame with [`unique_id`, `ds`] columns and `df`'s future exogenous.
        df : pandas or polars DataFrame, optional (default=None)
            DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
            Only required if this is different than the one used in the fit step.
        """
        expected = self.make_future_dataframe(df)
        ids = [self.id_col, self.time_col]
        return ufp.anti_join(expected, futr_df[ids], on=ids)

    def _get_needed_futr_exog(self):
        futr_exogs = []
        for m in self.models:
            if isinstance(m, BaseAuto):
                if isinstance(m.config, dict):  # ray
                    exogs = m.config.get('futr_exog_list', [])
                    if hasattr(exogs, 'categories'):  # features are being tuned, get possible values
                        exogs = exogs.categories
                else:   # optuna
                    exogs = m.config(MockTrial()).get('futr_exog_list', [])
            else:  # regular model, extract them directly
                exogs = getattr(m, 'futr_exog_list', [])
            
            for exog in exogs:
                if isinstance(exog, str):
                    futr_exogs.append(exog)
                else:
                    futr_exogs.extend(exog)

        return set(futr_exogs)
    
    def _get_needed_exog(self):
        futr_exog = self._get_needed_futr_exog()

        hist_exog = []
        for m in self.models:
            if isinstance(m, BaseAuto):
                if isinstance(m.config, dict):  # ray
                    exogs = m.config.get('hist_exog_list', [])
                    if hasattr(exogs, 'categories'):  # features are being tuned, get possible values
                        exogs = exogs.categories
                else:   # optuna
                    exogs = m.config(MockTrial()).get('hist_exog_list', [])
            else:  # regular model, extract them directly
                exogs = getattr(m, 'hist_exog_list', [])
            
            for exog in exogs:
                if isinstance(exog, str):
                    hist_exog.append(exog)
                else:
                    hist_exog.extend(exog)

        return futr_exog | set(hist_exog)
    
    def _get_model_names(self) -> List[str]:
        names: List[str] = []
        count_names = {'model': 0}
        for model in self.models:
            model_name = repr(model)
            count_names[model_name] = count_names.get(model_name, -1) + 1
            if count_names[model_name] > 0:
                model_name += str(count_names[model_name])
            names.extend(model_name + n for n in model.loss.output_names)
        return names

    def _predict_distributed(
        self,
        df: Optional[SparkDataFrame],
        static_df: Optional[SparkDataFrame],
        futr_df: Optional[SparkDataFrame],
        engine,
    ):
        import fugue.api as fa

        def _predict(
            df: pd.DataFrame,
            static_cols,
            futr_exog_cols,
            models,
            freq,
            id_col,
            time_col,
            target_col,
        ) -> pd.DataFrame:
            from neuralforecast import NeuralForecast

            nf = NeuralForecast(models=models, freq=freq)
            nf.id_col = id_col
            nf.time_col = time_col
            nf.target_col = target_col
            nf.scalers_ = {}
            nf._fitted = True
            if futr_exog_cols:
                # if we have futr_exog we'll have extra rows with the future values
                futr_rows = df[target_col].isnull()
                futr_df = df.loc[futr_rows, [self.id_col, self.time_col] + futr_exog_cols].copy()
                df = df[~futr_rows].copy()
            else:
                futr_df = None
            if static_cols:
                static_df = df[[self.id_col] + static_cols].groupby(self.id_col, observed=True).head(1)
                df = df.drop(columns=static_cols)
            else:
                static_df = None
            preds = nf.predict(df=df, static_df=static_df, futr_df=futr_df)
            if preds.index.name == id_col:
                preds = preds.reset_index()
            return preds

        # df
        if isinstance(df, SparkDataFrame):
            repartition = True
        else:
            if engine is None:
                raise ValueError("engine is required for distributed inference")
            df = engine.read.parquet(*self.dataset.files)
            # we save the datataset with partitioning
            repartition = False

        # static
        static_cols = set(chain.from_iterable(getattr(m, 'stat_exog_list', []) for m in self.models))
        if static_df is not None:
            if not isinstance(static_df, SparkDataFrame):
                raise ValueError(
                    "`static_df` must be a spark dataframe when `df` is a spark dataframe "
                    "or the models were trained in a distributed setting.\n"
                    "You can also provide local dataframes (pandas or polars) as `df` and `static_df`."
                )
            missing_static = static_cols - set(static_df.columns)
            if missing_static:
                raise ValueError(
                    f"The following static columns are missing from the static_df: {missing_static}"
                )
            # join is supposed to preserve the partitioning
            df = df.join(static_df, on=[self.id_col], how="left")

        # exog
        if futr_df is not None:
            if not isinstance(futr_df, SparkDataFrame):
                raise ValueError(
                    "`futr_df` must be a spark dataframe when `df` is a spark dataframe "
                    "or the models were trained in a distributed setting.\n"
                    "You can also provide local dataframes (pandas or polars) as `df` and `futr_df`."
                )
            if self.target_col in futr_df.columns:
                raise ValueError("`futr_df` must not contain the target column.")
            # df has the statics, historic exog and target at this point, futr_df doesnt
            df = df.unionByName(futr_df, allowMissingColumns=True)
            # union doesn't guarantee preserving the partitioning
            repartition = True

        if repartition:
            df = df.repartitionByRange(df.rdd.getNumPartitions(), self.id_col)    

        # predict
        base_schema = fa.get_schema(df).extract([self.id_col, self.time_col])
        models_schema = {model: 'float' for model in self._get_model_names()}
        return fa.transform(
            df=df,
            using=_predict,
            schema=base_schema.append(models_schema),
            params=dict(
                static_cols=list(static_cols),
                futr_exog_cols=list(self._get_needed_futr_exog()),
                models=self.models,
                freq=self.freq,
                id_col=self.id_col,
                time_col=self.time_col,
                target_col=self.target_col,
            ),
        )

    def predict(
        self,
        df: Optional[Union[DataFrame, SparkDataFrame]] = None,
        static_df: Optional[Union[DataFrame, SparkDataFrame]] = None,
        futr_df: Optional[Union[DataFrame, SparkDataFrame]] = None,
        sort_df: bool = True,
        verbose: bool = False,
        engine = None,
        **data_kwargs
    ):
        """Predict with core.NeuralForecast.

        Use stored fitted `models` to predict large set of time series from DataFrame `df`.        

        Parameters
        ----------
        df : pandas, polars or spark DataFrame, optional (default=None)
            DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
            If a DataFrame is passed, it is used to generate forecasts.
        static_df : pandas, polars or spark DataFrame, optional (default=None)
            DataFrame with columns [`unique_id`] and static exogenous.
        futr_df : pandas, polars or spark DataFrame, optional (default=None)
            DataFrame with [`unique_id`, `ds`] columns and `df`'s future exogenous.
        sort_df : bool (default=True)
            Sort `df` before fitting.
        verbose : bool (default=False)
            Print processing steps.
        engine : spark session
            Distributed engine for inference. Only used if df is a spark dataframe or if fit was called on a spark dataframe.
        data_kwargs : kwargs
            Extra arguments to be passed to the dataset within each model.

        Returns
        -------
        fcsts_df : pandas or polars DataFrame
            DataFrame with insample `models` columns for point predictions and probabilistic
            predictions for all fitted `models`.    
        """
        if df is None and not hasattr(self, 'dataset'):
            raise Exception('You must pass a DataFrame or have one stored.')

        if not self._fitted:
            raise Exception("You must fit the model before predicting.")

        needed_futr_exog = self._get_needed_futr_exog()
        if needed_futr_exog:
            if futr_df is None:
                raise ValueError(
                    f'Models require the following future exogenous features: {needed_futr_exog}. '
                    'Please provide them through the `futr_df` argument.'
                )
            else:
                missing = needed_futr_exog - set(futr_df.columns)
                if missing:
                    raise ValueError(f'The following features are missing from `futr_df`: {missing}')

        # 分布式df或NeuralForecast实例已使用分布式输入进行训练,且未提供df
        # 我们假设用户也希望执行分布式推理。
        is_files_dataset = isinstance(getattr(self, 'dataset', None), _FilesDataset)
        is_dataset_local_files = isinstance(getattr(self, 'dataset', None), LocalFilesTimeSeriesDataset)
        if isinstance(df, SparkDataFrame) or (df is None and is_files_dataset):
            return self._predict_distributed(
                df=df,
                static_df=static_df,
                futr_df=futr_df,
                engine=engine,
            )
        
        if is_dataset_local_files and df is None:
            raise ValueError(
                "When the model has been trained on a dataset that is split between multiple files, you must pass in a specific dataframe for prediciton."
            )
        
        # 处理新数据集但不存储。
        if df is not None:
            validate_freq(df[self.time_col], self.freq)
            dataset, uids, last_dates, _ = self._prepare_fit(
                df=df,
                static_df=static_df,
                sort_df=sort_df,
                predict_only=True,
                id_col=self.id_col,
                time_col=self.time_col,
                target_col=self.target_col,
            )
        else:
            dataset = self.dataset
            uids = self.uids
            last_dates = self.last_dates
            if verbose: print('Using stored dataset.')
  

        cols = self._get_model_names()

        # 用于预测的占位数据框,包含unique_id和ds列
        fcsts_df = ufp.make_future_dataframe(
            uids=uids,
            last_times=last_dates,
            freq=self.freq,
            h=self.h,
            id_col=self.id_col,
            time_col=self.time_col,
        )

        # 更新并定义新的预测数据集
        if futr_df is None:
            futr_df = fcsts_df
        else:
            futr_orig_rows = futr_df.shape[0]
            futr_df = ufp.join(futr_df, fcsts_df, on=[self.id_col, self.time_col])
            if futr_df.shape[0] < fcsts_df.shape[0]:
                if df is None:
                    expected_cmd = 'make_future_dataframe()'
                    missing_cmd = 'get_missing_future(futr_df)'
                else:
                    expected_cmd = 'make_future_dataframe(df)'
                    missing_cmd = 'get_missing_future(futr_df, df)'
                raise ValueError(
                    'There are missing combinations of ids and times in `futr_df`.\n'
                    f'You can run the `{expected_cmd}` method to get the expected combinations or '
                    f'the `{missing_cmd}` method to get the missing combinations.'
                )
            if futr_orig_rows > futr_df.shape[0]:
                dropped_rows = futr_orig_rows - futr_df.shape[0]
                warnings.warn(
                    f'Dropped {dropped_rows:,} unused rows from `futr_df`.'
                )
            if any(ufp.is_none(futr_df[col]).any() for col in needed_futr_exog):
                raise ValueError('Found null values in `futr_df`')
        futr_dataset = dataset.align(
            futr_df,
            id_col=self.id_col,
            time_col=self.time_col,
            target_col=self.target_col,
        )
        self._scalers_transform(futr_dataset)
        dataset = dataset.append(futr_dataset)

        col_idx = 0
        fcsts = np.full((self.h * len(uids), len(cols)), fill_value=np.nan, dtype=np.float32)
        for model in self.models:
            old_test_size = model.get_test_size()
            model.set_test_size(self.h) # 预测h步之后
            model_fcsts = model.predict(dataset=dataset, **data_kwargs)
            # 在内存占位符中附加预测
            output_length = len(model.loss.output_names)
            fcsts[:, col_idx : col_idx + output_length] = model_fcsts
            col_idx += output_length
            model.set_test_size(old_test_size) # 恢复为原始值
        if self.scalers_:
            indptr = np.append(0, np.full(len(uids), self.h).cumsum())
            fcsts = self._scalers_target_inverse_transform(fcsts, indptr)

        # 声明预测为 pd.DataFrame
        cols = self._get_model_names()  # 在调用.predict()时,列名可能已发生变化,因此需要用于IQLoss。
        if isinstance(fcsts_df, pl_DataFrame):
            fcsts = pl_DataFrame(dict(zip(cols, fcsts.T)))
        else:
            fcsts = pd.DataFrame(fcsts, columns=cols)
        fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts])
        if isinstance(fcsts_df, pd.DataFrame) and _id_as_idx():
            _warn_id_as_idx()
            fcsts_df = fcsts_df.set_index(self.id_col)
        return fcsts_df

    def _reset_models(self):
        self.models = [deepcopy(model) for model in self.models_init]
        if self._fitted:
            print('WARNING: Deleting previously fitted models.')        
    
    def _no_refit_cross_validation(
        self,
        df: Optional[DataFrame],
        static_df: Optional[DataFrame],
        n_windows: int,
        step_size: int,
        val_size: Optional[int], 
        test_size: int,
        sort_df: bool,
        verbose: bool,
        id_col: str,
        time_col: str,
        target_col: str,
        **data_kwargs
    ) -> DataFrame:
        if (df is None) and not (hasattr(self, 'dataset')):
            raise Exception('You must pass a DataFrame or have one stored.')

        # 处理并保存新数据集(在自身中)
        if df is not None:
            validate_freq(df[time_col], self.freq)
            self.dataset, self.uids, self.last_dates, self.ds = self._prepare_fit(
                df=df,
                static_df=static_df,
                sort_df=sort_df,
                predict_only=False,
                id_col=id_col,
                time_col=time_col,
                target_col=target_col,
            )
            self.sort_df = sort_df
        else:
            if verbose: print('Using stored dataset.')

        if val_size is not None:
            if self.dataset.min_size < (val_size+test_size):
                warnings.warn('Validation and test sets are larger than the shorter time-series.')

        cols = []
        count_names = {'model': 0}
        for model in self.models:
            model_name = repr(model)
            count_names[model_name] = count_names.get(model_name, -1) + 1
            if count_names[model_name] > 0:
                model_name += str(count_names[model_name])
            cols += [model_name + n for n in model.loss.output_names]

        fcsts_df = ufp.cv_times(
            times=self.ds,
            uids=self.uids,
            indptr=self.dataset.indptr,
            h=self.h,
            test_size=test_size,
            step_size=step_size,
            id_col=id_col,
            time_col=time_col,
        )
        # cv_times 按窗口排序,然后按 id 排序
        fcsts_df = ufp.sort(fcsts_df, [id_col, 'cutoff', time_col])

        col_idx = 0
        fcsts = np.full((self.dataset.n_groups * self.h * n_windows, len(cols)),
                         np.nan, dtype=np.float32)
        
        for model in self.models:
            model.fit(dataset=self.dataset,
                        val_size=val_size, 
                        test_size=test_size)
            model_fcsts = model.predict(self.dataset, step_size=step_size, **data_kwargs)

            # 在内存占位符中附加预测
            output_length = len(model.loss.output_names)
            fcsts[:,col_idx:(col_idx + output_length)] = model_fcsts
            col_idx += output_length
        # 我们可能分配了比所需更多的空间。
        # 每个系列最多可以生成 (serie.size - 1) // self.h 个CV窗口
        effective_sizes = ufp.counts_by_id(fcsts_df, id_col)['counts'].to_numpy()
        needs_trim = effective_sizes.sum() != fcsts.shape[0]
        if self.scalers_ or needs_trim:
            indptr = np.arange(
                0,
                n_windows * self.h * (self.dataset.n_groups + 1),
                n_windows * self.h,
                dtype=np.int32,
            )
            if self.scalers_:
                fcsts = self._scalers_target_inverse_transform(fcsts, indptr)
            if needs_trim:
                # 我们仅保留CV结果中每个系列的有效样本。
                trimmed = np.empty_like(
                    fcsts, shape=(effective_sizes.sum(), fcsts.shape[1])
                )
                cv_indptr = np.append(0, effective_sizes).cumsum(dtype=np.int32)
                for i in range(fcsts.shape[1]):
                    ga = GroupedArray(fcsts[:, i], indptr)
                    trimmed[:, i] = ga._tails(cv_indptr)
                fcsts = trimmed

        self._fitted = True

        # 在预测数据框中添加预测值
        if isinstance(self.uids, pl_Series):
            fcsts = pl_DataFrame(dict(zip(cols, fcsts.T)))
        else:
            fcsts = pd.DataFrame(fcsts, columns=cols)
        fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts])

        # Add original input df's y to forecasts DataFrame    
        fcsts_df = ufp.join(
            fcsts_df,
            df[[id_col, time_col, target_col]],
            how='left',
            on=[id_col, time_col],
        )
        if isinstance(fcsts_df, pd.DataFrame) and _id_as_idx():
            _warn_id_as_idx()
            fcsts_df = fcsts_df.set_index(id_col)
        return fcsts_df

    def cross_validation(
        self,
        df: Optional[DataFrame] = None,
        static_df: Optional[DataFrame] = None,
        n_windows: int = 1,
        step_size: int = 1,
        val_size: Optional[int] = 0, 
        test_size: Optional[int] = None,
        sort_df: bool = True,
        use_init_models: bool = False,
        verbose: bool = False,
        refit: Union[bool, int] = False,
        id_col: str = 'unique_id',
        time_col: str = 'ds',
        target_col: str = 'y',
        **data_kwargs
    ) -> DataFrame:
        """Temporal Cross-Validation with core.NeuralForecast.

        `core.NeuralForecast`'s cross-validation efficiently fits a list of NeuralForecast 
        models through multiple windows, in either chained or rolled manner.

        Parameters
        ----------
        df : pandas or polars DataFrame, optional (default=None)
            DataFrame with columns [`unique_id`, `ds`, `y`] and exogenous variables.
            If None, a previously stored dataset is required.
        static_df : pandas or polars DataFrame, optional (default=None)
            DataFrame with columns [`unique_id`] and static exogenous.
        n_windows : int (default=1)
            Number of windows used for cross validation.
        step_size : int (default=1)
            Step size between each window.
        val_size : int, optional (default=None)
            Length of validation size. If passed, set `n_windows=None`.
        test_size : int, optional (default=None)
            Length of test size. If passed, set `n_windows=None`.
        sort_df : bool (default=True)
            Sort `df` before fitting.
        use_init_models : bool, option (default=False)
            Use initial model passed when object was instantiated.
        verbose : bool (default=False)
            Print processing steps.
        refit : bool or int (default=False)
            Retrain model for each cross validation window.
            If False, the models are trained at the beginning and then used to predict each window.
            If positive int, the models are retrained every `refit` windows.
        id_col : str (default='unique_id')
            Column that identifies each serie.
        time_col : str (default='ds')
            Column that identifies each timestep, its values can be timestamps or integers.
        target_col : str (default='y')
            Column that contains the target.            
        data_kwargs : kwargs
            Extra arguments to be passed to the dataset within each model.

        Returns
        -------
        fcsts_df : pandas or polars DataFrame
            DataFrame with insample `models` columns for point predictions and probabilistic
            predictions for all fitted `models`.    
        """
        h = self.h
        if n_windows is None and test_size is None:
            raise Exception('you must define `n_windows` or `test_size`.')            
        if test_size is None:
            test_size = h + step_size * (n_windows - 1)
        elif n_windows is None:
            if (test_size - h) % step_size:
                raise Exception('`test_size - h` should be module `step_size`')
            n_windows = int((test_size - h) / step_size) + 1
        else:
            raise Exception('you must define `n_windows` or `test_size` but not both')       
        # 如果使用初始模型,则恢复初始模型.
        if use_init_models:
            self._reset_models()
        if isinstance(df, pd.DataFrame) and df.index.name == id_col:
            warnings.warn(
                "Passing the id as index is deprecated, please provide it as a column instead.",
                FutureWarning,
            )
            df = df.reset_index(id_col)            
        if not refit:
            return self._no_refit_cross_validation(
                df=df,
                static_df=static_df,
                n_windows=n_windows,
                step_size=step_size,
                val_size=val_size,
                test_size=test_size,
                sort_df=sort_df,
                verbose=verbose,
                id_col=id_col,
                time_col=time_col,
                target_col=target_col,
                **data_kwargs
            )
        if df is None:
            raise ValueError('Must specify `df` with `refit!=False`.')
        validate_freq(df[time_col], self.freq)
        splits = ufp.backtest_splits(
            df,
            n_windows=n_windows,
            h=self.h,
            id_col=id_col,
            time_col=time_col,
            freq=self.freq,
            step_size=step_size,
            input_size=None,
        )
        results = []
        for i_window, (cutoffs, train, test) in enumerate(splits):
            should_fit = i_window == 0 or (refit > 0 and i_window % refit == 0)
            if should_fit:
                self.fit(
                    df=train,
                    static_df=static_df,
                    val_size=val_size,
                    sort_df=sort_df,
                    use_init_models=False,
                    verbose=verbose,
                    id_col=id_col,
                    time_col=time_col,
                    target_col=target_col,                                     
                )
                predict_df: Optional[DataFrame] = None
            else:
                predict_df = train
            needed_futr_exog = self._get_needed_futr_exog()
            if needed_futr_exog:
                futr_df: Optional[DataFrame] = test
            else:
                futr_df = None
            preds = self.predict(
                df=predict_df,
                static_df=static_df,
                futr_df=futr_df,
                sort_df=sort_df,
                verbose=verbose,
                **data_kwargs
            )
            preds = ufp.join(preds, cutoffs, on=id_col, how='left')
            fold_result = ufp.join(
                preds, test[[id_col, time_col, target_col]], on=[id_col, time_col]
            )
            results.append(fold_result)
        out = ufp.vertical_concat(results, match_categories=False)
        out = ufp.drop_index_if_pandas(out)
        # match order of cv with no refit
        first_out_cols = [id_col, time_col, "cutoff"]
        remaining_cols = [
            c for c in out.columns if c not in first_out_cols + [target_col]
        ]
        cols_order = first_out_cols + remaining_cols + [target_col]
        out = ufp.sort(out[cols_order], by=[id_col, 'cutoff', time_col])
        if isinstance(out, pd.DataFrame) and _id_as_idx():
            _warn_id_as_idx()
            out = out.set_index(id_col)
        return out

    def predict_insample(self, step_size: int = 1):
        """Predict insample with core.NeuralForecast.

        `core.NeuralForecast`'s `predict_insample` uses stored fitted `models`
        to predict historic values of a time series from the stored dataframe.

        Parameters
        ----------
        step_size : int (default=1)
            Step size between each window.

        Returns
        -------
        fcsts_df : pandas.DataFrame
            DataFrame with insample predictions for all fitted `models`.    
        """
        if not self._fitted:
            raise Exception('The models must be fitted first with `fit` or `cross_validation`.')

        for model in self.models:
            if model.SAMPLING_TYPE == 'recurrent':
                warnings.warn(f'Predict insample might not provide accurate predictions for \
                       recurrent model {repr(model)} class yet due to scaling.')
                print(f'WARNING: Predict insample might not provide accurate predictions for \
                      recurrent model {repr(model)} class yet due to scaling.')
        
        cols = []
        count_names = {'model': 0}
        for model in self.models:
            model_name = repr(model)
            count_names[model_name] = count_names.get(model_name, -1) + 1
            if count_names[model_name] > 0:
                model_name += str(count_names[model_name])
            cols += [model_name + n for n in model.loss.output_names]

        # Remove test set from dataset and last dates
        test_size = self.models[0].get_test_size()

        # trim the forefront period to ensure `test_size - h` should be module `step_size
        # Note: current constraint imposes that all series lengths are equal, so we can take the first series length as sample
        series_length = self.dataset.indptr[1] - self.dataset.indptr[0]
        _, forefront_offset = np.divmod((series_length - test_size - self.h), step_size)

        if test_size>0 or forefront_offset>0:
            trimmed_dataset = TimeSeriesDataset.trim_dataset(dataset=self.dataset,
                                                     right_trim=test_size,
                                                     left_trim=forefront_offset)
            new_idxs = np.hstack(
                [
                    np.arange(self.dataset.indptr[i] + forefront_offset, self.dataset.indptr[i + 1] - test_size)
                    for i in range(self.dataset.n_groups)
                ]
            )
            times = self.ds[new_idxs]
        else:
            trimmed_dataset = self.dataset
            times = self.ds

        # Generate dates
        fcsts_df = _insample_times(
            times=times,
            uids=self.uids,
            indptr=trimmed_dataset.indptr,
            h=self.h,
            freq=self.freq,
            step_size=step_size,
            id_col=self.id_col,
            time_col=self.time_col,
        )

        col_idx = 0
        fcsts = np.full((len(fcsts_df), len(cols)), np.nan, dtype=np.float32)

        for model in self.models:
            # Test size is the number of periods to forecast (full size of trimmed dataset)
            model.set_test_size(test_size=trimmed_dataset.max_size)

            # Predict
            model_fcsts = model.predict(trimmed_dataset, step_size=step_size)
            # 在内存占位符中附加预测
            output_length = len(model.loss.output_names)
            fcsts[:,col_idx:(col_idx + output_length)] = model_fcsts
            col_idx += output_length          
            model.set_test_size(test_size=test_size) # Set original test_size

        # original y
        original_y = {
            self.id_col: ufp.repeat(self.uids, np.diff(self.dataset.indptr)),
            self.time_col: self.ds,
            self.target_col: self.dataset.temporal[:, 0].numpy(),
        }

        # 在预测数据框中添加预测值
        if isinstance(self.uids, pl_Series):
            fcsts = pl_DataFrame(dict(zip(cols, fcsts.T)))
            Y_df = pl_DataFrame(original_y)
        else:
            fcsts = pd.DataFrame(fcsts, columns=cols)
            Y_df = pd.DataFrame(original_y).reset_index(drop=True)
        fcsts_df = ufp.horizontal_concat([fcsts_df, fcsts])

        # Add original input df's y to forecasts DataFrame
        fcsts_df = ufp.join(fcsts_df, Y_df, how='left', on=[self.id_col, self.time_col])
        if self.scalers_:
            sizes = ufp.counts_by_id(fcsts_df, self.id_col)['counts'].to_numpy()
            indptr = np.append(0, sizes.cumsum())
            invert_cols = cols + [self.target_col]
            fcsts_df[invert_cols] = self._scalers_target_inverse_transform(
                fcsts_df[invert_cols].to_numpy(),
                indptr
            )
        if isinstance(fcsts_df, pd.DataFrame) and _id_as_idx():
            _warn_id_as_idx()
            fcsts_df = fcsts_df.set_index(self.id_col)            
        return fcsts_df
        
    # 使用 PyTorch Lightning 的 save_checkpoint 函数保存模型列表
    def save(self, path: str, model_index: Optional[List]=None, save_dataset: bool=True, overwrite: bool=False):
        """Save NeuralForecast core class.

        `core.NeuralForecast`'s method to save current status of models, dataset, and configuration.
        Note that by default the `models` are not saving training checkpoints to save disk memory,
        to get them change the individual model `**trainer_kwargs` to include `enable_checkpointing=True`.

        Parameters
        ----------
        path : str
            Directory to save current status.
        model_index : list, optional (default=None)
            List to specify which models from list of self.models to save.
        save_dataset : bool (default=True)
            Whether to save dataset or not.
        overwrite : bool (default=False)
            Whether to overwrite files or not.
        """
        # Standarize path without '/'
        if path[-1] == '/':
            path = path[:-1]

        # Model index list
        if model_index is None:
            model_index = list(range(len(self.models)))

        fs, _, _ = fsspec.get_fs_token_paths(path)
        if not fs.exists(path):
            fs.makedirs(path)
        else:
            # Check if directory is empty to protect overwriting files
            files = fs.ls(path)

            # Checking if the list is empty or not
            if files:
                if not overwrite:
                    raise Exception('Directory is not empty. Set `overwrite=True` to overwrite files.')
                else:
                    fs.rm(path, recursive=True)
                    fs.mkdir(path)

        # Save models
        count_names = {'model': 0}
        alias_to_model = {}
        for i, model in enumerate(self.models):
            # Skip model if not in list
            if i not in model_index:
                continue

            model_name = repr(model)
            model_class_name = model.__class__.__name__.lower()
            alias_to_model[model_name] = model_class_name
            count_names[model_name] = count_names.get(model_name, -1) + 1
            model.save(f"{path}/{model_name}_{count_names[model_name]}.ckpt")
        with fsspec.open(f"{path}/alias_to_model.pkl", "wb") as f:
            pickle.dump(alias_to_model, f)

        # Save dataset
        if save_dataset and hasattr(self, 'dataset'):
            if isinstance(self.dataset, _FilesDataset):
                raise ValueError(
                    "Cannot save distributed dataset.\n"
                    "You can set `save_dataset=False` and use the `df` argument in the predict method after loading "
                    "this model to use it for inference."
                )
            with fsspec.open(f"{path}/dataset.pkl", "wb") as f:
                pickle.dump(self.dataset, f)
        elif save_dataset:
            raise Exception('You need to have a stored dataset to save it, \
                             set `save_dataset=False` to skip saving dataset.')

        # Save configuration and parameters
        config_dict = {
            "h": self.h,
            "freq": self.freq,
            "sort_df": self.sort_df,
            "_fitted": self._fitted,
            "local_scaler_type": self.local_scaler_type,
            "scalers_": self.scalers_,
            "id_col": self.id_col,
            "time_col": self.time_col,
            "target_col": self.target_col,
        }
        if save_dataset:
            config_dict.update(
                {
                    "uids": self.uids,
                    "last_dates": self.last_dates,
                    "ds": self.ds,
                }
            )

        with fsspec.open(f"{path}/configuration.pkl", "wb") as f:
            pickle.dump(config_dict, f)

    @staticmethod
    def load(path, verbose=False, **kwargs):
        """Load NeuralForecast

        `core.NeuralForecast`'s method to load checkpoint from path.

        Parameters
        -----------
        path : str
            Directory with stored artifacts.
        kwargs
            Additional keyword arguments to be passed to the function
            `load_from_checkpoint`.

        Returns
        -------
        result : NeuralForecast
            Instantiated `NeuralForecast` class.
        """
        # Standarize path without '/'
        if path[-1] == '/':
            path = path[:-1]
        
        fs, _, _ = fsspec.get_fs_token_paths(path)
        files = [f.split('/')[-1] for f in fs.ls(path) if fs.isfile(f)]

        # Load models
        models_ckpt = [f for f in files if f.endswith('.ckpt')]
        if len(models_ckpt) == 0:
            raise Exception('No model found in directory.') 
        
        if verbose: print(10 * '-' + ' Loading models ' + 10 * '-')
        models = []
        try:
            with fsspec.open(f'{path}/alias_to_model.pkl', 'rb') as f:
                alias_to_model = pickle.load(f)
        except FileNotFoundError:
            alias_to_model = {}
        for model in models_ckpt:
            model_name = '_'.join(model.split('_')[:-1])
            model_class_name = alias_to_model.get(model_name, model_name)
            loaded_model = MODEL_FILENAME_DICT[model_class_name].load(f'{path}/{model}', **kwargs)
            loaded_model.alias = model_name
            models.append(loaded_model)
            if verbose: print(f"Model {model_name} loaded.")

        if verbose: print(10*'-' + ' Loading dataset ' + 10*'-')
        # Load dataset
        try:
            with fsspec.open(f"{path}/dataset.pkl", "rb") as f:
                dataset = pickle.load(f)
            if verbose: print('Dataset loaded.')
        except FileNotFoundError:
            dataset = None
            if verbose: print('No dataset found in directory.')

        if verbose: print(10*'-' + ' Loading configuration ' + 10*'-')
        # Load configuration
        try:
            with fsspec.open(f"{path}/configuration.pkl", "rb") as f:
                config_dict = pickle.load(f)
            if verbose: print('Configuration loaded.')
        except FileNotFoundError:
            raise Exception('No configuration found in directory.')

        # 创建NeuralForecast对象
        neuralforecast = NeuralForecast(
            models=models,
            freq=config_dict['freq'],
            local_scaler_type=config_dict['local_scaler_type'],
        )

        for attr in ['id_col', 'time_col', 'target_col']:
            setattr(neuralforecast, attr, config_dict[attr])

        # 数据集
        if dataset is not None:
            neuralforecast.dataset = dataset
            restore_attrs = [
                'uids',
                'last_dates',
                'ds',
                'sort_df',
            ]
            for attr in restore_attrs:
                setattr(neuralforecast, attr, config_dict[attr])

        # 适配旗帜
        neuralforecast._fitted = config_dict['_fitted']

        neuralforecast.scalers_ = config_dict['scalers_']

        return neuralforecast

import logging
import warnings

logging.getLogger("pytorch_lightning").setLevel(logging.ERROR)
warnings.filterwarnings("ignore")
show_doc(NeuralForecast.fit, title_level=3)
show_doc(NeuralForecast.predict, title_level=3)
show_doc(NeuralForecast.cross_validation, title_level=3)
show_doc(NeuralForecast.predict_insample, title_level=3)
show_doc(NeuralForecast.save, title_level=3)
show_doc(NeuralForecast.load, title_level=3)

import tempfile

import matplotlib.pyplot as plt
import pytorch_lightning as pl

import neuralforecast
import optuna
from ray import tune

from neuralforecast.auto import (
    AutoMLP, AutoNBEATS, AutoNBEATSx,
    AutoRNN, AutoTCN, AutoDilatedRNN,
)

from neuralforecast.models.rnn import RNN
from neuralforecast.models.tcn import TCN
from neuralforecast.models.deepar import DeepAR
from neuralforecast.models.dilated_rnn import DilatedRNN

from neuralforecast.models.mlp import MLP
from neuralforecast.models.nhits import NHITS
from neuralforecast.models.nbeats import NBEATS
from neuralforecast.models.nbeatsx import NBEATSx

from neuralforecast.models.tft import TFT
from neuralforecast.models.vanillatransformer import VanillaTransformer
from neuralforecast.models.informer import Informer
from neuralforecast.models.autoformer import Autoformer

from neuralforecast.models.stemgnn import StemGNN
from neuralforecast.models.tsmixer import TSMixer
from neuralforecast.models.tsmixerx import TSMixerx

from neuralforecast.losses.pytorch import MQLoss, MAE, MSE
from neuralforecast.utils import AirPassengersDF, AirPassengersPanel, AirPassengersStatic

from datetime import date

AirPassengersPanel_train = AirPassengersPanel[AirPassengersPanel['ds'] < AirPassengersPanel['ds'].values[-12]].reset_index(drop=True)
AirPassengersPanel_test = AirPassengersPanel[AirPassengersPanel['ds'] >= AirPassengersPanel['ds'].values[-12]].reset_index(drop=True)
AirPassengersPanel_test['y'] = np.nan
AirPassengersPanel_test['y_[lag12]'] = np.nan

# id 作为索引警告
df_with_idx = AirPassengersPanel_train.set_index('unique_id')
models = [
    NHITS(h=12, input_size=12, max_steps=1)
]
nf = NeuralForecast(models=models, freq='M')
with warnings.catch_warnings(record=True) as issued_warnings:
    warnings.simplefilter('always', category=FutureWarning)
    nf.fit(df=df_with_idx)    
    nf.predict()
    nf.predict_insample()
    nf.cross_validation(df=df_with_idx)
input_id_warnings = [
    w for w in issued_warnings if 'Passing the id as index is deprecated' in str(w.message)
]
assert len(input_id_warnings) == 2
output_id_warnings = [
    w for w in issued_warnings if 'the predictions will have the id as a column' in str(w.message)
]
assert len(output_id_warnings) == 3
os.environ['NIXTLA_ID_AS_COL'] = '1'

# 无验证集保护的早停单元测试
models = [
    NHITS(h=12, input_size=12, max_steps=1, early_stop_patience_steps=5)
]
nf = NeuralForecast(models=models, freq='M')
test_fail(nf.fit,
          contains='Set val_size>0 if early stopping is enabled.',
          args=(AirPassengersPanel_train,))

# 测试拟合+交叉验证行为
models = [NHITS(h=12, input_size=24, max_steps=10)]
nf = NeuralForecast(models=models, freq='M')
nf.fit(AirPassengersPanel_train)
init_fcst = nf.predict()
init_cv = nf.cross_validation(AirPassengersPanel_train, use_init_models=True)
after_cv = nf.cross_validation(AirPassengersPanel_train, use_init_models=True)
nf.fit(AirPassengersPanel_train, use_init_models=True)
after_fcst = nf.predict()
test_eq(init_cv, after_cv)
test_eq(init_fcst, after_fcst)

# 测试交叉验证与重训练
models = [
    NHITS(
        h=12,
        input_size=24,
        max_steps=2,
        futr_exog_list=['trend'],
        stat_exog_list=['airline1', 'airline2']
    )
]
nf = NeuralForecast(models=models, freq='M')
cv_kwargs = dict(
    df=AirPassengersPanel_train,
    static_df=AirPassengersStatic,
    n_windows=4,
    use_init_models=True,
)
cv_res_norefit = nf.cross_validation(refit=False, **cv_kwargs)
cutoffs = cv_res_norefit['cutoff'].unique()
for refit in [True, 2]:
    cv_res = nf.cross_validation(refit=refit, **cv_kwargs)
    refit = int(refit)
    fltr = lambda df: df['cutoff'].isin(cutoffs[:refit])
    expected = cv_res_norefit[fltr]
    actual = cv_res[fltr]
    # 未翻新窗户的预测应保持一致
    pd.testing.assert_frame_equal(
        actual.reset_index(drop=True),
        expected.reset_index(drop=True)
    )
    # 翻新后的预测应有所不同
    test_fail(
        lambda: pd.testing.assert_frame_equal(
            cv_res_norefit.drop(expected.index).reset_index(drop=True),
            cv_res.drop(actual.index).reset_index(drop=True),
        ),
        contains='(column name="NHITS") are different',
    )

# 测试缩放
models = [NHITS(h=12, input_size=24, max_steps=10)]
models_exog = [NHITS(h=12, input_size=12, max_steps=10, hist_exog_list=['trend'], futr_exog_list=['trend'])]

# 拟合+预测
nf = NeuralForecast(models=models, freq='M', local_scaler_type='standard')
nf.fit(AirPassengersPanel_train)
scaled_fcst = nf.predict()
# 检查预测结果是否与未进行缩放的预测相似。
np.testing.assert_allclose(
    init_fcst['NHITS'].values,
    scaled_fcst['NHITS'].values,
    rtol=0.3,
)
# 与外生变量相关
nf = NeuralForecast(models=models_exog, freq='M', local_scaler_type='standard')
nf.fit(AirPassengersPanel_train)
scaled_exog_fcst = nf.predict(futr_df=AirPassengersPanel_test)
# 检查预测结果是否与无外生变量的预测相似。
np.testing.assert_allclose(
    scaled_fcst['NHITS'].values,
    scaled_exog_fcst['NHITS'].values,
    rtol=0.3,
)

# 简历
nf = NeuralForecast(models=models, freq='M', local_scaler_type='robust')
cv_res = nf.cross_validation(AirPassengersPanel)
# 检查预测值是否与原始值相似(原始值直接从数据框中恢复)。
np.testing.assert_allclose(
    cv_res['NHITS'].values,
    cv_res['y'].values,
    rtol=0.3,
)
# 与外生变量相关
nf = NeuralForecast(models=models_exog, freq='M', local_scaler_type='robust-iqr')
cv_res_exog = nf.cross_validation(AirPassengersPanel)
# 检查预测值是否与原始值相似(原始值直接从数据框中恢复)。
np.testing.assert_allclose(
    cv_res_exog['NHITS'].values,
    cv_res_exog['y'].values,
    rtol=0.2,
)

# 拟合+预测_insample
nf = NeuralForecast(models=models, freq='M', local_scaler_type='minmax')
nf.fit(AirPassengersPanel_train)
insample_res = (
    nf.predict_insample()
    .groupby('unique_id').tail(-12) # first values aren't reliable
    .merge(
        AirPassengersPanel_train[['unique_id', 'ds', 'y']],
        on=['unique_id', 'ds'],
        how='left',
        suffixes=('_actual', '_expected'),
    )
)
# y 被正确地倒置了
np.testing.assert_allclose(
    insample_res['y_actual'].values,
    insample_res['y_expected'].values,
    rtol=1e-5,
)
# predictions are in the same scale
np.testing.assert_allclose(
    insample_res['NHITS'].values,
    insample_res['y_expected'].values,
    rtol=0.7,
)
# 与外生变量相关
nf = NeuralForecast(models=models_exog, freq='M', local_scaler_type='minmax')
nf.fit(AirPassengersPanel_train)
insample_res_exog = (
    nf.predict_insample()
    .groupby('unique_id').tail(-12) # first values aren't reliable
    .merge(
        AirPassengersPanel_train[['unique_id', 'ds', 'y']],
        on=['unique_id', 'ds'],
        how='left',
        suffixes=('_actual', '_expected'),
    )
)
# y 被正确地倒置了
np.testing.assert_allclose(
    insample_res_exog['y_actual'].values,
    insample_res_exog['y_expected'].values,
    rtol=1e-5,
)
# 与没有外生变量相比,预测结果相似。
np.testing.assert_allclose(
    insample_res['NHITS'].values,
    insample_res_exog['NHITS'].values,
    rtol=0.2,
)

# 测试Box-Cox变换
nf = NeuralForecast(models=models, freq='M', local_scaler_type='boxcox')
nf.fit(AirPassengersPanel_train)
insample_res = (
    nf.predict_insample()
    .groupby('unique_id').tail(-12) # first values aren't reliable
    .merge(
        AirPassengersPanel_train[['unique_id', 'ds', 'y']],
        on=['unique_id', 'ds'],
        how='left',
        suffixes=('_actual', '_expected'),
    )
)
# y 被正确地倒置了
np.testing.assert_allclose(
    insample_res['y_actual'].values,
    insample_res['y_expected'].values,
    rtol=1e-5,
)
# predictions are in the same scale
np.testing.assert_allclose(
    insample_res['NHITS'].values,
    insample_res['y_expected'].values,
    rtol=0.7,
)

# 测试 futr_df 内容
models = [NHITS(h=6, input_size=24, max_steps=10, hist_exog_list=['trend'], futr_exog_list=['trend'])]
nf = NeuralForecast(models=models, freq='M')
nf.fit(AirPassengersPanel_train)
# futr_df 中的行数不足会引发错误
test_fail(lambda: nf.predict(futr_df=AirPassengersPanel_test.head()), contains='There are missing combinations')
# 额外行会发出警告
with warnings.catch_warnings(record=True) as issued_warnings:
    warnings.simplefilter('always', UserWarning)
    nf.predict(futr_df=AirPassengersPanel_test)
assert any('Dropped 12 unused rows' in str(w.message) for w in issued_warnings)
# 模型需要 futr_df 但未提供时会引发错误
test_fail(lambda: nf.predict(), contains="Models require the following future exogenous features: {'trend'}") 
# futr_df 中缺失的功能引发了错误
test_fail(lambda: nf.predict(futr_df=AirPassengersPanel_test.drop(columns='trend')), contains="missing from `futr_df`: {'trend'}")
# futr_df 中的空值引发错误
test_fail(lambda: nf.predict(futr_df=AirPassengersPanel_test.assign(trend=np.nan)), contains='Found null values in `futr_df`')

# 测试就地模型拟合
models = [MLP(h=12, input_size=12, max_steps=1, scaler_type='robust')]
initial_weights = models[0].mlp[0].weight.detach().clone()
fcst = NeuralForecast(models=models, freq='M')
fcst.fit(df=AirPassengersPanel_train, static_df=AirPassengersStatic, use_init_models=True)
after_weights = fcst.models_init[0].mlp[0].weight.detach().clone()
assert np.allclose(initial_weights, after_weights), 'init models should not be modified'
assert len(fcst.models[0].train_trajectories)>0, 'models stored trajectories should not be empty'

# 测试预测样本内
test_size = 12
n_series = 2
h = 12

config = {'input_size': tune.choice([12, 24]), 
          'hidden_size': 128,
          'max_steps': 1,
          'val_check_steps': 1,
          'step_size': 12}

models = [
    NHITS(h=h, input_size=24, loss=MQLoss(level=[80]), max_steps=1, alias='NHITS', scaler_type=None),
    AutoMLP(h=12, config=config, cpus=1, num_samples=1),
    RNN(h=h, input_size=-1, loss=MAE(), max_steps=1, alias='RNN', scaler_type=None),
    ]

nf = NeuralForecast(models=models, freq='M')
cv = nf.cross_validation(df=AirPassengersPanel_train, static_df=AirPassengersStatic, val_size=0, test_size=test_size, n_windows=None)

forecasts = nf.predict_insample(step_size=1)

expected_size = n_series*((len(AirPassengersPanel_train)//n_series-test_size)-h+1)*h
assert len(forecasts) == expected_size, f'Shape mismatch in predict_insample: {len(forecasts)=}, {expected_size=}'

# 测试预测_样本内步骤_大小

h = 12
train_end = AirPassengersPanel_train['ds'].max()
sizes = AirPassengersPanel_train['unique_id'].value_counts().to_numpy()
for step_size, test_size in [(7, 0), (9, 0), (7, 5), (9, 5)]:
    models = [NHITS(h=h, input_size=12, max_steps=1)]
    nf = NeuralForecast(models=models, freq='M')
    nf.fit(AirPassengersPanel_train)
    # 注意:仅在调用 nf.fit() 时应用 set_test_size(),否则将导致 test_size 被设置为 0。
    nf.models[0].set_test_size(test_size)
    
    forecasts = nf.predict_insample(step_size=step_size)
    last_cutoff = train_end - test_size * pd.offsets.MonthEnd() - h * pd.offsets.MonthEnd()
    n_expected_cutoffs = (sizes[0] - test_size - nf.h + step_size) // step_size

    # 比较截断值
    expected_cutoffs = np.flip(np.array([last_cutoff - step_size * i * pd.offsets.MonthEnd() for i in range(n_expected_cutoffs)]))
    actual_cutoffs = np.array([pd.Timestamp(x) for x in forecasts[forecasts['unique_id']==nf.uids[1]]['cutoff'].unique()])
    np.testing.assert_array_equal(expected_cutoffs, actual_cutoffs, err_msg=f"{step_size=},{expected_cutoffs=},{actual_cutoffs=}")
    
    # 检查每组预测点的数量
    cutoffs_by_series = forecasts.groupby(['unique_id', 'cutoff']).size().unstack('unique_id')
    pd.testing.assert_series_equal(cutoffs_by_series['Airline1'], cutoffs_by_series['Airline2'], check_names=False)

# 测试别名
config_drnn = {'input_size': tune.choice([-1]), 
               'encoder_hidden_size': tune.choice([5, 10]),
               'max_steps': 1,
               'val_check_steps': 1,
               'step_size': 1}
models = [
    # 测试自动
    AutoDilatedRNN(h=12, config=config_drnn, cpus=1, num_samples=2, alias='AutoDIL'),
    # 测试基础窗口
    NHITS(h=12, input_size=24, loss=MQLoss(level=[80]), max_steps=1, alias='NHITSMQ'),
    # 测试基础循环
    RNN(h=12, input_size=-1, encoder_hidden_size=10, max_steps=1,
            stat_exog_list=['airline1'],
            futr_exog_list=['trend'], hist_exog_list=['y_[lag12]'], alias='MyRNN'),
    # 测试基础多变量
    StemGNN(h=12, input_size=24, n_series=2, max_steps=1, scaler_type='robust', alias='StemMulti'),
    # 测试模型,不使用别名
    NHITS(h=12, input_size=24, max_steps=1),
]
nf = NeuralForecast(models=models, freq='M')
nf.fit(df=AirPassengersPanel_train, static_df=AirPassengersStatic)
forecasts = nf.predict(futr_df=AirPassengersPanel_test)
test_eq(
    forecasts.columns.to_list(),
    ['unique_id', 'ds', 'AutoDIL', 'NHITSMQ-median', 'NHITSMQ-lo-80', 'NHITSMQ-hi-80', 'MyRNN', 'StemMulti', 'NHITS']
)

# 核心/模型交互单元测试
config = {'input_size': tune.choice([12, 24]), 
          'hidden_size': 256,
          'max_steps': 1,
          'val_check_steps': 1,
          'step_size': 12}

config_drnn = {'input_size': tune.choice([-1]), 
               'encoder_hidden_size': tune.choice([5, 10]),
               'max_steps': 1,
               'val_check_steps': 1,
               'step_size': 1}

fcst = NeuralForecast(
    models=[
        AutoDilatedRNN(h=12, config=config_drnn, cpus=1, num_samples=2),
        DeepAR(h=12, input_size=24, max_steps=1,
               stat_exog_list=['airline1'], futr_exog_list=['trend']),
        DilatedRNN(h=12, input_size=-1, encoder_hidden_size=10, max_steps=1,
                   stat_exog_list=['airline1'],
                   futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
        RNN(h=12, input_size=-1, encoder_hidden_size=10, max_steps=1,
            inference_input_size=24,
            stat_exog_list=['airline1'],
            futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
        TCN(h=12, input_size=-1, encoder_hidden_size=10, max_steps=1,
            stat_exog_list=['airline1'],
            futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
        AutoMLP(h=12, config=config, cpus=1, num_samples=2),
        NBEATSx(h=12, input_size=12, max_steps=1,
                stat_exog_list=['airline1'],
                futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
        NHITS(h=12, input_size=24, loss=MQLoss(level=[80]), max_steps=1),
        NHITS(h=12, input_size=12, max_steps=1,
              stat_exog_list=['airline1'],
              futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
        DLinear(h=12, input_size=24, max_steps=1),
        MLP(h=12, input_size=12, max_steps=1,
            stat_exog_list=['airline1'],
            futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
        TFT(h=12, input_size=24, max_steps=1),
        VanillaTransformer(h=12, input_size=24, max_steps=1),
        Informer(h=12, input_size=24, max_steps=1),
        Autoformer(h=12, input_size=24, max_steps=1),
        FEDformer(h=12, input_size=24, max_steps=1),
        PatchTST(h=12, input_size=24, max_steps=1),
        TimesNet(h=12, input_size=24, max_steps=1),
        StemGNN(h=12, input_size=24, n_series=2, max_steps=1, scaler_type='robust'),
        TSMixer(h=12, input_size=24, n_series=2, max_steps=1, scaler_type='robust'),
        TSMixerx(h=12, input_size=24, n_series=2, max_steps=1, scaler_type='robust'),
    ],
    freq='M'
)
fcst.fit(df=AirPassengersPanel_train, static_df=AirPassengersStatic)
forecasts = fcst.predict(futr_df=AirPassengersPanel_test)
forecasts

fig, ax = plt.subplots(1, 1, figsize = (20, 7))
plot_df = pd.concat([AirPassengersPanel_train, forecasts.reset_index()]).set_index('ds')

plot_df[plot_df['unique_id']=='Airline1'].drop(['unique_id','trend','y_[lag12]'], axis=1).plot(ax=ax, linewidth=2)

ax.set_title('AirPassengers Forecast', fontsize=22)
ax.set_ylabel('Monthly Passengers', fontsize=20)
ax.set_xlabel('Timestamp [t]', fontsize=20)
ax.legend(prop={'size': 15})
ax.grid()

fig, ax = plt.subplots(1, 1, figsize = (20, 7))
plot_df = pd.concat([AirPassengersPanel_train, forecasts.reset_index()]).set_index('ds')

plot_df[plot_df['unique_id']=='Airline2'].drop(['unique_id','trend','y_[lag12]'], axis=1).plot(ax=ax, linewidth=2)

ax.set_title('AirPassengers Forecast', fontsize=22)
ax.set_ylabel('Monthly Passengers', fontsize=20)
ax.set_xlabel('Timestamp [t]', fontsize=20)
ax.legend(prop={'size': 15})
ax.grid()
def config_optuna(trial):
    return {"input_size": trial.suggest_categorical('input_size', [12, 24]),
        "hist_exog_list": trial.suggest_categorical('hist_exog_list', [['trend'], ['y_[lag12]'], ['trend', 'y_[lag12]']]),
        "futr_exog_list": ['trend'],
        "max_steps": 10,
        "val_check_steps": 5}

config_ray = {'input_size': tune.choice([12, 24]), 
          'hist_exog_list': tune.choice([['trend'], ['y_[lag12]'], ['trend', 'y_[lag12]']]),
          'futr_exog_list': ['trend'],
          'max_steps': 10,
          'val_check_steps': 5}

# 使用迭代数据集进行测试训练与直接将数据集作为pandas数据框传递所产生的结果相同。
AirPassengersPanel_train['id'] = AirPassengersPanel_train['unique_id']
AirPassengersPanel_test['id'] = AirPassengersPanel_test['unique_id']

models = [
    NHITS(h=12, input_size=12, max_steps=10, futr_exog_list=['trend'], random_seed=1),
    AutoMLP(h=12, config=config_optuna, num_samples=2, backend='optuna', search_alg=optuna.samplers.TPESampler(seed=0)), # 类型:忽略
    AutoNBEATSx(h=12, config=config_ray, cpus=1, num_samples=2)
]
nf = NeuralForecast(models=models, freq='M')

# 使用pandas数据框进行拟合和预测
nf.fit(df=AirPassengersPanel_train.drop(columns='unique_id'), use_init_models=True, id_col='id')
pred_dataframe = nf.predict(futr_df=AirPassengersPanel_test.drop(columns='unique_id')).reset_index()

# 使用数据目录进行拟合和预测
with tempfile.TemporaryDirectory() as tmpdir:
    AirPassengersPanel_train.to_parquet(tmpdir, partition_cols=['unique_id'], index=False)
    data_directory = sorted([str(path) for path in Path(tmpdir).iterdir()])
    nf.fit(df=data_directory, use_init_models=True, id_col='id')

pred_df = AirPassengersPanel_train[AirPassengersPanel_train['unique_id'] == 'Airline2'].drop(columns='unique_id')
futr_df = AirPassengersPanel_test[AirPassengersPanel_test['unique_id'] == 'Airline2'].drop(columns='unique_id')

pred_iterative = nf.predict(df=pred_df, futr_df=futr_df)
pred_airline2 = pred_dataframe[pred_dataframe['id'] == 'Airline2']
np.testing.assert_allclose(pred_iterative['NHITS'], pred_airline2['NHITS'], rtol=0, atol=1)
np.testing.assert_allclose(pred_iterative['AutoMLP'], pred_airline2['AutoMLP'], rtol=0, atol=1)
np.testing.assert_allclose(pred_iterative['AutoNBEATSx'], pred_airline2['AutoNBEATSx'], rtol=0, atol=1)

# 移除ID列以不影响未来的测试
AirPassengersPanel_train = AirPassengersPanel_train.drop(columns='id')
AirPassengersPanel_test = AirPassengersPanel_test.drop(columns='id')

config = {'input_size': tune.choice([12, 24]), 
          'hidden_size': 256,
          'max_steps': 1,
          'val_check_steps': 1,
          'step_size': 12}

config_drnn = {'input_size': tune.choice([-1]), 
               'encoder_hidden_size': tune.choice([5, 10]),
               'max_steps': 1,
               'val_check_steps': 1,
               'step_size': 1}

fcst = NeuralForecast(
    models=[
        DilatedRNN(h=12, input_size=-1, encoder_hidden_size=10, max_steps=1),
        AutoMLP(h=12, config=config, cpus=1, num_samples=1),
        NHITS(h=12, input_size=12, max_steps=1)
    ],
    freq='M'
)
cv_df = fcst.cross_validation(df=AirPassengersPanel, static_df=AirPassengersStatic, n_windows=3, step_size=1)

#测试交叉验证无泄漏
def test_cross_validation(df, static_df, h, test_size):
    if (test_size - h) % 1:
        raise Exception("`test_size - h` should be module `step_size`")
    
    n_windows = int((test_size - h) / 1) + 1
    Y_test_df = df.groupby('unique_id').tail(test_size)
    Y_train_df = df.drop(Y_test_df.index)
    config = {'input_size': tune.choice([12, 24]),
              'step_size': 12, 'hidden_size': 256, 'max_steps': 1, 'val_check_steps': 1}
    config_drnn = {'input_size': tune.choice([-1]), 'encoder_hidden_size': tune.choice([5, 10]),
                   'max_steps': 1, 'val_check_steps': 1}
    fcst = NeuralForecast(
        models=[
            AutoDilatedRNN(h=12, config=config_drnn, cpus=1, num_samples=1),
            DilatedRNN(h=12, input_size=-1, encoder_hidden_size=5, max_steps=1),
            RNN(h=12, input_size=-1, encoder_hidden_size=5, max_steps=1,
                stat_exog_list=['airline1'], futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
            TCN(h=12, input_size=-1, encoder_hidden_size=5, max_steps=1,
                stat_exog_list=['airline1'], futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
            AutoMLP(h=12, config=config, cpus=1, num_samples=1),
            MLP(h=12, input_size=12, max_steps=1, scaler_type='robust'),
            NBEATSx(h=12, input_size=12, max_steps=1,
                    stat_exog_list=['airline1'], futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
            NHITS(h=12, input_size=12, max_steps=1, scaler_type='robust'),
            NHITS(h=12, input_size=12, loss=MQLoss(level=[80]), max_steps=1),
            TFT(h=12, input_size=24, max_steps=1, scaler_type='robust'),
            DLinear(h=12, input_size=24, max_steps=1),
            VanillaTransformer(h=12, input_size=12, max_steps=1, scaler_type=None),
            Informer(h=12, input_size=12, max_steps=1, scaler_type=None),
            Autoformer(h=12, input_size=12, max_steps=1, scaler_type=None),
            FEDformer(h=12, input_size=12, max_steps=1, scaler_type=None),
            PatchTST(h=12, input_size=24, max_steps=1, scaler_type=None),
            TimesNet(h=12, input_size=24, max_steps=1, scaler_type='standard'),
            StemGNN(h=12, input_size=12, n_series=2, max_steps=1, scaler_type='robust'),
            TSMixer(h=12, input_size=12, n_series=2, max_steps=1, scaler_type='robust'),
            TSMixerx(h=12, input_size=12, n_series=2, max_steps=1, scaler_type='robust'),
            DeepAR(h=12, input_size=24, max_steps=1,
               stat_exog_list=['airline1'], futr_exog_list=['trend']),
        ],
        freq='M'
    )
    fcst.fit(df=Y_train_df, static_df=static_df)
    Y_hat_df = fcst.predict(futr_df=Y_test_df)
    Y_hat_df = Y_hat_df.merge(Y_test_df, how='left', on=['unique_id', 'ds'])
    last_dates = Y_train_df.groupby('unique_id').tail(1)
    last_dates = last_dates[['unique_id', 'ds']].rename(columns={'ds': 'cutoff'})
    Y_hat_df = Y_hat_df.merge(last_dates, how='left', on='unique_id')
    
    #交叉验证
    fcst = NeuralForecast(
        models=[
            AutoDilatedRNN(h=12, config=config_drnn, cpus=1, num_samples=1),
            DilatedRNN(h=12, input_size=-1, encoder_hidden_size=5, max_steps=1),
            RNN(h=12, input_size=-1, encoder_hidden_size=5, max_steps=1,
                stat_exog_list=['airline1'], futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
            TCN(h=12, input_size=-1, encoder_hidden_size=5, max_steps=1,
                stat_exog_list=['airline1'], futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
            AutoMLP(h=12, config=config, cpus=1, num_samples=1),
            MLP(h=12, input_size=12, max_steps=1, scaler_type='robust'),
            NBEATSx(h=12, input_size=12, max_steps=1,
                    stat_exog_list=['airline1'], futr_exog_list=['trend'], hist_exog_list=['y_[lag12]']),
            NHITS(h=12, input_size=12, max_steps=1, scaler_type='robust'),
            NHITS(h=12, input_size=12, loss=MQLoss(level=[80]), max_steps=1),
            TFT(h=12, input_size=24, max_steps=1, scaler_type='robust'),
            DLinear(h=12, input_size=24, max_steps=1),
            VanillaTransformer(h=12, input_size=12, max_steps=1, scaler_type=None),
            Informer(h=12, input_size=12, max_steps=1, scaler_type=None),
            Autoformer(h=12, input_size=12, max_steps=1, scaler_type=None),
            FEDformer(h=12, input_size=12, max_steps=1, scaler_type=None),
            PatchTST(h=12, input_size=24, max_steps=1, scaler_type=None),
            TimesNet(h=12, input_size=24, max_steps=1, scaler_type='standard'),
            StemGNN(h=12, input_size=12, n_series=2, max_steps=1, scaler_type='robust'),
            TSMixer(h=12, input_size=12, n_series=2, max_steps=1, scaler_type='robust'),
            TSMixerx(h=12, input_size=12, n_series=2, max_steps=1, scaler_type='robust'),
            DeepAR(h=12, input_size=24, max_steps=1,
               stat_exog_list=['airline1'], futr_exog_list=['trend']),
        ],
        freq='M'
    )
    Y_hat_df_cv = fcst.cross_validation(df, static_df=static_df, test_size=test_size, 
                                        n_windows=None)
    for col in ['ds', 'cutoff']:
        Y_hat_df_cv[col] = pd.to_datetime(Y_hat_df_cv[col].astype(str))
        Y_hat_df[col] = pd.to_datetime(Y_hat_df[col].astype(str))
    pd.testing.assert_frame_equal(
        Y_hat_df[Y_hat_df_cv.columns],
        Y_hat_df_cv,
        check_dtype=False,
        atol=1e-5,
    )

test_cross_validation(AirPassengersPanel, AirPassengersStatic, h=12, test_size=12)

# 测试简历,包含一系列不同尺寸
series = pd.DataFrame({
    'unique_id': np.repeat([0, 1], [10, 15]),
    'ds': np.arange(25),
    'y': np.random.rand(25),
})
nf = NeuralForecast(
    freq=1,
    models=[MLP(input_size=5, h=5, max_steps=0, enable_progress_bar=False)]
)
cv_df = nf.cross_validation(df=series, n_windows=3, step_size=5)
expected = pd.DataFrame({
    'unique_id': np.repeat([0, 1], [5, 10]),
    'ds': np.hstack([np.arange(5, 10), np.arange(15, 25)]),
    'cutoff': np.repeat([4, 14, 19], 5)
})
expected = expected.merge(series, on=['unique_id', 'ds'])
pd.testing.assert_frame_equal(expected, cv_df.drop(columns='MLP'))

# 测试保存与加载
config = {'input_size': tune.choice([12, 24]),
          'hidden_size': 256,
          'max_steps': 1,
          'val_check_steps': 1,
          'step_size': 12}

config_drnn = {'input_size': tune.choice([-1]),
               'encoder_hidden_size': tune.choice([5, 10]),
               'max_steps': 1,
               'val_check_steps': 1}

fcst = NeuralForecast(
    models=[
        AutoRNN(h=12, config=config_drnn, cpus=1, num_samples=2, refit_with_val=True),
        DilatedRNN(h=12, input_size=-1, encoder_hidden_size=5, max_steps=1),
        AutoMLP(h=12, config=config, cpus=1, num_samples=2),
        NHITS(h=12, input_size=12, max_steps=1,
              futr_exog_list=['trend'], hist_exog_list=['y_[lag12]'], alias='Model1'),
        StemGNN(h=12, input_size=12, n_series=2, max_steps=1, scaler_type='robust')
    ],
    freq='M'
)
fcst.fit(AirPassengersPanel_train)
forecasts1 = fcst.predict(futr_df=AirPassengersPanel_test)
save_paths = ['./examples/debug_run/']
try:
    s3fs.S3FileSystem().ls('s3://nixtla-tmp')    
    pyver = f'{sys.version_info.major}_{sys.version_info.minor}'
    sha = git.Repo(search_parent_directories=True).head.object.hexsha
    save_dir = f'{sys.platform}-{pyver}-{sha}'
    save_paths.append(f's3://nixtla-tmp/neural/{save_dir}')
except Exception as e:
    print(e)

for path in save_paths:
    fcst.save(path=path, model_index=None, overwrite=True, save_dataset=True)
    fcst2 = NeuralForecast.load(path=path)
    forecasts2 = fcst2.predict(futr_df=AirPassengersPanel_test)
    pd.testing.assert_frame_equal(forecasts1, forecasts2[forecasts1.columns])

# 测试保存和加载(不使用数据集)
shutil.rmtree('examples/debug_run')
fcst = NeuralForecast(
    models=[DilatedRNN(h=12, input_size=-1, encoder_hidden_size=5, max_steps=1)],
    freq='M',
)
fcst.fit(AirPassengersPanel_train)
forecasts1 = fcst.predict(futr_df=AirPassengersPanel_test)
fcst.save(path='./examples/debug_run/', model_index=None, overwrite=True, save_dataset=False)
fcst2 = NeuralForecast.load(path='./examples/debug_run/')
forecasts2 = fcst2.predict(df=AirPassengersPanel_train, futr_df=AirPassengersPanel_test)
np.testing.assert_allclose(forecasts1['DilatedRNN'], forecasts2['DilatedRNN'])

# 测试 `enable_checkpointing=True` 应生成检查点
shutil.rmtree('lightning_logs')
fcst = NeuralForecast(
    models=[
        MLP(h=12, input_size=12, max_steps=10, val_check_steps=5, enable_checkpointing=True),
        RNN(h=12, input_size=-1, max_steps=10, val_check_steps=5, enable_checkpointing=True)
    ],
    freq='M'
)
fcst.fit(AirPassengersPanel_train)
last_log = f"lightning_logs/{os.listdir('lightning_logs')[-1]}"
no_chkpt_found = ~np.any([file.endswith('checkpoints') for file in os.listdir(last_log)])
test_eq(no_chkpt_found, False)

# 测试 `enable_checkpointing=False` 不应生成检查点
shutil.rmtree('lightning_logs')
fcst = NeuralForecast(
    models=[
        MLP(h=12, input_size=12, max_steps=10, val_check_steps=5),
        RNN(h=12, input_size=-1, max_steps=10, val_check_steps=5)
    ],
    freq='M'
)
fcst.fit(AirPassengersPanel_train)
last_log = f"lightning_logs/{os.listdir('lightning_logs')[-1]}"
no_chkpt_found = ~np.any([file.endswith('checkpoints') for file in os.listdir(last_log)])
test_eq(no_chkpt_found, True)

# 测试短时间序列
config = {'input_size': tune.choice([12, 24]), 
          'max_steps': 1,
          'val_check_steps': 1}

fcst = NeuralForecast(
    models=[
        AutoNBEATS(h=12, config=config, cpus=1, num_samples=2)],
    freq='M'
)

AirPassengersShort = AirPassengersPanel.tail(36+144).reset_index(drop=True)
forecasts = fcst.cross_validation(AirPassengersShort, val_size=48, n_windows=1)

# 测试验证尺度 基础窗口

models = [NHITS(h=12, input_size=24, max_steps=50, scaler_type='robust')]
nf = NeuralForecast(models=models, freq='M')
nf.fit(AirPassengersPanel_train,val_size=12)
valid_losses = nf.models[0].valid_trajectories
assert valid_losses[-1][1] < 40, 'Validation loss is too high'
assert valid_losses[-1][1] > 10, 'Validation loss is too low'

models = [NHITS(h=12, input_size=24, max_steps=50, scaler_type=None)]
nf = NeuralForecast(models=models, freq='M')
nf.fit(AirPassengersPanel_train,val_size=12)
valid_losses = nf.models[0].valid_trajectories
assert valid_losses[-1][1] < 40, 'Validation loss is too high'
assert valid_losses[-1][1] > 10, 'Validation loss is too low'

# 测试验证尺度基础递归

nf = NeuralForecast(
    models=[LSTM(h=12,
                 input_size=-1,
                 loss=MAE(),
                 scaler_type='robust',
                 encoder_n_layers=2,
                 encoder_hidden_size=128,
                 context_size=10,
                 decoder_hidden_size=128,
                 decoder_layers=2,
                 max_steps=50,
                 val_check_steps=10,
                 )
    ],
    freq='M'
)
nf.fit(AirPassengersPanel_train,val_size=12)
valid_losses = nf.models[0].valid_trajectories
assert valid_losses[-1][1] < 100, 'Validation loss is too high'
assert valid_losses[-1][1] > 30, 'Validation loss is too low'

# 变量测试顺序不影响验证损失

AirPassengersPanel_train['zeros'] = 0
AirPassengersPanel_train['large_number'] = 100000
AirPassengersPanel_train['available_mask'] = 1
AirPassengersPanel_train = AirPassengersPanel_train[['unique_id','ds','zeros','y','available_mask','large_number']]

models = [NHITS(h=12, input_size=24, max_steps=50, scaler_type='robust')]
nf = NeuralForecast(models=models, freq='M')
nf.fit(AirPassengersPanel_train,val_size=12)
valid_losses = nf.models[0].valid_trajectories
assert valid_losses[-1][1] < 40, 'Validation loss is too high'
assert valid_losses[-1][1] > 10, 'Validation loss is too low'

models = [NHITS(h=12, input_size=24, max_steps=50, scaler_type=None)]
nf = NeuralForecast(models=models, freq='M')
nf.fit(AirPassengersPanel_train,val_size=12)
valid_losses = nf.models[0].valid_trajectories
assert valid_losses[-1][1] < 40, 'Validation loss is too high'
assert valid_losses[-1][1] > 10, 'Validation loss is too low'

# 如果变量不在数据框中,测试拟合失败。

# 基础窗口
models = [NHITS(h=12, input_size=24, max_steps=1, hist_exog_list=['not_included'], scaler_type='robust')]
nf = NeuralForecast(models=models, freq='M')
test_fail(nf.fit,
          contains='historical exogenous variables not found in input dataset',
          args=(AirPassengersPanel_train,))

models = [NHITS(h=12, input_size=24, max_steps=1, futr_exog_list=['not_included'], scaler_type='robust')]
nf = NeuralForecast(models=models, freq='M')
test_fail(nf.fit,
          contains='future exogenous variables not found in input dataset',
          args=(AirPassengersPanel_train,))

models = [NHITS(h=12, input_size=24, max_steps=1, stat_exog_list=['not_included'], scaler_type='robust')]
nf = NeuralForecast(models=models, freq='M')
test_fail(nf.fit,
          contains='static exogenous variables not found in input dataset',
          args=(AirPassengersPanel_train,))

# 基础循环
models = [LSTM(h=12, input_size=24, max_steps=1, hist_exog_list=['not_included'], scaler_type='robust')]
nf = NeuralForecast(models=models, freq='M')
test_fail(nf.fit,
          contains='historical exogenous variables not found in input dataset',
          args=(AirPassengersPanel_train,))

models = [LSTM(h=12, input_size=24, max_steps=1, futr_exog_list=['not_included'], scaler_type='robust')]
nf = NeuralForecast(models=models, freq='M')
test_fail(nf.fit,
          contains='future exogenous variables not found in input dataset',
          args=(AirPassengersPanel_train,))

models = [LSTM(h=12, input_size=24, max_steps=1, stat_exog_list=['not_included'], scaler_type='robust')]
nf = NeuralForecast(models=models, freq='M')
test_fail(nf.fit,
          contains='static exogenous variables not found in input dataset',
          args=(AirPassengersPanel_train,))

# 在数据框中测试传递未使用的变量不会影响预测  

models = [NHITS(h=12, input_size=24, max_steps=5, hist_exog_list=['zeros'], scaler_type='robust')]
nf = NeuralForecast(models=models, freq='M')
nf.fit(AirPassengersPanel_train)

Y_hat1 = nf.predict(df=AirPassengersPanel_train[['unique_id','ds','y','zeros','large_number']])
Y_hat2 = nf.predict(df=AirPassengersPanel_train[['unique_id','ds','y','zeros']])

pd.testing.assert_frame_equal(
    Y_hat1,
    Y_hat2,
    check_dtype=False,
)

models = [LSTM(h=12, input_size=24, max_steps=5, hist_exog_list=['zeros'], scaler_type='robust')]
nf = NeuralForecast(models=models, freq='M')
nf.fit(AirPassengersPanel_train)

Y_hat1 = nf.predict(df=AirPassengersPanel_train[['unique_id','ds','y','zeros','large_number']])
Y_hat2 = nf.predict(df=AirPassengersPanel_train[['unique_id','ds','y','zeros']])

pd.testing.assert_frame_equal(
    Y_hat1,
    Y_hat2,
    check_dtype=False,
)

#| 极地
import polars
from polars.testing import assert_frame_equal

#| 极地
models = [LSTM(h=12, input_size=24, max_steps=5, hist_exog_list=['zeros'], scaler_type='robust')]
nf = NeuralForecast(models=models, freq='M')
nf.fit(AirPassengersPanel_train, static_df=AirPassengersStatic)
insample_preds = nf.predict_insample()
preds = nf.predict()
cv_res = nf.cross_validation(df=AirPassengersPanel_train, static_df=AirPassengersStatic)

renamer = {'unique_id': 'uid', 'ds': 'time', 'y': 'target'}
inverse_renamer = {v: k for k, v in renamer.items()}
AirPassengers_pl = polars.from_pandas(AirPassengersPanel_train)
AirPassengers_pl = AirPassengers_pl.rename(renamer)
AirPassengersStatic_pl = polars.from_pandas(AirPassengersStatic)
AirPassengersStatic_pl = AirPassengersStatic_pl.rename({'unique_id': 'uid'})
nf = NeuralForecast(models=models, freq='1mo')
nf.fit(
    AirPassengers_pl,
    static_df=AirPassengersStatic_pl,
    id_col='uid',
    time_col='time',
    target_col='target',
)
insample_preds_pl = nf.predict_insample()
preds_pl = nf.predict()
cv_res_pl = nf.cross_validation(
    df=AirPassengers_pl,
    static_df=AirPassengersStatic_pl,
    id_col='uid',
    time_col='time',
    target_col='target',
)

def assert_equal_dfs(pandas_df, polars_df):
    mapping = {k: v for k, v in inverse_renamer.items() if k in polars_df}
    pd.testing.assert_frame_equal(
        pandas_df,
        polars_df.rename(mapping).to_pandas(),
    )

assert_equal_dfs(preds, preds_pl)
assert_equal_dfs(insample_preds, insample_preds_pl)
assert_equal_dfs(cv_res, cv_res_pl)

# 测试预测_样本内步骤_大小

h = 12
train_end = AirPassengers_pl['time'].max()
sizes = AirPassengers_pl['uid'].value_counts().to_numpy()

for step_size, test_size in [(7, 0), (9, 0), (7, 5), (9, 5)]:
    models = [NHITS(h=h, input_size=12, max_steps=1)]
    nf = NeuralForecast(models=models, freq='1mo')
    nf.fit(
        AirPassengers_pl,
        id_col='uid',
        time_col='time',
        target_col='target',    
    )
    # 注意:仅在调用 nf.fit() 时应用 set_test_size(),否则将导致 test_size 被设置为 0。
    nf.models[0].set_test_size(test_size)    
    
    forecasts = nf.predict_insample(step_size=step_size)
    n_expected_cutoffs = (sizes[0][1] - test_size - nf.h + step_size) // step_size

    # 比较截断值
    last_cutoff = train_end - test_size * pd.offsets.MonthEnd() - h * pd.offsets.MonthEnd()
    expected_cutoffs = np.flip(np.array([last_cutoff - step_size * i * pd.offsets.MonthEnd() for i in range(n_expected_cutoffs)]))
    pl_cutoffs = forecasts.filter(polars.col('uid') ==nf.uids[1]).select('cutoff').unique(maintain_order=True)
    actual_cutoffs = np.array([pd.Timestamp(x['cutoff']) for x in pl_cutoffs.rows(named=True)])
    np.testing.assert_array_equal(expected_cutoffs, actual_cutoffs, err_msg=f"{step_size=},{expected_cutoffs=},{actual_cutoffs=}")

    # 检查每组预测点的数量
    cutoffs_by_series = forecasts.group_by(['uid', 'cutoff']).count()
    assert_frame_equal(cutoffs_by_series.filter(polars.col('uid') == "Airline1").select(['cutoff', 'count']), cutoffs_by_series.filter(polars.col('uid') == "Airline2").select(['cutoff', 'count'] ), check_row_order=False)

# 在可用掩码为1的情况下,测试输入是否包含NaN,拟合时应引发错误
# 输入类型为 pandas.DataFrame
# available_mask 被明确指定

n_static_features = 2
n_temporal_features = 4
temporal_df, static_df = generate_series(n_series=4,
                                         min_length=50,
                                         max_length=50,
                                         n_static_features=n_static_features,
                                         n_temporal_features=n_temporal_features, 
                                         equal_ends=False) 
temporal_df["available_mask"] = 1
temporal_df.loc[10:20, "available_mask"] = 0
models = [NHITS(h=12, input_size=24, max_steps=20)]
nf = NeuralForecast(models=models, freq='D')

# 测试用例1:目标包含NaN值
test_df1 = temporal_df.copy()
test_df1.loc[5:7, "y"] = np.nan
test_fail(lambda: nf.fit(test_df1), contains="Found missing values in ['y']")

# 测试用例2:外生变量包含的NaN值被正确标记为异常
test_df2 = temporal_df.copy()
# temporal_0 won't raise ValueError as available_mask = 0
test_df2.loc[15:18, "temporal_0"] = np.nan
test_df2.loc[5, "temporal_1"] = np.nan
test_df2.loc[25, "temporal_2"] = np.nan
test_fail(lambda: nf.fit(test_df2), contains="Found missing values in ['temporal_1', 'temporal_2']")

# test case 3: static column has NaN values
test_df3 = static_df.copy()
test_df3.loc[3, "static_1"] = np.nan
test_fail(lambda: nf.fit(temporal_df, static_df=test_df3), contains="Found missing values in ['static_1']")

#| 极地
# 使用 `available_mask = 1` 测试输入中是否包含 NaN,若存在,拟合过程应引发错误。
# 输入类型为polars.Dataframe
# 请注意,此测试中并未明确提供 available_mask。

pl_df = polars.DataFrame(
    {
        'unique_id': [1]*50,
        'y': list(range(50)), 
        'temporal_0': list(range(100,150)),
        'temporal_1': list(range(200,250)),
        'ds': polars.date_range(start=date(2022, 1, 1), end=date(2022, 2, 19), interval="1d", eager=True), 
    }
)

pl_static_df = polars.DataFrame(
    {
        'unique_id': [1],
        'static_0': [1.2], 
        'static_1': [10.9],
    }
)

models = [NHITS(h=12, input_size=24, max_steps=20)]
nf = NeuralForecast(models=models, freq='1d')

# 测试用例1:目标包含NaN值
test_pl_df1 = pl_df.clone()
test_pl_df1[3, 'y'] = np.nan
test_pl_df1[4, 'y'] = None
test_fail(lambda: nf.fit(test_pl_df1), contains="Found missing values in ['y']")

# 测试用例2:外生变量包含NaN值,并正确地通过异常标记
test_pl_df2 = pl_df.clone()
test_pl_df2[15, "temporal_0"] = np.nan
test_pl_df2[5, "temporal_1"] = np.nan
test_fail(lambda: nf.fit(test_pl_df2), contains="Found missing values in ['temporal_0', 'temporal_1']")

# 测试用例3:静态列包含NaN值
test_pl_df3 = pl_static_df.clone()
test_pl_df3[0, "static_1"] = np.nan
test_fail(lambda: nf.fit(pl_df, static_df=test_pl_df3), contains="Found missing values in ['static_1']")

# 测试自定义优化器的行为,以确保用户定义的优化器结果与默认结果不同。
# 测试考虑了使用不同基类(如BaseWindows、BaseRecurrent、BaseMultivariate)实现的模型。

for nf_model in [NHITS, RNN, StemGNN]:
    # 默认优化器基于Adam
    params = {"h": 12, "input_size": 24, "max_steps": 1}
    if nf_model.__name__ == "StemGNN":
        params.update({"n_series": 2})
    models = [nf_model(**params)]
    nf = NeuralForecast(models=models, freq='M')
    nf.fit(AirPassengersPanel_train)
    default_optimizer_predict = nf.predict()
    mean = default_optimizer_predict.loc[:, nf_model.__name__].mean()

    # 使用自定义优化器
    params.update({
        "optimizer": torch.optim.Adadelta,
        "optimizer_kwargs": {"rho": 0.45}, 
    })
    models2 = [nf_model(**params)]
    nf2 = NeuralForecast(models=models2, freq='M')
    nf2.fit(AirPassengersPanel_train)
    customized_optimizer_predict = nf2.predict()
    mean2 = customized_optimizer_predict.loc[:, nf_model.__name__].mean()
    assert mean2 != mean

# 测试如果用户定义的优化器不是torch.optim.optimizer的子类,失败并抛出异常
# 测试涵盖了不同类型的基类,如BaseWindows、BaseRecurrent和BaseMultivariate。
test_fail(lambda: NHITS(h=12, input_size=24, max_steps=10, optimizer=torch.nn.Module), contains="optimizer is not a valid subclass of torch.optim.Optimizer")
test_fail(lambda: RNN(h=12, input_size=24, max_steps=10, optimizer=torch.nn.Module), contains="optimizer is not a valid subclass of torch.optim.Optimizer")
test_fail(lambda: StemGNN(h=12, input_size=24, max_steps=10, n_series=2, optimizer=torch.nn.Module), contains="optimizer is not a valid subclass of torch.optim.Optimizer")

# test that if we pass "lr" parameter, we expect warning and it ignores the passed in 'lr' parameter
# 测试考虑了使用不同基类(如BaseWindows、BaseRecurrent、BaseMultivariate)实现的模型。

for nf_model in [NHITS, RNN, StemGNN]:
    params = {
        "h": 12, 
        "input_size": 24, 
        "max_steps": 1, 
        "optimizer": torch.optim.Adadelta, 
        "optimizer_kwargs": {"lr": 0.8, "rho": 0.45}
    }
    if nf_model.__name__ == "StemGNN":
        params.update({"n_series": 2})
    models = [nf_model(**params)]
    nf = NeuralForecast(models=models, freq='M')
    with warnings.catch_warnings(record=True) as issued_warnings:
        warnings.simplefilter('always', UserWarning)
        nf.fit(AirPassengersPanel_train)
        assert any("ignoring learning rate passed in optimizer_kwargs, using the model's learning rate" in str(w.message) for w in issued_warnings)

# test that if we pass "optimizer_kwargs" but not "optimizer", we expect a warning
# 测试考虑了使用不同基类(如BaseWindows、BaseRecurrent、BaseMultivariate)实现的模型。

for nf_model in [NHITS, RNN, StemGNN]:
    params = {
        "h": 12, 
        "input_size": 24, 
        "max_steps": 1,
        "optimizer_kwargs": {"lr": 0.8, "rho": 0.45}
    }
    if nf_model.__name__ == "StemGNN":
        params.update({"n_series": 2})
    models = [nf_model(**params)]
    nf = NeuralForecast(models=models, freq='M')
    with warnings.catch_warnings(record=True) as issued_warnings:
        warnings.simplefilter('always', UserWarning)
        nf.fit(AirPassengersPanel_train)
        assert any("ignoring optimizer_kwargs as the optimizer is not specified" in str(w.message) for w in issued_warnings)

# 测试自定义学习率调度器的行为,以确保用户定义的学习率调度器结果与默认结果不同。
# 测试考虑了使用不同基类(如BaseWindows、BaseRecurrent、BaseMultivariate)实现的模型。

for nf_model in [NHITS, RNN, StemGNN]:
    params = {"h": 12, "input_size": 24, "max_steps": 1}
    if nf_model.__name__ == "StemGNN":
        params.update({"n_series": 2})
    models = [nf_model(**params)]
    nf = NeuralForecast(models=models, freq='M')
    nf.fit(AirPassengersPanel_train)
    default_optimizer_predict = nf.predict()
    mean = default_optimizer_predict.loc[:, nf_model.__name__].mean()

    # 使用自定义的lr_scheduler,默认是StepLR
    params.update({
        "lr_scheduler": torch.optim.lr_scheduler.ConstantLR,
        "lr_scheduler_kwargs": {"factor": 0.78}, 
    })
    models2 = [nf_model(**params)]
    nf2 = NeuralForecast(models=models2, freq='M')
    nf2.fit(AirPassengersPanel_train)
    customized_optimizer_predict = nf2.predict()
    mean2 = customized_optimizer_predict.loc[:, nf_model.__name__].mean()
    assert mean2 != mean

# 测试用户自定义的学习率调度器(lr_scheduler)是否不是torch.optim.lr_scheduler的子类时,失败并抛出异常。
# 测试涵盖了不同类型的基类,如BaseWindows、BaseRecurrent和BaseMultivariate。
test_fail(lambda: NHITS(h=12, input_size=24, max_steps=10, lr_scheduler=torch.nn.Module), contains="lr_scheduler is not a valid subclass of torch.optim.lr_scheduler.LRScheduler")
test_fail(lambda: RNN(h=12, input_size=24, max_steps=10, lr_scheduler=torch.nn.Module), contains="lr_scheduler is not a valid subclass of torch.optim.lr_scheduler.LRScheduler")
test_fail(lambda: StemGNN(h=12, input_size=24, max_steps=10, n_series=2, lr_scheduler=torch.nn.Module), contains="lr_scheduler is not a valid subclass of torch.optim.lr_scheduler.LRScheduler")

# test that if we pass in "optimizer" parameter, we expect warning and it ignores them
# 测试考虑了使用不同基类(如BaseWindows、BaseRecurrent、BaseMultivariate)实现的模型。

for nf_model in [NHITS, RNN, StemGNN]:
    params = {
        "h": 12, 
        "input_size": 24, 
        "max_steps": 1, 
        "lr_scheduler": torch.optim.lr_scheduler.ConstantLR, 
        "lr_scheduler_kwargs": {"optimizer": torch.optim.Adadelta, "factor": 0.22}
    }
    if nf_model.__name__ == "StemGNN":
        params.update({"n_series": 2})
    models = [nf_model(**params)]
    nf = NeuralForecast(models=models, freq='M')
    with warnings.catch_warnings(record=True) as issued_warnings:
        warnings.simplefilter('always', UserWarning)
        nf.fit(AirPassengersPanel_train)
        assert any("ignoring optimizer passed in lr_scheduler_kwargs, using the model's optimizer" in str(w.message) for w in issued_warnings)

# test that if we pass in "lr_scheduler_kwargs" but not "lr_scheduler", we expect a warning
# 测试考虑了使用不同基类(如BaseWindows、BaseRecurrent、BaseMultivariate)实现的模型。

for nf_model in [NHITS, RNN, StemGNN]:
    params = {
        "h": 12, 
        "input_size": 24, 
        "max_steps": 1,
        "lr_scheduler_kwargs": {"optimizer": torch.optim.Adadelta, "factor": 0.22}
    }
    if nf_model.__name__ == "StemGNN":
        params.update({"n_series": 2})
    models = [nf_model(**params)]
    nf = NeuralForecast(models=models, freq='M')
    with warnings.catch_warnings(record=True) as issued_warnings:
        warnings.simplefilter('always', UserWarning)
        nf.fit(AirPassengersPanel_train)
        assert any("ignoring lr_scheduler_kwargs as the lr_scheduler is not specified" in str(w.message) for w in issued_warnings)

Give us a ⭐ on Github