StemGNN

%load_ext autoreload
%autoreload 2

谱时序图神经网络 (StemGNN) 是一种基于图的多变量时间序列预测模型。StemGNN 通过结合图傅里叶变换 (GFT) 和离散傅里叶变换 (DFT),在谱域中共同学习时间依赖性和系列间相关性。

该方法在 SolarMETR-LAPEMS-BAY 等地理时间数据集上表现出了最先进的性能。

参考文献
-Defu Cao, Yujing Wang, Juanyong Duan, Ce Zhang, Xia Zhu, Congrui Huang, Yunhai Tong, Bixiong Xu, Jing Bai, Jie Tong, Qi Zhang (2020). “谱时序图神经网络用于多变量时间序列预测”.

图1. StemGNN.
from fastcore.test import test_eq
from nbdev.showdoc import show_doc
import torch
import torch.nn as nn
import torch.nn.functional as F

from neuralforecast.losses.pytorch import MAE
from neuralforecast.common._base_multivariate import BaseMultivariate
class GLU(nn.Module):
    """
    GLU
    """    
    def __init__(self, input_channel, output_channel):
        super(GLU, self).__init__()
        self.linear_left = nn.Linear(input_channel, output_channel)
        self.linear_right = nn.Linear(input_channel, output_channel)

    def forward(self, x):
        return torch.mul(self.linear_left(x), torch.sigmoid(self.linear_right(x)))
class 股票区块层(nn.Module):
    """
    股票区块层
    """       
    def __init__(self, time_step, unit, multi_layer, stack_cnt=0):
        super(股票区块层, self).__init__()
        self.time_step = time_step
        self.unit = unit
        self.stack_cnt = stack_cnt
        self.multi = multi_layer
        self.weight = nn.Parameter(
            torch.Tensor(1, 3 + 1, 1, self.time_step * self.multi,
                         self.multi * self.time_step))  # [K+1, 1, 输入通道数, 输出通道数]
        nn.init.xavier_normal_(self.weight)
        self.forecast = nn.Linear(self.time_step * self.multi, self.time_step * self.multi)
        self.forecast_result = nn.Linear(self.time_step * self.multi, self.time_step)
        if self.stack_cnt == 0:
            self.backcast = nn.Linear(self.time_step * self.multi, self.time_step)
        self.backcast_short_cut = nn.Linear(self.time_step, self.time_step)
        self.relu = nn.ReLU()
        self.GLUs = nn.ModuleList()
        self.output_channel = 4 * self.multi
        for i in range(3):
            if i == 0:
                self.GLUs.append(GLU(self.time_step * 4, self.time_step * self.output_channel))
                self.GLUs.append(GLU(self.time_step * 4, self.time_step * self.output_channel))
            elif i == 1:
                self.GLUs.append(GLU(self.time_step * self.output_channel, self.time_step * self.output_channel))
                self.GLUs.append(GLU(self.time_step * self.output_channel, self.time_step * self.output_channel))
            else:
                self.GLUs.append(GLU(self.time_step * self.output_channel, self.time_step * self.output_channel))
                self.GLUs.append(GLU(self.time_step * self.output_channel, self.time_step * self.output_channel))

    def spe_seq_cell(self, input):
        batch_size, k, input_channel, node_cnt, time_step = input.size()
        input = input.view(batch_size, -1, node_cnt, time_step)
        ffted = torch.view_as_real(torch.fft.fft(input, dim=1))
        real = ffted[..., 0].permute(0, 2, 1, 3).contiguous().reshape(batch_size, node_cnt, -1)
        img = ffted[..., 1].permute(0, 2, 1, 3).contiguous().reshape(batch_size, node_cnt, -1)
        for i in range(3):
            real = self.GLUs[i * 2](real)
            img = self.GLUs[2 * i + 1](img)
        real = real.reshape(batch_size, node_cnt, 4, -1).permute(0, 2, 1, 3).contiguous()
        img = img.reshape(batch_size, node_cnt, 4, -1).permute(0, 2, 1, 3).contiguous()
        time_step_as_inner = torch.cat([real.unsqueeze(-1), img.unsqueeze(-1)], dim=-1)
        iffted = torch.fft.irfft(torch.view_as_complex(time_step_as_inner), n=time_step_as_inner.shape[1], dim=1)
        return iffted

    def forward(self, x, mul_L):
        mul_L = mul_L.unsqueeze(1)
        x = x.unsqueeze(1)
        gfted = torch.matmul(mul_L, x)
        gconv_input = self.spe_seq_cell(gfted).unsqueeze(2)
        igfted = torch.matmul(gconv_input, self.weight)
        igfted = torch.sum(igfted, dim=1)
        forecast_source = torch.sigmoid(self.forecast(igfted).squeeze(1))
        forecast = self.forecast_result(forecast_source)
        if self.stack_cnt == 0:
            backcast_short = self.backcast_short_cut(x).squeeze(1)
            backcast_source = torch.sigmoid(self.backcast(igfted) - backcast_short)
        else:
            backcast_source = None
        return forecast, backcast_source
