%load_ext autoreload
%autoreload 2基础递归网络
BaseRecurrent类包含在递归神经网络中共享的标准方法;这些模型能够通过它们的内部记忆状态处理可变长度的输入序列。该类由LSTM、GRU和RNN以及其他更复杂的架构如MQCNN组成。
标准方法包括 TemporalNorm 预处理、优化工具如参数初始化、training_step、validation_step 和共享的 fit 和 predict 方法。这些共享的方法使得所有 neuralforecast.models 与 core.NeuralForecast 封装类的兼容性得以实现。
from fastcore.test import test_eq
from nbdev.showdoc import show_docimport numpy as np
import torch
import torch.nn as nn
import pytorch_lightning as pl
import neuralforecast.losses.pytorch as losses
from neuralforecast.common._base_model import BaseModel
from neuralforecast.common._scalers import TemporalNorm
from neuralforecast.tsdataset import TimeSeriesDataModule
from neuralforecast.utils import get_indexer_raise_missingclass BaseRecurrent(BaseModel):
""" Base Recurrent
Base class for all recurrent-based models. The forecasts are produced sequentially between
windows.
This class implements the basic functionality for all windows-based models, including:
- PyTorch Lightning's methods training_step, validation_step, predict_step. <br>
- fit and predict methods used by NeuralForecast.core class. <br>
- sampling and wrangling methods to sequential windows. <br>
"""
def __init__(self,
h,
input_size,
inference_input_size,
loss,
valid_loss,
learning_rate,
max_steps,
val_check_steps,
batch_size,
valid_batch_size,
scaler_type='robust',
num_lr_decays=0,
early_stop_patience_steps=-1,
futr_exog_list=None,
hist_exog_list=None,
stat_exog_list=None,
num_workers_loader=0,
drop_last_loader=False,
random_seed=1,
alias=None,
optimizer=None,
optimizer_kwargs=None,
lr_scheduler=None,
lr_scheduler_kwargs=None,
**trainer_kwargs):
super().__init__(
random_seed=random_seed,
loss=loss,
valid_loss=valid_loss,
optimizer=optimizer,
optimizer_kwargs=optimizer_kwargs,
lr_scheduler=lr_scheduler,
lr_scheduler_kwargs=lr_scheduler_kwargs,
futr_exog_list=futr_exog_list,
hist_exog_list=hist_exog_list,
stat_exog_list=stat_exog_list,
max_steps=max_steps,
early_stop_patience_steps=early_stop_patience_steps,
**trainer_kwargs,
)
# Padder to complete train windows,
# example y=[1,2,3,4,5] h=3 -> last y_output = [5,0,0]
self.h = h
self.input_size = input_size
self.inference_input_size = inference_input_size
self.padder = nn.ConstantPad1d(padding=(0, self.h), value=0)
unsupported_distributions = ['Bernoulli', 'ISQF']
if isinstance(self.loss, losses.DistributionLoss) and\
self.loss.distribution in unsupported_distributions:
raise Exception(f'Distribution {self.loss.distribution} not available for Recurrent-based models. Please choose another distribution.')
# 有效的批量大小
self.batch_size = batch_size
if valid_batch_size is None:
self.valid_batch_size = batch_size
else:
self.valid_batch_size = valid_batch_size
# 优化
self.learning_rate = learning_rate
self.max_steps = max_steps
self.num_lr_decays = num_lr_decays
self.lr_decay_steps = max(max_steps // self.num_lr_decays, 1) if self.num_lr_decays > 0 else 10e7
self.early_stop_patience_steps = early_stop_patience_steps
self.val_check_steps = val_check_steps
# 标量
self.scaler = TemporalNorm(
scaler_type=scaler_type,
dim=-1, # 时间维度是 -1。
num_features=1+len(self.hist_exog_list)+len(self.futr_exog_list)
)
# 适合的论点
self.val_size = 0
self.test_size = 0
# DataModule参数
self.num_workers_loader = num_workers_loader
self.drop_last_loader = drop_last_loader
# 用于on_validation_epoch_end钩子
self.validation_step_outputs = []
self.alias = alias
def _normalization(self, batch, val_size=0, test_size=0):
temporal = batch['temporal'] # B、C、T
temporal_cols = batch['temporal_cols'].copy()
y_idx = batch['y_idx']
# 分离数据与掩码
temporal_data_cols = self._get_temporal_exogenous_cols(temporal_cols=temporal_cols)
temporal_idxs = get_indexer_raise_missing(temporal_cols, temporal_data_cols)
temporal_idxs = np.append(y_idx, temporal_idxs)
temporal_data = temporal[:, temporal_idxs, :]
temporal_mask = temporal[:, temporal_cols.get_loc('available_mask'), :].clone()
# 移除验证集和测试集以防止数据泄露
if val_size + test_size > 0:
cutoff = val_size + test_size
temporal_mask[:, -cutoff:] = 0
# 标准化。self.scaler 存储了逆变换所需的偏移量和缩放量。
temporal_mask = temporal_mask.unsqueeze(1) # 为scaler.transform添加通道维度。
temporal_data = self.scaler.transform(x=temporal_data, mask=temporal_mask)
# 替换windows字典中的值
temporal[:, temporal_idxs, :] = temporal_data
batch['temporal'] = temporal
return batch
def _inv_normalization(self, y_hat, temporal_cols, y_idx):
# 接收窗口预测 [B, seq_len, H, 输出]
# 广播输出并反转归一化
# Get 'y' scale and shift, and add W dimension
y_loc = self.scaler.x_shift[:, [y_idx], 0].flatten() #[B,C,T] -> [B]
y_scale = self.scaler.x_scale[:, [y_idx], 0].flatten() #[B,C,T] -> [B]
# 扩展规模并转换为y_hat维度
y_loc = y_loc.view(*y_loc.shape, *(1,)*(y_hat.ndim-1))#.expand(y_hat)
y_scale = y_scale.view(*y_scale.shape, *(1,)*(y_hat.ndim-1))#.expand(y_hat)
y_hat = self.scaler.inverse_transform(z=y_hat, x_scale=y_scale, x_shift=y_loc)
return y_hat, y_loc, y_scale
def _create_windows(self, batch, step):
temporal = batch['temporal']
temporal_cols = batch['temporal_cols']
if step == 'train':
if self.val_size + self.test_size > 0:
cutoff = -self.val_size - self.test_size
temporal = temporal[:, :, :cutoff]
temporal = self.padder(temporal)
# 将批次截断为较短的时间序列
av_condition = torch.nonzero(torch.min(temporal[:, temporal_cols.get_loc('available_mask')], axis=0).values)
min_time_stamp = int(av_condition.min())
available_ts = temporal.shape[-1] - min_time_stamp
if available_ts < 1 + self.h:
raise Exception(
'Time series too short for given input and output size. \n'
f'Available timestamps: {available_ts}'
)
temporal = temporal[:, :, min_time_stamp:]
if step == 'val':
if self.test_size > 0:
temporal = temporal[:, :, :-self.test_size]
temporal = self.padder(temporal)
if step == 'predict':
if (self.test_size == 0) and (len(self.futr_exog_list)==0):
temporal = self.padder(temporal)
# 测试集大小涵盖所有数据,左侧用零填充一个时间步长。
if temporal.shape[-1] == self.test_size:
padder_left = nn.ConstantPad1d(padding=(1, 0), value=0)
temporal = padder_left(temporal)
# 解析批次
window_size = 1 + self.h # 1 表示当前的 t,h 表示未来的
windows = temporal.unfold(dimension=-1,
size=window_size,
step=1)
# 截断反向传播/推理(缩短RNN展开的序列)
n_windows = windows.shape[2]
input_size = -1
if (step == 'train') and (self.input_size>0):
input_size = self.input_size
if (input_size > 0) and (n_windows > input_size):
max_sampleable_time = n_windows-self.input_size+1
start = np.random.choice(max_sampleable_time)
windows = windows[:, :, start:(start+input_size), :]
if (step == 'val') and (self.inference_input_size>0):
cutoff = self.inference_input_size + self.val_size
windows = windows[:, :, -cutoff:, :]
if (step == 'predict') and (self.inference_input_size>0):
cutoff = self.inference_input_size + self.test_size
windows = windows[:, :, -cutoff:, :]
# [B, C, 输入尺寸, 1+H]
windows_batch = dict(temporal=windows,
temporal_cols=temporal_cols,
static=batch.get('static', None),
static_cols=batch.get('static_cols', None))
return windows_batch
def _parse_windows(self, batch, windows):
# [B, C, seq_len, 1+H]
# 从样本外预测期中过滤掉样本内滞后期
mask_idx = batch['temporal_cols'].get_loc('available_mask')
y_idx = batch['y_idx']
insample_y = windows['temporal'][:, y_idx, :, :-self.h]
insample_mask = windows['temporal'][:, mask_idx, :, :-self.h]
outsample_y = windows['temporal'][:, y_idx, :, -self.h:].contiguous()
outsample_mask = windows['temporal'][:, mask_idx, :, -self.h:].contiguous()
# 过滤历史外生变量
if len(self.hist_exog_list):
hist_exog_idx = get_indexer_raise_missing(windows['temporal_cols'], self.hist_exog_list)
hist_exog = windows['temporal'][:, hist_exog_idx, :, :-self.h]
else:
hist_exog = None
# 过滤未来的外生变量
if len(self.futr_exog_list):
futr_exog_idx = get_indexer_raise_missing(windows['temporal_cols'], self.futr_exog_list)
futr_exog = windows['temporal'][:, futr_exog_idx, :, :]
else:
futr_exog = None
# 过滤静态变量
if len(self.stat_exog_list):
static_idx = get_indexer_raise_missing(windows['static_cols'], self.stat_exog_list)
stat_exog = windows['static'][:, static_idx]
else:
stat_exog = None
return insample_y, insample_mask, outsample_y, outsample_mask, \
hist_exog, futr_exog, stat_exog
def training_step(self, batch, batch_idx):
# 创建并标准化窗口 [Ws, L+H, C]
batch = self._normalization(batch, val_size=self.val_size, test_size=self.test_size)
windows = self._create_windows(batch, step='train')
# 解析窗口
insample_y, insample_mask, outsample_y, outsample_mask, \
hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)
windows_batch = dict(insample_y=insample_y, # [B, 序列长度, 1]
insample_mask=insample_mask, # [B, 序列长度, 1]
futr_exog=futr_exog, # [B, F, seq_len, 1+H]
hist_exog=hist_exog, # [B, C, seq_len]
stat_exog=stat_exog) # [B, S]
# 模型预测
output = self(windows_batch) # tuple([B, seq_len, H, output])
if self.loss.is_distribution_output:
outsample_y, y_loc, y_scale = self._inv_normalization(y_hat=outsample_y,
temporal_cols=batch['temporal_cols'],
y_idx=batch['y_idx'])
B = output[0].size()[0]
T = output[0].size()[1]
H = output[0].size()[2]
output = [arg.view(-1, *(arg.size()[2:])) for arg in output]
outsample_y = outsample_y.view(B*T,H)
outsample_mask = outsample_mask.view(B*T,H)
y_loc = y_loc.repeat_interleave(repeats=T, dim=0).squeeze(-1)
y_scale = y_scale.repeat_interleave(repeats=T, dim=0).squeeze(-1)
distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)
loss = self.loss(y=outsample_y, distr_args=distr_args, mask=outsample_mask)
else:
loss = self.loss(y=outsample_y, y_hat=output, mask=outsample_mask)
if torch.isnan(loss):
print('Model Parameters', self.hparams)
print('insample_y', torch.isnan(insample_y).sum())
print('outsample_y', torch.isnan(outsample_y).sum())
print('output', torch.isnan(output).sum())
raise Exception('Loss is NaN, training stopped.')
self.log(
'train_loss',
loss.item(),
batch_size=outsample_y.size(0),
prog_bar=True,
on_epoch=True,
)
self.train_trajectories.append((self.global_step, loss.item()))
return loss
def validation_step(self, batch, batch_idx):
if self.val_size == 0:
return np.nan
# 创建并标准化窗口 [Ws, L+H, C]
batch = self._normalization(batch, val_size=self.val_size, test_size=self.test_size)
windows = self._create_windows(batch, step='val')
y_idx = batch['y_idx']
# 解析窗口
insample_y, insample_mask, outsample_y, outsample_mask, \
hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)
windows_batch = dict(insample_y=insample_y, # [B, 序列长度, 1]
insample_mask=insample_mask, # [B, 序列长度, 1]
futr_exog=futr_exog, # [B, F, seq_len, 1+H]
hist_exog=hist_exog, # [B, C, seq_len]
stat_exog=stat_exog) # [B, S]
# 移除训练集中的y_hat(对于用零填充的最后一个窗口,分别标记为+1和-1)
# tuple([B, seq_len, H, output]) -> tuple([B, validation_size, H, output])
val_windows = (self.val_size) + 1
outsample_y = outsample_y[:, -val_windows:-1, :]
outsample_mask = outsample_mask[:, -val_windows:-1, :]
# 模型预测
output = self(windows_batch) # tuple([B, seq_len, H, output])
if self.loss.is_distribution_output:
output = [arg[:, -val_windows:-1] for arg in output]
outsample_y, y_loc, y_scale = self._inv_normalization(y_hat=outsample_y,
temporal_cols=batch['temporal_cols'],
y_idx=y_idx)
B = output[0].size()[0]
T = output[0].size()[1]
H = output[0].size()[2]
output = [arg.reshape(-1, *(arg.size()[2:])) for arg in output]
outsample_y = outsample_y.reshape(B*T,H)
outsample_mask = outsample_mask.reshape(B*T,H)
y_loc = y_loc.repeat_interleave(repeats=T, dim=0).squeeze(-1)
y_scale = y_scale.repeat_interleave(repeats=T, dim=0).squeeze(-1)
distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)
_, sample_mean, quants = self.loss.sample(distr_args=distr_args)
if str(type(self.valid_loss)) in\
["<class 'neuralforecast.losses.pytorch.sCRPS'>", "<class 'neuralforecast.losses.pytorch.MQLoss'>"]:
output = quants
elif str(type(self.valid_loss)) in ["<class 'neuralforecast.losses.pytorch.relMSE'>"]:
output = torch.unsqueeze(sample_mean, dim=-1) # [N,H,1] -> [N,H]
else:
output = output[:, -val_windows:-1, :]
# 验证损失评估
if self.valid_loss.is_distribution_output:
valid_loss = self.valid_loss(y=outsample_y, distr_args=distr_args, mask=outsample_mask)
else:
outsample_y, _, _ = self._inv_normalization(y_hat=outsample_y, temporal_cols=batch['temporal_cols'], y_idx=y_idx)
output, _, _ = self._inv_normalization(y_hat=output, temporal_cols=batch['temporal_cols'], y_idx=y_idx)
valid_loss = self.valid_loss(y=outsample_y, y_hat=output, mask=outsample_mask)
if torch.isnan(valid_loss):
raise Exception('Loss is NaN, training stopped.')
self.log(
'valid_loss',
valid_loss.item(),
batch_size=outsample_y.size(0),
prog_bar=True,
on_epoch=True,
)
self.validation_step_outputs.append(valid_loss)
return valid_loss
def predict_step(self, batch, batch_idx):
# 创建并标准化窗口 [Ws, L+H, C]
batch = self._normalization(batch, val_size=0, test_size=self.test_size)
windows = self._create_windows(batch, step='predict')
y_idx = batch['y_idx']
# 解析窗口
insample_y, insample_mask, _, _, \
hist_exog, futr_exog, stat_exog = self._parse_windows(batch, windows)
windows_batch = dict(insample_y=insample_y, # [B, 序列长度, 1]
insample_mask=insample_mask, # [B, 序列长度, 1]
futr_exog=futr_exog, # [B, F, seq_len, 1+H]
hist_exog=hist_exog, # [B, C, seq_len]
stat_exog=stat_exog) # [B, S]
# 模型预测
output = self(windows_batch) # 元组([B, seq_len, H], ...)
if self.loss.is_distribution_output:
_, y_loc, y_scale = self._inv_normalization(y_hat=output[0],
temporal_cols=batch['temporal_cols'],
y_idx=y_idx)
B = output[0].size()[0]
T = output[0].size()[1]
H = output[0].size()[2]
output = [arg.reshape(-1, *(arg.size()[2:])) for arg in output]
y_loc = y_loc.repeat_interleave(repeats=T, dim=0).squeeze(-1)
y_scale = y_scale.repeat_interleave(repeats=T, dim=0).squeeze(-1)
distr_args = self.loss.scale_decouple(output=output, loc=y_loc, scale=y_scale)
_, sample_mean, quants = self.loss.sample(distr_args=distr_args)
y_hat = torch.concat((sample_mean, quants), axis=2)
y_hat = y_hat.view(B, T, H, -1)
if self.loss.return_params:
distr_args = torch.stack(distr_args, dim=-1)
distr_args = torch.reshape(distr_args, (B, T, H, -1))
y_hat = torch.concat((y_hat, distr_args), axis=3)
else:
y_hat, _, _ = self._inv_normalization(y_hat=output,
temporal_cols=batch['temporal_cols'],
y_idx=y_idx)
return y_hat
def fit(self, dataset, val_size=0, test_size=0, random_seed=None, distributed_config=None):
""" Fit.
The `fit` method, optimizes the neural network's weights using the
initialization parameters (`learning_rate`, `batch_size`, ...)
and the `loss` function as defined during the initialization.
Within `fit` we use a PyTorch Lightning `Trainer` that
inherits the initialization's `self.trainer_kwargs`, to customize
its inputs, see [PL's trainer arguments](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.trainer.trainer.Trainer.html?highlight=trainer).
The method is designed to be compatible with SKLearn-like classes
and in particular to be compatible with the StatsForecast library.
By default the `model` is not saving training checkpoints to protect
disk memory, to get them change `enable_checkpointing=True` in `__init__`.
**Parameters:**<br>
`dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation](https://nixtla.github.io/neuralforecast/tsdataset.html).<br>
`val_size`: int, validation size for temporal cross-validation.<br>
`test_size`: int, test size for temporal cross-validation.<br>
`random_seed`: int=None, random_seed for pytorch initializer and numpy generators, overwrites model.__init__'s.<br>
"""
return self._fit(
dataset=dataset,
batch_size=self.batch_size,
valid_batch_size=self.valid_batch_size,
val_size=val_size,
test_size=test_size,
random_seed=random_seed,
distributed_config=distributed_config,
)
def predict(self, dataset, step_size=1,
random_seed=None, **data_module_kwargs):
""" Predict.
Neural network prediction with PL's `Trainer` execution of `predict_step`.
**Parameters:**<br>
`dataset`: NeuralForecast's `TimeSeriesDataset`, see [documentation](https://nixtla.github.io/neuralforecast/tsdataset.html).<br>
`step_size`: int=1, Step size between each window.<br>
`random_seed`: int=None, random_seed for pytorch initializer and numpy generators, overwrites model.__init__'s.<br>
`**data_module_kwargs`: PL's TimeSeriesDataModule args, see [documentation](https://pytorch-lightning.readthedocs.io/en/1.6.1/extensions/datamodules.html#using-a-datamodule).
"""
self._check_exog(dataset)
self._restart_seed(random_seed)
data_module_kwargs = self._set_quantile_for_iqloss(**data_module_kwargs)
if step_size > 1:
raise Exception('Recurrent models do not support step_size > 1')
# fcsts (window, batch, h)
# Protect when case of multiple gpu. PL does not support return preds with multiple gpu.
pred_trainer_kwargs = self.trainer_kwargs.copy()
if (pred_trainer_kwargs.get('accelerator', None) == "gpu") and (torch.cuda.device_count() > 1):
pred_trainer_kwargs['devices'] = [0]
trainer = pl.Trainer(**pred_trainer_kwargs)
datamodule = TimeSeriesDataModule(
dataset=dataset,
valid_batch_size=self.valid_batch_size,
num_workers=self.num_workers_loader,
**data_module_kwargs
)
fcsts = trainer.predict(self, datamodule=datamodule)
if self.test_size > 0:
# 移除预热窗口(从训练和验证中)
# [N, T, H, 输出],避免对单变量输出兼容性进行最后一维索引
fcsts = torch.vstack([fcst[:, -(1+self.test_size-self.h):,:] for fcst in fcsts])
fcsts = fcsts.numpy().flatten()
fcsts = fcsts.reshape(-1, len(self.loss.output_names))
else:
fcsts = torch.vstack([fcst[:,-1:,:] for fcst in fcsts]).numpy().flatten()
fcsts = fcsts.reshape(-1, len(self.loss.output_names))
return fcstsshow_doc(BaseRecurrent, title_level=3)show_doc(BaseRecurrent.fit, title_level=3)show_doc(BaseRecurrent.predict, title_level=3)from neuralforecast.losses.pytorch import MAE
from neuralforecast.utils import AirPassengersDF
from neuralforecast.tsdataset import TimeSeriesDataset, TimeSeriesDataModule# 为 _parse_windows 添加 h=0,1 的单元测试
# 声明批次
AirPassengersDF['x'] = np.array(len(AirPassengersDF))
AirPassengersDF['x2'] = np.array(len(AirPassengersDF)) * 2
dataset, indices, dates, ds = TimeSeriesDataset.from_df(df=AirPassengersDF)
data = TimeSeriesDataModule(dataset=dataset, batch_size=1, drop_last=True)
train_loader = data.train_dataloader()
batch = next(iter(train_loader))
# 测试 hist_exog_list 和 futr_exog_list 是否正确过滤了发送给缩放器的数据。
baserecurrent = BaseRecurrent(h=12,
input_size=117,
hist_exog_list=['x', 'x2'],
futr_exog_list=['x'],
loss=MAE(),
valid_loss=MAE(),
learning_rate=0.001,
max_steps=1,
val_check_steps=0,
batch_size=1,
valid_batch_size=1,
windows_batch_size=10,
inference_input_size=2,
start_padding_enabled=True)
windows = baserecurrent._create_windows(batch, step='train')
temporal_cols = windows['temporal_cols'].copy() # B,L+H,C
temporal_data_cols = baserecurrent._get_temporal_exogenous_cols(temporal_cols=temporal_cols)
test_eq(set(temporal_data_cols), set(['x', 'x2']))
test_eq(windows['temporal'].shape, torch.Size([1,len(['y', 'x', 'x2', 'available_mask']),117,12+1]))Give us a ⭐ on Github