神经网络模块

%load_ext autoreload
%autoreload 2
import math

import torch
import torch.nn as nn
import torch.nn.functional as F
from nbdev.showdoc import show_doc
ACTIVATIONS = ['ReLU','Softplus','Tanh','SELU','LeakyReLU','PReLU','Sigmoid']

1. MLP

多层感知器

class MLP(nn.Module):
    """Multi-Layer Perceptron Class

    **Parameters:**<br>
    `in_features`: int, dimension of input.<br>
    `out_features`: int, dimension of output.<br>
    `activation`: str, activation function to use.<br>
    `hidden_size`: int, dimension of hidden layers.<br>
    `num_layers`: int, number of hidden layers.<br>
    `dropout`: float, dropout rate.<br>
    """
    def __init__(self, in_features, out_features, activation, hidden_size, num_layers, dropout):
        super().__init__()
        assert activation in ACTIVATIONS, f'{activation} is not in {ACTIVATIONS}'
        
        self.activation = getattr(nn, activation)()

        # 多层感知器
        # 输入层
        layers = [nn.Linear(in_features=in_features, out_features=hidden_size),
                  self.activation,
                  nn.Dropout(dropout)]
        # 隐藏层
        for i in range(num_layers - 2):
            layers += [nn.Linear(in_features=hidden_size, out_features=hidden_size),
                       self.activation,
                       nn.Dropout(dropout)]
        # 输出层
        layers += [nn.Linear(in_features=hidden_size, out_features=out_features)]

        # 按层存储为 ModuleList
        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        return self.layers(x)

2. 时序卷积

在深度学习的很长一段时间内,序列建模与递归网络同义,但几篇论文已经表明,简单的卷积架构可以通过展现更长的有效记忆来超越经典的递归网络,如LSTM。

参考文献
-van den Oord, A., Dieleman, S., Zen, H., Simonyan, K., Vinyals, O., Graves, A., Kalchbrenner, N., Senior, A. W., & Kavukcuoglu, K. (2016). Wavenet: A generative model for raw audio. Computing Research Repository, abs/1609.03499. URL: http://arxiv.org/abs/1609.03499. arXiv:1609.03499.
-Shaojie Bai, Zico Kolter, Vladlen Koltun. (2018). An Empirical Evaluation of Generic Convolutional and Recurrent Networks for Sequence Modeling. Computing Research Repository, abs/1803.01271. URL: https://arxiv.org/abs/1803.01271.

class Chomp1d(nn.Module):
    """ Chomp1d

    Receives `x` input of dim [N,C,T], and trims it so that only
    'time available' information is used. 
    Used by one dimensional causal convolutions `CausalConv1d`.

    **Parameters:**<br>
    `horizon`: int, length of outsample values to skip.
    """
    def __init__(self, horizon):
        super(Chomp1d, self).__init__()
        self.horizon = horizon

    def forward(self, x):
        return x[:, :, :-self.horizon].contiguous()


class CausalConv1d(nn.Module):
    """ Causal Convolution 1d

    Receives `x` input of dim [N,C_in,T], and computes a causal convolution
    in the time dimension. Skipping the H steps of the forecast horizon, through
    its dilation.
    Consider a batch of one element, the dilated convolution operation on the
    $t$ time step is defined:

    $\mathrm{Conv1D}(\mathbf{x},\mathbf{w})(t) = (\mathbf{x}_{[*d]} \mathbf{w})(t) = \sum^{K}_{k=1} w_{k} \mathbf{x}_{t-dk}$

    where $d$ is the dilation factor, $K$ is the kernel size, $t-dk$ is the index of
    the considered past observation. The dilation effectively applies a filter with skip
    connections. If $d=1$ one recovers a normal convolution.

    **Parameters:**<br>
    `in_channels`: int, dimension of `x` input's initial channels.<br> 
    `out_channels`: int, dimension of `x` outputs's channels.<br> 
    `activation`: str, identifying activations from PyTorch activations.
        select from 'ReLU','Softplus','Tanh','SELU', 'LeakyReLU','PReLU','Sigmoid'.<br>
    `padding`: int, number of zero padding used to the left.<br>
    `kernel_size`: int, convolution's kernel size.<br>
    `dilation`: int, dilation skip connections.<br>
    
    **Returns:**<br>
    `x`: tensor, torch tensor of dim [N,C_out,T] activation(conv1d(inputs, kernel) + bias). <br>
    """
    def __init__(self, in_channels, out_channels, kernel_size,
                 padding, dilation, activation, stride:int=1):
        super(CausalConv1d, self).__init__()
        assert activation in ACTIVATIONS, f'{activation} is not in {ACTIVATIONS}'
        
        self.conv       = nn.Conv1d(in_channels=in_channels, out_channels=out_channels, 
                                    kernel_size=kernel_size, stride=stride, padding=padding,
                                    dilation=dilation)
        
        self.chomp      = Chomp1d(padding)
        self.activation = getattr(nn, activation)()
        self.causalconv = nn.Sequential(self.conv, self.chomp, self.activation)
    
    def forward(self, x):
        return self.causalconv(x)
