%load_ext autoreload
%autoreload 2神经网络模块
import math
import torch
import torch.nn as nn
import torch.nn.functional as Ffrom nbdev.showdoc import show_docACTIVATIONS = ['ReLU','Softplus','Tanh','SELU','LeakyReLU','PReLU','Sigmoid']1. MLP
多层感知器
class MLP(nn.Module):
"""Multi-Layer Perceptron Class
**Parameters:**<br>
`in_features`: int, dimension of input.<br>
`out_features`: int, dimension of output.<br>
`activation`: str, activation function to use.<br>
`hidden_size`: int, dimension of hidden layers.<br>
`num_layers`: int, number of hidden layers.<br>
`dropout`: float, dropout rate.<br>
"""
def __init__(self, in_features, out_features, activation, hidden_size, num_layers, dropout):
super().__init__()
assert activation in ACTIVATIONS, f'{activation} is not in {ACTIVATIONS}'
self.activation = getattr(nn, activation)()
# 多层感知器
# 输入层
layers = [nn.Linear(in_features=in_features, out_features=hidden_size),
self.activation,
nn.Dropout(dropout)]
# 隐藏层
for i in range(num_layers - 2):
layers += [nn.Linear(in_features=hidden_size, out_features=hidden_size),
self.activation,
nn.Dropout(dropout)]
# 输出层
layers += [nn.Linear(in_features=hidden_size, out_features=out_features)]
# 按层存储为 ModuleList
self.layers = nn.Sequential(*layers)
def forward(self, x):
return self.layers(x)2. 时序卷积
在深度学习的很长一段时间内,序列建模与递归网络同义,但几篇论文已经表明,简单的卷积架构可以通过展现更长的有效记忆来超越经典的递归网络,如LSTM。
参考文献
-van den Oord, A., Dieleman, S., Zen, H., Simonyan, K., Vinyals, O., Graves, A., Kalchbrenner, N., Senior, A. W., & Kavukcuoglu, K. (2016). Wavenet: A generative model for raw audio. Computing Research Repository, abs/1609.03499. URL: http://arxiv.org/abs/1609.03499. arXiv:1609.03499.
-Shaojie Bai, Zico Kolter, Vladlen Koltun. (2018). An Empirical Evaluation of Generic Convolutional and Recurrent Networks for Sequence Modeling. Computing Research Repository, abs/1803.01271. URL: https://arxiv.org/abs/1803.01271.
class Chomp1d(nn.Module):
""" Chomp1d
Receives `x` input of dim [N,C,T], and trims it so that only
'time available' information is used.
Used by one dimensional causal convolutions `CausalConv1d`.
**Parameters:**<br>
`horizon`: int, length of outsample values to skip.
"""
def __init__(self, horizon):
super(Chomp1d, self).__init__()
self.horizon = horizon
def forward(self, x):
return x[:, :, :-self.horizon].contiguous()
class CausalConv1d(nn.Module):
""" Causal Convolution 1d
Receives `x` input of dim [N,C_in,T], and computes a causal convolution
in the time dimension. Skipping the H steps of the forecast horizon, through
its dilation.
Consider a batch of one element, the dilated convolution operation on the
$t$ time step is defined:
$\mathrm{Conv1D}(\mathbf{x},\mathbf{w})(t) = (\mathbf{x}_{[*d]} \mathbf{w})(t) = \sum^{K}_{k=1} w_{k} \mathbf{x}_{t-dk}$
where $d$ is the dilation factor, $K$ is the kernel size, $t-dk$ is the index of
the considered past observation. The dilation effectively applies a filter with skip
connections. If $d=1$ one recovers a normal convolution.
**Parameters:**<br>
`in_channels`: int, dimension of `x` input's initial channels.<br>
`out_channels`: int, dimension of `x` outputs's channels.<br>
`activation`: str, identifying activations from PyTorch activations.
select from 'ReLU','Softplus','Tanh','SELU', 'LeakyReLU','PReLU','Sigmoid'.<br>
`padding`: int, number of zero padding used to the left.<br>
`kernel_size`: int, convolution's kernel size.<br>
`dilation`: int, dilation skip connections.<br>
**Returns:**<br>
`x`: tensor, torch tensor of dim [N,C_out,T] activation(conv1d(inputs, kernel) + bias). <br>
"""
def __init__(self, in_channels, out_channels, kernel_size,
padding, dilation, activation, stride:int=1):
super(CausalConv1d, self).__init__()
assert activation in ACTIVATIONS, f'{activation} is not in {ACTIVATIONS}'
self.conv = nn.Conv1d(in_channels=in_channels, out_channels=out_channels,
kernel_size=kernel_size, stride=stride, padding=padding,
dilation=dilation)
self.chomp = Chomp1d(padding)
self.activation = getattr(nn, activation)()
self.causalconv = nn.Sequential(self.conv, self.chomp, self.activation)
def forward(self, x):
return self.causalconv(x)show_doc(CausalConv1d, title_level=3)class TemporalConvolutionEncoder(nn.Module):
""" Temporal Convolution Encoder
Receives `x` input of dim [N,T,C_in], permutes it to [N,C_in,T]
applies a deep stack of exponentially dilated causal convolutions.
The exponentially increasing dilations of the convolutions allow for
the creation of weighted averages of exponentially large long-term memory.
**Parameters:**<br>
`in_channels`: int, dimension of `x` input's initial channels.<br>
`out_channels`: int, dimension of `x` outputs's channels.<br>
`kernel_size`: int, size of the convolving kernel.<br>
`dilations`: int list, controls the temporal spacing between the kernel points.<br>
`activation`: str, identifying activations from PyTorch activations.
select from 'ReLU','Softplus','Tanh','SELU', 'LeakyReLU','PReLU','Sigmoid'.<br>
**Returns:**<br>
`x`: tensor, torch tensor of dim [N,T,C_out].<br>
"""
# 待办事项:添加膨胀参数并将层声明更改为for循环
def __init__(self, in_channels, out_channels,
kernel_size, dilations,
activation:str='ReLU'):
super(TemporalConvolutionEncoder, self).__init__()
layers = []
for dilation in dilations:
layers.append(CausalConv1d(in_channels=in_channels, out_channels=out_channels,
kernel_size=kernel_size, padding=(kernel_size-1)*dilation,
activation=activation, dilation=dilation))
in_channels = out_channels
self.tcn = nn.Sequential(*layers)
def forward(self, x):
# [N,T,C_in] -> [N,C_in,T] -> [N,T,C_out]
x = x.permute(0, 2, 1).contiguous()
x = self.tcn(x)
x = x.permute(0, 2, 1).contiguous()
return xshow_doc(TemporalConvolutionEncoder, title_level=3)3. 变换器
参考文献
- Haoyi Zhou, Shanghang Zhang, Jieqi Peng, Shuai Zhang, Jianxin Li, Hui Xiong, Wancai Zhang. “Informer: 超越高效Transformer用于长序列时间序列预测”
- Haixu Wu, Jiehui Xu, Jianmin Wang, Mingsheng Long.
class TransEncoderLayer(nn.Module):
def __init__(self, attention, hidden_size, conv_hidden_size=None, dropout=0.1, activation="relu"):
super(TransEncoderLayer, self).__init__()
conv_hidden_size = conv_hidden_size or 4 * hidden_size
self.attention = attention
self.conv1 = nn.Conv1d(in_channels=hidden_size, out_channels=conv_hidden_size, kernel_size=1)
self.conv2 = nn.Conv1d(in_channels=conv_hidden_size, out_channels=hidden_size, kernel_size=1)
self.norm1 = nn.LayerNorm(hidden_size)
self.norm2 = nn.LayerNorm(hidden_size)
self.dropout = nn.Dropout(dropout)
self.activation = F.relu if activation == "relu" else F.gelu
def forward(self, x, attn_mask=None):
new_x, attn = self.attention(
x, x, x,
attn_mask=attn_mask
)
x = x + self.dropout(new_x)
y = x = self.norm1(x)
y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
y = self.dropout(self.conv2(y).transpose(-1, 1))
return self.norm2(x + y), attn
class TransEncoder(nn.Module):
def __init__(self, attn_layers, conv_layers=None, norm_layer=None):
super(TransEncoder, self).__init__()
self.attn_layers = nn.ModuleList(attn_layers)
self.conv_layers = nn.ModuleList(conv_layers) if conv_layers is not None else None
self.norm = norm_layer
def forward(self, x, attn_mask=None):
# x [早餐, 午餐, 晚餐]
attns = []
if self.conv_layers is not None:
for attn_layer, conv_layer in zip(self.attn_layers, self.conv_layers):
x, attn = attn_layer(x, attn_mask=attn_mask)
x = conv_layer(x)
attns.append(attn)
x, attn = self.attn_layers[-1](x)
attns.append(attn)
else:
for attn_layer in self.attn_layers:
x, attn = attn_layer(x, attn_mask=attn_mask)
attns.append(attn)
if self.norm is not None:
x = self.norm(x)
return x, attnsclass TransDecoderLayer(nn.Module):
def __init__(self, self_attention, cross_attention, hidden_size, conv_hidden_size=None,
dropout=0.1, activation="relu"):
super(TransDecoderLayer, self).__init__()
conv_hidden_size = conv_hidden_size or 4 * hidden_size
self.self_attention = self_attention
self.cross_attention = cross_attention
self.conv1 = nn.Conv1d(in_channels=hidden_size, out_channels=conv_hidden_size, kernel_size=1)
self.conv2 = nn.Conv1d(in_channels=conv_hidden_size, out_channels=hidden_size, kernel_size=1)
self.norm1 = nn.LayerNorm(hidden_size)
self.norm2 = nn.LayerNorm(hidden_size)
self.norm3 = nn.LayerNorm(hidden_size)
self.dropout = nn.Dropout(dropout)
self.activation = F.relu if activation == "relu" else F.gelu
def forward(self, x, cross, x_mask=None, cross_mask=None):
x = x + self.dropout(self.self_attention(
x, x, x,
attn_mask=x_mask
)[0])
x = self.norm1(x)
x = x + self.dropout(self.cross_attention(
x, cross, cross,
attn_mask=cross_mask
)[0])
y = x = self.norm2(x)
y = self.dropout(self.activation(self.conv1(y.transpose(-1, 1))))
y = self.dropout(self.conv2(y).transpose(-1, 1))
return self.norm3(x + y)
class TransDecoder(nn.Module):
def __init__(self, layers, norm_layer=None, projection=None):
super(TransDecoder, self).__init__()
self.layers = nn.ModuleList(layers)
self.norm = norm_layer
self.projection = projection
def forward(self, x, cross, x_mask=None, cross_mask=None):
for layer in self.layers:
x = layer(x, cross, x_mask=x_mask, cross_mask=cross_mask)
if self.norm is not None:
x = self.norm(x)
if self.projection is not None:
x = self.projection(x)
return xclass AttentionLayer(nn.Module):
def __init__(self, attention, hidden_size, n_head, d_keys=None,
d_values=None):
super(AttentionLayer, self).__init__()
d_keys = d_keys or (hidden_size // n_head)
d_values = d_values or (hidden_size // n_head)
self.inner_attention = attention
self.query_projection = nn.Linear(hidden_size, d_keys * n_head)
self.key_projection = nn.Linear(hidden_size, d_keys * n_head)
self.value_projection = nn.Linear(hidden_size, d_values * n_head)
self.out_projection = nn.Linear(d_values * n_head, hidden_size)
self.n_head = n_head
def forward(self, queries, keys, values, attn_mask):
B, L, _ = queries.shape
_, S, _ = keys.shape
H = self.n_head
queries = self.query_projection(queries).view(B, L, H, -1)
keys = self.key_projection(keys).view(B, S, H, -1)
values = self.value_projection(values).view(B, S, H, -1)
out, attn = self.inner_attention(
queries,
keys,
values,
attn_mask
)
out = out.view(B, L, -1)
return self.out_projection(out), attnclass PositionalEmbedding(nn.Module):
def __init__(self, hidden_size, max_len=5000):
super(PositionalEmbedding, self).__init__()
# 在日志空间中一次性计算位置编码。
pe = torch.zeros(max_len, hidden_size).float()
pe.require_grad = False
position = torch.arange(0, max_len).float().unsqueeze(1)
div_term = (torch.arange(0, hidden_size, 2).float() * -(math.log(10000.0) / hidden_size)).exp()
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
return self.pe[:, :x.size(1)]
class TokenEmbedding(nn.Module):
def __init__(self, c_in, hidden_size):
super(TokenEmbedding, self).__init__()
padding = 1 if torch.__version__ >= '1.5.0' else 2
self.tokenConv = nn.Conv1d(in_channels=c_in, out_channels=hidden_size,
kernel_size=3, padding=padding, padding_mode='circular', bias=False)
for m in self.modules():
if isinstance(m, nn.Conv1d):
nn.init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='leaky_relu')
def forward(self, x):
x = self.tokenConv(x.permute(0, 2, 1)).transpose(1, 2)
return x
class TimeFeatureEmbedding(nn.Module):
def __init__(self, input_size, hidden_size):
super(TimeFeatureEmbedding, self).__init__()
self.embed = nn.Linear(input_size, hidden_size, bias=False)
def forward(self, x):
return self.embed(x)
class FixedEmbedding(nn.Module):
def __init__(self, c_in, d_model):
super(FixedEmbedding, self).__init__()
w = torch.zeros(c_in, d_model, dtype=torch.float32, requires_grad=False)
position = torch.arange(0, c_in, dtype=torch.float32).unsqueeze(1)
div_term = (torch.arange(0, d_model, 2).float()
* -(math.log(10000.0) / d_model)).exp()
w[:, 0::2] = torch.sin(position * div_term)
w[:, 1::2] = torch.cos(position * div_term)
self.emb = nn.Embedding(c_in, d_model)
self.emb.weight = nn.Parameter(w, requires_grad=False)
def forward(self, x):
return self.emb(x).detach()
class TemporalEmbedding(nn.Module):
def __init__(self, d_model, embed_type='fixed', freq='h'):
super(TemporalEmbedding, self).__init__()
minute_size = 4
hour_size = 24
weekday_size = 7
day_size = 32
month_size = 13
Embed = FixedEmbedding if embed_type == 'fixed' else nn.Embedding
if freq == 't':
self.minute_embed = Embed(minute_size, d_model)
self.hour_embed = Embed(hour_size, d_model)
self.weekday_embed = Embed(weekday_size, d_model)
self.day_embed = Embed(day_size, d_model)
self.month_embed = Embed(month_size, d_model)
def forward(self, x):
x = x.long()
minute_x = self.minute_embed(x[:, :, 4]) if hasattr(
self, 'minute_embed') else 0.
hour_x = self.hour_embed(x[:, :, 3])
weekday_x = self.weekday_embed(x[:, :, 2])
day_x = self.day_embed(x[:, :, 1])
month_x = self.month_embed(x[:, :, 0])
return hour_x + weekday_x + day_x + month_x + minute_x
class DataEmbedding(nn.Module):
def __init__(self, c_in, exog_input_size, hidden_size, pos_embedding=True, dropout=0.1):
super(DataEmbedding, self).__init__()
self.value_embedding = TokenEmbedding(c_in=c_in, hidden_size=hidden_size)
if pos_embedding:
self.position_embedding = PositionalEmbedding(hidden_size=hidden_size)
else:
self.position_embedding = None
if exog_input_size > 0:
self.temporal_embedding = TimeFeatureEmbedding(input_size=exog_input_size,
hidden_size=hidden_size)
else:
self.temporal_embedding = None
self.dropout = nn.Dropout(p=dropout)
def forward(self, x, x_mark=None):
# 卷积
x = self.value_embedding(x)
# 添加位置(相对于窗口的相对位置)嵌入,使用正弦和余弦函数。
if self.position_embedding is not None:
x = x + self.position_embedding(x)
# 添加时间(时间序列中的绝对时间)嵌入,使用线性层
if self.temporal_embedding is not None:
x = x + self.temporal_embedding(x_mark)
return self.dropout(x)
class MovingAvg(nn.Module):
"""
移动平均块以突出时间序列的趋势
"""
def __init__(self, kernel_size, stride):
super(MovingAvg, self).__init__()
self.kernel_size = kernel_size
self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)
def forward(self, x):
# 时间序列两端的填充
front = x[:, 0:1, :].repeat(1, (self.kernel_size - 1) // 2, 1)
end = x[:, -1:, :].repeat(1, (self.kernel_size - 1) // 2, 1)
x = torch.cat([front, x, end], dim=1)
x = self.avg(x.permute(0, 2, 1))
x = x.permute(0, 2, 1)
return x
class SeriesDecomp(nn.Module):
"""
序列分解模块
"""
def __init__(self, kernel_size):
super(SeriesDecomp, self).__init__()
self.MovingAvg = MovingAvg(kernel_size, stride=1)
def forward(self, x):
moving_mean = self.MovingAvg(x)
res = x - moving_mean
return res, moving_mean
class RevIN(nn.Module):
""" RevIN(可逆实例归一化)
"""
def __init__(self, num_features: int, eps=1e-5, affine=False, subtract_last=False, non_norm=False):
"""
:param num_features: the number of features or channels
:param eps: a value added for numerical stability
:param affine: if True, RevIN has learnable affine parameters
:param substract_last: if True, the substraction is based on the last value
instead of the mean in normalization
:param non_norm: if True, no normalization performed.
"""
super(RevIN, self).__init__()
self.num_features = num_features
self.eps = eps
self.affine = affine
self.subtract_last = subtract_last
self.non_norm = non_norm
if self.affine:
self._init_params()
def forward(self, x, mode: str):
if mode == 'norm':
self._get_statistics(x)
x = self._normalize(x)
elif mode == 'denorm':
x = self._denormalize(x)
else:
raise NotImplementedError
return x
def _init_params(self):
# 初始化RevIN参数:(C,)
self.affine_weight = nn.Parameter(torch.ones(self.num_features))
self.affine_bias = nn.Parameter(torch.zeros(self.num_features))
def _get_statistics(self, x):
dim2reduce = tuple(range(1, x.ndim - 1))
if self.subtract_last:
self.last = x[:, -1, :].unsqueeze(1)
else:
self.mean = torch.mean(x, dim=dim2reduce, keepdim=True).detach()
self.stdev = torch.sqrt(torch.var(x, dim=dim2reduce, keepdim=True, unbiased=False) + self.eps).detach()
def _normalize(self, x):
if self.non_norm:
return x
if self.subtract_last:
x = x - self.last
else:
x = x - self.mean
x = x / self.stdev
if self.affine:
x = x * self.affine_weight
x = x + self.affine_bias
return x
def _denormalize(self, x):
if self.non_norm:
return x
if self.affine:
x = x - self.affine_bias
x = x / (self.affine_weight + self.eps * self.eps)
x = x * self.stdev
if self.subtract_last:
x = x + self.last
else:
x = x + self.mean
return xGive us a ⭐ on Github