%load_ext autoreload
%autoreload 2
PyTorch 损失函数
NeuralForecast 包含一系列用于模型优化的 PyTorch 损失类。
最重要的训练信号是预测误差,它是观察值 \(y_{\tau}\) 与预测值 \(\hat{y}_{\tau}\) 之间的差异,时间为 \(y_{\tau}\):
\[e_{\tau} = y_{\tau}-\hat{y}_{\tau} \qquad \qquad \tau \in \{t+1,\dots,t+H \}\]
训练损失汇总了不同训练优化目标中的预测误差。
所有损失都是 torch.nn.modules
,它有助于通过 Pytorch Lightning 自动在 CPU/GPU/TPU 设备之间移动。
from typing import Optional, Union, Tuple
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Distribution
from torch.distributions import (
Bernoulli,
Normal,
StudentT,
Poisson,
NegativeBinomial,
Beta,
AffineTransform,
TransformedDistribution,
)
from torch.distributions import constraints
from functools import partial
import matplotlib.pyplot as plt
from fastcore.test import test_eq
from nbdev.showdoc import show_doc
from neuralforecast.utils import generate_series
def _divide_no_nan(a: torch.Tensor, b: torch.Tensor) -> torch.Tensor:
"""
处理除以零的辅助功能
"""
= a / b
div != div] = 0.0
div[div == float('inf')] = 0.0
div[div return div
def _weighted_mean(losses, weights):
"""
计算每个数据点的加权平均损失。
"""
return _divide_no_nan(torch.sum(losses * weights), torch.sum(weights))
class BasePointLoss(torch.nn.Module):
"""
Base class for point loss functions.
**Parameters:**<br>
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
`outputsize_multiplier`: Multiplier for the output size. <br>
`output_names`: Names of the outputs. <br>
"""
def __init__(self, horizon_weight, outputsize_multiplier, output_names):
super(BasePointLoss, self).__init__()
if horizon_weight is not None:
= torch.Tensor(horizon_weight.flatten())
horizon_weight self.horizon_weight = horizon_weight
self.outputsize_multiplier = outputsize_multiplier
self.output_names = output_names
self.is_distribution_output = False
def domain_map(self, y_hat: torch.Tensor):
"""
单变量损失在维度 [B,T,H]/[B,H] 上进行操作
这使得网络的输出从 [B,H,1] 变为 [B,H]
"""
return y_hat.squeeze(-1)
def _compute_weights(self, y, mask):
"""
Compute final weights for each datapoint (based on all weights and all masks)
Set horizon_weight to a ones[H] tensor if not set.
If set, check that it has the same length as the horizon in x.
"""
if mask is None:
= torch.ones_like(y, device=y.device)
mask
if self.horizon_weight is None:
self.horizon_weight = torch.ones(mask.shape[-1])
else:
assert mask.shape[-1] == len(self.horizon_weight), \
'horizon_weight must have same length as Y'
= self.horizon_weight.clone()
weights = torch.ones_like(mask, device=mask.device) * weights.to(mask.device)
weights return weights * mask
1. 规模相关的误差
这些指标与数据处于相同的尺度上。
平均绝对误差 (MAE)
class MAE(BasePointLoss):
"""Mean Absolute Error
Calculates Mean Absolute Error between
`y` and `y_hat`. MAE measures the relative prediction
accuracy of a forecasting method by calculating the
deviation of the prediction and the true
value at a given time and averages these devations
over the length of the series.
$$ \mathrm{MAE}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}_{\\tau}) = \\frac{1}{H} \\sum^{t+H}_{\\tau=t+1} |y_{\\tau} - \hat{y}_{\\tau}| $$
**Parameters:**<br>
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
"""
def __init__(self, horizon_weight=None):
super(MAE, self).__init__(horizon_weight=horizon_weight,
=1,
outputsize_multiplier=[''])
output_names
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies datapoints to consider in loss.<br>
**Returns:**<br>
`mae`: tensor (single value).
"""
= torch.abs(y - y_hat)
losses = self._compute_weights(y=y, mask=mask)
weights return _weighted_mean(losses=losses, weights=weights)
='MAE.__init__', title_level=3) show_doc(MAE, name
__call__, name='MAE.__call__', title_level=3) show_doc(MAE.
均方误差 (MSE)
class MSE(BasePointLoss):
""" Mean Squared Error
Calculates Mean Squared Error between
`y` and `y_hat`. MSE measures the relative prediction
accuracy of a forecasting method by calculating the
squared deviation of the prediction and the true
value at a given time, and averages these devations
over the length of the series.
$$ \mathrm{MSE}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}_{\\tau}) = \\frac{1}{H} \\sum^{t+H}_{\\tau=t+1} (y_{\\tau} - \hat{y}_{\\tau})^{2} $$
**Parameters:**<br>
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
"""
def __init__(self, horizon_weight=None):
super(MSE, self).__init__(horizon_weight=horizon_weight,
=1,
outputsize_multiplier=[''])
output_names
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies datapoints to consider in loss.<br>
**Returns:**<br>
`mse`: tensor (single value).
"""
= (y - y_hat)**2
losses = self._compute_weights(y=y, mask=mask)
weights return _weighted_mean(losses=losses, weights=weights)
='MSE.__init__', title_level=3) show_doc(MSE, name
__call__, name='MSE.__call__', title_level=3) show_doc(MSE.
均方根误差 (RMSE)
class RMSE(BasePointLoss):
""" Root Mean Squared Error
Calculates Root Mean Squared Error between
`y` and `y_hat`. RMSE measures the relative prediction
accuracy of a forecasting method by calculating the squared deviation
of the prediction and the observed value at a given time and
averages these devations over the length of the series.
Finally the RMSE will be in the same scale
as the original time series so its comparison with other
series is possible only if they share a common scale.
RMSE has a direct connection to the L2 norm.
$$ \mathrm{RMSE}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}_{\\tau}) = \\sqrt{\\frac{1}{H} \\sum^{t+H}_{\\tau=t+1} (y_{\\tau} - \hat{y}_{\\tau})^{2}} $$
**Parameters:**<br>
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
"""
def __init__(self, horizon_weight=None):
super(RMSE, self).__init__(horizon_weight=horizon_weight,
=1,
outputsize_multiplier=[''])
output_names
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies datapoints to consider in loss.<br>
**Returns:**<br>
`rmse`: tensor (single value).
"""
= (y - y_hat)**2
losses = self._compute_weights(y=y, mask=mask)
weights = _weighted_mean(losses=losses, weights=weights)
losses return torch.sqrt(losses)
='RMSE.__init__', title_level=3) show_doc(RMSE, name
__call__, name='RMSE.__call__', title_level=3) show_doc(RMSE.
2. 百分比误差
这些指标是无单位的,适合于跨系列的比较。
平均绝对百分比误差 (MAPE)
class MAPE(BasePointLoss):
""" Mean Absolute Percentage Error
Calculates Mean Absolute Percentage Error between
`y` and `y_hat`. MAPE measures the relative prediction
accuracy of a forecasting method by calculating the percentual deviation
of the prediction and the observed value at a given time and
averages these devations over the length of the series.
The closer to zero an observed value is, the higher penalty MAPE loss
assigns to the corresponding error.
$$ \mathrm{MAPE}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}_{\\tau}) = \\frac{1}{H} \\sum^{t+H}_{\\tau=t+1} \\frac{|y_{\\tau}-\hat{y}_{\\tau}|}{|y_{\\tau}|} $$
**Parameters:**<br>
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
**References:**<br>
[Makridakis S., "Accuracy measures: theoretical and practical concerns".](https://www.sciencedirect.com/science/article/pii/0169207093900793)
"""
def __init__(self, horizon_weight=None):
super(MAPE, self).__init__(horizon_weight=horizon_weight,
=1,
outputsize_multiplier=[''])
output_names
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies date stamps per serie to consider in loss.<br>
**Returns:**<br>
`mape`: tensor (single value).
"""
= _divide_no_nan(torch.ones_like(y, device=y.device), torch.abs(y))
scale = torch.abs(y - y_hat) * scale
losses = self._compute_weights(y=y, mask=mask)
weights = _weighted_mean(losses=losses, weights=weights)
mape return mape
='MAPE.__init__', title_level=3) show_doc(MAPE, name
__call__, name='MAPE.__call__', title_level=3) show_doc(MAPE.
对称平均绝对百分比误差 (sMAPE)
class SMAPE(BasePointLoss):
""" Symmetric Mean Absolute Percentage Error
Calculates Symmetric Mean Absolute Percentage Error between
`y` and `y_hat`. SMAPE measures the relative prediction
accuracy of a forecasting method by calculating the relative deviation
of the prediction and the observed value scaled by the sum of the
absolute values for the prediction and observed value at a
given time, then averages these devations over the length
of the series. This allows the SMAPE to have bounds between
0% and 200% which is desireble compared to normal MAPE that
may be undetermined when the target is zero.
$$ \mathrm{sMAPE}_{2}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}_{\\tau}) = \\frac{1}{H} \\sum^{t+H}_{\\tau=t+1} \\frac{|y_{\\tau}-\hat{y}_{\\tau}|}{|y_{\\tau}|+|\hat{y}_{\\tau}|} $$
**Parameters:**<br>
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
**References:**<br>
[Makridakis S., "Accuracy measures: theoretical and practical concerns".](https://www.sciencedirect.com/science/article/pii/0169207093900793)
"""
def __init__(self, horizon_weight=None):
super(SMAPE, self).__init__(horizon_weight=horizon_weight,
=1,
outputsize_multiplier=[''])
output_names
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies date stamps per serie to consider in loss.<br>
**Returns:**<br>
`smape`: tensor (single value).
"""
= torch.abs((y - y_hat))
delta_y = torch.abs(y) + torch.abs(y_hat)
scale = _divide_no_nan(delta_y, scale)
losses = self._compute_weights(y=y, mask=mask)
weights return 2*_weighted_mean(losses=losses, weights=weights)
='SMAPE.__init__', title_level=3) show_doc(SMAPE, name
__call__, name='SMAPE.__call__', title_level=3) show_doc(SMAPE.
3. 与尺度无关的误差
这些指标测量相对于基准的相对改进。
平均绝对缩放误差 (MASE)
class MASE(BasePointLoss):
""" Mean Absolute Scaled Error
Calculates the Mean Absolute Scaled Error between
`y` and `y_hat`. MASE measures the relative prediction
accuracy of a forecasting method by comparinng the mean absolute errors
of the prediction and the observed value against the mean
absolute errors of the seasonal naive model.
The MASE partially composed the Overall Weighted Average (OWA),
used in the M4 Competition.
$$ \mathrm{MASE}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}_{\\tau}, \\mathbf{\hat{y}}^{season}_{\\tau}) = \\frac{1}{H} \sum^{t+H}_{\\tau=t+1} \\frac{|y_{\\tau}-\hat{y}_{\\tau}|}{\mathrm{MAE}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}^{season}_{\\tau})} $$
**Parameters:**<br>
`seasonality`: int. Main frequency of the time series; Hourly 24, Daily 7, Weekly 52, Monthly 12, Quarterly 4, Yearly 1.
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
**References:**<br>
[Rob J. Hyndman, & Koehler, A. B. "Another look at measures of forecast accuracy".](https://www.sciencedirect.com/science/article/pii/S0169207006000239)<br>
[Spyros Makridakis, Evangelos Spiliotis, Vassilios Assimakopoulos, "The M4 Competition: 100,000 time series and 61 forecasting methods".](https://www.sciencedirect.com/science/article/pii/S0169207019301128)
"""
def __init__(self, seasonality: int, horizon_weight=None):
super(MASE, self).__init__(horizon_weight=horizon_weight,
=1,
outputsize_multiplier=[''])
output_namesself.seasonality = seasonality
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,
y_insample: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor (batch_size, output_size), Actual values.<br>
`y_hat`: tensor (batch_size, output_size)), Predicted values.<br>
`y_insample`: tensor (batch_size, input_size), Actual insample Seasonal Naive predictions.<br>
`mask`: tensor, Specifies date stamps per serie to consider in loss.<br>
**Returns:**<br>
`mase`: tensor (single value).
"""
= torch.abs(y - y_hat)
delta_y = torch.mean(torch.abs(y_insample[:, self.seasonality:] - \
scale -self.seasonality]), axis=1)
y_insample[:, := _divide_no_nan(delta_y, scale[:, None])
losses = self._compute_weights(y=y, mask=mask)
weights return _weighted_mean(losses=losses, weights=weights)
='MASE.__init__', title_level=3) show_doc(MASE, name
__call__, name='MASE.__call__', title_level=3) show_doc(MASE.
相对均方误差 (relMSE)
class relMSE(BasePointLoss):
"""Relative Mean Squared Error
Computes Relative Mean Squared Error (relMSE), as proposed by Hyndman & Koehler (2006)
as an alternative to percentage errors, to avoid measure unstability.
$$ \mathrm{relMSE}(\\mathbf{y}, \\mathbf{\hat{y}}, \\mathbf{\hat{y}}^{naive1}) =
\\frac{\mathrm{MSE}(\\mathbf{y}, \\mathbf{\hat{y}})}{\mathrm{MSE}(\\mathbf{y}, \\mathbf{\hat{y}}^{naive1})} $$
**Parameters:**<br>
`y_train`: numpy array, Training values.<br>
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
**References:**<br>
- [Hyndman, R. J and Koehler, A. B. (2006).
"Another look at measures of forecast accuracy",
International Journal of Forecasting, Volume 22, Issue 4.](https://www.sciencedirect.com/science/article/pii/S0169207006000239)<br>
- [Kin G. Olivares, O. Nganba Meetei, Ruijun Ma, Rohan Reddy, Mengfei Cao, Lee Dicker.
"Probabilistic Hierarchical Forecasting with Deep Poisson Mixtures.
Submitted to the International Journal Forecasting, Working paper available at arxiv.](https://arxiv.org/pdf/2110.13179.pdf)
"""
def __init__(self, y_train, horizon_weight=None):
super(relMSE, self).__init__(horizon_weight=horizon_weight,
=1,
outputsize_multiplier=[''])
output_namesself.y_train = y_train
self.mse = MSE(horizon_weight=horizon_weight)
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor (batch_size, output_size), Actual values.<br>
`y_hat`: tensor (batch_size, output_size)), Predicted values.<br>
`y_insample`: tensor (batch_size, input_size), Actual insample Seasonal Naive predictions.<br>
`mask`: tensor, Specifies date stamps per serie to consider in loss.<br>
**Returns:**<br>
`relMSE`: tensor (single value).
"""
= y.shape[-1]
horizon = self.y_train[:, -1].unsqueeze(1)
last_col = last_col.repeat(1, horizon)
y_naive
= self.mse(y=y, y_hat=y_naive, mask=mask) # 已经加权
norm = norm + 1e-5 # 数值稳定性
norm = self.mse(y=y, y_hat=y_hat, mask=mask) # 已经加权
loss = _divide_no_nan(loss, norm)
loss return loss
='relMSE.__init__', title_level=3) show_doc(relMSE, name
__call__, name='relMSE.__call__', title_level=3) show_doc(relMSE.
4. 概率错误
这些方法使用统计方法通过观察到的数据来估计未知的概率分布。
最大似然估计涉及找到最大化似然函数的参数值,似然函数衡量的是在给定参数值的情况下获得观察到的数据的概率。最大似然估计在满足某些假设时具有良好的理论性质和效率。
在非参数方法中,分位数回归测量不对称偏差,产生低估/高估。
分位损失
class QuantileLoss(BasePointLoss):
""" Quantile Loss
Computes the quantile loss between `y` and `y_hat`.
QL measures the deviation of a quantile forecast.
By weighting the absolute deviation in a non symmetric way, the
loss pays more attention to under or over estimation.
A common value for q is 0.5 for the deviation from the median (Pinball loss).
$$ \mathrm{QL}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}^{(q)}_{\\tau}) = \\frac{1}{H} \\sum^{t+H}_{\\tau=t+1} \Big( (1-q)\,( \hat{y}^{(q)}_{\\tau} - y_{\\tau} )_{+} + q\,( y_{\\tau} - \hat{y}^{(q)}_{\\tau} )_{+} \Big) $$
**Parameters:**<br>
`q`: float, between 0 and 1. The slope of the quantile loss, in the context of quantile regression, the q determines the conditional quantile level.<br>
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
**References:**<br>
[Roger Koenker and Gilbert Bassett, Jr., "Regression Quantiles".](https://www.jstor.org/stable/1913643)
"""
def __init__(self, q, horizon_weight=None):
super(QuantileLoss, self).__init__(horizon_weight=horizon_weight,
=1,
outputsize_multiplier=[f'_ql{q}'])
output_namesself.q = q
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies datapoints to consider in loss.<br>
**Returns:**<br>
`quantile_loss`: tensor (single value).
"""
= y - y_hat
delta_y = torch.max(torch.mul(self.q, delta_y), torch.mul((self.q - 1), delta_y))
losses = self._compute_weights(y=y, mask=mask)
weights return _weighted_mean(losses=losses, weights=weights)
='QuantileLoss.__init__', title_level=3) show_doc(QuantileLoss, name
__call__, name='QuantileLoss.__call__', title_level=3) show_doc(QuantileLoss.
多量化损失(MQLoss)
def level_to_outputs(level):
= sum([[50-l/2, 50+l/2] for l in level], [])
qs = sum([[f'-lo-{l}', f'-hi-{l}'] for l in level], [])
output_names
= np.argsort(qs)
sort_idx = np.array(qs)[sort_idx]
quantiles
# 添加默认中位数
= np.concatenate([np.array([50]), quantiles])
quantiles = torch.Tensor(quantiles) / 100
quantiles = list(np.array(output_names)[sort_idx])
output_names 0, '-median')
output_names.insert(
return quantiles, output_names
def quantiles_to_outputs(quantiles):
= []
output_names for q in quantiles:
if q<.50:
f'-lo-{np.round(100-200*q,2)}')
output_names.append(elif q>.50:
f'-hi-{np.round(100-200*(1-q),2)}')
output_names.append(else:
'-median')
output_names.append(return quantiles, output_names
class MQLoss(BasePointLoss):
""" Multi-Quantile loss
Calculates the Multi-Quantile loss (MQL) between `y` and `y_hat`.
MQL calculates the average multi-quantile Loss for
a given set of quantiles, based on the absolute
difference between predicted quantiles and observed values.
$$ \mathrm{MQL}(\\mathbf{y}_{\\tau},[\\mathbf{\hat{y}}^{(q_{1})}_{\\tau}, ... ,\hat{y}^{(q_{n})}_{\\tau}]) = \\frac{1}{n} \\sum_{q_{i}} \mathrm{QL}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}^{(q_{i})}_{\\tau}) $$
The limit behavior of MQL allows to measure the accuracy
of a full predictive distribution $\mathbf{\hat{F}}_{\\tau}$ with
the continuous ranked probability score (CRPS). This can be achieved
through a numerical integration technique, that discretizes the quantiles
and treats the CRPS integral with a left Riemann approximation, averaging over
uniformly distanced quantiles.
$$ \mathrm{CRPS}(y_{\\tau}, \mathbf{\hat{F}}_{\\tau}) = \int^{1}_{0} \mathrm{QL}(y_{\\tau}, \hat{y}^{(q)}_{\\tau}) dq $$
**Parameters:**<br>
`level`: int list [0,100]. Probability levels for prediction intervals (Defaults median).
`quantiles`: float list [0., 1.]. Alternative to level, quantiles to estimate from y distribution.
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
**References:**<br>
[Roger Koenker and Gilbert Bassett, Jr., "Regression Quantiles".](https://www.jstor.org/stable/1913643)<br>
[James E. Matheson and Robert L. Winkler, "Scoring Rules for Continuous Probability Distributions".](https://www.jstor.org/stable/2629907)
"""
def __init__(self, level=[80, 90], quantiles=None, horizon_weight=None):
= level_to_outputs(level)
qs, output_names = torch.Tensor(qs)
qs # 将分位数转换为同质输出名称
if quantiles is not None:
= quantiles_to_outputs(quantiles)
_, output_names = torch.Tensor(quantiles)
qs
super(MQLoss, self).__init__(horizon_weight=horizon_weight,
=len(qs),
outputsize_multiplier=output_names)
output_names
self.quantiles = torch.nn.Parameter(qs, requires_grad=False)
def domain_map(self, y_hat: torch.Tensor):
"""
身份域映射 [B,T,H,Q]/[B,H,Q]
"""
return y_hat
def _compute_weights(self, y, mask):
"""
Compute final weights for each datapoint (based on all weights and all masks)
Set horizon_weight to a ones[H] tensor if not set.
If set, check that it has the same length as the horizon in x.
"""
if mask is None:
= torch.ones_like(y, device=y.device)
mask else:
= mask.unsqueeze(1) # 添加Q维度。
mask
if self.horizon_weight is None:
self.horizon_weight = torch.ones(mask.shape[-1])
else:
assert mask.shape[-1] == len(self.horizon_weight), \
'horizon_weight must have same length as Y'
= self.horizon_weight.clone()
weights = torch.ones_like(mask, device=mask.device) * weights.to(mask.device)
weights return weights * mask
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies date stamps per serie to consider in loss.<br>
**Returns:**<br>
`mqloss`: tensor (single value).
"""
= y_hat - y.unsqueeze(-1)
error = torch.maximum(-error, torch.zeros_like(error))
sq = torch.maximum(error, torch.zeros_like(error))
s1_q = (1/len(self.quantiles))*(self.quantiles * sq + (1 - self.quantiles) * s1_q)
losses
if y_hat.ndim == 3: # 基础窗口
= losses.swapaxes(-2,-1) # [B,H,Q] -> [B,Q,H](用于水平加权,H置于末尾)
losses elif y_hat.ndim == 4: # 基础循环
= losses.swapaxes(-2,-1)
losses = losses.swapaxes(-2,-3) # [B,seq_len,H,Q] -> [B,Q,seq_len,H] (用于水平加权,H置于末尾)
losses
= self._compute_weights(y=losses, mask=mask) # 利用损失增加暗度
weights # 注意:权重没有Q维度。
return _weighted_mean(losses=losses, weights=weights)
='MQLoss.__init__', title_level=3) show_doc(MQLoss, name
__call__, name='MQLoss.__call__', title_level=3) show_doc(MQLoss.
::: {#da37f2ef .cell 0=‘隐’ 1=‘藏’}
# 单元测试以检查 MQLoss 存储的分位数
# 属性已正确实例化
= MQLoss(level=[80, 90])
check len(check.quantiles), 5)
test_eq(
= MQLoss(quantiles=[0.0100, 0.1000, 0.5, 0.9000, 0.9900])
check print(check.output_names)
print(check.quantiles)
len(check.quantiles), 5)
test_eq(
= MQLoss(quantiles=[0.0100, 0.1000, 0.9000, 0.9900])
check len(check.quantiles), 4) test_eq(
:::
隐式分位数损失(IQLoss)
class QuantileLayer(nn.Module):
r"""
Implicit Quantile Layer from the paper ``IQN for Distributional
Reinforcement Learning`` (https://arxiv.org/abs/1806.06923) by
Dabney et al. 2018.
Code from GluonTS: https://github.com/awslabs/gluonts/blob/61133ef6e2d88177b32ace4afc6843ab9a7bc8cd/src/gluonts/torch/distributions/implicit_quantile_network.py
"""
def __init__(self, num_output: int, cos_embedding_dim: int = 128):
super().__init__()
self.output_layer = nn.Sequential(
nn.Linear(cos_embedding_dim, cos_embedding_dim),
nn.PReLU(),
nn.Linear(cos_embedding_dim, num_output),
)
self.register_buffer("integers", torch.arange(0, cos_embedding_dim))
def forward(self, tau: torch.Tensor) -> torch.Tensor:
= torch.cos(tau * self.integers * torch.pi)
cos_emb_tau return self.output_layer(cos_emb_tau)
class IQLoss(QuantileLoss):
"""Implicit Quantile Loss
Computes the quantile loss between `y` and `y_hat`, with the quantile `q` provided as an input to the network.
IQL measures the deviation of a quantile forecast.
By weighting the absolute deviation in a non symmetric way, the
loss pays more attention to under or over estimation.
$$ \mathrm{QL}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}^{(q)}_{\\tau}) = \\frac{1}{H} \\sum^{t+H}_{\\tau=t+1} \Big( (1-q)\,( \hat{y}^{(q)}_{\\tau} - y_{\\tau} )_{+} + q\,( y_{\\tau} - \hat{y}^{(q)}_{\\tau} )_{+} \Big) $$
**Parameters:**<br>
`quantile_sampling`: str, default='uniform', sampling distribution used to sample the quantiles during training. Choose from ['uniform', 'beta']. <br>
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
**References:**<br>
[Gouttes, Adèle, Kashif Rasul, Mateusz Koren, Johannes Stephan, and Tofigh Naghibi, "Probabilistic Time Series Forecasting with Implicit Quantile Networks".](http://arxiv.org/abs/2107.03743)
"""
def __init__(self, cos_embedding_dim = 64, concentration0 = 1.0, concentration1 = 1.0, horizon_weight=None):
self.update_quantile()
super(IQLoss, self).__init__(
= self.q,
q =horizon_weight
horizon_weight
)
self.cos_embedding_dim = cos_embedding_dim
self.concentration0 = concentration0
self.concentration1 = concentration1
self.has_sampled = False
self.has_predicted = False
self.quantile_layer = QuantileLayer(
=1, cos_embedding_dim=self.cos_embedding_dim
num_output
)self.output_layer = nn.Sequential(
1, 1), nn.PReLU()
nn.Linear(
)
def _sample_quantiles(self, sample_size, device):
if not self.has_sampled:
self._init_sampling_distribution(device)
= self.sampling_distr.sample(sample_size)
quantiles self.has_sampled = True
self.has_predicted = False
return quantiles
def _init_sampling_distribution(self, device):
= torch.tensor([self.concentration0],
concentration0 =device,
device=torch.float32)
dtype= torch.tensor([self.concentration1],
concentration1 =device,
device=torch.float32)
dtypeself.sampling_distr = Beta(concentration0 = concentration0,
= concentration1)
concentration1
def update_quantile(self, q: float = 0.5):
self.q = q
self.output_names = [f"_ql{q}"]
self.has_predicted = True
def domain_map(self, y_hat):
"""
Adds IQN network to output of network
Input shapes to this function:
base_windows: y_hat = [B, h, 1]
base_multivariate: y_hat = [B, h, n_series]
base_recurrent: y_hat = [B, seq_len, h, n_series]
"""
if self.eval() and self.has_predicted:
= torch.full(size=y_hat.shape,
quantiles =self.q,
fill_value=y_hat.device,
device=y_hat.dtype)
dtype= quantiles.unsqueeze(-1)
quantiles else:
= self._sample_quantiles(sample_size=y_hat.shape,
quantiles =y_hat.device)
device
# 嵌入分位数并添加到 y_hat
= self.quantile_layer(quantiles)
emb_taus = y_hat.unsqueeze(-1) * (1.0 + emb_taus)
emb_inputs = self.output_layer(emb_inputs)
emb_outputs
# 域图
= emb_outputs.squeeze(-1).squeeze(-1)
y_hat
return y_hat
='IQLoss.__init__', title_level=3) show_doc(IQLoss, name
__call__, name='IQLoss.__call__', title_level=3) show_doc(IQLoss.
::: {#cee2992b .cell 0=‘隐’ 1=‘藏’}
# 单元测试
# 检查默认分位数在初始化时是否设置为0.5
= IQLoss()
check 0.5)
test_eq(check.q,
# 检查分位数是否正确更新 - 预测
= IQLoss()
check 0.7)
check.update_quantile(0.7) test_eq(check.q,
:::
分布损失
def weighted_average(x: torch.Tensor,
=None, dim=None) -> torch.Tensor:
weights: Optional[torch.Tensor]"""
Computes the weighted average of a given tensor across a given dim, masking
values associated with weight zero,
meaning instead of `nan * 0 = nan` you will get `0 * 0 = 0`.
**Parameters:**<br>
`x`: Input tensor, of which the average must be computed.<br>
`weights`: Weights tensor, of the same shape as `x`.<br>
`dim`: The dim along which to average `x`.<br>
**Returns:**<br>
`Tensor`: The tensor with values averaged along the specified `dim`.<br>
"""
if weights is not None:
= torch.where(
weighted_tensor != 0, x * weights, torch.zeros_like(x)
weights
)= torch.clamp(
sum_weights sum(dim=dim) if dim else weights.sum(), min=1.0
weights.
)return (
sum(dim=dim) if dim else weighted_tensor.sum()
weighted_tensor./ sum_weights
) else:
return x.mean(dim=dim)
def bernoulli_domain_map(input: torch.Tensor):
""" Bernoulli Domain Map
Maps input into distribution constraints, by construction input's
last dimension is of matching `distr_args` length.
**Parameters:**<br>
`input`: tensor, of dimensions [B,T,H,theta] or [B,H,theta].<br>
**Returns:**<br>
`(probs,)`: tuple with tensors of Poisson distribution arguments.<br>
"""
return (input.squeeze(-1),)
def bernoulli_scale_decouple(output, loc=None, scale=None):
""" Bernoulli Scale Decouple
Stabilizes model's output optimization, by learning residual
variance and residual location based on anchoring `loc`, `scale`.
Also adds Bernoulli domain protection to the distribution parameters.
"""
= output[0]
probs #如果(位置不为空)且(尺度不为空):
# 速率 = (速率 * 比例) + 位置
= F.sigmoid(probs)#.clone()
probs return (probs,)
def student_domain_map(input: torch.Tensor):
""" Student T Domain Map
Maps input into distribution constraints, by construction input's
last dimension is of matching `distr_args` length.
**Parameters:**<br>
`input`: tensor, of dimensions [B,T,H,theta] or [B,H,theta].<br>
`eps`: float, helps the initialization of scale for easier optimization.<br>
**Returns:**<br>
`(df, loc, scale)`: tuple with tensors of StudentT distribution arguments.<br>
"""
= torch.tensor_split(input, 3, dim=-1)
df, loc, scale return df.squeeze(-1), loc.squeeze(-1), scale.squeeze(-1)
def student_scale_decouple(output, loc=None, scale=None, eps: float=0.1):
""" Normal Scale Decouple
Stabilizes model's output optimization, by learning residual
variance and residual location based on anchoring `loc`, `scale`.
Also adds StudentT domain protection to the distribution parameters.
"""
= output
df, mean, tscale = F.softplus(tscale)
tscale if (loc is not None) and (scale is not None):
= (mean * scale) + loc
mean = (tscale + eps) * scale
tscale = 2.0 + F.softplus(df)
df return (df, mean, tscale)
def normal_domain_map(input: torch.Tensor):
""" Normal Domain Map
Maps input into distribution constraints, by construction input's
last dimension is of matching `distr_args` length.
**Parameters:**<br>
`input`: tensor, of dimensions [B,T,H,theta] or [B,H,theta].<br>
`eps`: float, helps the initialization of scale for easier optimization.<br>
**Returns:**<br>
`(mean, std)`: tuple with tensors of Normal distribution arguments.<br>
"""
= torch.tensor_split(input, 2, dim=-1)
mean, std return mean.squeeze(-1), std.squeeze(-1)
def normal_scale_decouple(output, loc=None, scale=None, eps: float=0.2):
""" Normal Scale Decouple
Stabilizes model's output optimization, by learning residual
variance and residual location based on anchoring `loc`, `scale`.
Also adds Normal domain protection to the distribution parameters.
"""
= output
mean, std = F.softplus(std)
std if (loc is not None) and (scale is not None):
= (mean * scale) + loc
mean = (std + eps) * scale
std return (mean, std)
def poisson_domain_map(input: torch.Tensor):
""" Poisson Domain Map
Maps input into distribution constraints, by construction input's
last dimension is of matching `distr_args` length.
**Parameters:**<br>
`input`: tensor, of dimensions [B,T,H,theta] or [B,H,theta].<br>
**Returns:**<br>
`(rate,)`: tuple with tensors of Poisson distribution arguments.<br>
"""
return (input.squeeze(-1),)
def poisson_scale_decouple(output, loc=None, scale=None):
""" Poisson Scale Decouple
Stabilizes model's output optimization, by learning residual
variance and residual location based on anchoring `loc`, `scale`.
Also adds Poisson domain protection to the distribution parameters.
"""
= 1e-10
eps = output[0]
rate if (loc is not None) and (scale is not None):
= (rate * scale) + loc
rate = F.softplus(rate) + eps
rate return (rate,)
def nbinomial_domain_map(input: torch.Tensor):
""" Negative Binomial Domain Map
Maps input into distribution constraints, by construction input's
last dimension is of matching `distr_args` length.
**Parameters:**<br>
`input`: tensor, of dimensions [B,T,H,theta] or [B,H,theta].<br>
**Returns:**<br>
`(total_count, alpha)`: tuple with tensors of N.Binomial distribution arguments.<br>
"""
= torch.tensor_split(input, 2, dim=-1)
mu, alpha return mu.squeeze(-1), alpha.squeeze(-1)
def nbinomial_scale_decouple(output, loc=None, scale=None):
""" Negative Binomial Scale Decouple
Stabilizes model's output optimization, by learning total
count and logits based on anchoring `loc`, `scale`.
Also adds Negative Binomial domain protection to the distribution parameters.
"""
= output
mu, alpha = F.softplus(mu) + 1e-8
mu = F.softplus(alpha) + 1e-8 # alpha = 1 / 总计数
alpha if (loc is not None) and (scale is not None):
*= loc
mu /= (loc + 1.)
alpha
# mu = 总数量 * (概率/(1-概率))
# => 概率 = μ / (总计数 + μ)
# => 概率 = mu / [总次数 * (1 + mu * (1/总次数))]
= 1.0 / alpha
total_count = (mu * alpha / (1.0 + mu * alpha)) + 1e-8
probs return (total_count, probs)
def est_lambda(mu, rho):
return mu ** (2 - rho) / (2 - rho)
def est_alpha(rho):
return (2 - rho) / (rho - 1)
def est_beta(mu, rho):
return mu ** (1 - rho) / (rho - 1)
class Tweedie(Distribution):
""" Tweedie Distribution
The Tweedie distribution is a compound probability, special case of exponential
dispersion models EDMs defined by its mean-variance relationship.
The distribution particularly useful to model sparse series as the probability has
possitive mass at zero but otherwise is continuous.
$Y \sim \mathrm{ED}(\\mu,\\sigma^{2}) \qquad
\mathbb{P}(y|\\mu ,\\sigma^{2})=h(\\sigma^{2},y) \\exp \\left({\\frac {\\theta y-A(\\theta )}{\\sigma^{2}}}\\right)$<br>
$\mu =A'(\\theta ) \qquad \mathrm{Var}(Y) = \\sigma^{2} \\mu^{\\rho}$
Cases of the variance relationship include Normal (`rho` = 0), Poisson (`rho` = 1),
Gamma (`rho` = 2), inverse Gaussian (`rho` = 3).
**Parameters:**<br>
`log_mu`: tensor, with log of means.<br>
`rho`: float, Tweedie variance power (1,2). Fixed across all observations.<br>
`sigma2`: tensor, Tweedie variance. Currently fixed in 1.<br>
**References:**<br>
- [Tweedie, M. C. K. (1984). An index which distinguishes between some important exponential families. Statistics: Applications and New Directions.
Proceedings of the Indian Statistical Institute Golden Jubilee International Conference (Eds. J. K. Ghosh and J. Roy), pp. 579-604. Calcutta: Indian Statistical Institute.]()<br>
- [Jorgensen, B. (1987). Exponential Dispersion Models. Journal of the Royal Statistical Society.
Series B (Methodological), 49(2), 127–162. http://www.jstor.org/stable/2345415](http://www.jstor.org/stable/2345415)<br>
"""
def __init__(self, log_mu, rho, validate_args=None):
# TODO: add sigma2 dispersion
# TODO add constraints
# arg_constraints = {'log_mu': constraints.real, 'rho': constraints.positive}
# support = constraints.real
self.log_mu = log_mu
self.rho = rho
assert rho>1 and rho<2, f'rho={rho} parameter needs to be between (1,2).'
= log_mu.size()
batch_shape super(Tweedie, self).__init__(batch_shape, validate_args=validate_args)
@property
def mean(self):
return torch.exp(self.log_mu)
@property
def variance(self):
return torch.ones_line(self.log_mu) #TODO need to be assigned
def sample(self, sample_shape=torch.Size()):
= self._extended_shape(sample_shape)
shape with torch.no_grad():
= self.mean
mu = self.rho * torch.ones_like(mu)
rho = 1 #TODO
sigma2
= est_lambda(mu, rho) / sigma2 # rate for poisson
rate = est_alpha(rho) # alpha for Gamma distribution
alpha = est_beta(mu, rho) / sigma2 # beta for Gamma distribution
beta
# Expand for sample
= rate.expand(shape)
rate = alpha.expand(shape)
alpha = beta.expand(shape)
beta
= torch.poisson(rate)
N = torch.distributions.gamma.Gamma(N*alpha, beta)
gamma = gamma.sample()
samples ==0] = 0
samples[N
return samples
def log_prob(self, y_true):
= self.rho
rho = self.log_mu
y_pred
= y_true * torch.exp((1 - rho) * y_pred) / (1 - rho)
a = torch.exp((2 - rho) * y_pred) / (2 - rho)
b
return a - b
def tweedie_domain_map(input: torch.Tensor):
""" Tweedie Domain Map
Maps input into distribution constraints, by construction input's
last dimension is of matching `distr_args` length.
**Parameters:**<br>
`input`: tensor, of dimensions [B,T,H,theta] or [B,H,theta].<br>
**Returns:**<br>
`(log_mu,)`: tuple with tensors of Tweedie distribution arguments.<br>
"""
# log_mu, probs = torch.tensor_split(input, 2, dim=-1)
return (input.squeeze(-1),)
def tweedie_scale_decouple(output, loc=None, scale=None):
""" Tweedie Scale Decouple
Stabilizes model's output optimization, by learning total
count and logits based on anchoring `loc`, `scale`.
Also adds Tweedie domain protection to the distribution parameters.
"""
= output[0]
log_mu if (loc is not None) and (scale is not None):
+= torch.log(loc) # 待办事项:ρ 缩放
log_mu return (log_mu,)
# 代码改编自:https://github.com/awslabs/gluonts/blob/61133ef6e2d88177b32ace4afc6843ab9a7bc8cd/src/gluonts/torch/distributions/isqf.py
class ISQF(TransformedDistribution):
"""
Distribution class for the Incremental (Spline) Quantile Function.
**Parameters:**<br>
`spline_knots`: Tensor parametrizing the x-positions of the spline knots. Shape: (*batch_shape, (num_qk-1), num_pieces)
`spline_heights`: Tensor parametrizing the y-positions of the spline knots. Shape: (*batch_shape, (num_qk-1), num_pieces)
`beta_l`: Tensor containing the non-negative learnable parameter of the left tail. Shape: (*batch_shape,)
`beta_r`: Tensor containing the non-negative learnable parameter of the right tail. Shape: (*batch_shape,)
`qk_y`: Tensor containing the increasing y-positions of the quantile knots. Shape: (*batch_shape, num_qk)
`qk_x`: Tensor containing the increasing x-positions of the quantile knots. Shape: (*batch_shape, num_qk)
`loc`: Tensor containing the location in case of a transformed random variable. Shape: (*batch_shape,)
`scale`: Tensor containing the scale in case of a transformed random variable. Shape: (*batch_shape,)
**References:**<br>
[Park, Youngsuk, Danielle Maddix, François-Xavier Aubet, Kelvin Kan, Jan Gasthaus, and Yuyang Wang (2022). "Learning Quantile Functions without Quantile Crossing for Distribution-free Time Series Forecasting".](https://proceedings.mlr.press/v151/park22a.html)
"""
def __init__(
self,
spline_knots: torch.Tensor,
spline_heights: torch.Tensor,
beta_l: torch.Tensor,
beta_r: torch.Tensor,
qk_y: torch.Tensor,
qk_x: torch.Tensor,
loc: torch.Tensor,
scale: torch.Tensor, =None,
validate_args-> None:
) = BaseISQF(spline_knots=spline_knots,
base_distribution =spline_heights,
spline_heights=beta_l,
beta_l=beta_r,
beta_r=qk_y,
qk_y=qk_x,
qk_x=validate_args)
validate_args= AffineTransform(loc = loc, scale = scale)
transforms super().__init__(
=validate_args
base_distribution, transforms, validate_args
)
def crps(self, y: torch.Tensor) -> torch.Tensor:
= y
z = 1.0
scale = self.transforms[0]
t = t._inverse(z)
z *= t.scale
scale = self.base_dist.crps(z)
p return p * scale
class BaseISQF(Distribution):
"""
Base distribution class for the Incremental (Spline) Quantile Function.
**Parameters:**<br>
`spline_knots`: Tensor parametrizing the x-positions of the spline knots. Shape: (*batch_shape, (num_qk-1), num_pieces)
`spline_heights`: Tensor parametrizing the y-positions of the spline knots. Shape: (*batch_shape, (num_qk-1), num_pieces)
`beta_l`: Tensor containing the non-negative learnable parameter of the left tail. (*batch_shape,)
`beta_r`: Tensor containing the non-negative learnable parameter of the right tail. (*batch_shape,)
`qk_y`: Tensor containing the increasing y-positions of the quantile knots. Shape: (*batch_shape, num_qk)
`qk_x`: Tensor containing the increasing x-positions of the quantile knots. Shape: (*batch_shape, num_qk)
**References:**<br>
[Park, Youngsuk, Danielle Maddix, François-Xavier Aubet, Kelvin Kan, Jan Gasthaus, and Yuyang Wang (2022). "Learning Quantile Functions without Quantile Crossing for Distribution-free Time Series Forecasting".](https://proceedings.mlr.press/v151/park22a.html)
"""
def __init__(
self,
spline_knots: torch.Tensor,
spline_heights: torch.Tensor,
beta_l: torch.Tensor,
beta_r: torch.Tensor,
qk_y: torch.Tensor,
qk_x: torch.Tensor,float = 1e-4,
tol: bool = False,
validate_args: -> None:
) self.num_qk, self.num_pieces = qk_y.shape[-1], spline_knots.shape[-1]
self.spline_knots, self.spline_heights = spline_knots, spline_heights
self.beta_l, self.beta_r = beta_l, beta_r
self.qk_y_all = qk_y
self.tol = tol
super().__init__(
=self.batch_shape, validate_args=validate_args
batch_shape
)
# 获取分位数节点(qk)参数
(self.qk_x,
self.qk_x_plus,
self.qk_x_l,
self.qk_x_r,
= BaseISQF.parameterize_qk(qk_x)
)
(self.qk_y,
self.qk_y_plus,
self.qk_y_l,
self.qk_y_r,
= BaseISQF.parameterize_qk(qk_y)
)
# 获取样条结点(sk)参数
self.sk_y, self.delta_sk_y = BaseISQF.parameterize_spline(
self.spline_heights,
self.qk_y,
self.qk_y_plus,
self.tol,
)self.sk_x, self.delta_sk_x = BaseISQF.parameterize_spline(
self.spline_knots,
self.qk_x,
self.qk_x_plus,
self.tol,
)
if self.num_pieces > 1:
self.sk_x_plus = torch.cat(
self.sk_x[..., 1:], self.qk_x_plus.unsqueeze(dim=-1)], dim=-1
[
)else:
self.sk_x_plus = self.qk_x_plus.unsqueeze(dim=-1)
# 获取尾部参数
self.tail_al, self.tail_bl = BaseISQF.parameterize_tail(
self.beta_l, self.qk_x_l, self.qk_y_l
)self.tail_ar, self.tail_br = BaseISQF.parameterize_tail(
-self.beta_r, 1 - self.qk_x_r, self.qk_y_r
)
@staticmethod
def parameterize_qk(
quantile_knots: torch.Tensor,-> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
) r"""
Function to parameterize the x or y positions
of the num_qk quantile knots
Parameters
----------
quantile_knots
x or y positions of the quantile knots
shape: (*batch_shape, num_qk)
Returns
-------
qk
x or y positions of the quantile knots (qk),
with index=1, ..., num_qk-1,
shape: (*batch_shape, num_qk-1)
qk_plus
x or y positions of the quantile knots (qk),
with index=2, ..., num_qk,
shape: (*batch_shape, num_qk-1)
qk_l
x or y positions of the left-most quantile knot (qk),
shape: (*batch_shape)
qk_r
x or y positions of the right-most quantile knot (qk),
shape: (*batch_shape)
"""
= quantile_knots[..., :-1], quantile_knots[..., 1:]
qk, qk_plus = quantile_knots[..., 0], quantile_knots[..., -1]
qk_l, qk_r
return qk, qk_plus, qk_l, qk_r
@staticmethod
def parameterize_spline(
spline_knots: torch.Tensor,
qk: torch.Tensor,
qk_plus: torch.Tensor,float = 1e-4,
tol: -> Tuple[torch.Tensor, torch.Tensor]:
) r"""
Function to parameterize the x or y positions of the spline knots
Parameters
----------
spline_knots
variable that parameterizes the spline knot positions
qk
x or y positions of the quantile knots (qk),
with index=1, ..., num_qk-1,
shape: (*batch_shape, num_qk-1)
qk_plus
x or y positions of the quantile knots (qk),
with index=2, ..., num_qk,
shape: (*batch_shape, num_qk-1)
num_pieces
number of spline knot pieces
tol
tolerance hyperparameter for numerical stability
Returns
-------
sk
x or y positions of the spline knots (sk),
shape: (*batch_shape, num_qk-1, num_pieces)
delta_sk
difference of x or y positions of the spline knots (sk),
shape: (*batch_shape, num_qk-1, num_pieces)
"""
# 样条节点之间的间距是参数化的
# 通过softmax函数(取值范围在[0,1]且总和为1)
# 我们在计算样条CRPS中的1/spacing时添加tol以防止溢出
# 添加tol后,进行归一化处理。
# (1 + num_pieces * tol)以保持总和为1的特性
= spline_knots.shape[-1]
num_pieces
= (F.softmax(spline_knots, dim=-1) + tol) / (
delta_x 1 + num_pieces * tol
)
= torch.zeros_like(
zero_tensor 0:1]
delta_x[..., # 0:1 表示保持尺寸
) = torch.cat(
x =-1)[..., :-1]], dim=-1
[zero_tensor, torch.cumsum(delta_x, dim
)
= qk.unsqueeze(dim=-1), qk_plus.unsqueeze(dim=-1)
qk, qk_plus = x * (qk_plus - qk) + qk
sk = delta_x * (qk_plus - qk)
delta_sk
return sk, delta_sk
@staticmethod
def parameterize_tail(
beta: torch.Tensor, qk_x: torch.Tensor, qk_y: torch.Tensor-> Tuple[torch.Tensor, torch.Tensor]:
) r"""
Function to parameterize the tail parameters
Note that the exponential tails are given by
q(alpha)
= a_l log(alpha) + b_l if left tail
= a_r log(1-alpha) + b_r if right tail
where
a_l=1/beta_l, b_l=-a_l*log(qk_x_l)+q(qk_x_l)
a_r=1/beta_r, b_r=a_r*log(1-qk_x_r)+q(qk_x_r)
Parameters
----------
beta
parameterizes the left or right tail, shape: (*batch_shape,)
qk_x
left- or right-most x-positions of the quantile knots,
shape: (*batch_shape,)
qk_y
left- or right-most y-positions of the quantile knots,
shape: (*batch_shape,)
Returns
-------
tail_a
a_l or a_r as described above
tail_b
b_l or b_r as described above
"""
= 1 / beta
tail_a = -tail_a * torch.log(qk_x) + qk_y
tail_b
return tail_a, tail_b
def quantile(self, alpha: torch.Tensor) -> torch.Tensor:
return self.quantile_internal(alpha, dim=0)
def quantile_internal(
self, alpha: torch.Tensor, dim: Optional[int] = None
-> torch.Tensor:
) r"""
Evaluates the quantile function at the quantile levels input_alpha
Parameters
----------
alpha
Tensor of shape = (*batch_shape,) if axis=None, or containing an
additional axis on the specified position, otherwise
dim
Index of the axis containing the different quantile levels which
are to be computed.
Read the description below for detailed information
Returns
-------
Tensor
Quantiles tensor, of the same shape as alpha
"""
= self.qk_x, self.qk_x_l, self.qk_x_plus
qk_x, qk_x_l, qk_x_plus
# 以下描述了参数重塑的过程。
# quantile_internal、quantile_spline 和 quantile_tail
# 尾部参数:tail_al, tail_ar, tail_bl, tail_br,
# 形状 = (*批次形状,)
# 样条参数:sk_x, sk_x_plus, sk_y, sk_y_plus,
# 形状 = (*批次形状, num_qk-1, 片段数)
# 分位数节点参数:qk_x, qk_x_plus, qk_y, qk_y_plus,
# 形状 = (*批次形状, num_qk-1)
# dim=None - 在推理时传递,当 num_samples 为 None 时
# 输入_alpha的形状为(*batch_shape,),将被扩展为
# (*batch_shape, 1, 1) 执行操作
# 参数的形状如上所述,
# 无需重塑
# dim=0 - 在推理时传递,当 num_samples 不为 None 时
# 输入_alpha的形状 = (样本数量, *批次形状)
# 它将被扩展到
# (样本数量, *批次形状, 1, 1) 以执行操作
#
# 尾参数的形状
# 应为 (样本数量, *批次形状)
#
# 样条参数的形状
# 应为 (num_samples, *batch_shape, num_qk-1, num_pieces)
#
# 分位数节点参数的形状
# 应为 (样本数量, *批次形状, num_qk-1)
#
# 我们对它们在维度0上进行扩展
# dim=-2 - 在训练时传递,当我们评估分位数时
# 样条节点,以便计算α_tilde
#
# 这仅适用于quantile_spline函数。
# 输入_alpha的形状 = (*批次形状, num_qk-1, num_片段)
# 它将被扩展到
# (*batch_shape, num_qk-1, num_pieces, 1) 以执行操作
#
# 样条和分位数节点参数的形状应为
# (*批量形状, num_qk-1, 1, 分段数)
# 分别表示为 (*batch_shape, num_qk-1, 1)
#
# 我们在dim=-2和dim=-1维度上进行扩展,
# 分别是样条和分位数节点参数
if dim is not None:
= qk_x_l.unsqueeze(dim=dim)
qk_x_l = qk_x.unsqueeze(dim=dim)
qk_x = qk_x_plus.unsqueeze(dim=dim)
qk_x_plus
= torch.where(
quantile < qk_x_l,
alpha self.quantile_tail(alpha, dim=dim, left_tail=True),
self.quantile_tail(alpha, dim=dim, left_tail=False),
)
= self.quantile_spline(alpha, dim=dim)
spline_val
for spline_idx in range(self.num_qk - 1):
= torch.logical_and(
is_in_between <= alpha,
qk_x[..., spline_idx] < qk_x_plus[..., spline_idx],
alpha
)
= torch.where(
quantile
is_in_between,
spline_val[..., spline_idx],
quantile,
)
return quantile
def quantile_spline(
self,
alpha: torch.Tensor,int] = None,
dim: Optional[-> torch.Tensor:
) # 请参阅分位数内部描述。
= self.qk_y
qk_y = (
sk_x, delta_sk_x, delta_sk_y self.sk_x,
self.delta_sk_x,
self.delta_sk_y,
)
if dim is not None:
= qk_y.unsqueeze(dim=0 if dim == 0 else -1)
qk_y = sk_x.unsqueeze(dim=dim)
sk_x = delta_sk_x.unsqueeze(dim=dim)
delta_sk_x = delta_sk_y.unsqueeze(dim=dim)
delta_sk_y
if dim is None or dim == 0:
= alpha.unsqueeze(dim=-1)
alpha
= alpha.unsqueeze(dim=-1)
alpha
= (alpha - sk_x) / delta_sk_x
spline_val = torch.maximum(
spline_val
torch.minimum(spline_val, torch.ones_like(spline_val)),
torch.zeros_like(spline_val),
)
return qk_y + torch.sum(spline_val * delta_sk_y, dim=-1)
def quantile_tail(
self,
alpha: torch.Tensor,int] = None,
dim: Optional[bool = True,
left_tail: -> torch.Tensor:
) # 请参阅分位数内部描述。
if left_tail:
= self.tail_al, self.tail_bl
tail_a, tail_b else:
= self.tail_ar, self.tail_br
tail_a, tail_b = 1 - alpha
alpha
if dim is not None:
= tail_a.unsqueeze(dim=dim), tail_b.unsqueeze(
tail_a, tail_b =dim
dim
)
return tail_a * torch.log(alpha) + tail_b
def cdf_spline(self, z: torch.Tensor) -> torch.Tensor:
r"""
For observations z and splines defined in [qk_x[k], qk_x[k+1]]
Computes the quantile level alpha_tilde such that
alpha_tilde
= q^{-1}(z) if z is in-between qk_x[k] and qk_x[k+1]
= qk_x[k] if z<qk_x[k]
= qk_x[k+1] if z>qk_x[k+1]
Parameters
----------
z
Observation, shape = (*batch_shape,)
Returns
-------
alpha_tilde
Corresponding quantile level, shape = (*batch_shape, num_qk-1)
"""
= self.qk_y, self.qk_y_plus
qk_y, qk_y_plus = self.qk_x, self.qk_x_plus
qk_x, qk_x_plus = (
sk_x, delta_sk_x, delta_sk_y self.sk_x,
self.delta_sk_x,
self.delta_sk_y,
)
= z.unsqueeze(dim=-1)
z_expand
if self.num_pieces > 1:
= qk_y.unsqueeze(dim=-1)
qk_y_expand = z_expand.unsqueeze(dim=-1)
z_expand_twice
= self.quantile_spline(sk_x, dim=-2)
knots_eval
# 计算 \sum_{s=0}^{s_0-1} \Delta sk_y[s],
# 其中,\(\Delta sk_y[s] = (sk_y[s+1] - sk_y[s])\)
= torch.lt(knots_eval, z_expand_twice)
mask_sum_s0 = torch.cat(
mask_sum_s0_minus
[1:],
mask_sum_s0[..., =torch.bool),
torch.zeros_like(qk_y_expand, dtype
],=-1,
dim
)= torch.sum(mask_sum_s0_minus * delta_sk_y, dim=-1)
sum_delta_sk_y
= torch.logical_and(
mask_s0_only
mask_sum_s0, torch.logical_not(mask_sum_s0_minus)
)# 计算 (sk_x[s_0+1] - sk_x[s_0]) / (sk_y[s_0+1] - sk_y[s_0])
= torch.sum(
frac_s0 * delta_sk_x) / delta_sk_y, dim=-1
(mask_s0_only
)
# 计算 sk_x_{s_0}
= torch.sum(mask_s0_only * sk_x, dim=-1)
sk_x_s0
# 计算α_tilde
= (
alpha_tilde + (z_expand - qk_y - sum_delta_sk_y) * frac_s0
sk_x_s0
)
else:
# 当num_pieces=1时,ISQF简化为IQF
= qk_x + (z_expand - qk_y) / (qk_y_plus - qk_y) * (
alpha_tilde - qk_x
qk_x_plus
)
= torch.minimum(
alpha_tilde
torch.maximum(alpha_tilde, qk_x), qk_x_plus
)
return alpha_tilde
def cdf_tail(
self, z: torch.Tensor, left_tail: bool = True
-> torch.Tensor:
) r"""
Computes the quantile level alpha_tilde such that
alpha_tilde
= q^{-1}(z) if z is in the tail region
= qk_x_l or qk_x_r if z is in the non-tail region
Parameters
----------
z
Observation, shape = (*batch_shape,)
left_tail
If True, compute alpha_tilde for the left tail
Otherwise, compute alpha_tilde for the right tail
Returns
-------
alpha_tilde
Corresponding quantile level, shape = (*batch_shape,)
"""
if left_tail:
= self.tail_al, self.tail_bl, self.qk_x_l
tail_a, tail_b, qk_x else:
= self.tail_ar, self.tail_br, 1 - self.qk_x_r
tail_a, tail_b, qk_x
= torch.minimum((z - tail_b) / tail_a, torch.log(qk_x))
log_alpha_tilde = torch.exp(log_alpha_tilde)
alpha_tilde return alpha_tilde if left_tail else 1 - alpha_tilde
def crps_tail(
self, z: torch.Tensor, left_tail: bool = True
-> torch.Tensor:
) r"""
Compute CRPS in analytical form for left/right tails
Parameters
----------
z
Observation to evaluate. shape = (*batch_shape,)
left_tail
If True, compute CRPS for the left tail
Otherwise, compute CRPS for the right tail
Returns
-------
Tensor
Tensor containing the CRPS, of the same shape as z
"""
= self.cdf_tail(z, left_tail=left_tail)
alpha_tilde
if left_tail:
= (
tail_a, tail_b, qk_x, qk_y self.tail_al,
self.tail_bl,
self.qk_x_l,
self.qk_y_l,
)= (z - tail_b) * (qk_x**2 - 2 * qk_x + 2 * alpha_tilde)
term1 = qk_x**2 * tail_a * (-torch.log(qk_x) + 0.5)
term2 = term2 + 2 * torch.where(
term2 < qk_y,
z * tail_a * (torch.log(qk_x) - 1)
qk_x + alpha_tilde * (-z + tail_b + tail_a),
torch.zeros_like(qk_x),
)else:
= (
tail_a, tail_b, qk_x, qk_y self.tail_ar,
self.tail_br,
self.qk_x_r,
self.qk_y_r,
)= (z - tail_b) * (-1 - qk_x**2 + 2 * alpha_tilde)
term1 = tail_a * (
term2 -0.5 * (qk_x + 1) ** 2
+ (qk_x**2 - 1) * torch.log(1 - qk_x)
+ 2 * alpha_tilde
)= term2 + 2 * torch.where(
term2 > qk_y,
z 1 - alpha_tilde) * (z - tail_b),
(* (1 - qk_x) * torch.log(1 - qk_x),
tail_a
)
return term1 + term2
def crps_spline(self, z: torch.Tensor) -> torch.Tensor:
"""
计算样条的CRPS(连续分级概率评分)的解析形式
**参数**<br>
`z`:待评估的观测值。
"""
= self.qk_x, self.qk_x_plus, self.qk_y
qk_x, qk_x_plus, qk_y = self.sk_x, self.sk_x_plus
sk_x, sk_x_plus = self.delta_sk_x, self.delta_sk_y
delta_sk_x, delta_sk_y
= z.unsqueeze(dim=-1)
z_expand = qk_x_plus.unsqueeze(dim=-1)
qk_x_plus_expand
= self.cdf_spline(z)
alpha_tilde = alpha_tilde.unsqueeze(dim=-1)
alpha_tilde_expand
= torch.minimum(torch.maximum(alpha_tilde_expand, sk_x), sk_x_plus)
r
= (
coeff1 -2 / 3 * sk_x_plus**3
+ sk_x * sk_x_plus**2
+ sk_x_plus**2
- (1 / 3) * sk_x**3
- 2 * sk_x * sk_x_plus
- r**2
+ 2 * sk_x * r
)
= (
coeff2 -2 * torch.maximum(alpha_tilde_expand, sk_x_plus)
+ sk_x_plus**2
+ 2 * qk_x_plus_expand
- qk_x_plus_expand**2
)
= (
result **2 - qk_x**2) * (z_expand - qk_y)
(qk_x_plus+ 2 * (qk_x_plus - alpha_tilde) * (qk_y - z_expand)
+ torch.sum((delta_sk_y / delta_sk_x) * coeff1, dim=-1)
+ torch.sum(delta_sk_y * coeff2, dim=-1)
)
return torch.sum(result, dim=-1)
def loss(self, z: torch.Tensor) -> torch.Tensor:
return self.crps(z)
def log_prob(self, z: torch.Tensor) -> torch.Tensor:
return -self.crps(z)
def crps(self, z: torch.Tensor) -> torch.Tensor:
"""
计算CRPS的解析形式
**参数**
`z`:待评估的观测值。
"""
= self.crps_tail(z, left_tail=True)
crps_lt = self.crps_tail(z, left_tail=False)
crps_rt
return crps_lt + crps_rt + self.crps_spline(z)
def cdf(self, z: torch.Tensor) -> torch.Tensor:
"""
计算使得分位数水平 alpha_tilde 满足
q(alpha_tilde) = z
**参数**
`z`: 形状为 (*batch_shape,) 的张量
"""
= self.qk_y, self.qk_y_l, self.qk_y_plus
qk_y, qk_y_l, qk_y_plus
= torch.where(
alpha_tilde < qk_y_l,
z self.cdf_tail(z, left_tail=True),
self.cdf_tail(z, left_tail=False),
)
= self.cdf_spline(z)
spline_alpha_tilde
for spline_idx in range(self.num_qk - 1):
= torch.logical_and(
is_in_between <= z, z < qk_y_plus[..., spline_idx]
qk_y[..., spline_idx]
)
= torch.where(
alpha_tilde
is_in_between, spline_alpha_tilde[..., spline_idx], alpha_tilde
)
return alpha_tilde
def rsample(self, sample_shape: torch.Size = torch.Size()) -> torch.Tensor:
"""
用于绘制随机样本的函数
**参数**
`num_samples`:样本数量
"""
# 如果 sample_shape 为空(即 ()),那么 input_alpha 应具有相同的形状。
# 即 beta_l,即 (*batch_shape,)
# 否则,形状应为 (*sample_shape, *batch_shape)
= (
target_shape self.beta_l.shape
if sample_shape == torch.Size()
else torch.Size(sample_shape) + self.beta_l.shape
)
= torch.rand(
alpha
target_shape,=self.beta_l.dtype,
dtype=self.beta_l.device,
device=self.beta_l.layout,
layout
)
= self.quantile(alpha)
sample
if sample_shape == torch.Size():
= sample.squeeze(dim=0)
sample
return sample
@property
def batch_shape(self) -> torch.Size:
return self.beta_l.shape
def isqf_domain_map(input: torch.Tensor, tol: float=1e-4, quantiles: torch.Tensor = torch.tensor([0.1, 0.5, 0.9], dtype=torch.float32), num_pieces: int = 5):
""" ISQF Domain Map
Maps input into distribution constraints, by construction input's
last dimension is of matching `distr_args` length.
**Parameters:**<br>
`input`: tensor, of dimensions [B,T,H,theta] or [B,H,theta].<br>
`tol`: float, tolerance.<br>
`quantiles`: tensor, quantiles used for ISQF (i.e. x-positions for the knots). <br>
`num_pieces`: int, num_pieces used for each quantile spline. <br>
**Returns:**<br>
`(spline_knots, spline_heights, beta_l, beta_r, qk_y, qk_x)`: tuple with tensors of ISQF distribution arguments.<br>
"""
# 添加tol以防止y方向的距离
# 两个分位数节点过于接近
#
# 因为在这种情况下,样条结点可能会被挤压在一起。
# 并导致样条CRPS计算中的溢出问题
= len(quantiles)
num_qk = 0
start_index = input[..., start_index: start_index + (num_qk - 1) * num_pieces]
spline_knots += (num_qk - 1) * num_pieces
start_index = input[..., start_index: start_index + (num_qk - 1) * num_pieces]
spline_heights += (num_qk - 1) * num_pieces
start_index = input[..., start_index: start_index + 1]
beta_l += 1
start_index = input[..., start_index: start_index + 1]
beta_r += 1
start_index = input[..., start_index: start_index + num_qk]
quantile_knots
= torch.cat(
qk_y
[0:1],
quantile_knots[..., abs(quantile_knots[..., 1:]) + tol,
torch.
],=-1,
dim
)= torch.cumsum(qk_y, dim=-1)
qk_y
# 在计算1/beta时防止溢出
= torch.abs(beta_l.squeeze(-1)) + tol
beta_l = torch.abs(beta_r.squeeze(-1)) + tol
beta_r
# 重塑样条参数
= spline_knots.shape[:-1]
batch_shape
# 将 qk_x 从 (num_qk,) 重复扩展至 (*batch_shape, num_qk)
= torch.sort(quantiles)\
qk_x_repeat \
.values*batch_shape, 1)\
.repeat(input.device)
.to(
# 结点和高度具有形状 (*batch_shape, (num_qk-1)*num_pieces)
# 将它们重塑为 (*batch_shape, (num_qk-1), num_pieces)
= spline_knots.reshape(
spline_knots_reshape *batch_shape, (num_qk - 1), num_pieces
)= spline_heights.reshape(
spline_heights_reshape *batch_shape, (num_qk - 1), num_pieces
)
return spline_knots_reshape, spline_heights_reshape, beta_l, beta_r, qk_y, qk_x_repeat
def isqf_scale_decouple(output, loc=None, scale=None):
""" ISQF 尺度解耦
稳定模型输出优化。我们直接将位置和尺度传递给(变换后的)分布构造函数。
"""
= output
spline_knots, spline_heights, beta_l, beta_r, qk_y, qk_x_repeat if loc is None:
= torch.zeros_like(beta_l)
loc if scale is None:
= torch.ones_like(beta_l)
scale
return (spline_knots, spline_heights, beta_l, beta_r, qk_y, qk_x_repeat, loc, scale)
class DistributionLoss(torch.nn.Module):
""" DistributionLoss
This PyTorch module wraps the `torch.distribution` classes allowing it to
interact with NeuralForecast models modularly. It shares the negative
log-likelihood as the optimization objective and a sample method to
generate empirically the quantiles defined by the `level` list.
Additionally, it implements a distribution transformation that factorizes the
scale-dependent likelihood parameters into a base scale and a multiplier
efficiently learnable within the network's non-linearities operating ranges.
Available distributions:<br>
- Poisson<br>
- Normal<br>
- StudentT<br>
- NegativeBinomial<br>
- Tweedie<br>
- Bernoulli (Temporal Classifiers)<br>
- ISQF (Incremental Spline Quantile Function)
**Parameters:**<br>
`distribution`: str, identifier of a torch.distributions.Distribution class.<br>
`level`: float list [0,100], confidence levels for prediction intervals.<br>
`quantiles`: float list [0,1], alternative to level list, target quantiles.<br>
`num_samples`: int=500, number of samples for the empirical quantiles.<br>
`return_params`: bool=False, wether or not return the Distribution parameters.<br><br>
**References:**<br>
- [PyTorch Probability Distributions Package: StudentT.](https://pytorch.org/docs/stable/distributions.html#studentt)<br>
- [David Salinas, Valentin Flunkert, Jan Gasthaus, Tim Januschowski (2020).
"DeepAR: Probabilistic forecasting with autoregressive recurrent networks". International Journal of Forecasting.](https://www.sciencedirect.com/science/article/pii/S0169207019301888)<br>
- [Park, Youngsuk, Danielle Maddix, François-Xavier Aubet, Kelvin Kan, Jan Gasthaus, and Yuyang Wang (2022). "Learning Quantile Functions without Quantile Crossing for Distribution-free Time Series Forecasting".](https://proceedings.mlr.press/v151/park22a.html)
"""
def __init__(self, distribution, level=[80, 90], quantiles=None,
=1000, return_params=False, **distribution_kwargs):
num_samplessuper(DistributionLoss, self).__init__()
self.output_names = level_to_outputs(level)
qs, = torch.Tensor(qs)
qs
# 将分位数转换为同质输出名称
if quantiles is not None:
= sorted(quantiles)
quantiles self.output_names = quantiles_to_outputs(quantiles)
_, = torch.Tensor(quantiles)
qs self.quantiles = torch.nn.Parameter(qs, requires_grad=False)
= len(self.quantiles)
num_qk
if "num_pieces" not in distribution_kwargs:
= 5
num_pieces else:
= distribution_kwargs.pop("num_pieces")
num_pieces
= dict(
available_distributions =Bernoulli,
Bernoulli=Normal,
Normal=Poisson,
Poisson=StudentT,
StudentT=NegativeBinomial,
NegativeBinomial=Tweedie,
Tweedie=ISQF)
ISQF= dict(Bernoulli=bernoulli_domain_map,
domain_maps =normal_domain_map,
Normal=poisson_domain_map,
Poisson=student_domain_map,
StudentT=nbinomial_domain_map,
NegativeBinomial=tweedie_domain_map,
Tweedie=partial(isqf_domain_map,
ISQF=qs,
quantiles=num_pieces))
num_pieces= dict(
scale_decouples =bernoulli_scale_decouple,
Bernoulli=normal_scale_decouple,
Normal=poisson_scale_decouple,
Poisson=student_scale_decouple,
StudentT=nbinomial_scale_decouple,
NegativeBinomial=tweedie_scale_decouple,
Tweedie=isqf_scale_decouple)
ISQF= dict(Bernoulli=["-logits"],
param_names =["-loc", "-scale"],
Normal=["-loc"],
Poisson=["-df", "-loc", "-scale"],
StudentT=["-total_count", "-logits"],
NegativeBinomial=["-log_mu"],
Tweedie=[f"-spline_knot_{i + 1}" for i in range((num_qk - 1) * num_pieces)] + \
ISQFf"-spline_height_{i + 1}" for i in range((num_qk - 1) * num_pieces)] + \
["-beta_l", "-beta_r"] + \
[f"-quantile_knot_{i + 1}" for i in range(num_qk)],
[
)assert (distribution in available_distributions.keys()), f'{distribution} not available'
self.distribution = distribution
self._base_distribution = available_distributions[distribution]
self.domain_map = domain_maps[distribution]
self.scale_decouple = scale_decouples[distribution]
self.distribution_kwargs = distribution_kwargs
self.num_samples = num_samples
self.param_names = param_names[distribution]
# If True, predict_step will return Distribution's parameters
self.return_params = return_params
if self.return_params:
self.output_names = self.output_names + self.param_names
# Add first output entry for the sample_mean
self.output_names.insert(0, "")
self.outputsize_multiplier = len(self.param_names)
self.is_distribution_output = True
def get_distribution(self, distr_args, **distribution_kwargs) -> Distribution:
"""
Construct the associated Pytorch Distribution, given the collection of
constructor arguments and, optionally, location and scale tensors.
**Parameters**<br>
`distr_args`: Constructor arguments for the underlying Distribution type.<br>
**Returns**<br>
`Distribution`: AffineTransformed distribution.<br>
"""
# TransformedDistribution(distr, [AffineTransform(loc=loc, scale=scale)])
= self._base_distribution(*distr_args, **distribution_kwargs)
distr
if self.distribution =='Poisson':
= constraints.nonnegative
distr.support return distr
def sample(self,
distr_args: torch.Tensor,int] = None):
num_samples: Optional["""
Construct the empirical quantiles from the estimated Distribution,
sampling from it `num_samples` independently.
**Parameters**<br>
`distr_args`: Constructor arguments for the underlying Distribution type.<br>
`num_samples`: int=500, overwrite number of samples for the empirical quantiles.<br>
**Returns**<br>
`samples`: tensor, shape [B,H,`num_samples`].<br>
`quantiles`: tensor, empirical quantiles defined by `levels`.<br>
"""
if num_samples is None:
= self.num_samples
num_samples
# print(distr_args[0].size())
= distr_args[0].shape[:2]
B, H = len(self.quantiles)
Q
# 实例化缩放解耦分布
= self.get_distribution(distr_args=distr_args, **self.distribution_kwargs)
distr = distr.sample(sample_shape=(num_samples,))
samples = samples.permute(1,2,0) # [样本, B, H] -> [B, H, 样本]
samples = samples.view(B*H, num_samples)
samples = torch.mean(samples, dim=-1)
sample_mean
# 计算分位数
= self.quantiles.to(distr_args[0].device)
quantiles_device = torch.quantile(input=samples,
quants =quantiles_device, dim=1)
q= quants.permute((1,0)) # [Q, B*H] -> [B*H, Q]
quants
# 最终重塑
= samples.view(B, H, num_samples)
samples = sample_mean.view(B, H, 1)
sample_mean = quants.view(B, H, Q)
quants
return samples, sample_mean, quants
def __call__(self,
y: torch.Tensor,
distr_args: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
Computes the negative log-likelihood objective function.
To estimate the following predictive distribution:
$$\mathrm{P}(\mathbf{y}_{\\tau}\,|\,\\theta) \\quad \mathrm{and} \\quad -\log(\mathrm{P}(\mathbf{y}_{\\tau}\,|\,\\theta))$$
where $\\theta$ represents the distributions parameters. It aditionally
summarizes the objective signal using a weighted average using the `mask` tensor.
**Parameters**<br>
`y`: tensor, Actual values.<br>
`distr_args`: Constructor arguments for the underlying Distribution type.<br>
`loc`: Optional tensor, of the same shape as the batch_shape + event_shape
of the resulting distribution.<br>
`scale`: Optional tensor, of the same shape as the batch_shape+event_shape
of the resulting distribution.<br>
`mask`: tensor, Specifies date stamps per serie to consider in loss.<br>
**Returns**<br>
`loss`: scalar, weighted loss function against which backpropagation will be performed.<br>
"""
# 实例化缩放解耦分布
= self.get_distribution(distr_args=distr_args, **self.distribution_kwargs)
distr = -distr.log_prob(y)
loss_values = mask
loss_weights return weighted_average(loss_values, weights=loss_weights)
='DistributionLoss.__init__', title_level=3) show_doc(DistributionLoss, name
='DistributionLoss.sample', title_level=3) show_doc(DistributionLoss.sample, name
__call__, name='DistributionLoss.__call__', title_level=3) show_doc(DistributionLoss.
::: {#14a7e381 .cell 0=‘隐’ 1=‘藏’}
# Unit tests to check DistributionLoss' stored quantiles
# attribute is correctly instantiated
= DistributionLoss(distribution='Normal', level=[80, 90])
check len(check.quantiles), 5)
test_eq(
= DistributionLoss(distribution='Normal',
check =[0.0100, 0.1000, 0.5, 0.9000, 0.9900])
quantilesprint(check.output_names)
print(check.quantiles)
len(check.quantiles), 5)
test_eq(
= DistributionLoss(distribution='Normal',
check =[0.0100, 0.1000, 0.9000, 0.9900])
quantileslen(check.quantiles), 4) test_eq(
:::
泊松混合网格 (PMM)
class PMM(torch.nn.Module):
""" Poisson Mixture Mesh
This Poisson Mixture statistical model assumes independence across groups of
data $\mathcal{G}=\{[g_{i}]\}$, and estimates relationships within the group.
$$ \mathrm{P}\\left(\mathbf{y}_{[b][t+1:t+H]}\\right) =
\prod_{ [g_{i}] \in \mathcal{G}} \mathrm{P} \\left(\mathbf{y}_{[g_{i}][\\tau]} \\right) =
\prod_{\\beta\in[g_{i}]}
\\left(\sum_{k=1}^{K} w_k \prod_{(\\beta,\\tau) \in [g_i][t+1:t+H]} \mathrm{Poisson}(y_{\\beta,\\tau}, \hat{\\lambda}_{\\beta,\\tau,k}) \\right)$$
**Parameters:**<br>
`n_components`: int=10, the number of mixture components.<br>
`level`: float list [0,100], confidence levels for prediction intervals.<br>
`quantiles`: float list [0,1], alternative to level list, target quantiles.<br>
`return_params`: bool=False, wether or not return the Distribution parameters.<br>
`batch_correlation`: bool=False, wether or not model batch correlations.<br>
`horizon_correlation`: bool=False, wether or not model horizon correlations.<br>
**References:**<br>
[Kin G. Olivares, O. Nganba Meetei, Ruijun Ma, Rohan Reddy, Mengfei Cao, Lee Dicker.
Probabilistic Hierarchical Forecasting with Deep Poisson Mixtures. Submitted to the International
Journal Forecasting, Working paper available at arxiv.](https://arxiv.org/pdf/2110.13179.pdf)
"""
def __init__(self, n_components=10, level=[80, 90], quantiles=None,
=1000, return_params=False,
num_samples=False, horizon_correlation=False):
batch_correlationsuper(PMM, self).__init__()
# 将变换级别转换为MQ损失参数
self.output_names = level_to_outputs(level)
qs, = torch.Tensor(qs)
qs
# 将分位数转换为同质输出名称
if quantiles is not None:
self.output_names = quantiles_to_outputs(quantiles)
_, = torch.Tensor(quantiles)
qs self.quantiles = torch.nn.Parameter(qs, requires_grad=False)
self.num_samples = num_samples
self.batch_correlation = batch_correlation
self.horizon_correlation = horizon_correlation
# If True, predict_step will return Distribution's parameters
self.return_params = return_params
if self.return_params:
self.param_names = [f"-lambda-{i}" for i in range(1, n_components + 1)]
self.output_names = self.output_names + self.param_names
# Add first output entry for the sample_mean
self.output_names.insert(0, "")
self.outputsize_multiplier = n_components
self.is_distribution_output = True
def domain_map(self, output: torch.Tensor):
return (output,)#, weights
def scale_decouple(self,
output,= None,
loc: Optional[torch.Tensor] = None):
scale: Optional[torch.Tensor] """ Scale Decouple
Stabilizes model's output optimization, by learning residual
variance and residual location based on anchoring `loc`, `scale`.
Also adds domain protection to the distribution parameters.
"""
= output[0]
lambdas if (loc is not None) and (scale is not None):
= loc.view(lambdas.size(dim=0), 1, -1)
loc = scale.view(lambdas.size(dim=0), 1, -1)
scale = (lambdas * scale) + loc
lambdas = F.softplus(lambdas)
lambdas return (lambdas,)
def sample(self, distr_args, num_samples=None):
"""
Construct the empirical quantiles from the estimated Distribution,
sampling from it `num_samples` independently.
**Parameters**<br>
`distr_args`: Constructor arguments for the underlying Distribution type.<br>
`loc`: Optional tensor, of the same shape as the batch_shape + event_shape
of the resulting distribution.<br>
`scale`: Optional tensor, of the same shape as the batch_shape+event_shape
of the resulting distribution.<br>
`num_samples`: int=500, overwrites number of samples for the empirical quantiles.<br>
**Returns**<br>
`samples`: tensor, shape [B,H,`num_samples`].<br>
`quantiles`: tensor, empirical quantiles defined by `levels`.<br>
"""
if num_samples is None:
= self.num_samples
num_samples
= distr_args[0]
lambdas = lambdas.size()
B, H, K = len(self.quantiles)
Q
# 样本 K ~ 多项分布(权重)
# 在B和H之间共享
# 权重 = torch.repeat_interleave(input=权重, repeats=H, dim=2)
= (1/K) * torch.ones_like(lambdas, device=lambdas.device)
weights
# 避免循环,向量化
= weights.reshape(-1, K)
weights = lambdas.flatten()
lambdas
# 向量化技巧以恢复行索引
= torch.multinomial(input=weights,
sample_idxs =num_samples,
num_samples=True)
replacement= torch.unsqueeze(torch.arange(B * H, device=lambdas.device), -1) * K
aux_col_idx
# 去设备
= sample_idxs.to(lambdas.device)
sample_idxs
= sample_idxs + aux_col_idx
sample_idxs = sample_idxs.flatten()
sample_idxs
= lambdas[sample_idxs]
sample_lambdas
# 样本 y 独立地服从泊松分布 (λ)
= torch.poisson(sample_lambdas).to(lambdas.device)
samples = samples.view(B*H, num_samples)
samples = torch.mean(samples, dim=-1)
sample_mean
# 计算分位数
= self.quantiles.to(lambdas.device)
quantiles_device = torch.quantile(input=samples, q=quantiles_device, dim=1)
quants = quants.permute((1,0)) # Q,B*H
quants
# 最终重塑
= samples.view(B, H, num_samples)
samples = sample_mean.view(B, H, 1)
sample_mean = quants.view(B, H, Q)
quants
return samples, sample_mean, quants
def neglog_likelihood(self,
y: torch.Tensor,
distr_args: Tuple[torch.Tensor],None] = None,):
mask: Union[torch.Tensor, if mask is None:
= (y > 0) * 1
mask else:
= mask * ((y > 0) * 1)
mask
= 1e-10
eps = distr_args[0]
lambdas = lambdas.size()
B, H, K
= (1/K) * torch.ones_like(lambdas, device=lambdas.device)
weights
= y[:,:,None]
y = mask[:,:,None]
mask
= y * mask # 保护您的负面记录
y
# 单泊松似然
= y.xlogy(lambdas + eps) - lambdas - (y + 1).lgamma()
log_pi
if self.batch_correlation:
= torch.sum(log_pi, dim=0, keepdim=True)
log_pi
if self.horizon_correlation:
= torch.sum(log_pi, dim=1, keepdim=True)
log_pi
# 数值稳定的混合对数似然
= torch.logsumexp((torch.log(weights) + log_pi), dim=2, keepdim=True)
loglik = loglik * mask
loglik
= torch.sum(weights * lambdas, axis=-1, keepdims=True)
mean = torch.mean(torch.square(y - mean) * mask)
reglrz = -torch.mean(loglik) + 0.001 * reglrz
loss return loss
def __call__(self, y: torch.Tensor,
distr_args: Tuple[torch.Tensor],None] = None):
mask: Union[torch.Tensor,
return self.neglog_likelihood(y=y, distr_args=distr_args, mask=mask)
='PMM.__init__', title_level=3) show_doc(PMM, name
='PMM.sample', title_level=3) show_doc(PMM.sample, name
__call__, name='PMM.__call__', title_level=3) show_doc(PMM.
::: {#e4a20e21 .cell 0=‘隐’ 1=‘藏’}
# 单元测试用于检查PMM存储的分位数
# 属性已正确实例化
= PMM(n_components=2, level=[80, 90])
check len(check.quantiles), 5)
test_eq(
= PMM(n_components=2,
check =[0.0100, 0.1000, 0.5, 0.9000, 0.9900])
quantilesprint(check.output_names)
print(check.quantiles)
len(check.quantiles), 5)
test_eq(
= PMM(n_components=2,
check =[0.0100, 0.1000, 0.9000, 0.9900])
quantileslen(check.quantiles), 4) test_eq(
:::
# 创建单一混合物并广播至N、H、K
= torch.ones((1,3))[None, :, :]
weights = torch.Tensor([[5,10,15], [10,20,30]])[None, :, :]
lambdas
# 为批量维度N创建重复项。
=2
N= torch.repeat_interleave(input=weights, repeats=N, dim=0)
weights = torch.repeat_interleave(input=lambdas, repeats=N, dim=0)
lambdas
print('weights.shape (N,H,K) \t', weights.shape)
print('lambdas.shape (N,H,K) \t', lambdas.shape)
= PMM(quantiles=[0.1, 0.40, 0.5, 0.60, 0.9])
distr = (lambdas,)
distr_args = distr.sample(distr_args)
samples, sample_mean, quants
print('samples.shape (N,H,num_samples) ', samples.shape)
print('sample_mean.shape (N,H) ', sample_mean.shape)
print('quants.shape (N,H,Q) \t\t', quants.shape)
# 绘制综合数据
= range(quants.shape[1]) # H长度
x_plot = quants[0,:,:] # 过滤器 N,G,T -> H,Q
y_plot_hat = samples[0,:,:] # 滤波器 N,G,T -> H,样本数
samples_hat
# 单个预测时间范围 \tau = t+1 的核密度图
= plt.subplots(figsize=(3.7, 2.9))
fig, ax
0,:], alpha=0.5, label=r'Horizon $\tau+1$')
ax.hist(samples_hat[1,:], alpha=0.5, label=r'Horizon $\tau+2$')
ax.hist(samples_hat[set(xlabel='Y values', ylabel='Probability')
ax.'Single horizon Distributions')
plt.title(=(1, 1), loc='upper left', ncol=1)
plt.legend(bbox_to_anchor
plt.grid()
plt.show()
plt.close()
# 绘制模拟轨迹
= plt.subplots(figsize=(3.7, 2.9))
fig, ax 2], color='black', label='median [q50]')
plt.plot(x_plot, y_plot_hat[:,
plt.fill_between(x_plot,=y_plot_hat[:,1], y2=y_plot_hat[:,3],
y1='blue', alpha=0.4, label='[p25-p75]')
facecolor
plt.fill_between(x_plot,=y_plot_hat[:,0], y2=y_plot_hat[:,4],
y1='blue', alpha=0.2, label='[p1-p99]')
facecolorset(xlabel='Horizon', ylabel='Y values')
ax.'PMM Probabilistic Predictions')
plt.title(=(1, 1), loc='upper left', ncol=1)
plt.legend(bbox_to_anchor
plt.grid()
plt.show() plt.close()
高斯混合网格 (GMM)
class GMM(torch.nn.Module):
""" Gaussian Mixture Mesh
This Gaussian Mixture statistical model assumes independence across groups of
data $\mathcal{G}=\{[g_{i}]\}$, and estimates relationships within the group.
$$ \mathrm{P}\\left(\mathbf{y}_{[b][t+1:t+H]}\\right) =
\prod_{ [g_{i}] \in \mathcal{G}} \mathrm{P}\left(\mathbf{y}_{[g_{i}][\\tau]}\\right)=
\prod_{\\beta\in[g_{i}]}
\\left(\sum_{k=1}^{K} w_k \prod_{(\\beta,\\tau) \in [g_i][t+1:t+H]}
\mathrm{Gaussian}(y_{\\beta,\\tau}, \hat{\mu}_{\\beta,\\tau,k}, \sigma_{\\beta,\\tau,k})\\right)$$
**Parameters:**<br>
`n_components`: int=10, the number of mixture components.<br>
`level`: float list [0,100], confidence levels for prediction intervals.<br>
`quantiles`: float list [0,1], alternative to level list, target quantiles.<br>
`return_params`: bool=False, wether or not return the Distribution parameters.<br>
`batch_correlation`: bool=False, wether or not model batch correlations.<br>
`horizon_correlation`: bool=False, wether or not model horizon correlations.<br><br>
**References:**<br>
[Kin G. Olivares, O. Nganba Meetei, Ruijun Ma, Rohan Reddy, Mengfei Cao, Lee Dicker.
Probabilistic Hierarchical Forecasting with Deep Poisson Mixtures. Submitted to the International
Journal Forecasting, Working paper available at arxiv.](https://arxiv.org/pdf/2110.13179.pdf)
"""
def __init__(self, n_components=1, level=[80, 90], quantiles=None,
=1000, return_params=False,
num_samples=False, horizon_correlation=False):
batch_correlationsuper(GMM, self).__init__()
# 将变换级别转换为MQ损失参数
self.output_names = level_to_outputs(level)
qs, = torch.Tensor(qs)
qs
# 将分位数转换为同质输出名称
if quantiles is not None:
self.output_names = quantiles_to_outputs(quantiles)
_, = torch.Tensor(quantiles)
qs self.quantiles = torch.nn.Parameter(qs, requires_grad=False)
self.num_samples = num_samples
self.batch_correlation = batch_correlation
self.horizon_correlation = horizon_correlation
# If True, predict_step will return Distribution's parameters
self.return_params = return_params
if self.return_params:
= [f"-mu-{i}" for i in range(1, n_components + 1)]
mu_names = [f"-std-{i}" for i in range(1, n_components + 1)]
std_names = [i for j in zip(mu_names, std_names) for i in j]
mu_std_names self.output_names = self.output_names + mu_std_names
# Add first output entry for the sample_mean
self.output_names.insert(0, "")
self.outputsize_multiplier = 2 * n_components
self.is_distribution_output = True
def domain_map(self, output: torch.Tensor):
= torch.tensor_split(output, 2, dim=-1)
means, stds return (means, stds)
def scale_decouple(self,
output,= None,
loc: Optional[torch.Tensor] = None,
scale: Optional[torch.Tensor] float=0.2):
eps: """ Scale Decouple
Stabilizes model's output optimization, by learning residual
variance and residual location based on anchoring `loc`, `scale`.
Also adds domain protection to the distribution parameters.
"""
= output
means, stds = F.softplus(stds)
stds if (loc is not None) and (scale is not None):
= loc.view(means.size(dim=0), 1, -1)
loc = scale.view(means.size(dim=0), 1, -1)
scale = (means * scale) + loc
means = (stds + eps) * scale
stds return (means, stds)
def sample(self, distr_args, num_samples=None):
"""
Construct the empirical quantiles from the estimated Distribution,
sampling from it `num_samples` independently.
**Parameters**<br>
`distr_args`: Constructor arguments for the underlying Distribution type.<br>
`loc`: Optional tensor, of the same shape as the batch_shape + event_shape
of the resulting distribution.<br>
`scale`: Optional tensor, of the same shape as the batch_shape+event_shape
of the resulting distribution.<br>
`num_samples`: int=500, number of samples for the empirical quantiles.<br>
**Returns**<br>
`samples`: tensor, shape [B,H,`num_samples`].<br>
`quantiles`: tensor, empirical quantiles defined by `levels`.<br>
"""
if num_samples is None:
= self.num_samples
num_samples
= distr_args
means, stds = means.size()
B, H, K = len(self.quantiles)
Q assert means.shape == stds.shape
# 样本 K ~ 多项分布(权重)
# 在B和H之间共享
# 权重 = torch.repeat_interleave(input=权重, repeats=H, dim=2)
= (1/K) * torch.ones_like(means, device=means.device)
weights
# 避免循环,向量化
= weights.reshape(-1, K)
weights = means.flatten()
means = stds.flatten()
stds
# 向量化技巧以恢复行索引
= torch.multinomial(input=weights,
sample_idxs =num_samples,
num_samples=True)
replacement= torch.unsqueeze(torch.arange(B * H, device=means.device),-1) * K
aux_col_idx
# 去设备
= sample_idxs.to(means.device)
sample_idxs
= sample_idxs + aux_col_idx
sample_idxs = sample_idxs.flatten()
sample_idxs
= means[sample_idxs]
sample_means = stds[sample_idxs]
sample_stds
# 样本 y ~ 独立地服从正态分布(mu, std)
= torch.normal(sample_means, sample_stds).to(means.device)
samples = samples.view(B*H, num_samples)
samples = torch.mean(samples, dim=-1)
sample_mean
# 计算分位数
= self.quantiles.to(means.device)
quantiles_device = torch.quantile(input=samples, q=quantiles_device, dim=1)
quants = quants.permute((1,0)) # Q,B*H
quants
# 最终重塑
= samples.view(B, H, num_samples)
samples = sample_mean.view(B, H, 1)
sample_mean = quants.view(B, H, Q)
quants
return samples, sample_mean, quants
def neglog_likelihood(self,
y: torch.Tensor,
distr_args: Tuple[torch.Tensor, torch.Tensor],None] = None):
mask: Union[torch.Tensor,
if mask is None:
= torch.ones_like(y)
mask
= distr_args
means, stds = means.size()
B, H, K
= (1/K) * torch.ones_like(means, device=means.device)
weights
= y[:,:, None]
y = mask[:,:,None]
mask
= stds ** 2
var = torch.log(stds)
log_stds = - ((y - means) ** 2 / (2 * var)) - log_stds \
log_pi - math.log(math.sqrt(2 * math.pi))
if self.batch_correlation:
= torch.sum(log_pi, dim=0, keepdim=True)
log_pi
if self.horizon_correlation:
= torch.sum(log_pi, dim=1, keepdim=True)
log_pi
# 数值稳定的混合对数似然
= torch.logsumexp((torch.log(weights) + log_pi), dim=2, keepdim=True)
loglik = loglik * mask
loglik
= -torch.mean(loglik)
loss return loss
def __call__(self, y: torch.Tensor,
distr_args: Tuple[torch.Tensor, torch.Tensor],None] = None,):
mask: Union[torch.Tensor,
return self.neglog_likelihood(y=y, distr_args=distr_args, mask=mask)
='GMM.__init__', title_level=3) show_doc(GMM, name
='GMM.sample', title_level=3) show_doc(GMM.sample, name
__call__, name='GMM.__call__', title_level=3) show_doc(GMM.
::: {#8ebe4250 .cell 0=‘隐’ 1=‘藏’}
# 单元测试用于检查PMM存储的分位数
# 属性已正确实例化
= GMM(n_components=2, level=[80, 90])
check len(check.quantiles), 5)
test_eq(
= GMM(n_components=2,
check =[0.0100, 0.1000, 0.5, 0.9000, 0.9900])
quantilesprint(check.output_names)
print(check.quantiles)
len(check.quantiles), 5)
test_eq(
= GMM(n_components=2,
check =[0.0100, 0.1000, 0.9000, 0.9900])
quantileslen(check.quantiles), 4) test_eq(
:::
# 创建单一混合物并广播至N、H、K
= torch.Tensor([[5,10,15], [10,20,30]])[None, :, :]
means
# 为批处理维度N创建重复项。
=2
N= torch.repeat_interleave(input=means, repeats=N, dim=0)
means = torch.ones_like(means)
weights = torch.ones_like(means)
stds
print('weights.shape (N,H,K) \t', weights.shape)
print('means.shape (N,H,K) \t', means.shape)
print('stds.shape (N,H,K) \t', stds.shape)
= GMM(quantiles=[0.1, 0.40, 0.5, 0.60, 0.9])
distr = (means, stds)
distr_args = distr.sample(distr_args)
samples, sample_mean, quants
print('samples.shape (N,H,num_samples) ', samples.shape)
print('sample_mean.shape (N,H) ', sample_mean.shape)
print('quants.shape (N,H,Q) \t\t', quants.shape)
# 绘制综合数据
= range(quants.shape[1]) # H长度
x_plot = quants[0,:,:] # 过滤器 N,G,T -> H,Q
y_plot_hat = samples[0,:,:] # 滤波器 N,G,T -> H,num_samples
samples_hat
# 单个预测时间范围 \tau = t+1 的核密度图
= plt.subplots(figsize=(3.7, 2.9))
fig, ax
0,:], alpha=0.5, bins=50,
ax.hist(samples_hat[=r'Horizon $\tau+1$')
label1,:], alpha=0.5, bins=50,
ax.hist(samples_hat[=r'Horizon $\tau+2$')
labelset(xlabel='Y values', ylabel='Probability')
ax.'Single horizon Distributions')
plt.title(=(1, 1), loc='upper left', ncol=1)
plt.legend(bbox_to_anchor
plt.grid()
plt.show()
plt.close()
# 绘制模拟轨迹
= plt.subplots(figsize=(3.7, 2.9))
fig, ax 2], color='black', label='median [q50]')
plt.plot(x_plot, y_plot_hat[:,
plt.fill_between(x_plot,=y_plot_hat[:,1], y2=y_plot_hat[:,3],
y1='blue', alpha=0.4, label='[p25-p75]')
facecolor
plt.fill_between(x_plot,=y_plot_hat[:,0], y2=y_plot_hat[:,4],
y1='blue', alpha=0.2, label='[p1-p99]')
facecolorset(xlabel='Horizon', ylabel='Y values')
ax.'GMM Probabilistic Predictions')
plt.title(=(1, 1), loc='upper left', ncol=1)
plt.legend(bbox_to_anchor
plt.grid()
plt.show() plt.close()
负二项混合网格 (NBMM)
class NBMM(torch.nn.Module):
""" Negative Binomial Mixture Mesh
This N. Binomial Mixture statistical model assumes independence across groups of
data $\mathcal{G}=\{[g_{i}]\}$, and estimates relationships within the group.
$$ \mathrm{P}\\left(\mathbf{y}_{[b][t+1:t+H]}\\right) =
\prod_{ [g_{i}] \in \mathcal{G}} \mathrm{P}\left(\mathbf{y}_{[g_{i}][\\tau]}\\right)=
\prod_{\\beta\in[g_{i}]}
\\left(\sum_{k=1}^{K} w_k \prod_{(\\beta,\\tau) \in [g_i][t+1:t+H]}
\mathrm{NBinomial}(y_{\\beta,\\tau}, \hat{r}_{\\beta,\\tau,k}, \hat{p}_{\\beta,\\tau,k})\\right)$$
**Parameters:**<br>
`n_components`: int=10, the number of mixture components.<br>
`level`: float list [0,100], confidence levels for prediction intervals.<br>
`quantiles`: float list [0,1], alternative to level list, target quantiles.<br>
`return_params`: bool=False, wether or not return the Distribution parameters.<br><br>
**References:**<br>
[Kin G. Olivares, O. Nganba Meetei, Ruijun Ma, Rohan Reddy, Mengfei Cao, Lee Dicker.
Probabilistic Hierarchical Forecasting with Deep Poisson Mixtures. Submitted to the International
Journal Forecasting, Working paper available at arxiv.](https://arxiv.org/pdf/2110.13179.pdf)
"""
def __init__(self, n_components=1, level=[80, 90], quantiles=None,
=1000, return_params=False):
num_samplessuper(NBMM, self).__init__()
# 将变换级别转换为MQLoss参数
self.output_names = level_to_outputs(level)
qs, = torch.Tensor(qs)
qs
# 将分位数转换为同质输出名称
if quantiles is not None:
self.output_names = quantiles_to_outputs(quantiles)
_, = torch.Tensor(quantiles)
qs self.quantiles = torch.nn.Parameter(qs, requires_grad=False)
self.num_samples = num_samples
# If True, predict_step will return Distribution's parameters
self.return_params = return_params
if self.return_params:
= [f"-total_count-{i}" for i in range(1, n_components + 1)]
total_count_names = [f"-probs-{i}" for i in range(1, n_components + 1)]
probs_names = [i for j in zip(total_count_names, probs_names) for i in j]
param_names self.output_names = self.output_names + param_names
# Add first output entry for the sample_mean
self.output_names.insert(0, "")
self.outputsize_multiplier = 2 * n_components
self.is_distribution_output = True
def domain_map(self, output: torch.Tensor):
= torch.tensor_split(output, 2, dim=-1)
mu, alpha return (mu, alpha)
def scale_decouple(self,
output,= None,
loc: Optional[torch.Tensor] = None,
scale: Optional[torch.Tensor] float=0.2):
eps: """ Scale Decouple
Stabilizes model's output optimization, by learning residual
variance and residual location based on anchoring `loc`, `scale`.
Also adds domain protection to the distribution parameters.
"""
# Efficient NBinomial parametrization
= output
mu, alpha = F.softplus(mu) + 1e-8
mu = F.softplus(alpha) + 1e-8 # alpha = 1/total_counts
alpha if (loc is not None) and (scale is not None):
= loc.view(mu.size(dim=0), 1, -1)
loc *= loc
mu /= (loc + 1.)
alpha
# mu = total_count * (probs/(1-probs))
# => probs = mu / (total_count + mu)
# => probs = mu / [total_count * (1 + mu * (1/total_count))]
= 1.0 / alpha
total_count = (mu * alpha / (1.0 + mu * alpha)) + 1e-8
probs return (total_count, probs)
def sample(self, distr_args, num_samples=None):
"""
Construct the empirical quantiles from the estimated Distribution,
sampling from it `num_samples` independently.
**Parameters**<br>
`distr_args`: Constructor arguments for the underlying Distribution type.<br>
`loc`: Optional tensor, of the same shape as the batch_shape + event_shape
of the resulting distribution.<br>
`scale`: Optional tensor, of the same shape as the batch_shape+event_shape
of the resulting distribution.<br>
`num_samples`: int=500, number of samples for the empirical quantiles.<br>
**Returns**<br>
`samples`: tensor, shape [B,H,`num_samples`].<br>
`quantiles`: tensor, empirical quantiles defined by `levels`.<br>
"""
if num_samples is None:
= self.num_samples
num_samples
= distr_args
total_count, probs = total_count.size()
B, H, K = len(self.quantiles)
Q assert total_count.shape == probs.shape
# 样本 K ~ 多项分布(权重)
# 在B和H之间共享
# 权重 = torch.repeat_interleave(input=权重, repeats=H, dim=2)
= (1/K) * torch.ones_like(probs, device=probs.device)
weights
# 避免循环,向量化
= weights.reshape(-1, K)
weights = total_count.flatten()
total_count = probs.flatten()
probs
# 向量化技巧以恢复行索引
= torch.multinomial(input=weights,
sample_idxs =num_samples,
num_samples=True)
replacement= torch.unsqueeze(torch.arange(B * H, device=probs.device),-1) * K
aux_col_idx
# 去设备
= sample_idxs.to(probs.device)
sample_idxs
= sample_idxs + aux_col_idx
sample_idxs = sample_idxs.flatten()
sample_idxs
= total_count[sample_idxs]
sample_total_count = probs[sample_idxs]
sample_probs
# 样本 y 服从独立负二项分布 NBinomial(total_count, probs)
= NegativeBinomial(total_count=sample_total_count,
dist =sample_probs)
probs= dist.sample(sample_shape=(1,)).to(probs.device)[0]
samples = samples.view(B*H, num_samples)
samples = torch.mean(samples, dim=-1)
sample_mean
# 计算分位数
= self.quantiles.to(probs.device)
quantiles_device = torch.quantile(input=samples, q=quantiles_device, dim=1)
quants = quants.permute((1,0)) # Q,B*H
quants
# 最终重塑
= samples.view(B, H, num_samples)
samples = sample_mean.view(B, H, 1)
sample_mean = quants.view(B, H, Q)
quants
return samples, sample_mean, quants
def neglog_likelihood(self,
y: torch.Tensor,
distr_args: Tuple[torch.Tensor, torch.Tensor],None] = None):
mask: Union[torch.Tensor,
if mask is None:
= torch.ones_like(y)
mask
= distr_args
total_count, probs = total_count.size()
B, H, K
= (1/K) * torch.ones_like(probs, device=probs.device)
weights
= y[:,:, None]
y = mask[:,:,None]
mask
= (total_count * torch.log(1.-probs) + y * torch.log(probs))
log_unnormalized_prob = (-torch.lgamma(total_count + y) + torch.lgamma(1. + y) +
log_normalization
torch.lgamma(total_count))+ y == 0.] = 0.
log_normalization[total_count = log_unnormalized_prob - log_normalization
log
# #log = torch.sum(log, dim=0, keepdim=True) 批次/组内联合
# #log = torch.sum(log, dim=1, keepdim=True) 在预测范围内联合
# 数值稳定性混合与对数似然
= torch.amax(log, dim=2, keepdim=True) # [1,1,K](关节塌陷)
log_max = weights * torch.exp(log-log_max) # 取最大值
lik = torch.log(torch.sum(lik, dim=2, keepdim=True)) + log_max # 返回最大值
loglik
= loglik * mask #替换为掩码
loglik
= -torch.mean(loglik)
loss return loss
def __call__(self, y: torch.Tensor,
distr_args: Tuple[torch.Tensor, torch.Tensor],None] = None,):
mask: Union[torch.Tensor,
return self.neglog_likelihood(y=y, distr_args=distr_args, mask=mask)
='NBMM.__init__', title_level=3) show_doc(NBMM, name
='NBMM.sample', title_level=3) show_doc(NBMM.sample, name
__call__, name='NBMM.__call__', title_level=3) show_doc(NBMM.
# 创建单一混合物并广播至N、H、K
= torch.Tensor([[10,20,30], [20,40,60]])[None, :, :]
counts
# 为批处理维度N创建重复项。
=2
N= torch.repeat_interleave(input=counts, repeats=N, dim=0)
counts = torch.ones_like(counts)
weights = torch.ones_like(counts) * 0.5
probs
print('weights.shape (N,H,K) \t', weights.shape)
print('counts.shape (N,H,K) \t', counts.shape)
print('probs.shape (N,H,K) \t', probs.shape)
= NBMM(quantiles=[0.1, 0.40, 0.5, 0.60, 0.9])
model = (counts, probs)
distr_args = model.sample(distr_args, num_samples=2000)
samples, sample_mean, quants
print('samples.shape (N,H,num_samples) ', samples.shape)
print('sample_mean.shape (N,H) ', sample_mean.shape)
print('quants.shape (N,H,Q) \t\t', quants.shape)
# 绘制综合数据
= range(quants.shape[1]) # H长度
x_plot = quants[0,:,:] # 过滤器 N,G,T -> H,Q
y_plot_hat = samples[0,:,:] # 滤波器 N,G,T -> H,num_samples
samples_hat
# 单个预测时间范围 \tau = t+1 的核密度图
= plt.subplots(figsize=(3.7, 2.9))
fig, ax
0,:], alpha=0.5, bins=30,
ax.hist(samples_hat[=r'Horizon $\tau+1$')
label1,:], alpha=0.5, bins=30,
ax.hist(samples_hat[=r'Horizon $\tau+2$')
labelset(xlabel='Y values', ylabel='Probability')
ax.'Single horizon Distributions')
plt.title(=(1, 1), loc='upper left', ncol=1)
plt.legend(bbox_to_anchor
plt.grid()
plt.show()
plt.close()
# 绘制模拟轨迹
= plt.subplots(figsize=(3.7, 2.9))
fig, ax 2], color='black', label='median [q50]')
plt.plot(x_plot, y_plot_hat[:,
plt.fill_between(x_plot,=y_plot_hat[:,1], y2=y_plot_hat[:,3],
y1='blue', alpha=0.4, label='[p25-p75]')
facecolor
plt.fill_between(x_plot,=y_plot_hat[:,0], y2=y_plot_hat[:,4],
y1='blue', alpha=0.2, label='[p1-p99]')
facecolorset(xlabel='Horizon', ylabel='Y values')
ax.'NBM Probabilistic Predictions')
plt.title(=(1, 1), loc='upper left', ncol=1)
plt.legend(bbox_to_anchor
plt.grid()
plt.show() plt.close()
5. 强健化错误
这种来自强健统计的错误类型关注于对异常值和假设违反具有抵抗力的方法,提供可靠的估计和推断。强健估计量用于减少异常值的影响,提供更稳定的结果。
Huber 损失
class HuberLoss(BasePointLoss):
""" Huber Loss
The Huber loss, employed in robust regression, is a loss function that
exhibits reduced sensitivity to outliers in data when compared to the
squared error loss. This function is also refered as SmoothL1.
The Huber loss function is quadratic for small errors and linear for large
errors, with equal values and slopes of the different sections at the two
points where $(y_{\\tau}-\hat{y}_{\\tau})^{2}$=$|y_{\\tau}-\hat{y}_{\\tau}|$.
$$ L_{\delta}(y_{\\tau},\; \hat{y}_{\\tau})
=\\begin{cases}{\\frac{1}{2}}(y_{\\tau}-\hat{y}_{\\tau})^{2}\;{\\text{for }}|y_{\\tau}-\hat{y}_{\\tau}|\leq \delta \\\
\\delta \ \cdot \left(|y_{\\tau}-\hat{y}_{\\tau}|-{\\frac {1}{2}}\delta \\right),\;{\\text{otherwise.}}\end{cases}$$
where $\\delta$ is a threshold parameter that determines the point at which the loss transitions from quadratic to linear,
and can be tuned to control the trade-off between robustness and accuracy in the predictions.
**Parameters:**<br>
`delta`: float=1.0, Specifies the threshold at which to change between delta-scaled L1 and L2 loss.
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
**References:**<br>
[Huber Peter, J (1964). "Robust Estimation of a Location Parameter". Annals of Statistics](https://projecteuclid.org/journals/annals-of-mathematical-statistics/volume-35/issue-1/Robust-Estimation-of-a-Location-Parameter/10.1214/aoms/1177703732.full)
"""
def __init__(self, delta: float=1., horizon_weight=None):
super(HuberLoss, self).__init__(horizon_weight=horizon_weight,
=1,
outputsize_multiplier=[''])
output_namesself.delta = delta
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies date stamps per serie to consider in loss.<br>
**Returns:**<br>
`huber_loss`: tensor (single value).
"""
= F.huber_loss(y, y_hat, reduction='none', delta=self.delta)
losses = self._compute_weights(y=y, mask=mask)
weights return _weighted_mean(losses=losses, weights=weights)
='HuberLoss.__init__', title_level=3) show_doc(HuberLoss, name
__call__, name='HuberLoss.__call__', title_level=3) show_doc(HuberLoss.
图基损失
class TukeyLoss(torch.nn.Module):
""" Tukey Loss
The Tukey loss function, also known as Tukey's biweight function, is a
robust statistical loss function used in robust statistics. Tukey's loss exhibits
quadratic behavior near the origin, like the Huber loss; however, it is even more
robust to outliers as the loss for large residuals remains constant instead of
scaling linearly.
The parameter $c$ in Tukey's loss determines the ''saturation'' point
of the function: Higher values of $c$ enhance sensitivity, while lower values
increase resistance to outliers.
$$ L_{c}(y_{\\tau},\; \hat{y}_{\\tau})
=\\begin{cases}{
\\frac{c^{2}}{6}} \\left[1-(\\frac{y_{\\tau}-\hat{y}_{\\tau}}{c})^{2} \\right]^{3} \;\\text{for } |y_{\\tau}-\hat{y}_{\\tau}|\leq c \\\
\\frac{c^{2}}{6} \qquad \\text{otherwise.} \end{cases}$$
Please note that the Tukey loss function assumes the data to be stationary or
normalized beforehand. If the error values are excessively large, the algorithm
may need help to converge during optimization. It is advisable to employ small learning rates.
**Parameters:**<br>
`c`: float=4.685, Specifies the Tukey loss' threshold on which residuals are no longer considered.<br>
`normalize`: bool=True, Wether normalization is performed within Tukey loss' computation.<br>
**References:**<br>
[Beaton, A. E., and Tukey, J. W. (1974). "The Fitting of Power Series, Meaning Polynomials, Illustrated on Band-Spectroscopic Data."](https://www.jstor.org/stable/1267936)
"""
def __init__(self, c: float=4.685, normalize: bool=True):
super(TukeyLoss, self).__init__()
self.outputsize_multiplier = 1
self.c = c
self.normalize = normalize
self.output_names = ['']
self.is_distribution_output = False
def domain_map(self, y_hat: torch.Tensor):
"""
单变量损失在维度 [B,T,H] 或 [B,H] 上进行操作
这使得网络的输出从 [B,H,1] 变为 [B,H]
"""
return y_hat.squeeze(-1)
def masked_mean(self, x, mask, dim):
= x.masked_fill(mask < 1, float("nan"))
x_nan = x_nan.nanmean(dim=dim, keepdim=True)
x_mean = torch.nan_to_num(x_mean, nan=0.0)
x_mean return x_mean
def __call__(self, y: torch.Tensor, y_hat: torch.Tensor,
None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies date stamps per serie to consider in loss.<br>
**Returns:**<br>
`tukey_loss`: tensor (single value).
"""
if mask is None:
= torch.ones_like(y_hat)
mask
# 我们对Tukey损失进行了归一化处理,以满足4.685的正态异常值界限。
if self.normalize:
= self.masked_mean(x=y, mask=mask, dim=-1)
y_mean = torch.sqrt(self.masked_mean(x=(y - y_mean) ** 2, mask=mask, dim=-1)) + 1e-2
y_std else:
= 1.
y_std = torch.abs(y - y_hat) / y_std
delta_y
= torch.greater_equal(self.c * torch.ones_like(delta_y), delta_y)
tukey_mask = tukey_mask * mask * (1-(delta_y/(self.c))**2)**3 + (1-(tukey_mask * 1))
tukey_loss = (self.c**2 / 6) * torch.mean(tukey_loss)
tukey_loss return tukey_loss
='TukeyLoss.__init__', title_level=3) show_doc(TukeyLoss, name
__call__, name='TukeyLoss.__call__', title_level=3) show_doc(TukeyLoss.
Huber化的分位数损失
class HuberQLoss(BasePointLoss):
""" Huberized Quantile Loss
The Huberized quantile loss is a modified version of the quantile loss function that
combines the advantages of the quantile loss and the Huber loss. It is commonly used
in regression tasks, especially when dealing with data that contains outliers or heavy tails.
The Huberized quantile loss between `y` and `y_hat` measure the Huber Loss in a non-symmetric way.
The loss pays more attention to under/over-estimation depending on the quantile parameter $q$;
and controls the trade-off between robustness and accuracy in the predictions with the parameter $delta$.
$$ \mathrm{HuberQL}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}^{(q)}_{\\tau}) =
(1-q)\, L_{\delta}(y_{\\tau},\; \hat{y}^{(q)}_{\\tau}) \mathbb{1}\{ \hat{y}^{(q)}_{\\tau} \geq y_{\\tau} \} +
q\, L_{\delta}(y_{\\tau},\; \hat{y}^{(q)}_{\\tau}) \mathbb{1}\{ \hat{y}^{(q)}_{\\tau} < y_{\\tau} \} $$
**Parameters:**<br>
`delta`: float=1.0, Specifies the threshold at which to change between delta-scaled L1 and L2 loss.<br>
`q`: float, between 0 and 1. The slope of the quantile loss, in the context of quantile regression, the q determines the conditional quantile level.<br>
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
**References:**<br>
[Huber Peter, J (1964). "Robust Estimation of a Location Parameter". Annals of Statistics](https://projecteuclid.org/journals/annals-of-mathematical-statistics/volume-35/issue-1/Robust-Estimation-of-a-Location-Parameter/10.1214/aoms/1177703732.full)<br>
[Roger Koenker and Gilbert Bassett, Jr., "Regression Quantiles".](https://www.jstor.org/stable/1913643)
"""
def __init__(self, q, delta: float=1., horizon_weight=None):
super(HuberQLoss, self).__init__(horizon_weight=horizon_weight,
=1,
outputsize_multiplier=[f'_q{q}_d{delta}'])
output_namesself.q = q
self.delta = delta
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies datapoints to consider in loss.<br>
**Returns:**<br>
`huber_qloss`: tensor (single value).
"""
= y_hat - y
error = torch.zeros_like(error)
zero_error = torch.maximum(-error, zero_error)
sq = torch.maximum(error, zero_error)
s1_q = self.q * F.huber_loss(sq, zero_error,
losses ='none', delta=self.delta) + \
reduction1 - self.q) * F.huber_loss(s1_q, zero_error,
(='none', delta=self.delta)
reduction
= self._compute_weights(y=y, mask=mask)
weights return _weighted_mean(losses=losses, weights=weights)
='HuberQLoss.__init__', title_level=3) show_doc(HuberQLoss, name
__call__, name='HuberQLoss.__call__', title_level=3) show_doc(HuberQLoss.
Huber化的MQLoss
class HuberMQLoss(BasePointLoss):
""" Huberized Multi-Quantile loss
The Huberized Multi-Quantile loss (HuberMQL) is a modified version of the multi-quantile loss function
that combines the advantages of the quantile loss and the Huber loss. HuberMQL is commonly used in regression
tasks, especially when dealing with data that contains outliers or heavy tails. The loss function pays
more attention to under/over-estimation depending on the quantile list $[q_{1},q_{2},\dots]$ parameter.
It controls the trade-off between robustness and prediction accuracy with the parameter $\\delta$.
$$ \mathrm{HuberMQL}_{\delta}(\\mathbf{y}_{\\tau},[\\mathbf{\hat{y}}^{(q_{1})}_{\\tau}, ... ,\hat{y}^{(q_{n})}_{\\tau}]) =
\\frac{1}{n} \\sum_{q_{i}} \mathrm{HuberQL}_{\\delta}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}^{(q_{i})}_{\\tau}) $$
**Parameters:**<br>
`level`: int list [0,100]. Probability levels for prediction intervals (Defaults median).
`quantiles`: float list [0., 1.]. Alternative to level, quantiles to estimate from y distribution.
`delta`: float=1.0, Specifies the threshold at which to change between delta-scaled L1 and L2 loss.<br>
`horizon_weight`: Tensor of size h, weight for each timestamp of the forecasting window. <br>
**References:**<br>
[Huber Peter, J (1964). "Robust Estimation of a Location Parameter". Annals of Statistics](https://projecteuclid.org/journals/annals-of-mathematical-statistics/volume-35/issue-1/Robust-Estimation-of-a-Location-Parameter/10.1214/aoms/1177703732.full)<br>
[Roger Koenker and Gilbert Bassett, Jr., "Regression Quantiles".](https://www.jstor.org/stable/1913643)
"""
def __init__(self, level=[80, 90], quantiles=None, delta: float=1.0, horizon_weight=None):
= level_to_outputs(level)
qs, output_names = torch.Tensor(qs)
qs # 将分位数转换为同质输出名称
if quantiles is not None:
= quantiles_to_outputs(quantiles)
_, output_names = torch.Tensor(quantiles)
qs
super(HuberMQLoss, self).__init__(horizon_weight=horizon_weight,
=len(qs),
outputsize_multiplier=output_names)
output_names
self.quantiles = torch.nn.Parameter(qs, requires_grad=False)
self.delta = delta
def domain_map(self, y_hat: torch.Tensor):
"""
身份域映射 [B,T,H,Q]/[B,H,Q]
"""
return y_hat
def _compute_weights(self, y, mask):
"""
Compute final weights for each datapoint (based on all weights and all masks)
Set horizon_weight to a ones[H] tensor if not set.
If set, check that it has the same length as the horizon in x.
"""
if mask is None:
= torch.ones_like(y, device=y.device)
mask else:
= mask.unsqueeze(1) # 添加Q维度。
mask
if self.horizon_weight is None:
self.horizon_weight = torch.ones(mask.shape[-1])
else:
assert mask.shape[-1] == len(self.horizon_weight), \
'horizon_weight must have same length as Y'
= self.horizon_weight.clone()
weights = torch.ones_like(mask, device=mask.device) * weights.to(mask.device)
weights return weights * mask
def __call__(self,
y: torch.Tensor,
y_hat: torch.Tensor,None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies date stamps per serie to consider in loss.<br>
**Returns:**<br>
`hmqloss`: tensor (single value).
"""
= y_hat - y.unsqueeze(-1)
error = torch.zeros_like(error)
zero_error = torch.maximum(-error, torch.zeros_like(error))
sq = torch.maximum(error, torch.zeros_like(error))
s1_q = F.huber_loss(self.quantiles * sq, zero_error,
losses ='none', delta=self.delta) + \
reduction1 - self.quantiles) * s1_q, zero_error,
F.huber_loss((='none', delta=self.delta)
reduction= (1/len(self.quantiles)) * losses
losses
if y_hat.ndim == 3: # 基础窗口
= losses.swapaxes(-2,-1) # [B,H,Q] -> [B,Q,H](用于水平加权,H置于末尾)
losses elif y_hat.ndim == 4: # 基础循环
= losses.swapaxes(-2,-1)
losses = losses.swapaxes(-2,-3) # [B,seq_len,H,Q] -> [B,Q,seq_len,H] (用于水平加权,H置于末尾)
losses
= self._compute_weights(y=losses, mask=mask) # 利用损失增加暗度
weights # 注意:权重没有Q维度。
return _weighted_mean(losses=losses, weights=weights)
='HuberMQLoss.__init__', title_level=3) show_doc(HuberMQLoss, name
__call__, name='HuberMQLoss.__call__', title_level=3) show_doc(HuberMQLoss.
6. 其他
准确性
class Accuracy(torch.nn.Module):
""" Accuracy
Computes the accuracy between categorical `y` and `y_hat`.
This evaluation metric is only meant for evalution, as it
is not differentiable.
$$ \mathrm{Accuracy}(\\mathbf{y}_{\\tau}, \\mathbf{\hat{y}}_{\\tau}) = \\frac{1}{H} \\sum^{t+H}_{\\tau=t+1} \mathrm{1}\{\\mathbf{y}_{\\tau}==\\mathbf{\hat{y}}_{\\tau}\} $$
"""
def __init__(self,):
super(Accuracy, self).__init__()
self.is_distribution_output = False
def domain_map(self, y_hat: torch.Tensor):
"""
单变量损失在维度 [B,T,H]/[B,H] 上进行操作
这使得网络的输出从 [B,H,1] 变为 [B,H]
"""
return y_hat.squeeze(-1)
def __call__(self, y: torch.Tensor, y_hat: torch.Tensor,
None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies date stamps per serie to consider in loss.<br>
**Returns:**<br>
`accuracy`: tensor (single value).
"""
if mask is None:
= torch.ones_like(y_hat)
mask
= (y.unsqueeze(-1) == y_hat) * mask.unsqueeze(-1)
measure = torch.mean(measure)
accuracy return accuracy
='Accuracy.__init__', title_level=3) show_doc(Accuracy, name
__call__, name='Accuracy.__call__', title_level=3) show_doc(Accuracy.
缩放的连续排名概率评分 (sCRPS)
class sCRPS(torch.nn.Module):
"""Scaled Continues Ranked Probability Score
Calculates a scaled variation of the CRPS, as proposed by Rangapuram (2021),
to measure the accuracy of predicted quantiles `y_hat` compared to the observation `y`.
This metric averages percentual weighted absolute deviations as
defined by the quantile losses.
$$ \mathrm{sCRPS}(\\mathbf{\hat{y}}^{(q)}_{\\tau}, \mathbf{y}_{\\tau}) = \\frac{2}{N} \sum_{i}
\int^{1}_{0}
\\frac{\mathrm{QL}(\\mathbf{\hat{y}}^{(q}_{\\tau} y_{i,\\tau})_{q}}{\sum_{i} | y_{i,\\tau} |} dq $$
where $\\mathbf{\hat{y}}^{(q}_{\\tau}$ is the estimated quantile, and $y_{i,\\tau}$
are the target variable realizations.
**Parameters:**<br>
`level`: int list [0,100]. Probability levels for prediction intervals (Defaults median).
`quantiles`: float list [0., 1.]. Alternative to level, quantiles to estimate from y distribution.
**References:**<br>
- [Gneiting, Tilmann. (2011). \"Quantiles as optimal point forecasts\".
International Journal of Forecasting.](https://www.sciencedirect.com/science/article/pii/S0169207010000063)<br>
- [Spyros Makridakis, Evangelos Spiliotis, Vassilios Assimakopoulos, Zhi Chen, Anil Gaba, Ilia Tsetlin, Robert L. Winkler. (2022).
\"The M5 uncertainty competition: Results, findings and conclusions\".
International Journal of Forecasting.](https://www.sciencedirect.com/science/article/pii/S0169207021001722)<br>
- [Syama Sundar Rangapuram, Lucien D Werner, Konstantinos Benidis, Pedro Mercado, Jan Gasthaus, Tim Januschowski. (2021).
\"End-to-End Learning of Coherent Probabilistic Forecasts for Hierarchical Time Series\".
Proceedings of the 38th International Conference on Machine Learning (ICML).](https://proceedings.mlr.press/v139/rangapuram21a.html)
"""
def __init__(self, level=[80, 90], quantiles=None):
super(sCRPS, self).__init__()
self.mql = MQLoss(level=level, quantiles=quantiles)
self.is_distribution_output = False
def __call__(self, y: torch.Tensor, y_hat: torch.Tensor,
None] = None):
mask: Union[torch.Tensor, """
**Parameters:**<br>
`y`: tensor, Actual values.<br>
`y_hat`: tensor, Predicted values.<br>
`mask`: tensor, Specifies date stamps per series to consider in loss.<br>
**Returns:**<br>
`scrps`: tensor (single value).
"""
= self.mql(y=y, y_hat=y_hat, mask=mask)
mql = torch.sum(torch.abs(y))
norm = torch.sum(mask)
unmean = 2 * mql * unmean / (norm + 1e-5)
scrps return scrps
='sCRPS.__init__', title_level=3) show_doc(sCRPS, name
__call__, name='sCRPS.__call__', title_level=3) show_doc(sCRPS.
# 每个1代表一个错误,共有6个数据点。
= torch.Tensor([[0,0,0],[0,0,0]])
y = torch.Tensor([[0,0,1],[1,0,1]])
y_hat
# 完整掩码和地平线权重
= torch.Tensor([[1,1,1],[1,1,1]])
mask = torch.Tensor([1,1,1])
horizon_weight
= MAE(horizon_weight=horizon_weight)
mae = mae(y=y, y_hat=y_hat, mask=mask)
loss assert loss==(3/6), 'Should be 3/6'
# 不完整掩码与完整地平线权重
= torch.Tensor([[1,1,1],[0,1,1]]) # 仅1个错误和点被屏蔽。
mask = torch.Tensor([1,1,1])
horizon_weight = MAE(horizon_weight=horizon_weight)
mae = mae(y=y, y_hat=y_hat, mask=mask)
loss assert loss==(2/5), 'Should be 2/5'
# 完整面具与不完全地平线_权重
= torch.Tensor([[1,1,1],[1,1,1]])
mask = torch.Tensor([1,1,0]) # 2个错误和点被屏蔽。
horizon_weight = MAE(horizon_weight=horizon_weight)
mae = mae(y=y, y_hat=y_hat, mask=mask)
loss assert loss==(1/4), 'Should be 1/4'
# 不完整的掩码和不完整的地平线权重
= torch.Tensor([[0,1,1],[1,1,1]])
mask = torch.Tensor([1,1,0]) # 2个错误被掩盖,扣3分。
horizon_weight = MAE(horizon_weight=horizon_weight)
mae = mae(y=y, y_hat=y_hat, mask=mask)
loss assert loss==(1/3), 'Should be 1/3'
Give us a ⭐ on Github