class StemGNN(BaseMultivariate):
    """ StemGNN

    The Spectral Temporal Graph Neural Network (`StemGNN`) is a Graph-based multivariate
    time-series forecasting model. `StemGNN` jointly learns temporal dependencies and
    inter-series correlations in the spectral domain, by combining Graph Fourier Transform (GFT)
    and Discrete Fourier Transform (DFT). 

    **Parameters:**<br>
    `h`: int, Forecast horizon. <br>
    `input_size`: int, autorregresive inputs size, y=[1,2,3,4] input_size=2 -> y_[t-2:t]=[1,2].<br>
    `n_series`: int, number of time-series.<br>
    `stat_exog_list`: str list, static exogenous columns.<br>
    `hist_exog_list`: str list, historic exogenous columns.<br>
    `futr_exog_list`: str list, future exogenous columns.<br>
    `n_stacks`: int=2, number of stacks in the model.<br>
    `multi_layer`: int=5, multiplier for FC hidden size on StemGNN blocks.<br>
    `dropout_rate`: float=0.5, dropout rate.<br>
    `leaky_rate`: float=0.2, alpha for LeakyReLU layer on Latent Correlation layer.<br>
    `loss`: PyTorch module, instantiated train loss class from [losses collection](https://nixtla.github.io/neuralforecast/losses.pytorch.html).<br>
    `valid_loss`: PyTorch module=`loss`, instantiated valid loss class from [losses collection](https://nixtla.github.io/neuralforecast/losses.pytorch.html).<br>
    `max_steps`: int=1000, maximum number of training steps.<br>
    `learning_rate`: float=1e-3, Learning rate between (0, 1).<br>
    `num_lr_decays`: int=-1, Number of learning rate decays, evenly distributed across max_steps.<br>
    `early_stop_patience_steps`: int=-1, Number of validation iterations before early stopping.<br>
    `val_check_steps`: int=100, Number of training steps between every validation loss check.<br>
    `batch_size`: int, number of windows in each batch.<br>
    `step_size`: int=1, step size between each window of temporal data.<br>
    `scaler_type`: str='robust', type of scaler for temporal inputs normalization see [temporal scalers](https://nixtla.github.io/neuralforecast/common.scalers.html).<br>
    `random_seed`: int, random_seed for pytorch initializer and numpy generators.<br>
    `num_workers_loader`: int=os.cpu_count(), workers to be used by `TimeSeriesDataLoader`.<br>
    `drop_last_loader`: bool=False, if True `TimeSeriesDataLoader` drops last non-full batch.<br>
    `alias`: str, optional,  Custom name of the model.<br>
    `optimizer`: Subclass of 'torch.optim.Optimizer', optional, user specified optimizer instead of the default choice (Adam).<br>
    `optimizer_kwargs`: dict, optional, list of parameters used by the user specified `optimizer`.<br>
    `lr_scheduler`: Subclass of 'torch.optim.lr_scheduler.LRScheduler', optional, user specified lr_scheduler instead of the default choice (StepLR).<br>
    `lr_scheduler_kwargs`: dict, optional, list of parameters used by the user specified `lr_scheduler`.<br>
    `**trainer_kwargs`: int,  keyword trainer arguments inherited from [PyTorch Lighning's trainer](https://pytorch-lightning.readthedocs.io/en/stable/api/pytorch_lightning.trainer.trainer.Trainer.html?highlight=trainer).<br>    
    """
    # Class attributes
    SAMPLING_TYPE = 'multivariate'
    EXOGENOUS_FUTR = False
    EXOGENOUS_HIST = False
    EXOGENOUS_STAT = False    
    
    def __init__(self,
                 h,
                 input_size,
                 n_series,
                 futr_exog_list = None,
                 hist_exog_list = None,
                 stat_exog_list = None,
                 n_stacks = 2,
                 multi_layer: int = 5,
                 dropout_rate: float = 0.5,
                 leaky_rate: float = 0.2,
                 loss = MAE(),
                 valid_loss = None,
                 max_steps: int = 1000,
                 learning_rate: float = 1e-3,
                 num_lr_decays: int = 3,
                 early_stop_patience_steps: int =-1,
                 val_check_steps: int = 100,
                 batch_size: int = 32,
                 step_size: int = 1,
                 scaler_type: str = 'robust',
                 random_seed: int = 1,
                 num_workers_loader = 0,
                 drop_last_loader = False,
                 optimizer = None,
                 optimizer_kwargs = None,
                 lr_scheduler = None,
                 lr_scheduler_kwargs = None,
                 **trainer_kwargs):

        # Inherit BaseMultivariate class
        super(StemGNN, self).__init__(h=h,
                                      input_size=input_size,
                                      n_series=n_series,
                                      futr_exog_list=futr_exog_list,
                                      hist_exog_list=hist_exog_list,
                                      stat_exog_list=stat_exog_list,                                    
                                      loss=loss,
                                      valid_loss=valid_loss,
                                      max_steps=max_steps,
                                      learning_rate=learning_rate,
                                      num_lr_decays=num_lr_decays,
                                      early_stop_patience_steps=early_stop_patience_steps,
                                      val_check_steps=val_check_steps,
                                      batch_size=batch_size,
                                      step_size=step_size,
                                      scaler_type=scaler_type,
                                      num_workers_loader=num_workers_loader,
                                      drop_last_loader=drop_last_loader,
                                      random_seed=random_seed,
                                      optimizer=optimizer,
                                      optimizer_kwargs=optimizer_kwargs,
                                      lr_scheduler=lr_scheduler,
                                      lr_scheduler_kwargs=lr_scheduler_kwargs,
                                      **trainer_kwargs)
        # Quick fix for now, fix the model later.
        if n_stacks != 2:
            raise Exception("StemGNN currently only supports n_stacks=2.")

        self.unit = n_series
        self.stack_cnt = n_stacks
        self.alpha = leaky_rate
        self.time_step = input_size
        self.horizon = h
        self.h = h

        self.weight_key = nn.Parameter(torch.zeros(size=(self.unit, 1)))
        nn.init.xavier_uniform_(self.weight_key.data, gain=1.414)
        self.weight_query = nn.Parameter(torch.zeros(size=(self.unit, 1)))
        nn.init.xavier_uniform_(self.weight_query.data, gain=1.414)
        self.GRU = nn.GRU(self.time_step, self.unit)
        self.multi_layer = multi_layer
        self.stock_block = nn.ModuleList()
        self.stock_block.extend(
            [StockBlockLayer(self.time_step, self.unit, self.multi_layer, stack_cnt=i) for i in range(self.stack_cnt)])
        self.fc = nn.Sequential(
            nn.Linear(int(self.time_step), int(self.time_step)),
            nn.LeakyReLU(),
            nn.Linear(int(self.time_step), self.horizon * self.loss.outputsize_multiplier),
        )
        self.leakyrelu = nn.LeakyReLU(self.alpha)
        self.dropout = nn.Dropout(p=dropout_rate)

    def get_laplacian(self, graph, normalize):
            """
            return the laplacian of the graph.
            :param graph: the graph structure without self loop, [N, N].
            :param normalize: whether to used the normalized laplacian.
            :return: graph laplacian.
            """
            if normalize:
                D = torch.diag(torch.sum(graph, dim=-1) ** (-1 / 2))
                L = torch.eye(graph.size(0), device=graph.device, dtype=graph.dtype) - torch.mm(torch.mm(D, graph), D)
            else:
                D = torch.diag(torch.sum(graph, dim=-1))
                L = D - graph
            return L

    def cheb_polynomial(self, laplacian):
        """
        根据图拉普拉斯矩阵计算切比雪夫多项式。
        :param laplacian: 图拉普拉斯矩阵,形状为 [N, N]。
        :return: 多阶切比雪夫拉普拉斯矩阵,形状为 [K, N, N]。
        """
        N = laplacian.size(0)  # [N, N]
        laplacian = laplacian.unsqueeze(0)
        first_laplacian = torch.zeros([1, N, N], device=laplacian.device, dtype=torch.float)
        second_laplacian = laplacian
        third_laplacian = (2 * torch.matmul(laplacian, second_laplacian)) - first_laplacian
        forth_laplacian = 2 * torch.matmul(laplacian, third_laplacian) - second_laplacian
        multi_order_laplacian = torch.cat([first_laplacian, second_laplacian, third_laplacian, forth_laplacian], dim=0)
        return multi_order_laplacian

    def latent_correlation_layer(self, x):
        input, _ = self.GRU(x.permute(2, 0, 1).contiguous())
        input = input.permute(1, 0, 2).contiguous()
        attention = self.self_graph_attention(input)
        attention = torch.mean(attention, dim=0)
        degree = torch.sum(attention, dim=1)
        # laplacian is sym or not
        attention = 0.5 * (attention + attention.T)
        degree_l = torch.diag(degree)
        diagonal_degree_hat = torch.diag(1 / (torch.sqrt(degree) + 1e-7))
        laplacian = torch.matmul(diagonal_degree_hat,
                                    torch.matmul(degree_l - attention, diagonal_degree_hat))
        mul_L = self.cheb_polynomial(laplacian)
        return mul_L, attention

    def self_graph_attention(self, input):
        input = input.permute(0, 2, 1).contiguous()
        bat, N, fea = input.size()
        key = torch.matmul(input, self.weight_key)
        query = torch.matmul(input, self.weight_query)
        data = key.repeat(1, 1, N).view(bat, N * N, 1) + query.repeat(1, N, 1)
        data = data.squeeze(2)
        data = data.view(bat, N, -1)
        data = self.leakyrelu(data)
        attention = F.softmax(data, dim=2)
        attention = self.dropout(attention)
        return attention

    def graph_fft(self, input, eigenvectors):
        return torch.matmul(eigenvectors, input)

    def forward(self, windows_batch):
        # Parse batch
        x = windows_batch['insample_y']
        batch_size = x.shape[0]

        mul_L, attention = self.latent_correlation_layer(x)
        X = x.unsqueeze(1).permute(0, 1, 3, 2).contiguous()
        result = []
        for stack_i in range(self.stack_cnt):
            forecast, X = self.stock_block[stack_i](X, mul_L)
            result.append(forecast)
        forecast = result[0] + result[1]
        forecast = self.fc(forecast)

        forecast = forecast.permute(0, 2, 1).contiguous()
        forecast = forecast.reshape(batch_size, self.h, self.loss.outputsize_multiplier * self.n_series)
        forecast = self.loss.domain_map(forecast)

        # 在 n_series == 1 的情况下,domain_map 可能已经压缩了最后一个维度。
        # 请注意,在元组损失的情况下,此方法会失败,但Multivariate目前尚不支持元组损失。
        if forecast.ndim == 2:
            return forecast.unsqueeze(-1)
        else:
            return forecast