show_doc(CausalConv1d, title_level=3)
class TemporalConvolutionEncoder(nn.Module):
    """ Temporal Convolution Encoder

    Receives `x` input of dim [N,T,C_in], permutes it to  [N,C_in,T]
    applies a deep stack of exponentially dilated causal convolutions.
    The exponentially increasing dilations of the convolutions allow for 
    the creation of weighted averages of exponentially large long-term memory.

    **Parameters:**<br>
    `in_channels`: int, dimension of `x` input's initial channels.<br> 
    `out_channels`: int, dimension of `x` outputs's channels.<br>
    `kernel_size`: int, size of the convolving kernel.<br>
    `dilations`: int list, controls the temporal spacing between the kernel points.<br>
    `activation`: str, identifying activations from PyTorch activations.
        select from 'ReLU','Softplus','Tanh','SELU', 'LeakyReLU','PReLU','Sigmoid'.<br>

    **Returns:**<br>
    `x`: tensor, torch tensor of dim [N,T,C_out].<br>
    """
    # 待办事项:添加膨胀参数并将层声明更改为for循环
    def __init__(self, in_channels, out_channels, 
                 kernel_size, dilations,
                 activation:str='ReLU'):
        super(TemporalConvolutionEncoder, self).__init__()
        layers = []
        for dilation in dilations:
            layers.append(CausalConv1d(in_channels=in_channels, out_channels=out_channels, 
                                        kernel_size=kernel_size, padding=(kernel_size-1)*dilation, 
                                        activation=activation, dilation=dilation))
            in_channels = out_channels
        self.tcn = nn.Sequential(*layers)

    def forward(self, x):
        # [N,T,C_in] -> [N,C_in,T] -> [N,T,C_out]
        x = x.permute(0, 2, 1).contiguous()
        x = self.tcn(x)
        x = x.permute(0, 2, 1).contiguous()
        return x
show_doc(TemporalConvolutionEncoder, title_level=3)

3. 变换器

参考文献
- Haoyi Zhou, Shanghang Zhang, Jieqi Peng, Shuai Zhang, Jianxin Li, Hui Xiong, Wancai Zhang. “Informer: 超越高效Transformer用于长序列时间序列预测”
- Haixu Wu, Jiehui Xu, Jianmin Wang, Mingsheng Long.

class TransEncoderLayer(nn.Module):
    def __init__(self, attention, hidden_size, conv_hidden_size=None, dropout=0.1, activation="relu"):
        super(TransEncoderLayer, self).__init__()
        conv_hidden_size = conv_hidden_size or 4 * hidden_size
        self.attention = attention
        self.conv1 = nn.Conv1d(in_channels=hidden_size, out_channels=conv_hidden_size, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=conv_hidden_size, out_channels=hidden_size, kernel_size=1)
        self.norm1 = nn.LayerNorm(hidden_size)
        self.norm2 = nn.LayerNorm(hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu

    def forward(self, x, attn_mask=None):
        new_x, attn = self.attention(
            x, x, x,
            attn_mask=attn_mask
        )
        
        x = x + self.dropout(new_x)

        y = x = self.norm1(x)
        y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
        y = self.dropout(self.conv2(y).transpose(-1, 1))

        return self.norm2(x + y), attn


class TransEncoder(nn.Module):
    def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
        super(TransEncoder, self).__init__()
        self.attn_layers = nn.ModuleList(attn_layers)
        self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
        self.norm = norm_layer

    def forward(self, x, attn_mask=None):
        # x [早餐, 午餐, 晚餐]
        attns = []
        if self.conv_layers is not None:
            for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
                x, attn = attn_layer(x, attn_mask=attn_mask)
                x = conv_layer(x)
                attns.append(attn)
            x, attn = self.attn_layers[-1](x)
            attns.append(attn)
        else:
            for attn_layer in self.attn_layers:
                x, attn = attn_layer(x, attn_mask=attn_mask)
                attns.append(attn)

        if self.norm is not None:
            x = self.norm(x)

        return x, attns
class TransDecoderLayer(nn.Module):
    def __init__(self, self_attention, cross_attention, hidden_size, conv_hidden_size=None,
                 dropout=0.1, activation="relu"):
        super(TransDecoderLayer, self).__init__()
        conv_hidden_size = conv_hidden_size or 4 * hidden_size
        self.self_attention = self_attention
        self.cross_attention = cross_attention
        self.conv1 = nn.Conv1d(in_channels=hidden_size, out_channels=conv_hidden_size, kernel_size=1)
        self.conv2 = nn.Conv1d(in_channels=conv_hidden_size, out_channels=hidden_size, kernel_size=1)
        self.norm1 = nn.LayerNorm(hidden_size)
        self.norm2 = nn.LayerNorm(hidden_size)
        self.norm3 = nn.LayerNorm(hidden_size)
        self.dropout = nn.Dropout(dropout)
        self.activation = F.relu if activation == "relu" else F.gelu

    def forward(self, x, cross, x_mask=None, cross_mask=None):
        x = x + self.dropout(self.self_attention(
            x, x, x,
            attn_mask=x_mask
        )[0])
        x = self.norm1(x)

        x = x + self.dropout(self.cross_attention(
            x, cross, cross,
            attn_mask=cross_mask
        )[0])

        y = x = self.norm2(x)
        y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
        y = self.dropout(self.conv2(y).transpose(-1, 1))

        return self.norm3(x + y)


class TransDecoder(nn.Module):
    def __init__(self, layers, norm_layer=None, projection=None):
        super(TransDecoder, self).__init__()
        self.layers = nn.ModuleList(layers)
        self.norm = norm_layer
        self.projection = projection

    def forward(self, x, cross, x_mask=None, cross_mask=None):
        for layer in self.layers:
            x = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask)

        if self.norm is not None:
            x = self.norm(x)

        if self.projection is not None:
            x = self.projection(x)
        return x
