基础递归网络

%load_ext autoreload
%autoreload 2

BaseRecurrent 类包含在递归神经网络中共享的标准方法;这些模型能够通过它们的内部记忆状态处理可变长度的输入序列。该类由 LSTMGRURNN 以及其他更复杂的架构如 MQCNN 组成。

标准方法包括 TemporalNorm 预处理、优化工具如参数初始化、training_stepvalidation_step 和共享的 fitpredict 方法。这些共享的方法使得所有 neuralforecast.modelscore.NeuralForecast 封装类的兼容性得以实现。

from fastcore.test import test_eq
from nbdev.showdoc import show_doc
import numpy as np
import torch
import torch.nn as nn
import pytorch_lightning as pl
import neuralforecast.losses.pytorch as losses

from neuralforecast.common._base_model import BaseModel
from neuralforecast.common._scalers import TemporalNorm
from neuralforecast.tsdataset import TimeSeriesDataModule
from neuralforecast.utils import get_indexer_raise_missing
class BaseRecurrent(BaseModel):
    """ Base Recurrent
    
    Base class for all recurrent-based models. The forecasts are produced sequentially between 
    windows.
    
    This class implements the basic functionality for all windows-based models, including:
    - PyTorch Lightning's methods training_step, validation_step, predict_step. <br>
    - fit and predict methods used by NeuralForecast.core class. <br>
    - sampling and wrangling methods to sequential windows. <br>
    """
    def __init__(self,
                 h,
                 input_size,
                 inference_input_size,
                 loss,
                 valid_loss,
                 learning_rate,
                 max_steps,
                 val_check_steps,
                 batch_size,
                 valid_batch_size,
                 scaler_type='robust',
                 num_lr_decays=0,
                 early_stop_patience_steps=-1,
                 futr_exog_list=None,
                 hist_exog_list=None,
                 stat_exog_list=None,
                 num_workers_loader=0,
                 drop_last_loader=False,
                 random_seed=1, 
                 alias=None,
                 optimizer=None,
                 optimizer_kwargs=None,
                 lr_scheduler=None,
                 lr_scheduler_kwargs=None,
                 **trainer_kwargs):
        super().__init__(
            random_seed=random_seed,
            loss=loss,
            valid_loss=valid_loss,
            optimizer=optimizer,
            optimizer_kwargs=optimizer_kwargs,
            lr_scheduler=lr_scheduler,
            lr_scheduler_kwargs=lr_scheduler_kwargs,
            futr_exog_list=futr_exog_list,
            hist_exog_list=hist_exog_list,
            stat_exog_list=stat_exog_list,
            max_steps=max_steps,
            early_stop_patience_steps=early_stop_patience_steps,            
            **trainer_kwargs,
        )

        # Padder to complete train windows, 
        # example y=[1,2,3,4,5] h=3 -> last y_output = [5,0,0]
        self.h = h
        self.input_size = input_size
        self.inference_input_size = inference_input_size
        self.padder = nn.ConstantPad1d(padding=(0, self.h), value=0)

        unsupported_distributions = ['Bernoulli', 'ISQF']
        if isinstance(self.loss, losses.DistributionLoss) and\
            self.loss.distribution in unsupported_distributions:
                raise Exception(f'Distribution {self.loss.distribution} not available for Recurrent-based models. Please choose another distribution.')

        # 有效的批量大小
        self.batch_size = batch_size
        if valid_batch_size is None:
            self.valid_batch_size = batch_size
        else:
            self.valid_batch_size = valid_batch_size

        # 优化
        self.learning_rate = learning_rate
        self.max_steps = max_steps
        self.num_lr_decays = num_lr_decays
        self.lr_decay_steps = max(max_steps // self.num_lr_decays, 1) if self.num_lr_decays > 0 else 10e7
        self.early_stop_patience_steps = early_stop_patience_steps
        self.val_check_steps = val_check_steps

        # 标量
        self.scaler = TemporalNorm(
            scaler_type=scaler_type,
            dim=-1,  # 时间维度是 -1。
            num_features=1+len(self.hist_exog_list)+len(self.futr_exog_list)
        )

        # 适合的论点
        self.val_size = 0
        self.test_size = 0

        # DataModule参数
        self.num_workers_loader = num_workers_loader
        self.drop_last_loader = drop_last_loader
        # 用于on_validation_epoch_end钩子
        self.validation_step_outputs = []
        self.alias = alias

    def _normalization(self, batch, val_size=0, test_size=0):
        temporal = batch['temporal'] # B、C、T
        temporal_cols = batch['temporal_cols'].copy()
        y_idx = batch['y_idx']

        # 分离数据与掩码
        temporal_data_cols = self._get_temporal_exogenous_cols(temporal_cols=temporal_cols)
        temporal_idxs = get_indexer_raise_missing(temporal_cols, temporal_data_cols)
        temporal_idxs = np.append(y_idx, temporal_idxs)
        temporal_data = temporal[:, temporal_idxs, :]
        temporal_mask = temporal[:, temporal_cols.get_loc('available_mask'), :].clone()

        # 移除验证集和测试集以防止数据泄露
        if val_size + test_size > 0:
            cutoff = val_size + test_size
            temporal_mask[:, -cutoff:] = 0

        # 标准化。self.scaler 存储了逆变换所需的偏移量和缩放量。
        temporal_mask = temporal_mask.unsqueeze(1) # 为scaler.transform添加通道维度。
        temporal_data = self.scaler.transform(x=temporal_data, mask=temporal_mask)

        # 替换windows字典中的值
        temporal[:, temporal_idxs, :] = temporal_data
        batch['temporal'] = temporal

        return batch

    def _inv_normalization(self, y_hat, temporal_cols, y_idx):
        # 接收窗口预测 [B, seq_len, H, 输出]
        # 广播输出并反转归一化

        # Get 'y' scale and shift, and add W dimension
        y_loc = self.scaler.x_shift[:, [y_idx], 0].flatten() #[B,C,T] -> [B]        
        y_scale = self.scaler.x_scale[:, [y_idx], 0].flatten() #[B,C,T] -> [B]

        # 扩展规模并转换为y_hat维度
        y_loc = y_loc.view(*y_loc.shape, *(1,)*(y_hat.ndim-1))#.expand(y_hat)        
        y_scale = y_scale.view(*y_scale.shape, *(1,)*(y_hat.ndim-1))#.expand(y_hat)

        y_hat = self.scaler.inverse_transform(z=y_hat, x_scale=y_scale, x_shift=y_loc)

        return y_hat, y_loc, y_scale

    def _create_windows(self, batch, step):
        temporal = batch['temporal']
        temporal_cols = batch['temporal_cols']

        if step == 'train':
            if self.val_size + self.test_size > 0:
                cutoff = -self.val_size - self.test_size
                temporal = temporal[:, :, :cutoff]
            temporal = self.padder(temporal)

            # 将批次截断为较短的时间序列 
            av_condition = torch.nonzero(torch.min(temporal[:, temporal_cols.get_loc('available_mask')], axis=0).values)
            min_time_stamp = int(av_condition.min())
            
            available_ts = temporal.shape[-1] - min_time_stamp
            if available_ts < 1 + self.h:
                raise Exception(
                    'Time series too short for given input and output size. \n'
                    f'Available timestamps: {available_ts}'
                )

            temporal = temporal[:, :, min_time_stamp:]

        if step == 'val':
            if self.test_size > 0:
                temporal = temporal[:, :, :-self.test_size]
            temporal = self.padder(temporal)

        if step == 'predict':
            if (self.test_size == 0) and (len(self.futr_exog_list)==0):
                temporal = self.padder(temporal)

            # 测试集大小涵盖所有数据,左侧用零填充一个时间步长。
            if temporal.shape[-1] == self.test_size:
                padder_left = nn.ConstantPad1d(padding=(1, 0), value=0)
                temporal = padder_left(temporal)

        # 解析批次
        window_size = 1 + self.h # 1 表示当前的 t,h 表示未来的
        windows = temporal.unfold(dimension=-1,
                                  size=window_size,
                                  step=1)

        # 截断反向传播/推理(缩短RNN展开的序列)
        n_windows = windows.shape[2]
        input_size = -1
        if (step == 'train') and (self.input_size>0):
            input_size = self.input_size
            if (input_size > 0) and (n_windows > input_size):
                max_sampleable_time = n_windows-self.input_size+1
                start = np.random.choice(max_sampleable_time)
                windows = windows[:, :, start:(start+input_size), :]

        if (step == 'val') and (self.inference_input_size>0):
            cutoff = self.inference_input_size + self.val_size
            windows = windows[:, :, -cutoff:, :]

        if (step == 'predict') and (self.inference_input_size>0):
            cutoff = self.inference_input_size + self.test_size
            windows = windows[:, :, -cutoff:, :]
        
        # [B, C, 输入尺寸, 1+H]
        windows_batch = dict(temporal=windows,
                             temporal_cols=temporal_cols,
                             static=batch.get('static', None),
                             static_cols=batch.get('static_cols', None))

        return windows_batch

    def _parse_windows(self, batch, windows):
        # [B, C, seq_len, 1+H]
        # 从样本外预测期中过滤掉样本内滞后期
        mask_idx = batch['temporal_cols'].get_loc('available_mask')
        y_idx = batch['y_idx']        
        insample_y = windows['temporal'][:, y_idx, :, :-self.h]
        insample_mask = windows['temporal'][:, mask_idx, :, :-self.h]
        outsample_y = windows['temporal'][:, y_idx, :, -self.h:].contiguous()
        outsample_mask = windows['temporal'][:, mask_idx, :, -self.h:].contiguous()

        # 过滤历史外生变量
        if len(self.hist_exog_list):
            hist_exog_idx = get_indexer_raise_missing(windows['temporal_cols'], self.hist_exog_list)
            hist_exog = windows['temporal'][:, hist_exog_idx, :, :-self.h]
        else:
            hist_exog = None
        
        # 过滤未来的外生变量
        if len(self.futr_exog_list):
            futr_exog_idx = get_indexer_raise_missing(windows['temporal_cols'], self.futr_exog_list)
            futr_exog = windows['temporal'][:, futr_exog_idx, :, :]
        else:
            futr_exog = None
        # 过滤静态变量
        if len(self.stat_exog_list):
            static_idx = get_indexer_raise_missing(windows['static_cols'], self.stat_exog_list)
            stat_exog = windows['static'][:, static_idx]
        else:
            stat_exog = None

        return insample_y, insample_mask, outsample_y, outsample_mask, \
               hist_exog, futr_exog, stat_exog

    def training_step(self, batch, batch_idx):
        # 创建并标准化窗口 [Ws, L+H, C]
        batch = self._normalization(batch, val_size=self.val_size, test_size=self.test_size)
        windows = self._create_windows(batch, step='train')

        # 解析窗口
        insample_y, insample_mask, outsample_y, outsample_mask, \
               hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)

        windows_batch = dict(insample_y=insample_y, # [B, 序列长度, 1]
                             insample_mask=insample_mask, # [B, 序列长度, 1]
                             futr_exog=futr_exog, # [B, F, seq_len, 1+H]
                             hist_exog=hist_exog, # [B, C, seq_len]
                             stat_exog=stat_exog) # [B, S]

        # 模型预测
        output = self(windows_batch) # tuple([B, seq_len, H, output])
        if self.loss.is_distribution_output:
            outsample_y, y_loc, y_scale = self._inv_normalization(y_hat=outsample_y,
                                            temporal_cols=batch['temporal_cols'],
                                            y_idx=batch['y_idx'])
            B = output[0].size()[0]
            T = output[0].size()[1]
            H = output[0].size()[2]
            output = [arg.view(-1, *(arg.size()[2:])) for arg in output]
            outsample_y = outsample_y.view(B*T,H)
            outsample_mask = outsample_mask.view(B*T,H)
            y_loc = y_loc.repeat_interleave(repeats=T, dim=0).squeeze(-1)
            y_scale = y_scale.repeat_interleave(repeats=T, dim=0).squeeze(-1)
            distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)
            loss = self.loss(y=outsample_y, distr_args=distr_args, mask=outsample_mask)
        else:
            loss = self.loss(y=outsample_y, y_hat=output, mask=outsample_mask)

        if torch.isnan(loss):
            print('Model Parameters', self.hparams)
            print('insample_y', torch.isnan(insample_y).sum())
            print('outsample_y', torch.isnan(outsample_y).sum())
            print('output', torch.isnan(output).sum())
            raise Exception('Loss is NaN, training stopped.')

        self.log(
            'train_loss',
            loss.item(),
            batch_size=outsample_y.size(0),
            prog_bar=True,
            on_epoch=True,
        )
        self.train_trajectories.append((self.global_step, loss.item()))
        return loss

    def validation_step(self, batch, batch_idx):
        if self.val_size == 0:
            return np.nan

        # 创建并标准化窗口 [Ws, L+H, C]
        batch = self._normalization(batch, val_size=self.val_size, test_size=self.test_size)
        windows = self._create_windows(batch, step='val')
        y_idx = batch['y_idx']

        # 解析窗口
        insample_y, insample_mask, outsample_y, outsample_mask, \
               hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)

        windows_batch = dict(insample_y=insample_y, # [B, 序列长度, 1]
                             insample_mask=insample_mask, # [B, 序列长度, 1]
                             futr_exog=futr_exog, # [B, F, seq_len, 1+H]
                             hist_exog=hist_exog, # [B, C, seq_len]
                             stat_exog=stat_exog) # [B, S]

        # 移除训练集中的y_hat(对于用零填充的最后一个窗口,分别标记为+1和-1)
        # tuple([B, seq_len, H, output]) -> tuple([B, validation_size, H, output])
        val_windows = (self.val_size) + 1
        outsample_y = outsample_y[:, -val_windows:-1, :]
        outsample_mask = outsample_mask[:, -val_windows:-1, :]        

        # 模型预测
        output = self(windows_batch) # tuple([B, seq_len, H, output])
        if self.loss.is_distribution_output:
            output = [arg[:, -val_windows:-1] for arg in output]
            outsample_y, y_loc, y_scale = self._inv_normalization(y_hat=outsample_y,
                                            temporal_cols=batch['temporal_cols'],
                                            y_idx=y_idx)
            B = output[0].size()[0]
            T = output[0].size()[1]
            H = output[0].size()[2]
            output = [arg.reshape(-1, *(arg.size()[2:])) for arg in output]
            outsample_y = outsample_y.reshape(B*T,H)
            outsample_mask = outsample_mask.reshape(B*T,H)
            y_loc = y_loc.repeat_interleave(repeats=T, dim=0).squeeze(-1)
            y_scale = y_scale.repeat_interleave(repeats=T, dim=0).squeeze(-1)
            distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)
            _, sample_mean, quants  = self.loss.sample(distr_args=distr_args)

            if str(type(self.valid_loss)) in\
                ["<class 'neuralforecast.losses.pytorch.sCRPS'>", "<class 'neuralforecast.losses.pytorch.MQLoss'>"]:
                output = quants
            elif str(type(self.valid_loss)) in ["<class 'neuralforecast.losses.pytorch.relMSE'>"]:
                output = torch.unsqueeze(sample_mean, dim=-1) # [N,H,1] -> [N,H]
            
        else:
            output = output[:, -val_windows:-1, :]

        # 验证损失评估
        if self.valid_loss.is_distribution_output:
            valid_loss = self.valid_loss(y=outsample_y, distr_args=distr_args, mask=outsample_mask)
        else:
            outsample_y, _, _ = self._inv_normalization(y_hat=outsample_y, temporal_cols=batch['temporal_cols'], y_idx=y_idx)
            output, _, _      = self._inv_normalization(y_hat=output, temporal_cols=batch['temporal_cols'], y_idx=y_idx)
            valid_loss = self.valid_loss(y=outsample_y, y_hat=output, mask=outsample_mask)

        if torch.isnan(valid_loss):
            raise Exception('Loss is NaN, training stopped.')

        self.log(
            'valid_loss',
            valid_loss.item(),
            batch_size=outsample_y.size(0),
            prog_bar=True,
            on_epoch=True,
        )
        self.validation_step_outputs.append(valid_loss)
        return valid_loss

    def predict_step(self, batch, batch_idx):
        # 创建并标准化窗口 [Ws, L+H, C]
        batch = self._normalization(batch, val_size=0, test_size=self.test_size)
        windows = self._create_windows(batch, step='predict')
        y_idx = batch['y_idx']

        # 解析窗口
        insample_y, insample_mask, _, _, \
               hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)

        windows_batch = dict(insample_y=insample_y, # [B, 序列长度, 1]
                             insample_mask=insample_mask, # [B, 序列长度, 1]
                             futr_exog=futr_exog, # [B, F, seq_len, 1+H]
                             hist_exog=hist_exog, # [B, C, seq_len]
                             stat_exog=stat_exog) # [B, S]

        # 模型预测
        output = self(windows_batch) # 元组([B, seq_len, H], ...)
        if self.loss.is_distribution_output:
            _, y_loc, y_scale = self._inv_normalization(y_hat=output[0],
                                            temporal_cols=batch['temporal_cols'],
                                            y_idx=y_idx)
            B = output[0].size()[0]
            T = output[0].size()[1]
            H = output[0].size()[2]
            output = [arg.reshape(-1, *(arg.size()[2:])) for arg in output]
            y_loc = y_loc.repeat_interleave(repeats=T, dim=0).squeeze(-1)
            y_scale = y_scale.repeat_interleave(repeats=T, dim=0).squeeze(-1)
            distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)
            _, sample_mean, quants = self.loss.sample(distr_args=distr_args)
            y_hat = torch.concat((sample_mean, quants), axis=2)
            y_hat = y_hat.view(B, T, H, -1)

            if self.loss.return_params:
                distr_args = torch.stack(distr_args, dim=-1)
                distr_args = torch.reshape(distr_args, (B, T, H, -1))
                y_hat = torch.concat((y_hat, distr_args), axis=3)
        else:
            y_hat, _, _ = self._inv_normalization(y_hat=output,
                                            temporal_cols=batch['temporal_cols'],
                                            y_idx=y_idx)
        return y_hat

    def fit(self, dataset, val_size=0, test_size=0, random_seed=None, distributed_config=None):
        """ Fit.

        The `fit` method, optimizes the neural network's weights using the
        initialization parameters (`learning_rate`, `batch_size`, ...)
        and the `loss` function as defined during the initialization. 
        Within `fit` we use a PyTorch Lightning `Trainer` that
        inherits the initialization's `self.trainer_kwargs`, to customize
        its inputs, see [PL's trainer arguments](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.trainer.trainer.Trainer.html?highlight=trainer).

        The method is designed to be compatible with SKLearn-like classes
        and in particular to be compatible with the StatsForecast library.

        By default the `model` is not saving training checkpoints to protect 
        disk memory, to get them change `enable_checkpointing=True` in `__init__`.        

        **Parameters:**<br>
        `dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation](https://nixtla.github.io/neuralforecast/tsdataset.html).<br>
        `val_size`: int, validation size for temporal cross-validation.<br>
        `test_size`: int, test size for temporal cross-validation.<br>
        `random_seed`: int=None, random_seed for pytorch initializer and numpy generators, overwrites model.__init__'s.<br>
        """
        return self._fit(
            dataset=dataset,
            batch_size=self.batch_size,
            valid_batch_size=self.valid_batch_size,
            val_size=val_size,
            test_size=test_size,
            random_seed=random_seed,
            distributed_config=distributed_config,
        )

    def predict(self, dataset, step_size=1,
                random_seed=None, **data_module_kwargs):
        """ Predict.

        Neural network prediction with PL's `Trainer` execution of `predict_step`.

        **Parameters:**<br>
        `dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation](https://nixtla.github.io/neuralforecast/tsdataset.html).<br>
        `step_size`: int=1, Step size between each window.<br>
        `random_seed`: int=None, random_seed for pytorch initializer and numpy generators, overwrites model.__init__'s.<br>
        `**data_module_kwargs`: PL's TimeSeriesDataModule args, see [documentation](https://pytorch-lightning.readthedocs.io/en/1.6.1/extensions/datamodules.html#using-a-datamodule).
        """
        self._check_exog(dataset)
        self._restart_seed(random_seed)
        data_module_kwargs = self._set_quantile_for_iqloss(**data_module_kwargs)

        if step_size > 1:
            raise Exception('Recurrent models do not support step_size > 1')

        # fcsts (window, batch, h)
        # Protect when case of multiple gpu. PL does not support return preds with multiple gpu.
        pred_trainer_kwargs = self.trainer_kwargs.copy()
        if (pred_trainer_kwargs.get('accelerator', None) == "gpu") and (torch.cuda.device_count() > 1):
            pred_trainer_kwargs['devices'] = [0]

        trainer = pl.Trainer(**pred_trainer_kwargs)

        datamodule = TimeSeriesDataModule(
            dataset=dataset,
            valid_batch_size=self.valid_batch_size,
            num_workers=self.num_workers_loader,
            **data_module_kwargs
        )
        fcsts = trainer.predict(self, datamodule=datamodule)
        if self.test_size > 0:
            # 移除预热窗口(从训练和验证中)
            # [N, T, H, 输出],避免对单变量输出兼容性进行最后一维索引
            fcsts = torch.vstack([fcst[:, -(1+self.test_size-self.h):,:] for fcst in fcsts])
            fcsts = fcsts.numpy().flatten()
            fcsts = fcsts.reshape(-1, len(self.loss.output_names))
        else:
            fcsts = torch.vstack([fcst[:,-1:,:] for fcst in fcsts]).numpy().flatten()
            fcsts = fcsts.reshape(-1, len(self.loss.output_names))
        return fcsts