show_doc(StemGNN)
show_doc(StemGNN.fit, name='StemGNN.fit')
show_doc(StemGNN.predict, name='StemGNN.predict')
import logging
import warnings

from neuralforecast import NeuralForecast
from neuralforecast.utils import AirPassengersPanel, AirPassengersStatic
from neuralforecast.losses.pytorch import MAE, MSE, RMSE, MAPE, SMAPE, MASE, relMSE, QuantileLoss, MQLoss, DistributionLoss,PMM, GMM, NBMM, HuberLoss, TukeyLoss, HuberQLoss, HuberMQLoss
# 测试损失
logging.getLogger("pytorch_lightning").setLevel(logging.ERROR)
warnings.filterwarnings("ignore")

Y_train_df = AirPassengersPanel[AirPassengersPanel.ds<AirPassengersPanel['ds'].values[-12]].reset_index(drop=True) # 132次列车
Y_test_df = AirPassengersPanel[AirPassengersPanel.ds>=AirPassengersPanel['ds'].values[-12]].reset_index(drop=True) # 12项测试

AirPassengersStatic_single = AirPassengersStatic[AirPassengersStatic["unique_id"] == 'Airline1']
Y_train_df_single = Y_train_df[Y_train_df["unique_id"] == 'Airline1']
Y_test_df_single = Y_test_df[Y_test_df["unique_id"] == 'Airline1']