class AttentionLayer(nn.Module):
    def __init__(self, attention, hidden_size, n_head, d_keys=None,
                 d_values=None):
        super(AttentionLayer, self).__init__()

        d_keys = d_keys or (hidden_size // n_head)
        d_values = d_values or (hidden_size // n_head)

        self.inner_attention = attention
        self.query_projection = nn.Linear(hidden_size, d_keys * n_head)
        self.key_projection = nn.Linear(hidden_size, d_keys * n_head)
        self.value_projection = nn.Linear(hidden_size, d_values * n_head)
        self.out_projection = nn.Linear(d_values * n_head, hidden_size)
        self.n_head = n_head

    def forward(self, queries, keys, values, attn_mask):
        B, L, _ = queries.shape
        _, S, _ = keys.shape
        H = self.n_head

        queries = self.query_projection(queries).view(B, L, H, -1)
        keys = self.key_projection(keys).view(B, S, H, -1)
        values = self.value_projection(values).view(B, S, H, -1)

        out, attn = self.inner_attention(
            queries,
            keys,
            values,
            attn_mask
        )
        out = out.view(B, L, -1)

        return self.out_projection(out), attn
class PositionalEmbedding(nn.Module):
    def __init__(self, hidden_size, max_len=5000):
        super(PositionalEmbedding, self).__init__()
        # 在日志空间中一次性计算位置编码。
        pe = torch.zeros(max_len, hidden_size).float()
        pe.require_grad = False

        position = torch.arange(0, max_len).float().unsqueeze(1)
        div_term = (torch.arange(0, hidden_size, 2).float() * -(math.log(10000.0) / hidden_size)).exp()

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return self.pe[:, :x.size(1)]

class TokenEmbedding(nn.Module):
    def __init__(self, c_in, hidden_size):
        super(TokenEmbedding, self).__init__()
        padding = 1 if torch.__version__ >= '1.5.0' else 2
        self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=hidden_size,
                                   kernel_size=3, padding=padding, padding_mode='circular', bias=False)
        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu')

    def forward(self, x):
        x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)
        return x