show_doc(BaseRecurrent, title_level=3)
show_doc(BaseRecurrent.fit, title_level=3)
show_doc(BaseRecurrent.predict, title_level=3)
from neuralforecast.losses.pytorch import MAE
from neuralforecast.utils import AirPassengersDF
from neuralforecast.tsdataset import TimeSeriesDataset, TimeSeriesDataModule
# 为 _parse_windows 添加 h=0,1 的单元测试 
# 声明批次
AirPassengersDF['x'] = np.array(len(AirPassengersDF))
AirPassengersDF['x2'] = np.array(len(AirPassengersDF)) * 2
dataset, indices, dates, ds = TimeSeriesDataset.from_df(df=AirPassengersDF)
data = TimeSeriesDataModule(dataset=dataset, batch_size=1, drop_last=True)

train_loader =  data.train_dataloader()
batch = next(iter(train_loader))

# 测试 hist_exog_list 和 futr_exog_list 是否正确过滤了发送给缩放器的数据。
baserecurrent = BaseRecurrent(h=12,
                              input_size=117,
                              hist_exog_list=['x', 'x2'],
                              futr_exog_list=['x'],
                              loss=MAE(),
                              valid_loss=MAE(),
                              learning_rate=0.001,
                              max_steps=1,
                              val_check_steps=0,
                              batch_size=1,
                              valid_batch_size=1,
                              windows_batch_size=10,
                              inference_input_size=2,
                              start_padding_enabled=True)

windows = baserecurrent._create_windows(batch, step='train')

temporal_cols = windows['temporal_cols'].copy() # B,L+H,C
temporal_data_cols = baserecurrent._get_temporal_exogenous_cols(temporal_cols=temporal_cols)

test_eq(set(temporal_data_cols), set(['x', 'x2']))
test_eq(windows['temporal'].shape, torch.Size([1,len(['y', 'x', 'x2', 'available_mask']),117,12+1]))

Give us a ⭐ on Github