losses = [MAE(), MSE(), RMSE(), MAPE(), SMAPE(), MASE(seasonality=12), relMSE(y_train=Y_train_df), QuantileLoss(q=0.5), MQLoss(), DistributionLoss(distribution='Bernoulli'), DistributionLoss(distribution='Normal'), DistributionLoss(distribution='Poisson'), DistributionLoss(distribution='StudentT'), DistributionLoss(distribution='NegativeBinomial'), DistributionLoss(distribution='Tweedie'), PMM(), GMM(), NBMM(), HuberLoss(), TukeyLoss(), HuberQLoss(q=0.5), HuberMQLoss()]
valid_losses = [MAE(), MSE(), RMSE(), MAPE(), SMAPE(), MASE(seasonality=12), relMSE(y_train=Y_train_df), QuantileLoss(q=0.5), MQLoss(), DistributionLoss(distribution='Bernoulli'), DistributionLoss(distribution='Normal'), DistributionLoss(distribution='Poisson'), DistributionLoss(distribution='StudentT'), DistributionLoss(distribution='NegativeBinomial'), DistributionLoss(distribution='Tweedie'), PMM(), GMM(), NBMM(), HuberLoss(), TukeyLoss(), HuberQLoss(q=0.5), HuberMQLoss()]

for loss, valid_loss in zip(losses, valid_losses):
    try:
        model = StemGNN(h=12,
                        input_size=24,
                        n_series=2,
                        scaler_type='robust',
                        max_steps=2,
                        early_stop_patience_steps=-1,
                        val_check_steps=10,
                        learning_rate=1e-3,
                        loss=loss,
                        valid_loss=valid_loss,
                        batch_size=32
                        )

        fcst = NeuralForecast(models=[model], freq='M')
        fcst.fit(df=Y_train_df, static_df=AirPassengersStatic, val_size=12)
        forecasts = fcst.predict(futr_df=Y_test_df)
    except Exception as e:
        assert str(e) == f"{loss} is not supported in a Multivariate model."