class TimeFeatureEmbedding(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(TimeFeatureEmbedding, self).__init__()
        self.embed = nn.Linear(input_size, hidden_size, bias=False)

    def forward(self, x):
        return self.embed(x)
    
class FixedEmbedding(nn.Module):
    def __init__(self, c_in, d_model):
        super(FixedEmbedding, self).__init__()

        w = torch.zeros(c_in, d_model, dtype=torch.float32, requires_grad=False)
        position = torch.arange(0, c_in, dtype=torch.float32).unsqueeze(1)
        div_term = (torch.arange(0, d_model, 2).float()
                    * -(math.log(10000.0) / d_model)).exp()

        w[:, 0::2] = torch.sin(position * div_term)
        w[:, 1::2] = torch.cos(position * div_term)

        self.emb = nn.Embedding(c_in, d_model)
        self.emb.weight = nn.Parameter(w, requires_grad=False)

    def forward(self, x):
        return self.emb(x).detach()
    
class TemporalEmbedding(nn.Module):
    def __init__(self, d_model, embed_type='fixed', freq='h'):
        super(TemporalEmbedding, self).__init__()

        minute_size = 4
        hour_size = 24
        weekday_size = 7
        day_size = 32
        month_size = 13

        Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embedding
        if freq == 't':
            self.minute_embed = Embed(minute_size, d_model)
        self.hour_embed = Embed(hour_size, d_model)
        self.weekday_embed = Embed(weekday_size, d_model)
        self.day_embed = Embed(day_size, d_model)
        self.month_embed = Embed(month_size, d_model)

    def forward(self, x):
        x = x.long()
        minute_x = self.minute_embed(x[:, :, 4]) if hasattr(
            self, 'minute_embed') else 0.
        hour_x = self.hour_embed(x[:, :, 3])
        weekday_x = self.weekday_embed(x[:, :, 2])
        day_x = self.day_embed(x[:, :, 1])
        month_x = self.month_embed(x[:, :, 0])

        return hour_x + weekday_x + day_x + month_x + minute_x

class DataEmbedding(nn.Module):
    def __init__(self, c_in, exog_input_size, hidden_size, pos_embedding=True, dropout=0.1):
        super(DataEmbedding, self).__init__()

        self.value_embedding = TokenEmbedding(c_in=c_in, hidden_size=hidden_size)

        if pos_embedding:
            self.position_embedding = PositionalEmbedding(hidden_size=hidden_size)
        else:
            self.position_embedding = None

        if exog_input_size > 0:
            self.temporal_embedding = TimeFeatureEmbedding(input_size=exog_input_size,
                                                        hidden_size=hidden_size)
        else:
            self.temporal_embedding = None

        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, x_mark=None):

        # 卷积
        x = self.value_embedding(x)

        # 添加位置(相对于窗口的相对位置)嵌入,使用正弦和余弦函数。
        if self.position_embedding is not None:
            x = x + self.position_embedding(x)

        # 添加时间(时间序列中的绝对时间)嵌入,使用线性层
        if self.temporal_embedding is not None:
            x = x + self.temporal_embedding(x_mark)            

        return self.dropout(x)

class MovingAvg(nn.Module):
    """
    移动平均块以突出时间序列的趋势
    """
    def __init__(self, kernel_size, stride):
        super(MovingAvg, self).__init__()
        self.kernel_size = kernel_size
        self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)

    def forward(self, x):
        # 时间序列两端的填充
        front = x[:, 0:1, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        end = x[:, -1:, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        x = torch.cat([front, x, end], dim=1)
        x = self.avg(x.permute(0, 2, 1))
        x = x.permute(0, 2, 1)
        return x
    
class SeriesDecomp(nn.Module):
    """
    序列分解模块
    """
    def __init__(self, kernel_size):
        super(SeriesDecomp, self).__init__()
        self.MovingAvg = MovingAvg(kernel_size, stride=1)

    def forward(self, x):
        moving_mean = self.MovingAvg(x)
        res = x - moving_mean
        return res, moving_mean

class RevIN(nn.Module):
    """ RevIN(可逆实例归一化)
    """
    def __init__(self, num_features: int, eps=1e-5, affine=False, subtract_last=False, non_norm=False):
        """
        :param num_features: the number of features or channels
        :param eps: a value added for numerical stability
        :param affine: if True, RevIN has learnable affine parameters
        :param substract_last: if True, the substraction is based on the last value 
                               instead of the mean in normalization
        :param non_norm: if True, no normalization performed.
        """
        super(RevIN, self).__init__()
        self.num_features = num_features
        self.eps = eps
        self.affine = affine
        self.subtract_last = subtract_last
        self.non_norm = non_norm
        if self.affine:
            self._init_params()

    def forward(self, x, mode: str):
        if mode == 'norm':
            self._get_statistics(x)
            x = self._normalize(x)
        elif mode == 'denorm':
            x = self._denormalize(x)
        else:
            raise NotImplementedError
        return x

    def _init_params(self):
        # 初始化RevIN参数:(C,)
        self.affine_weight = nn.Parameter(torch.ones(self.num_features))
        self.affine_bias = nn.Parameter(torch.zeros(self.num_features))

    def _get_statistics(self, x):
        dim2reduce = tuple(range(1, x.ndim - 1))
        if self.subtract_last:
            self.last = x[:, -1, :].unsqueeze(1)
        else:
            self.mean = torch.mean(x, dim=dim2reduce, keepdim=True).detach()
        self.stdev = torch.sqrt(torch.var(x, dim=dim2reduce, keepdim=True, unbiased=False) + self.eps).detach()

    def _normalize(self, x):
        if self.non_norm:
            return x
        if self.subtract_last:
            x = x - self.last
        else:
            x = x - self.mean
        x = x / self.stdev
        if self.affine:
            x = x * self.affine_weight
            x = x + self.affine_bias
        return x

    def _denormalize(self, x):
        if self.non_norm:
            return x
        if self.affine:
            x = x - self.affine_bias
            x = x / (self.affine_weight + self.eps * self.eps)
        x = x * self.stdev
        if self.subtract_last:
            x = x + self.last
        else:
            x = x + self.mean
        return x

Give us a ⭐ on Github