%load_ext autoreload
%autoreload 2TiDE
时间序列密集编码器(
TiDE)是一个基于多层感知器的单变量时间序列预测模型。TiDE在一个编码器-解码器模型中使用多层感知器(MLPs)进行长期时间序列预测。此外,该模型可以处理外生输入。

from fastcore.test import test_eq
from nbdev.showdoc import show_docfrom typing import Optional
import torch
import torch.nn as nn
import torch.nn.functional as F
from neuralforecast.losses.pytorch import MAE
from neuralforecast.common._base_windows import BaseWindows1. 辅助函数
1.1 MLP 残差
带有残差连接的 MLP 块。
class MLP残差(nn.Module):
"""
MLP残差
"""
def __init__(self, input_dim, hidden_size, output_dim, dropout, layernorm):
super().__init__()
self.layernorm = layernorm
if layernorm:
self.norm = nn.LayerNorm(output_dim)
self.drop = nn.Dropout(dropout)
self.lin1 = nn.Linear(input_dim, hidden_size)
self.lin2 = nn.Linear(hidden_size, output_dim)
self.skip = nn.Linear(input_dim, output_dim)
def forward(self, input):
# MLP密集层
x = F.relu(self.lin1(input))
x = self.lin2(x)
x = self.drop(x)
# 跳跃连接
x_skip = self.skip(input)
# 结合
x = x + x_skip
if self.layernorm:
return self.norm(x)
return x2. 模型
class TiDE(BaseWindows):
""" TiDE
Time-series Dense Encoder (`TiDE`) is a MLP-based univariate time-series forecasting model. `TiDE` uses Multi-layer Perceptrons (MLPs) in an encoder-decoder model for long-term time-series forecasting.
**Parameters:**<br>
`h`: int, forecast horizon.<br>
`input_size`: int, considered autorregresive inputs (lags), y=[1,2,3,4] input_size=2 -> lags=[1,2].<br>
`hidden_size`: int=1024, number of units for the dense MLPs.<br>
`decoder_output_dim`: int=32, number of units for the output of the decoder.<br>
`temporal_decoder_dim`: int=128, number of units for the hidden sizeof the temporal decoder.<br>
`dropout`: float=0.0, dropout rate between (0, 1) .<br>
`layernorm`: bool=True, if True uses Layer Normalization on the MLP residual block outputs.<br>
`num_encoder_layers`: int=1, number of encoder layers.<br>
`num_decoder_layers`: int=1, number of decoder layers.<br>
`temporal_width`: int=4, lower temporal projected dimension.<br>
`futr_exog_list`: str list, future exogenous columns.<br>
`hist_exog_list`: str list, historic exogenous columns.<br>
`stat_exog_list`: str list, static exogenous columns.<br>
`loss`: PyTorch module, instantiated train loss class from [losses collection](https://nixtla.github.io/neuralforecast/losses.pytorch.html).<br>
`valid_loss`: PyTorch module=`loss`, instantiated valid loss class from [losses collection](https://nixtla.github.io/neuralforecast/losses.pytorch.html).<br>
`max_steps`: int=1000, maximum number of training steps.<br>
`learning_rate`: float=1e-3, Learning rate between (0, 1).<br>
`num_lr_decays`: int=-1, Number of learning rate decays, evenly distributed across max_steps.<br>
`early_stop_patience_steps`: int=-1, Number of validation iterations before early stopping.<br>
`val_check_steps`: int=100, Number of training steps between every validation loss check.<br>
`batch_size`: int=32, number of different series in each batch.<br>
`step_size`: int=1, step size between each window of temporal data.<br>
`scaler_type`: str='identity', type of scaler for temporal inputs normalization see [temporal scalers](https://nixtla.github.io/neuralforecast/common.scalers.html).<br>
`random_seed`: int=1, random_seed for pytorch initializer and numpy generators.<br>
`num_workers_loader`: int=os.cpu_count(), workers to be used by `TimeSeriesDataLoader`.<br>
`drop_last_loader`: bool=False, if True `TimeSeriesDataLoader` drops last non-full batch.<br>
`alias`: str, optional, Custom name of the model.<br>
`optimizer`: Subclass of 'torch.optim.Optimizer', optional, user specified optimizer instead of the default choice (Adam).<br>
`optimizer_kwargs`: dict, optional, list of parameters used by the user specified `optimizer`.<br>
`lr_scheduler`: Subclass of 'torch.optim.lr_scheduler.LRScheduler', optional, user specified lr_scheduler instead of the default choice (StepLR).<br>
`lr_scheduler_kwargs`: dict, optional, list of parameters used by the user specified `lr_scheduler`.<br>
`**trainer_kwargs`: int, keyword trainer arguments inherited from [PyTorch Lighning's trainer](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.trainer.trainer.Trainer.html?highlight=trainer).<br>
**References:**<br>
- [Das, Abhimanyu, Weihao Kong, Andrew Leach, Shaan Mathur, Rajat Sen, and Rose Yu (2024). "Long-term Forecasting with TiDE: Time-series Dense Encoder."](http://arxiv.org/abs/2304.08424)
"""
# Class attributes
SAMPLING_TYPE = 'windows'
EXOGENOUS_FUTR = True
EXOGENOUS_HIST = True
EXOGENOUS_STAT = True
def __init__(self,
h,
input_size,
hidden_size = 512,
decoder_output_dim = 32,
temporal_decoder_dim = 128,
dropout = 0.3,
layernorm=True,
num_encoder_layers = 1,
num_decoder_layers = 1,
temporal_width = 4,
futr_exog_list = None,
hist_exog_list = None,
stat_exog_list = None,
exclude_insample_y = False,
loss = MAE(),
valid_loss = None,
max_steps: int = 1000,
learning_rate: float = 1e-3,
num_lr_decays: int = -1,
early_stop_patience_steps: int =-1,
val_check_steps: int = 100,
batch_size: int = 32,
valid_batch_size: Optional[int] = None,
windows_batch_size = 1024,
inference_windows_batch_size = 1024,
start_padding_enabled = False,
step_size: int = 1,
scaler_type: str = 'identity',
random_seed: int = 1,
num_workers_loader: int = 0,
drop_last_loader: bool = False,
optimizer = None,
optimizer_kwargs = None,
lr_scheduler = None,
lr_scheduler_kwargs = None,
**trainer_kwargs):
# 继承 BaseWindows 类
super(TiDE, self).__init__(
h=h,
input_size=input_size,
futr_exog_list=futr_exog_list,
hist_exog_list=hist_exog_list,
stat_exog_list=stat_exog_list,
exclude_insample_y = exclude_insample_y,
loss=loss,
valid_loss=valid_loss,
max_steps=max_steps,
learning_rate=learning_rate,
num_lr_decays=num_lr_decays,
early_stop_patience_steps=early_stop_patience_steps,
val_check_steps=val_check_steps,
batch_size=batch_size,
valid_batch_size=valid_batch_size,
windows_batch_size=windows_batch_size,
inference_windows_batch_size=inference_windows_batch_size,
start_padding_enabled=start_padding_enabled,
step_size=step_size,
scaler_type=scaler_type,
random_seed=random_seed,
num_workers_loader=num_workers_loader,
drop_last_loader=drop_last_loader,
optimizer=optimizer,
optimizer_kwargs=optimizer_kwargs,
lr_scheduler=lr_scheduler,
lr_scheduler_kwargs=lr_scheduler_kwargs,
**trainer_kwargs
)
self.h = h
if self.hist_exog_size > 0 or self.futr_exog_size > 0:
self.hist_exog_projection = MLPResidual(input_dim = self.hist_exog_size,
hidden_size=hidden_size,
output_dim=temporal_width,
dropout=dropout,
layernorm=layernorm)
if self.futr_exog_size > 0:
self.futr_exog_projection = MLPResidual(input_dim = self.futr_exog_size,
hidden_size = hidden_size,
output_dim=temporal_width,
dropout=dropout,
layernorm=layernorm)
# 编码器
dense_encoder_input_size = input_size + \
input_size * (self.hist_exog_size > 0) * temporal_width + \
(input_size + h) * (self.futr_exog_size > 0) * temporal_width + \
(self.stat_exog_size > 0) * self.stat_exog_size
dense_encoder_layers = [MLPResidual(input_dim=dense_encoder_input_size if i == 0 else hidden_size,
hidden_size=hidden_size,
output_dim=hidden_size,
dropout=dropout,
layernorm=layernorm) for i in range(num_encoder_layers)]
self.dense_encoder = nn.Sequential(*dense_encoder_layers)
# 解码器
decoder_output_size = decoder_output_dim * h
dense_decoder_layers = [MLPResidual(input_dim=hidden_size,
hidden_size=hidden_size,
output_dim=decoder_output_size if i == num_decoder_layers - 1 else hidden_size,
dropout=dropout,
layernorm=layernorm) for i in range(num_decoder_layers)]
self.dense_decoder = nn.Sequential(*dense_decoder_layers)
# 时间解码器 with loss dependent dimensions
self.temporal_decoder = MLPResidual(input_dim = decoder_output_dim + (self.futr_exog_size > 0) * temporal_width,
hidden_size = temporal_decoder_dim,
output_dim=self.loss.outputsize_multiplier,
dropout=dropout,
layernorm=layernorm)
# 全局跳跃连接
self.global_skip = nn.Linear(in_features = input_size,
out_features = h * self.loss.outputsize_multiplier)
def forward(self, windows_batch):
# 解析Windows批处理文件
x = windows_batch['insample_y'].unsqueeze(-1) # [B, L, 1]
hist_exog = windows_batch['hist_exog'] # [B, L, X]
futr_exog = windows_batch['futr_exog'] # [B, L + h, F]
stat_exog = windows_batch['stat_exog'] # [B, S]
batch_size, seq_len = x.shape[:2] # B = 批量大小,L = 序列长度
# 展平 insample_y
x = x.reshape(batch_size, -1) # [B, L, 1] -> [B, L]
# 全局跳跃连接
x_skip = self.global_skip(x) # [B, L] -> [B, h * n_outputs]
x_skip = x_skip.reshape(batch_size, self.h, -1) # [B, h * n_outputs] -> [B, h, n_outputs]
# 将 x 与扁平化的历史外生变量连接起来
if self.hist_exog_size > 0:
x_hist_exog = self.hist_exog_projection(hist_exog) # [B, L, X] -> [B, L, 时间宽度]
x_hist_exog = x_hist_exog.reshape(batch_size, -1) # [B, L, temporal_width] -> [B, L * temporal_width]
x = torch.cat((x, x_hist_exog), dim=1) # [B, L] + [B, L * 时间宽度] -> [B, L * (1 + 时间宽度)]
# 将 x 与扁平化的未来外生变量连接起来
if self.futr_exog_size > 0:
x_futr_exog = self.futr_exog_projection(futr_exog) # [B, L + h, F] -> [B, L + h, 时间宽度]
x_futr_exog_flat = x_futr_exog.reshape(batch_size, -1) # [B, L + h, temporal_width] -> [B, (L + h) * temporal_width]
x = torch.cat((x, x_futr_exog_flat), dim=1) # [B, L * (1 + temporal_width)] + [B, (L + h) * temporal_width] -> [B, L * (1 + 2 * temporal_width) + h * temporal_width]
# 将 x 与静态外生变量连接起来
if self.stat_exog_size > 0:
x = torch.cat((x, stat_exog), dim=1) # [B, L * (1 + 2 * 时间宽度) + h * 时间宽度] + [B, S] -> [B, L * (1 + 2 * 时间宽度) + h * 时间宽度 + S]
# 密集编码器
x = self.dense_encoder(x) # [B, L * (1 + 2 * 时间宽度) + h * 时间宽度 + S] -> [B, 隐藏层大小]
# 密集解码器
x = self.dense_decoder(x) # [B, hidden_size] -> [B, decoder_output_dim * h]
x = x.reshape(batch_size, self.h, -1) # [B, decoder_output_dim * h] -> [B, h, decoder_output_dim]
# 将futr_exog堆叠起来,用于futr_exog中的horizon部分
if self.futr_exog_size > 0:
x_futr_exog_h = x_futr_exog[:, seq_len:] # [B, L + h, 时间宽度] -> [B, h, 时间宽度]
x = torch.cat((x, x_futr_exog_h), dim=2) # [B, h, decoder_output_dim] + [B, h, temporal_width] -> [B, h, temporal_width + decoder_output_dim]
# 时间解码器
x = self.temporal_decoder(x) # [B, h, temporal_width + decoder_output_dim] -> [B, h, n_outputs]
# 映射到输出域
forecast = self.loss.domain_map(x + x_skip)
return forecast
show_doc(TiDE)show_doc(TiDE.fit, name='TiDE.fit')show_doc(TiDE.predict, name='TiDE.predict')3. 使用示例
import pandas as pd
import matplotlib.pyplot as plt
from neuralforecast import NeuralForecast
from neuralforecast.models import TiDE
from neuralforecast.losses.pytorch import GMM, DistributionLoss
from neuralforecast.utils import AirPassengersPanel, AirPassengersStatic
Y_train_df = AirPassengersPanel[AirPassengersPanel.ds<AirPassengersPanel['ds'].values[-12]] # 132次列车
Y_test_df = AirPassengersPanel[AirPassengersPanel.ds>=AirPassengersPanel['ds'].values[-12]].reset_index(drop=True) # 12项测试
fcst = NeuralForecast(
models=[
TiDE(h=12,
input_size=24,
loss=GMM(n_components=7, return_params=True, level=[80,90]),
max_steps=500,
scaler_type='standard',
futr_exog_list=['y_[lag12]'],
hist_exog_list=None,
stat_exog_list=['airline1'],
),
],
freq='M'
)
fcst.fit(df=Y_train_df, static_df=AirPassengersStatic)
forecasts = fcst.predict(futr_df=Y_test_df)
# 绘制分位数预测图
Y_hat_df = forecasts.reset_index(drop=False).drop(columns=['unique_id','ds'])
plot_df = pd.concat([Y_test_df, Y_hat_df], axis=1)
plot_df = pd.concat([Y_train_df, plot_df])
plot_df = plot_df[plot_df.unique_id=='Airline1'].drop('unique_id', axis=1)
plt.plot(plot_df['ds'], plot_df['y'], c='black', label='True')
plt.plot(plot_df['ds'], plot_df['TiDE-median'], c='blue', label='median')
plt.fill_between(x=plot_df['ds'][-12:],
y1=plot_df['TiDE-lo-90'][-12:].values,
y2=plot_df['TiDE-hi-90'][-12:].values,
alpha=0.4, label='level 90')
plt.legend()
plt.grid()Give us a ⭐ on Github