# 测试 n_系列 = 1
model = StemGNN(h=12,
                input_size=24,
                n_series=1,
                scaler_type='robust',
                max_steps=2,
                early_stop_patience_steps=-1,
                val_check_steps=10,
                learning_rate=1e-3,
                loss=MAE(),
                valid_loss=MAE(),
                batch_size=32
                )
fcst = NeuralForecast(models=[model], freq='M')
fcst.fit(df=Y_train_df_single, static_df=AirPassengersStatic_single, val_size=12)
forecasts = fcst.predict(futr_df=Y_test_df_single)        

使用示例

训练模型并使用 predict 方法预测未来值。

import pandas as pd
import matplotlib.pyplot as plt

from neuralforecast import NeuralForecast
from neuralforecast.models import StemGNN
from neuralforecast.utils import AirPassengersPanel, AirPassengersStatic
from neuralforecast.losses.pytorch import MAE

Y_train_df = AirPassengersPanel[AirPassengersPanel.ds<AirPassengersPanel['ds'].values[-12]].reset_index(drop=True) # 132次列车
Y_test_df = AirPassengersPanel[AirPassengersPanel.ds>=AirPassengersPanel['ds'].values[-12]].reset_index(drop=True) # 12项测试

model = StemGNN(h=12,
                input_size=24,
                n_series=2,
                scaler_type='robust',
                max_steps=100,
                early_stop_patience_steps=-1,
                val_check_steps=10,
                learning_rate=1e-3,
                loss=MAE(),
                valid_loss=None,
                batch_size=32
                )

fcst = NeuralForecast(models=[model], freq='M')
fcst.fit(df=Y_train_df, static_df=AirPassengersStatic, val_size=12)
forecasts = fcst.predict(futr_df=Y_test_df)

# 情节预测
fig, ax = plt.subplots(1, 1, figsize = (20, 7))
Y_hat_df = forecasts.reset_index(drop=False).drop(columns=['unique_id','ds'])
plot_df = pd.concat([Y_test_df, Y_hat_df], axis=1)
plot_df = pd.concat([Y_train_df, plot_df])

plot_df = plot_df[plot_df.unique_id=='Airline1'].drop('unique_id', axis=1)
plt.plot(plot_df['ds'], plot_df['y'], c='black', label='True')
plt.plot(plot_df['ds'], plot_df['StemGNN'], c='blue', label='Forecast')
ax.set_title('AirPassengers Forecast', fontsize=22)
ax.set_ylabel('Monthly Passengers', fontsize=20)
ax.set_xlabel('Year', fontsize=20)
ax.legend(prop={'size': 15})
ax.grid()

使用 cross_validation 预测多个历史值。

fcst = NeuralForecast(models=[model], freq='M')
forecasts = fcst.cross_validation(df=AirPassengersPanel, static_df=AirPassengersStatic, n_windows=2, step_size=12)

# 情节预测
fig, ax = plt.subplots(1, 1, figsize = (20, 7))
Y_hat_df = forecasts.loc['Airline1']
Y_df = AirPassengersPanel[AirPassengersPanel['unique_id']=='Airline1']

plt.plot(Y_df['ds'], Y_df['y'], c='black', label='True')
plt.plot(Y_hat_df['ds'], Y_hat_df['StemGNN'], c='blue', label='Forecast')
ax.set_title('AirPassengers Forecast', fontsize=22)
ax.set_ylabel('Monthly Passengers', fontsize=20)
ax.set_xlabel('Year', fontsize=20)
ax.legend(prop={'size': 15})
ax.grid()

Give us a ⭐ on Github