%load_ext autoreload
%autoreload 2模型
StatsForecast 当前支持的模型
StatsForecast 提供了多种模型,分为以下几类:
自动预测: 自动预测工具搜索最佳参数并为一系列时间序列选择最佳模型。这些工具对于大量单变量时间序列非常有用。包括 Arima、ETS、Theta、CES 的自动版本。
指数平滑: 使用所有过去观察值的加权平均,其中权重随时间呈指数递减。适合具有明确趋势和/或季节性的数据显示。对于没有明确趋势或季节性的数据显示,使用
SimpleExponential家族。示例:SES、霍尔特-温特斯、SSO。基准模型: 用于建立基准的经典模型。示例:均值、天真模型、随机游走。
间歇性或稀疏模型: 适用于非零观察值极少的时间序列。示例:CROSTON、ADIDA、IMAPA。
多重季节性: 适用于具有多种明显季节性的信号。对低频数据(如电力和日志)很有用。示例:MSTL 和 TBATS。
Theta 模型: 将两条 theta 线拟合到去季节化时间序列,使用不同技术获得并组合这两条 theta 线以产生最终预测。示例:Theta、DynamicTheta。
GARCH 模型: 适用于建模随时间变化而波动性不恒定的时间序列。通常用于金融领域建模股票价格、汇率、利率及其他金融工具。ARCH 模型是 GARCH 的一种特例。
import warnings
from math import trunc
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
from numba import njit
from scipy.optimize import minimize
from scipy.special import inv_boxcox
from statsforecast.arima import (
Arima,
auto_arima_f,
fitted_arima,
forecast_arima,
forward_arima,
is_constant,
)
from statsforecast.ces import (
auto_ces, forecast_ces,
forward_ces
)
from statsforecast.ets import (
_PHI_LOWER,
_PHI_UPPER,
ets_f, forecast_ets,
forward_ets,
)
from statsforecast.mfles import MFLES as _MFLES
from statsforecast.mstl import mstl
from statsforecast.theta import (
auto_theta, forecast_theta,
forward_theta
)
from statsforecast.garch import (
garch_model, garch_forecast
)
from statsforecast.tbats import tbats_selection, tbats_forecast, _compute_sigmah
from statsforecast.utils import (
_calculate_sigma,
_calculate_intervals,
_ensure_float,
_naive,
_old_kw_to_pos,
_quantiles,
_repeat_val,
_repeat_val_seas,
_seasonal_naive,
CACHE,
ConformalIntervals,
NOGIL,
)from datetime import date, timedelta
import matplotlib.pyplot as plt
import pandas as pd
from fastcore.test import test_eq, test_close, test_fail
from nbdev.showdoc import add_docs, show_doc
from statsforecast.garch import generate_garch_data
from statsforecast.utils import AirPassengers as apdef _plot_insample_pi(fcst):
fig, ax = plt.subplots(1, 1, figsize = (20,7))
sdate = date(1949,1,1) # 开始日期
edate = date(1961,1,1) # 结束日期
dates = pd.date_range(sdate,edate-timedelta(days=1),freq='m')
df = pd.DataFrame({'dates': dates,
'actual': ap,
'fitted': fcst['fitted'],
'fitted_lo_80': fcst['fitted-lo-80'],
'fitted_lo_95': fcst['fitted-lo-95'],
'fitted_hi_80': fcst['fitted-hi-80'],
'fitted_hi_95': fcst['fitted-hi-95']})
plt.plot(df.dates, df.actual, color='firebrick', label='Actual value', linewidth=3)
plt.plot(df.dates, df.fitted, color='navy', label='Fitted values', linewidth=3)
plt.plot(df.dates, df.fitted_lo_80, color='darkorange', label='fitted-lo-80', linewidth=3)
plt.plot(df.dates, df.fitted_lo_95, color='deepskyblue', label='fitted-lo-95', linewidth=3)
plt.plot(df.dates, df.fitted_hi_80, color='darkorange', label='fitted-hi-80', linewidth=3)
plt.plot(df.dates, df.fitted_hi_95, color='deepskyblue', label='fitted-hi-95', linewidth=3)
plt.fill_between(df.dates, df.fitted_lo_95, df.fitted_hi_95, color = 'deepskyblue', alpha = 0.2)
plt.fill_between(df.dates, df.fitted_lo_80, df.fitted_hi_80, color = 'darkorange', alpha = 0.3)
plt.legend()
def _plot_fcst(fcst):
fig, ax = plt.subplots(1, 1, figsize = (20,7))
plt.plot(np.arange(0, len(ap)), ap)
plt.plot(np.arange(len(ap), len(ap) + 13), fcst['mean'], label='mean')
plt.plot(np.arange(len(ap), len(ap) + 13), fcst['lo-95'], color = 'r', label='lo-95')
plt.plot(np.arange(len(ap), len(ap) + 13), fcst['hi-95'], color = 'r', label='hi-95')
plt.plot(np.arange(len(ap), len(ap) + 13), fcst['lo-80'], color = 'g', label='lo-80')
plt.plot(np.arange(len(ap), len(ap) + 13), fcst['hi-80'], color = 'g', label='hi-80')
plt.legend()def _add_fitted_pi(res, se, level):
level = sorted(level)
level = np.asarray(level)
quantiles = _quantiles(level=level)
lo = res['fitted'].reshape(-1, 1) - quantiles * se.reshape(-1, 1)
hi = res['fitted'].reshape(-1, 1) + quantiles * se.reshape(-1, 1)
lo = lo[:, ::-1]
lo = {f'fitted-lo-{l}': lo[:, i] for i, l in enumerate(reversed(level))}
hi = {f'fitted-hi-{l}': hi[:, i] for i, l in enumerate(level)}
res = {**res, **lo, **hi}
return resdef _add_conformal_distribution_intervals(
fcst: Dict,
cs: np.ndarray,
level: List[Union[int, float]],
) -> Dict:
r"""
Adds conformal intervals to the `fcst` dict based on conformal scores `cs`.
`level` should be already sorted. This strategy creates forecasts paths
based on errors and calculate quantiles using those paths.
"""
alphas = [100 - lv for lv in level]
cuts = [alpha / 200 for alpha in reversed(alphas)]
cuts.extend(1 - alpha / 200 for alpha in alphas)
mean = fcst['mean'].reshape(1, -1)
scores = np.vstack([mean - cs, mean + cs])
quantiles = np.quantile(
scores,
cuts,
axis=0,
)
quantiles = quantiles.reshape(len(cuts), -1)
lo_cols = [f"lo-{lv}" for lv in reversed(level)]
hi_cols = [f"hi-{lv}" for lv in level]
out_cols = lo_cols + hi_cols
for i, col in enumerate(out_cols):
fcst[col] = quantiles[i]
return fcstdef _get_conformal_method(method: str):
available_methods = {
"conformal_distribution": _add_conformal_distribution_intervals,
#"conformal_error": _add_conformal_error_intervals,
}
if method not in available_methods.keys():
raise ValueError(
f"prediction intervals method {method} not supported "
f'please choose one of {", ".join(available_methods.keys())}'
)
return available_methods[method]class _TS:
uses_exog = False
def new(self):
b = type(self).__new__(type(self))
b.__dict__.update(self.__dict__)
return b
def __repr__(self):
return self.alias
def _conformity_scores(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
) -> np.ndarray:
y = _ensure_float(y)
n_windows = self.prediction_intervals.n_windows # 类型:忽略[属性定义]
h = self.prediction_intervals.h # 类型:忽略[属性定义]
n_samples = y.size
# 尽可能多地使用窗口进行短期系列
# 训练集减去1
n_windows = min(n_windows, (n_samples - 1) // h)
if n_windows < 2:
raise ValueError(
f"Prediction intervals settings require at least {2 * h + 1:,} samples, serie has {n_samples:,}."
)
test_size = n_windows * h
cs = np.empty((n_windows, h), dtype=y.dtype)
for i_window in range(n_windows):
train_end = n_samples - test_size + i_window * h
y_train = y[:train_end]
y_test = y[train_end : train_end + h]
if X is not None:
X_train = X[:train_end]
X_test = X[train_end : train_end + h]
else:
X_train = None
X_test = None
fcst_window = self.forecast(h=h, y=y_train, X=X_train, X_future=X_test) # 类型:忽略[属性定义]
cs[i_window] = np.abs(fcst_window["mean"] - y_test)
return cs
@property
def _conformal_method(self):
return _get_conformal_method(self.prediction_intervals.method)
def _store_cs(self, y, X):
if self.prediction_intervals is not None:
self._cs = self._conformity_scores(y, X)
def _add_conformal_intervals(self, fcst, y, X, level):
if self.prediction_intervals is not None and level is not None:
cs = self._conformity_scores(y, X) if y is not None else self._cs
res = self._conformal_method(fcst=fcst, cs=cs, level=level)
return res
return fcst
def _add_predict_conformal_intervals(self, fcst, level):
return self._add_conformal_intervals(fcst=fcst, y=None, X=None, level=level)# 测试一致性得分
class ZeroModel(_TS):
def __init__(self, prediction_intervals: ConformalIntervals = None):
self.prediction_intervals = prediction_intervals
self.alias = 'SumAhead'
def forecast(self, y, h, X=None, X_future=None, fitted=False, level=None):
res = {'mean': np.zeros(h)}
if self.prediction_intervals is not None and level is not None:
cs = self._conformity_scores(y, X)
res = self._conformal_method(fcst=res, cs=cs, level=level)
return res
def fit(self, y, X):
return self
def predict(self, h, X=None, level=None):
res = {'mean': np.zeros(h)}
return resconf_intervals = ConformalIntervals(h=12, n_windows=10)
expected_cs = np.full((conf_intervals.n_windows, conf_intervals.h), np.nan)
cs_info = ap[-conf_intervals.h*conf_intervals.n_windows:]
for i in range(conf_intervals.n_windows):
expected_cs[i] = cs_info[i * conf_intervals.h:(i+1) * conf_intervals.h]
current_cs = ZeroModel(conf_intervals)._conformity_scores(ap)
test_eq(expected_cs, current_cs)
zero_model = ZeroModel(conf_intervals)
fcst_conformal = zero_model.forecast(ap, h=12, level=[80, 90])
test_eq(list(fcst_conformal.keys()), ['mean', 'lo-90', 'lo-80', 'hi-80', 'hi-90'])自动化预测
自动ARIMA
class AutoARIMA(_TS):
r"""AutoARIMA model.
Automatically selects the best ARIMA (AutoRegressive Integrated Moving Average)
model using an information criterion. Default is Akaike Information Criterion (AICc).
Notes
-----
This implementation is a mirror of Hyndman's [forecast::auto.arima](https://github.com/robjhyndman/forecast).
References
----------
[Rob J. Hyndman, Yeasmin Khandakar (2008). "Automatic Time Series Forecasting: The forecast package for R"](https://www.jstatsoft.org/article/view/v027i03).
Parameters
----------
d : Optional[int]
Order of first-differencing.
D : Optional[int]
Order of seasonal-differencing.
max_p : int
Max autorregresives p.
max_q : int
Max moving averages q.
max_P : int
Max seasonal autorregresives P.
max_Q : int
Max seasonal moving averages Q.
max_order : int
Max p+q+P+Q value if not stepwise selection.
max_d : int
Max non-seasonal differences.
max_D : int
Max seasonal differences.
start_p : int
Starting value of p in stepwise procedure.
start_q : int
Starting value of q in stepwise procedure.
start_P : int
Starting value of P in stepwise procedure.
start_Q : int
Starting value of Q in stepwise procedure.
stationary : bool
If True, restricts search to stationary models.
seasonal : bool
If False, restricts search to non-seasonal models.
ic : str
Information criterion to be used in model selection.
stepwise : bool
If True, will do stepwise selection (faster).
nmodels : int
Number of models considered in stepwise search.
trace : bool
If True, the searched ARIMA models is reported.
approximation : Optional[bool]
If True, conditional sums-of-squares estimation, final MLE.
method : Optional[str]
Fitting method between maximum likelihood or sums-of-squares.
truncate : Optional[int]
Observations truncated series used in model selection.
test : str
Unit root test to use. See `ndiffs` for details.
test_kwargs : Optional[str]
Unit root test additional arguments.
seasonal_test : str
Selection method for seasonal differences.
seasonal_test_kwargs : Optional[dict]
Seasonal unit root test arguments.
allowdrift : bool (default True)
If True, drift models terms considered.
allowmean : bool (default True)
If True, non-zero mean models considered.
blambda : Optional[float]
Box-Cox transformation parameter.
biasadj : bool
Use adjusted back-transformed mean Box-Cox.
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
uses_exog = True
def __init__(
self,
d: Optional[int] = None,
D: Optional[int] = None,
max_p: int = 5,
max_q: int = 5,
max_P: int = 2,
max_Q: int = 2,
max_order: int = 5,
max_d: int = 2,
max_D: int = 1,
start_p: int = 2,
start_q: int = 2,
start_P: int = 1,
start_Q: int = 1,
stationary: bool = False,
seasonal: bool = True,
ic: str = 'aicc',
stepwise: bool = True,
nmodels: int = 94,
trace: bool = False,
approximation: Optional[bool] = False,
method: Optional[str] = None,
truncate: Optional[bool] = None,
test: str = 'kpss',
test_kwargs: Optional[str] = None,
seasonal_test: str = 'seas',
seasonal_test_kwargs: Optional[Dict] = None,
allowdrift: bool = True,
allowmean: bool = True,
blambda: Optional[float] = None,
biasadj: bool = False,
season_length: int = 1,
alias: str = 'AutoARIMA',
prediction_intervals: Optional[ConformalIntervals] = None,
):
self.d=d
self.D=D
self.max_p=max_p
self.max_q=max_q
self.max_P=max_P
self.max_Q=max_Q
self.max_order=max_order
self.max_d=max_d
self.max_D=max_D
self.start_p=start_p
self.start_q=start_q
self.start_P=start_P
self.start_Q=start_Q
self.stationary=stationary
self.seasonal=seasonal
self.ic=ic
self.stepwise=stepwise
self.nmodels=nmodels
self.trace=trace
self.approximation=approximation
self.method=method
self.truncate=truncate
self.test=test
self.test_kwargs=test_kwargs
self.seasonal_test=seasonal_test
self.seasonal_test_kwargs=seasonal_test_kwargs
self.allowdrift=allowdrift
self.allowmean=allowmean
self.blambda=blambda
self.biasadj=biasadj
self.season_length=season_length
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the AutoARIMA model.
Fit an AutoARIMA to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
AutoARIMA fitted model.
"""
y = _ensure_float(y)
with np.errstate(invalid='ignore'):
self.model_ = auto_arima_f(
x=y,
d=self.d,
D=self.D,
max_p=self.max_p,
max_q=self.max_q,
max_P=self.max_P,
max_Q=self.max_Q,
max_order=self.max_order,
max_d=self.max_d,
max_D=self.max_D,
start_p=self.start_p,
start_q=self.start_q,
start_P=self.start_P,
start_Q=self.start_Q,
stationary=self.stationary,
seasonal=self.seasonal,
ic=self.ic,
stepwise=self.stepwise,
nmodels=self.nmodels,
trace=self.trace,
approximation=self.approximation,
method=self.method,
truncate=self.truncate,
xreg=X,
test=self.test,
test_kwargs=self.test_kwargs,
seasonal_test=self.seasonal_test,
seasonal_test_kwargs=self.seasonal_test_kwargs,
allowdrift=self.allowdrift,
allowmean=self.allowmean,
blambda=self.blambda,
biasadj=self.biasadj,
period=self.season_length
)
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted AutoArima.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
fcst = forecast_arima(self.model_, h=h, xreg=X, level=level)
mean = fcst['mean']
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
res = {
'mean': mean,
**{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
**{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
}
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted AutoArima insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
mean = fitted_arima(self.model_)
res = {'fitted': mean}
if level is not None:
se = np.sqrt(self.model_['sigma2'])
res = _add_fitted_pi(res=res, se=se, level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient AutoARIMA predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenpus of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x) optional exogenous.
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
with np.errstate(invalid='ignore'):
mod = auto_arima_f(
x=y,
d=self.d,
D=self.D,
max_p=self.max_p,
max_q=self.max_q,
max_P=self.max_P,
max_Q=self.max_Q,
max_order=self.max_order,
max_d=self.max_d,
max_D=self.max_D,
start_p=self.start_p,
start_q=self.start_q,
start_P=self.start_P,
start_Q=self.start_Q,
stationary=self.stationary,
seasonal=self.seasonal,
ic=self.ic,
stepwise=self.stepwise,
nmodels=self.nmodels,
trace=self.trace,
approximation=self.approximation,
method=self.method,
truncate=self.truncate,
xreg=X,
test=self.test,
test_kwargs=self.test_kwargs,
seasonal_test=self.seasonal_test,
seasonal_test_kwargs=self.seasonal_test_kwargs,
allowdrift=self.allowdrift,
allowmean=self.allowmean,
blambda=self.blambda,
biasadj=self.biasadj,
period=self.season_length
)
fcst = forecast_arima(mod, h, xreg=X_future, level=level)
res = {'mean': fcst['mean']}
if fitted:
res['fitted'] = fitted_arima(mod)
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
res = {
**res,
**{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
**{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
}
if fitted:
# add prediction intervals for fitted values
se = np.sqrt(mod['sigma2'])
res = _add_fitted_pi(res=res, se=se, level=level)
return res
def forward(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Apply fitted ARIMA model to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, 'model_'):
raise Exception('You have to use the `fit` method first')
y = _ensure_float(y)
with np.errstate(invalid='ignore'):
mod = forward_arima(self.model_, y=y, xreg=X, method=self.method)
fcst = forecast_arima(mod, h, xreg=X_future, level=level)
res = {'mean': fcst['mean']}
if fitted:
res['fitted'] = fitted_arima(mod)
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
res = {
**res,
**{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
**{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
}
if fitted:
# add prediction intervals for fitted values
se = np.sqrt(mod['sigma2'])
res = _add_fitted_pi(res=res, se=se, level=level)
return resdef test_class(
cls_, x, h, skip_insample=False, level=None, test_forward=False, X=None, X_future=None
):
cls_ = cls_.fit(x, X=X)
fcst_cls = cls_.predict(h=h, X=X_future)
test_eq(len(fcst_cls['mean']), h)
# 测试拟合 + 预测 = 预报
test_eq(
cls_.forecast(y=x, h=h, X=X, X_future=X_future)['mean'],
fcst_cls['mean']
)
if not skip_insample:
test_eq(len(cls_.predict_in_sample()['fitted']), len(x))
assert isinstance(cls_.predict_in_sample()['fitted'], np.ndarray)
np.testing.assert_array_equal(
cls_.forecast(y=x, h=h, X=X, X_future=X_future, fitted=True)['fitted'],
cls_.predict_in_sample()['fitted'],
)
if test_forward:
np.testing.assert_array_equal(
cls_.forward(y=x, h=h, X=X, X_future=X_future, fitted=True)['fitted'],
cls_.predict_in_sample()['fitted'],
)
if test_forward:
try:
pd.testing.assert_frame_equal(
pd.DataFrame(cls_.predict(h=h, X=X_future)),
pd.DataFrame(cls_.forward(y=x, X=X, X_future=X_future, h=h)),
)
except AssertionError:
raise Exception('predict and forward methods are not equal')
if level is not None:
fcst_cls = pd.DataFrame(cls_.predict(h=h, X=X_future, level=level))
fcst_forecast = pd.DataFrame(cls_.forecast(y=x, h=h, X=X, X_future=X_future, level=level))
try:
pd.testing.assert_frame_equal(fcst_cls, fcst_forecast)
except AssertionError:
raise Exception('predict and forecast methods are not equal with levels')
if test_forward:
try:
pd.testing.assert_frame_equal(
pd.DataFrame(cls_.predict(h=h, X=X_future, level=level)),
pd.DataFrame(cls_.forward(y=x, h=h, X=X, X_future=X_future, level=level))
)
except AssertionError:
raise Exception('predict and forward methods are not equal with levels')
if not skip_insample:
fcst_cls = pd.DataFrame(cls_.predict_in_sample(level=level))
fcst_forecast = cls_.forecast(y=x, h=h, X=X, X_future=X_future, level=level, fitted=True)
fcst_forecast = pd.DataFrame({key: val for key, val in fcst_forecast.items() if 'fitted' in key})
try:
pd.testing.assert_frame_equal(fcst_cls, fcst_forecast)
except AssertionError:
raise Exception(
'predict and forecast methods are not equal with '
'levels for fitted values '
)
if test_forward:
fcst_forward = cls_.forecast(y=x, h=h, X=X, X_future=X_future, level=level, fitted=True)
fcst_forward = pd.DataFrame({key: val for key, val in fcst_forward.items() if 'fitted' in key})
try:
pd.testing.assert_frame_equal(fcst_cls, fcst_forward)
except AssertionError:
raise Exception(
'predict and forward methods are not equal with '
'levels for fitted values '
)
def _test_fitted_sparse(model_factory):
y1 = np.array([2, 5, 0, 1, 3, 0, 1, 1, 0], dtype=np.float64)
y2 = np.array([0, 0, 1, 0, 0, 7, 1, 0, 1], dtype=np.float64)
y3 = np.array([0, 0, 1, 0, 0, 7, 1, 0, 0], dtype=np.float64)
y4 = np.zeros(9, dtype=np.float64)
for y in [y1, y2, y3, y4]:
expected_fitted = np.hstack(
[
model_factory().forecast(y=y[:i + 1], h=1)['mean']
for i in range(y.size - 1)]
)
np.testing.assert_allclose(
model_factory().forecast(y=y, h=1, fitted=True)['fitted'],
np.append(np.nan, expected_fitted),
atol=1e-6,
)arima = AutoARIMA(season_length=12)
test_class(arima, x=ap, h=12, level=[90, 80], test_forward=True)
fcst_arima = arima.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_arima)# 测试一致性预测
arima_c = AutoARIMA(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(arima_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_arima_c = arima_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_arima_c["mean"],
fcst_arima["mean"],
)_plot_fcst(fcst_arima_c)#测试别名参数
test_eq(
repr(AutoARIMA()),
'AutoARIMA'
)
test_eq(
repr(AutoARIMA(alias='AutoARIMA_seasonality')),
'AutoARIMA_seasonality'
)_plot_fcst(fcst_arima)show_doc(AutoARIMA, title_level=3)show_doc(AutoARIMA.fit, title_level=3)show_doc(AutoARIMA.predict, title_level=3)show_doc(AutoARIMA.predict_in_sample, title_level=3)show_doc(AutoARIMA.forecast, title_level=3)show_doc(AutoARIMA.forward, title_level=3)from statsforecast.models import AutoARIMA
from statsforecast.utils import AirPassengers as ap# AutoARIMA的使用示例
arima = AutoARIMA(season_length=4)
arima = arima.fit(y=ap)
y_hat_dict = arima.predict(h=4, level=[80])
y_hat_dict自动ETS
class AutoETS(_TS):
r"""Automatic Exponential Smoothing model.
Automatically selects the best ETS (Error, Trend, Seasonality)
model using an information criterion. Default is Akaike Information Criterion (AICc), while particular models are estimated using maximum likelihood.
The state-space equations can be determined based on their $M$ multiplicative, $A$ additive,
$Z$ optimized or $N$ ommited components. The `model` string parameter defines the ETS equations:
E in [$M, A, Z$], T in [$N, A, M, Z$], and S in [$N, A, M, Z$].
For example when model='ANN' (additive error, no trend, and no seasonality), ETS will
explore only a simple exponential smoothing.
If the component is selected as 'Z', it operates as a placeholder to ask the AutoETS model
to figure out the best parameter.
Notes
-----
This implementation is a mirror of Hyndman's [forecast::ets](https://github.com/robjhyndman/forecast).
References
----------
[Rob J. Hyndman, Yeasmin Khandakar (2008). "Automatic Time Series Forecasting: The forecast package for R"](https://www.jstatsoft.org/article/view/v027i03).
[Hyndman, Rob, et al (2008). "Forecasting with exponential smoothing: the state space approach"](https://robjhyndman.com/expsmooth/).
Parameters
----------
model : str
Controlling state-space-equations.
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
damped : bool
A parameter that 'dampens' the trend.
phi : float, optional (default=None)
Smoothing parameter for trend damping. Only used when `damped=True`.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals],
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
season_length: int = 1,
model: str = 'ZZZ',
damped: Optional[bool] = None,
phi: Optional[float] = None,
alias: str = 'AutoETS',
prediction_intervals: Optional[ConformalIntervals] = None,
):
self.season_length = season_length
self.model = model
self.damped = damped
if phi is not None:
if not isinstance(phi, float):
raise ValueError('phi must be `None` or float.')
if not _PHI_LOWER <= phi <= _PHI_UPPER:
raise ValueError(f'Valid range for phi is [{_PHI_LOWER}, {_PHI_UPPER}]')
self.phi = phi
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the Exponential Smoothing model.
Fit an Exponential Smoothing model to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
Exponential Smoothing fitted model.
"""
y = _ensure_float(y)
self.model_ = ets_f(y, m=self.season_length, model=self.model, damped=self.damped, phi=self.phi)
self.model_['actual_residuals'] = y - self.model_['fitted']
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None
):
r"""Predict with fitted Exponential Smoothing.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenpus of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
fcst = forecast_ets(self.model_, h=h, level=level)
res = {'mean': fcst['mean']}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
res = {
**res,
**{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
**{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
}
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted Exponential Smoothing insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
residuals = self.model_['actual_residuals']
se = _calculate_sigma(residuals, len(residuals) - self.model_['n_params'])
res = _add_fitted_pi(res=res, se=se, level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient Exponential Smoothing predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenpus of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
mod = ets_f(y, m=self.season_length, model=self.model, damped=self.damped, phi=self.phi)
fcst = forecast_ets(mod, h=h, level=level)
keys = ['mean']
if fitted:
keys.append('fitted')
res = {key: fcst[key] for key in keys}
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
res = {
**res,
**{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
**{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
}
if fitted:
# add prediction intervals for fitted values
se = _calculate_sigma(y - mod['fitted'], len(y) - mod['n_params'])
res = _add_fitted_pi(res=res, se=se, level=level)
return res
def forward(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Apply fitted Exponential Smoothing model to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenpus of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, 'model_'):
raise Exception('You have to use the `fit` method first')
y = _ensure_float(y)
mod = forward_ets(self.model_, y=y)
fcst = forecast_ets(mod, h=h, level=level)
keys = ['mean']
if fitted:
keys.append('fitted')
res = {key: fcst[key] for key in keys}
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
res = {
**res,
**{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
**{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
}
if fitted:
# add prediction intervals for fitted values
se = _calculate_sigma(y - mod['fitted'], len(y) - mod['n_params'])
res = _add_fitted_pi(res=res, se=se, level=level)
return resautoets = AutoETS(season_length=12)
test_class(autoets, x=ap, h=12, level=[90, 80], test_forward=True)
fcst_ets = autoets.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_ets)_plot_fcst(fcst_ets)#测试别名参数
test_eq(
repr(AutoETS()),
'AutoETS'
)
test_eq(
repr(AutoETS(alias='AutoETS_custom')),
'AutoETS_custom'
)autoets = AutoETS(season_length=12, model='AAA')
test_class(autoets, x=ap, h=12, level=[90, 80])
fcst_ets = autoets.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_ets)# 测试保形预测
autoets_c = AutoETS(season_length=12, model='AAA', prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(autoets_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_ets_c = autoets_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(fcst_ets_c['mean'],
fcst_ets['mean'])
_plot_fcst(fcst_ets_c)# 测试预测和拟合预测是否生成相同的结果
models = ['ANNN', 'AANN', 'ANAN', 'AAAN', 'AAND', 'AAAD', # 班级 1
'MNNN', 'MANN', 'MAND', 'MNAN', 'MAAN', 'MAAD', # 二班
'MNMN', 'MAMN', 'MAMD'] # 三班
for k in range(0,len(models)):
mod = models[k][0:3]
damped_val = models[k][-1]
if damped_val == 'N':
damped = False
else:
damped = True
ets = AutoETS(season_length=12, model=mod, damped=damped)
test_class(ets, x=ap, h=13, level=[90, 80], test_forward=True)show_doc(AutoETS, title_level=3)show_doc(AutoETS.fit, title_level=3)show_doc(AutoETS.predict, title_level=3)show_doc(AutoETS.predict_in_sample, title_level=3)show_doc(AutoETS.forecast, title_level=3)show_doc(AutoETS.forward, title_level=3)from statsforecast.models import AutoETS
from statsforecast.utils import AirPassengers as ap
# AutoETS' usage example
# Multiplicative trend, optimal error and seasonality
autoets = AutoETS(model='ZMZ', season_length=4)
autoets = autoets.fit(y=ap)
y_hat_dict = autoets.predict(h=4)
y_hat_dictclass ETS(AutoETS):
@classmethod
def _warn(cls):
warnings.warn(
'`ETS` will be deprecated in future versions of `StatsForecast`. Please use `AutoETS` instead.',
category=FutureWarning,
stacklevel=2
)
def __init__(self, season_length: int = 1, model: str = 'ZZZ',
damped: Optional[bool] = None,
phi: Optional[float] = None,
alias: str = 'ETS',
prediction_intervals: Optional[ConformalIntervals] = None):
ETS._warn()
super().__init__(
season_length=season_length,
model=model,
damped=damped,
phi=phi,
alias=alias,
prediction_intervals=prediction_intervals,
)ets = ETS(model='ZMZ', season_length=4)ets = ETS(model='ZMZ', season_length=4)
autoets = AutoETS(model='ZMZ',
season_length=4)
test_eq(ets.forecast(y=ap, h=12)['mean'],
autoets.forecast(y=ap, h=12)['mean'])#测试别名参数
test_eq(
repr(ETS()),
'ETS'
)
test_eq(
repr(ETS(alias='ETS_custom')),
'ETS_custom'
)自动化环境控制系统 (AutoCES)
class AutoCES(_TS):
r"""Complex Exponential Smoothing model.
Automatically selects the best Complex Exponential Smoothing
model using an information criterion. Default is Akaike Information Criterion (AICc), while particular
models are estimated using maximum likelihood.
The state-space equations can be determined based on their $S$ simple, $P$ parial,
$Z$ optimized or $N$ ommited components. The `model` string parameter defines the
kind of CES model: $N$ for simple CES (withous seasonality), $S$ for simple seasonality (lagged CES),
$P$ for partial seasonality (without complex part), $F$ for full seasonality (lagged CES
with real and complex seasonal parts).
If the component is selected as 'Z', it operates as a placeholder to ask the AutoCES model
to figure out the best parameter.
References
----------
[Svetunkov, Ivan & Kourentzes, Nikolaos. (2015). "Complex Exponential Smoothing". 10.13140/RG.2.1.3757.2562. ](https://onlinelibrary.wiley.com/doi/full/10.1002/nav.22074).
Parameters
----------
model : str
Controlling state-space-equations.
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
season_length: int = 1,
model: str = 'Z',
alias: str = 'CES',
prediction_intervals: Optional[ConformalIntervals] = None,
):
self.season_length = season_length
self.model = model
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the Complex Exponential Smoothing model.
Fit the Complex Exponential Smoothing model to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
Complex Exponential Smoothing fitted model.
"""
y = _ensure_float(y)
if is_constant(y):
model = Naive(alias=self.alias, prediction_intervals=self.prediction_intervals)
model.fit(y=y, X=X)
return model
self.model_ = auto_ces(y, m=self.season_length, model=self.model)
self.model_['actual_residuals'] = y - self.model_['fitted']
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None
):
r"""Predict with fitted Exponential Smoothing.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level: List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
fcst = forecast_ces(self.model_, h=h, level=level)
res = {"mean": fcst["mean"]}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
res = {
**res,
**{f"lo-{l}": fcst[f"lo-{l}"] for l in reversed(level)},
**{f"hi-{l}": fcst[f"hi-{l}"] for l in level},
}
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted Exponential Smoothing insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
residuals = self.model_['actual_residuals']
se = _calculate_sigma(residuals, self.model_['n'])
res = _add_fitted_pi(res=res, se=se, level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient Complex Exponential Smoothing predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenpus of shape (h, n_x).
level: List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
if is_constant(y):
model = Naive(alias=self.alias, prediction_intervals=self.prediction_intervals)
return model.forecast(y=y, h=h, X=X, X_future=X_future, level=level, fitted=fitted)
mod = auto_ces(y, m=self.season_length, model=self.model)
fcst = forecast_ces(mod, h, level=level)
keys = ['mean']
if fitted:
keys.append('fitted')
res = {key: fcst[key] for key in keys}
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
res = {
**res,
**{f'lo-{l}': fcst[f'lo-{l}'] for l in reversed(level)},
**{f'hi-{l}': fcst[f'hi-{l}'] for l in level},
}
if fitted:
# 为拟合值添加预测区间
se = _calculate_sigma(y - mod['fitted'], len(y))
res = _add_fitted_pi(res=res, se=se, level=level)
return res
def forward(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Apply fitted Complex Exponential Smoothing to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenpus of shape (h, n_x).
level: List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, 'model_'):
raise Exception('You have to use the `fit` method first')
y = _ensure_float(y)
mod = forward_ces(self.model_, y=y)
fcst = forecast_ces(mod, h, level=level)
keys = ['mean']
if fitted:
keys.append('fitted')
res = {key: fcst[key] for key in keys}
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
res = {
**res,
**{f'lo-{l}': fcst[f'lo-{l}'] for l in reversed(level)},
**{f'hi-{l}': fcst[f'hi-{l}'] for l in level},
}
if fitted:
# 为拟合值添加预测区间
se = _calculate_sigma(y - mod['fitted'], len(y))
res = _add_fitted_pi(res=res, se=se, level=level)
return resautoces = AutoCES(season_length=12)
fcst_ces = autoces.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_ces)# 测试保形预测
autoces_c = AutoCES(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(autoces_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_ces_c = autoces_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_ces["mean"],
fcst_ces_c["mean"]
)_plot_fcst(fcst_ces)_plot_fcst(fcst_ces_c)fit = autoces.fit(ap)
fcst = fit.predict(13, None, (80,95))
_plot_fcst(fit.predict(13, None, (80,95)))
values = ['mean', 'lo-80', 'lo-95', 'hi-80', 'hi-95']
for k in range(0, len(values)):
np.testing.assert_equal(
fcst_ces[values[k]],
fcst[values[k]]
)pi_insample = fit.predict_in_sample((80,95))
_plot_insample_pi(pi_insample)
values = ['fitted', 'fitted-lo-80', 'fitted-lo-95', 'fitted-hi-80', 'fitted-hi-95']
for k in range(0, len(values)):
np.testing.assert_equal(
fcst_ces[values[k]],
pi_insample[values[k]]
)ces = AutoCES(season_length=12)
test_class(ces, x=ap, h=12, test_forward=True, level=[90, 80])#测试别名参数
test_eq(
repr(AutoCES()),
'CES'
)
test_eq(
repr(AutoCES(alias='AutoCES_custom')),
'AutoCES_custom'
)show_doc(AutoCES, title_level=3)show_doc(AutoCES.fit, title_level=3)show_doc(AutoCES.predict, title_level=3)show_doc(AutoCES.predict_in_sample, title_level=3)show_doc(AutoCES.forecast, title_level=3)show_doc(AutoCES.forward, title_level=3)from statsforecast.models import AutoCES
from statsforecast.utils import AirPassengers as ap
# CES' usage example
# Multiplicative trend, optimal error and seasonality
ces = AutoCES(model='Z',
season_length=4)
ces = ces.fit(y=ap)
y_hat_dict = ces.predict(h=4)
y_hat_dict自动Theta
class AutoTheta(_TS):
r"""AutoTheta model.
Automatically selects the best Theta (Standard Theta Model ('STM'),
Optimized Theta Model ('OTM'), Dynamic Standard Theta Model ('DSTM'),
Dynamic Optimized Theta Model ('DOTM')) model using mse.
References
----------
[Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
decomposition_type : str
Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
model : str
Controlling Theta Model. By default searchs the best model.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
season_length: int = 1,
decomposition_type: str = 'multiplicative',
model: Optional[str] = None,
alias: str = 'AutoTheta',
prediction_intervals: Optional[ConformalIntervals] = None,
):
self.season_length = season_length
self.decomposition_type = decomposition_type
self.model = model
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the AutoTheta model.
Fit an AutoTheta model to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
AutoTheta fitted model.
"""
y = _ensure_float(y)
self.model_ = auto_theta(
y=y,
m=self.season_length,
model=self.model,
decomposition_type=self.decomposition_type,
)
self.model_['fitted'] = y - self.model_['residuals']
self._store_cs(y, X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted AutoTheta.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
fcst = forecast_theta(self.model_, h=h, level=level)
if self.prediction_intervals is not None and level is not None:
fcst = self._add_predict_conformal_intervals(fcst, level)
return fcst
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted AutoTheta insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
se = np.std(self.model_['residuals'][3:], ddof=1)
res = _add_fitted_pi(res=res, se=se, level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient AutoTheta predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
mod = auto_theta(
y=y,
m=self.season_length,
model=self.model,
decomposition_type=self.decomposition_type
)
res = forecast_theta(mod, h, level=level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
if fitted:
res['fitted'] = y - mod['residuals']
if level is not None and fitted:
# 为拟合值添加预测区间
se = np.std(mod['residuals'][3:], ddof=1)
res = _add_fitted_pi(res=res, se=se, level=level)
return res
def forward(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Apply fitted AutoTheta to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, 'model_'):
raise Exception('You have to use the `fit` method first')
y = _ensure_float(y)
mod = forward_theta(self.model_, y=y)
res = forecast_theta(mod, h, level=level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
if fitted:
res['fitted'] = y - mod['residuals']
if level is not None and fitted:
# 为拟合值添加预测区间
se = np.std(mod['residuals'][3:], ddof=1)
res = _add_fitted_pi(res=res, se=se, level=level)
return restheta = AutoTheta(season_length=12)
test_class(theta, x=ap, h=12, level=[80, 90], test_forward=True)
fcst_theta = theta.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_theta)# 测试一致性预测
theta_c = AutoTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(theta_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_theta_c = theta_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_theta_c['mean'],
fcst_theta['mean'],
)_plot_fcst(fcst_theta_c)zero_theta = theta.forward(np.zeros(10), h=12, level=[80, 90], fitted=True)#测试别名参数
test_eq(
repr(AutoTheta()),
'AutoTheta'
)
test_eq(
repr(AutoTheta(alias='AutoTheta_custom')),
'AutoTheta_custom'
)show_doc(AutoTheta, title_level=3)show_doc(AutoTheta.fit, title_level=3)show_doc(AutoTheta.predict, title_level=3)show_doc(AutoTheta.predict_in_sample, title_level=3)show_doc(AutoTheta.forecast, title_level=3)show_doc(AutoTheta.forward, title_level=3)from statsforecast.models import AutoTheta
from statsforecast.utils import AirPassengers as ap
# AutoTheta的使用示例
theta = AutoTheta(season_length=4)
theta = theta.fit(y=ap)
y_hat_dict = theta.predict(h=4)
y_hat_dictARIMA家族
自回归积分滑动平均模型 (ARIMA)
class ARIMA(_TS):
r"""ARIMA model.
AutoRegressive Integrated Moving Average model.
References
----------
[Rob J. Hyndman, Yeasmin Khandakar (2008). "Automatic Time Series Forecasting: The forecast package for R"](https://www.jstatsoft.org/article/view/v027i03).
Parameters
----------
order : tuple (default=(0, 0, 0))
A specification of the non-seasonal part of the ARIMA model: the three components (p, d, q) are the AR order, the degree of differencing, and the MA order.
season_length : int (default=1)
Number of observations per unit of time. Ex: 24 Hourly data.
seasonal_order : tuple (default=(0, 0, 0))
A specification of the seasonal part of the ARIMA model.
(P, D, Q) for the AR order, the degree of differencing, the MA order.
include_mean : bool (default=True)
Should the ARIMA model include a mean term?
The default is True for undifferenced series, False for differenced ones (where a mean would not affect the fit nor predictions).
include_drift : bool (default=False)
Should the ARIMA model include a linear drift term?
(i.e., a linear regression with ARIMA errors is fitted.)
include_constant : bool, optional (default=None)
If True, then includ_mean is set to be True for undifferenced series and include_drift is set to be True for differenced series.
Note that if there is more than one difference taken, no constant is included regardless of the value of this argument.
This is deliberate as otherwise quadratic and higher order polynomial trends would be induced.
blambda : float, optional (default=None)
Box-Cox transformation parameter.
biasadj : bool (default=False)
Use adjusted back-transformed mean Box-Cox.
method : str (default='CSS-ML')
Fitting method: maximum likelihood or minimize conditional sum-of-squares.
The default (unless there are missing values) is to use conditional-sum-of-squares to find starting values, then maximum likelihood.
fixed : dict, optional (default=None)
Dictionary containing fixed coefficients for the arima model. Example: `{'ar1': 0.5, 'ma2': 0.75}`.
For autoregressive terms use the `ar{i}` keys. For its seasonal version use `sar{i}`.
For moving average terms use the `ma{i}` keys. For its seasonal version use `sma{i}`.
For intercept and drift use the `intercept` and `drift` keys.
For exogenous variables use the `ex_{i}` keys.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
uses_exog = True
def __init__(
self,
order: Tuple[int, int, int] = (0, 0, 0),
season_length: int = 1,
seasonal_order: Tuple[int, int, int] = (0, 0, 0),
include_mean: bool = True,
include_drift: bool = False,
include_constant: Optional[bool] = None,
blambda: Optional[float] = None,
biasadj: bool = False,
method: str = 'CSS-ML',
fixed: Optional[dict] = None,
alias: str = 'ARIMA',
prediction_intervals: Optional[ConformalIntervals] = None,
):
self.order=order
self.season_length=season_length
self.seasonal_order=seasonal_order
self.include_mean=include_mean
self.include_drift=include_drift
self.include_constant=include_constant
self.blambda=blambda
self.biasadj=biasadj
self.method=method
self.fixed=fixed
self.alias=alias
self.prediction_intervals=prediction_intervals
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""
Fit the model to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
Fitted model.
"""
y = _ensure_float(y)
with np.errstate(invalid='ignore'):
self.model_ = Arima(
x=y,
order=self.order,
seasonal={'order': self.seasonal_order,
'period': self.season_length},
xreg=X,
include_mean=self.include_mean,
include_constant=self.include_constant,
include_drift=self.include_drift,
blambda=self.blambda,
biasadj=self.biasadj,
method=self.method,
fixed=self.fixed
)
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted model.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
fcst = forecast_arima(self.model_, h=h, xreg=X, level=level)
mean = fcst['mean']
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
res = {
"mean": mean,
**{f"lo-{l}": fcst["lower"][f"{l}%"] for l in reversed(level)},
**{f"hi-{l}": fcst["upper"][f"{l}%"] for l in level},
}
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
mean = fitted_arima(self.model_)
res = {'fitted': mean}
if level is not None:
se = np.sqrt(self.model_['sigma2'])
res = _add_fitted_pi(res=res, se=se, level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory efficient predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x) optional exogenous.
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
with np.errstate(invalid='ignore'):
mod = Arima(
x=y,
order=self.order,
seasonal={'order': self.seasonal_order,
'period': self.season_length},
xreg=X,
include_mean=self.include_mean,
include_constant=self.include_constant,
include_drift=self.include_drift,
blambda=self.blambda,
biasadj=self.biasadj,
method=self.method,
fixed=self.fixed
)
fcst = forecast_arima(mod, h, xreg=X_future, level=level)
res = {'mean': fcst['mean']}
if fitted:
res['fitted'] = fitted_arima(mod)
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
res = {
**res,
**{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
**{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
}
if fitted:
# 为拟合值添加预测区间
se = np.sqrt(mod['sigma2'])
res = _add_fitted_pi(res=res, se=se, level=level)
return res
def forward(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Apply fitted model to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, 'model_'):
raise Exception('You have to use the `fit` method first')
y = _ensure_float(y)
with np.errstate(invalid='ignore'):
mod = forward_arima(self.model_, y=y, xreg=X, method=self.method)
fcst = forecast_arima(mod, h, xreg=X_future, level=level)
res = {'mean': fcst['mean']}
if fitted:
res['fitted'] = fitted_arima(mod)
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
res = {
**res,
**{f'lo-{l}': fcst['lower'][f'{l}%'] for l in reversed(level)},
**{f'hi-{l}': fcst['upper'][f'{l}%'] for l in level},
}
if fitted:
# 为拟合值添加预测区间
se = np.sqrt(mod['sigma2'])
res = _add_fitted_pi(res=res, se=se, level=level)
return ressimple_arima = ARIMA(order=(1, 0, 0), season_length=12)
test_class(simple_arima, x=ap, h=12, level=[90, 80], test_forward=True)
fcst_simple_arima = simple_arima.forecast(ap, 13, None, None, (80,95), True)
_plot_insample_pi(fcst_simple_arima)# 测试一致性预测
simple_arima_c = ARIMA(order=(1, 0, 0), season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(simple_arima_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_simple_arima_c = simple_arima_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_simple_arima_c['mean'],
fcst_simple_arima['mean'],
)_plot_fcst(fcst_simple_arima_c)simple_arima = ARIMA(order=(2, 0, 0), season_length=12, fixed={'ar1': 0.5, 'ar2': 0.5})
test_class(simple_arima, x=ap, h=12, level=[90, 80], test_forward=True)
fcst_simple_arima = simple_arima.forecast(ap, 4, None, None, (80,95), True)
_plot_insample_pi(fcst_simple_arima)
test_eq(
fcst_simple_arima['mean'],
np.array([411., 421.5, 416.25, 418.875])
)#测试别名参数
test_eq(
repr(ARIMA()),
'ARIMA'
)
test_eq(
repr(ARIMA(alias='ARIMA_seasonality')),
'ARIMA_seasonality'
)show_doc(ARIMA, title_level=3)show_doc(ARIMA.fit, title_level=3)show_doc(ARIMA.predict, title_level=3)show_doc(ARIMA.predict_in_sample, title_level=3)show_doc(ARIMA.forecast, title_level=3)show_doc(ARIMA.forward, title_level=3)from statsforecast.models import ARIMA
from statsforecast.utils import AirPassengers as ap
# ARIMA的应用示例
arima = ARIMA(order=(1, 0, 0), season_length=12)
arima = arima.fit(y=ap)
y_hat_dict = arima.predict(h=4, level=[80])
y_hat_dict自回归
class AutoRegressive(ARIMA):
r"""Simple Autoregressive model.
Parameters
----------
lags : int or list
Number of lags to include in the model.
If an int is passed then all lags up to `lags` are considered.
If a list, only the elements of the list are considered as lags.
include_mean : bool (default=True)
Should the AutoRegressive model include a mean term?
The default is True for undifferenced series, False for differenced ones (where a mean would not affect the fit nor predictions).
include_drift : bool (default=False)
Should the AutoRegressive model include a linear drift term?
(i.e., a linear regression with AutoRegressive errors is fitted.)
blambda : float, optional (default=None)
Box-Cox transformation parameter.
biasadj : bool (default=False)
Use adjusted back-transformed mean Box-Cox.
method : str (default='CSS-ML')
Fitting method: maximum likelihood or minimize conditional sum-of-squares.
The default (unless there are missing values) is to use conditional-sum-of-squares to find starting values, then maximum likelihood.
fixed : dict, optional (default=None)
Dictionary containing fixed coefficients for the AutoRegressive model. Example: `{'ar1': 0.5, 'ar5': 0.75}`.
For autoregressive terms use the `ar{i}` keys.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
uses_exog = True
def __init__(
self,
lags: Tuple[int, List],
include_mean: bool = True,
include_drift: bool = False,
blambda: Optional[float] = None,
biasadj: bool = False,
method: str = 'CSS-ML',
fixed: Optional[dict] = None,
alias: str = 'AutoRegressive',
prediction_intervals: Optional[ConformalIntervals] = None,
):
if isinstance(lags, int):
order = (lags, 0, 0)
elif isinstance(lags, list):
order = (max(lags), 0, 0)
fixed_lags = {f'ar{i+1}': np.nan if (i+1) in lags else 0 for i in range(max(lags))}
if fixed is not None:
fixed_lags.update(fixed)
fixed = fixed_lags
else:
raise ValueError('Please provide an int or a list specifying the lags to use.')
super().__init__(
order=order,
include_mean=include_mean,
include_drift=include_drift,
blambda=blambda,
biasadj=biasadj,
method=method,
alias=alias,
fixed=fixed,
prediction_intervals=prediction_intervals,
)ar = AutoRegressive(lags=[12], fixed={'ar12': 0.9999999})
test_class(ar, x=ap, h=12, level=[90, 80], test_forward=True)
fcst_ar = ar.forecast(ap, 13, None, None, (80,95), True)
# 我们应该恢复季节性朴素
test_close(
fcst_ar['mean'][:-1],
ap[-12:],
eps=1e-4
)
_plot_insample_pi(fcst_simple_arima)# 测试一致性预测
ar_c = AutoRegressive(lags=[12], fixed={'ar12': 0.9999999}, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(ar_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_ar_c = ar_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_ar_c['mean'],
fcst_ar['mean'],
)_plot_fcst(fcst_ar_c)#测试别名参数
test_eq(
repr(AutoRegressive(lags=[12])),
'AutoRegressive'
)
test_eq(
repr(AutoRegressive(lags=[12], alias='AutoRegressive_lag12')),
'AutoRegressive_lag12'
)show_doc(AutoRegressive, title_level=3)show_doc(AutoRegressive.fit, title_level=3, name='AutoRegressive.fit')show_doc(AutoRegressive.predict, title_level=3, name='AutoRegressive.predict')show_doc(AutoRegressive.predict_in_sample, title_level=3, name='AutoRegressive.predict_in_sample')show_doc(AutoRegressive.forecast, title_level=3, name='AutoRegressive.forecast')show_doc(AutoRegressive.forward, title_level=3, name='AutoRegressive.forward')from statsforecast.models import AutoRegressive
from statsforecast.utils import AirPassengers as ap
# 自回归模型的应用示例
ar = AutoRegressive(lags=[12])
ar = ar.fit(y=ap)
y_hat_dict = ar.predict(h=4, level=[80])
y_hat_dict指数平滑法
简单平滑
@njit(nogil=NOGIL, cache=CACHE)
def _ses_fcst_mse(x: np.ndarray, alpha: float) -> Tuple[float, float, np.ndarray]:
r"""对一个序列执行简单指数平滑。
该函数返回一步预测值以及拟合的均方误差。
"""
smoothed = x[0]
n = x.size
mse = 0.
fitted = np.full(n, np.nan, dtype=x.dtype)
for i in range(1, n):
smoothed = (alpha * x[i - 1] + (1 - alpha) * smoothed).item()
error = x[i] - smoothed
mse += error * error
fitted[i] = smoothed
mse /= n
forecast = alpha * x[-1] + (1 - alpha) * smoothed
return forecast, mse, fitted
def _ses_mse(alpha: float, x: np.ndarray) -> float:
r"""计算简单指数平滑拟合的均方误差。"""
_, mse, _ = _ses_fcst_mse(x, alpha)
return mse
def _ses_forecast(x: np.ndarray, alpha: float) -> Tuple[float, np.ndarray]:
r"""一步超前预测,采用简单指数平滑法。"""
forecast, _, fitted = _ses_fcst_mse(x, alpha)
return forecast, fitted
def _demand(x: np.ndarray) -> np.ndarray:
r"""提取向量中的正元素。"""
return x[x > 0]
def _intervals(x: np.ndarray) -> np.ndarray:
r"""计算向量中非零元素之间的间隔。"""
nonzero_idxs = np.where(x != 0)[0]
return np.diff(nonzero_idxs + 1, prepend=0).astype(x.dtype)
def _probability(x: np.ndarray) -> np.ndarray:
r"""计算元素为非零的概率。"""
return (x != 0).astype(x.dtype)
def _optimized_ses_forecast(
x: np.ndarray,
bounds: Sequence[Tuple[float, float]] = [(0.1, 0.3)]
) -> Tuple[float, np.ndarray]:
r"""搜索最优的alpha值并计算一步超前简单指数平滑预测。"""
alpha = minimize(
fun=_ses_mse,
x0=(0,),
args=(x,),
bounds=bounds,
method='L-BFGS-B'
).x[0]
forecast, fitted = _ses_forecast(x, alpha)
return forecast, fitted
def _chunk_sums(array: np.ndarray, chunk_size: int) -> np.ndarray:
r"""将数组分割成多个块,并返回每个块的总和。
不完整的块将被丢弃。"""
n_chunks = array.size // chunk_size
n_elems = n_chunks * chunk_size
return array[:n_elems].reshape(n_chunks, chunk_size).sum(axis=1)def _ses(
y: np.ndarray, # 时间序列
h: int, # 预测时间范围
fitted: bool, # 拟合值
alpha: float, # 平滑参数
) -> Dict[str, np.ndarray]:
fcst, _, fitted_vals = _ses_fcst_mse(y, alpha)
fcst = {'mean': _repeat_val(val=fcst, h=h)}
if fitted:
fcst['fitted'] = fitted_vals
return fcstclass SimpleExponentialSmoothing(_TS):
r"""SimpleExponentialSmoothing model.
Uses a weighted average of all past observations where the weights decrease exponentially into the past.
Suitable for data with no clear trend or seasonality.
Assuming there are $t$ observations, the one-step forecast is given by: $\hat{y}_{t+1} = \alpha y_t + (1-\alpha) \hat{y}_{t-1}$
The rate $0 \leq \alpha \leq 1$ at which the weights decrease is called the smoothing parameter. When $\alpha = 1$, SES is equal to the naive method.
References
----------
[Charles C Holt (1957). “Forecasting seasonals and trends by exponentially weighted moving averages”](https://doi.org/10.1016/j.ijforecast).
Parameters
----------
alpha : float
Smoothing parameter.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
alpha: float,
alias: str = 'SES',
prediction_intervals: Optional[ConformalIntervals] = None,
):
self.alpha = alpha
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the SimpleExponentialSmoothing model.
Fit an SimpleExponentialSmoothing to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
SimpleExponentialSmoothing fitted model.
"""
y = _ensure_float(y)
mod = _ses(y=y, alpha=self.alpha, h=1, fitted=True)
self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted SimpleExponentialSmoothing.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val(val=self.model_['mean'][0], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception("You must pass `prediction_intervals` to " "compute them.")
return res
def predict_in_sample(self):
r"""Access fitted SimpleExponentialSmoothing insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
res = {'fitted': self.model_['fitted']}
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient SimpleExponentialSmoothing predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _ses(y=y, h=h, fitted=fitted, alpha=self.alpha)
res = dict(res)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to " "compute them.")
return resses = SimpleExponentialSmoothing(alpha=0.1)
test_class(ses, x=ap, h=12)
#更多测试
ses = ses.fit(ap)
fcst_ses = ses.predict(12)
test_close(fcst_ses['mean'], np.repeat(460.3028, 12), eps=1e-4)
从R中恢复这些残差
#你必须通过initial="simple"
在 `ses` 函数中
np.testing.assert_allclose(
ses.predict_in_sample()['fitted'][[0, 1, -1]],
np.array([np.nan, 118 - 6., 432 + 31.447525])
)# 测试一致性预测
ses_c = SimpleExponentialSmoothing(alpha=0.1, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(ses_c, x=ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
fcst_ses_c = ses_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_ses_c['mean'][:12],
fcst_ses['mean']
)
_plot_fcst(fcst_ses_c)#测试别名参数
test_eq(
repr(SimpleExponentialSmoothing(alpha=0.1)),
'SES'
)
test_eq(
repr(SimpleExponentialSmoothing(alpha=0.1, alias='SES_custom')),
'SES_custom'
)show_doc(SimpleExponentialSmoothing, title_level=3)show_doc(SimpleExponentialSmoothing.forecast, title_level=3)show_doc(SimpleExponentialSmoothing.fit, title_level=3)show_doc(SimpleExponentialSmoothing.predict, title_level=3)show_doc(SimpleExponentialSmoothing.predict_in_sample, title_level=3)from statsforecast.models import SimpleExponentialSmoothing
from statsforecast.utils import AirPassengers as ap
# 简单指数平滑的使用示例
ses = SimpleExponentialSmoothing(alpha=0.5)
ses = ses.fit(y=ap)
y_hat_dict = ses.predict(h=4)
y_hat_dict简单平滑优化
def _ses_optimized(
y: np.ndarray, # 时间序列
h: int, # 预测时间范围
fitted: bool, # 拟合值
):
fcst_, fitted_vals = _optimized_ses_forecast(y, [(0.01, 0.99)])
mean = _repeat_val(val=fcst_, h=h)
fcst = {'mean': mean}
if fitted:
fcst['fitted'] = fitted_vals
return fcstclass SimpleExponentialSmoothingOptimized(_TS):
r"""SimpleExponentialSmoothing model.
Uses a weighted average of all past observations where the weights decrease exponentially into the past.
Suitable for data with no clear trend or seasonality.
Assuming there are $t$ observations, the one-step forecast is given by: $\hat{y}_{t+1} = \alpha y_t + (1-\alpha) \hat{y}_{t-1}$
The smoothing parameter $\alpha^*$ is optimized by square error minimization.
References
----------
[Charles C Holt (1957). “Forecasting seasonals and trends by exponentially weighted moving averages”](https://doi.org/10.1016/j.ijforecast).
Parameters
----------
alias: str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
"""
def __init__(
self,
alias: str = "SESOpt",
prediction_intervals: Optional[ConformalIntervals] = None,
):
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the SimpleExponentialSmoothingOptimized model.
Fit an SimpleExponentialSmoothingOptimized to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
SimpleExponentialSmoothingOptimized fitted model.
"""
y = _ensure_float(y)
mod = _ses_optimized(y=y, h=1, fitted=True)
self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted SimpleExponentialSmoothingOptimized.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val(val=self.model_['mean'][0], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self):
r"""Access fitted SimpleExponentialSmoothingOptimized insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
res = {'fitted': self.model_['fitted']}
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient SimpleExponentialSmoothingOptimized predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _ses_optimized(y=y, h=h, fitted=fitted)
res = dict(res)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return resses_op = SimpleExponentialSmoothingOptimized()
test_class(ses_op, x=ap, h=12)
ses_op = ses_op.fit(ap)
fcst_ses_op = ses_op.predict(12)# 测试一致性预测
ses_op_c = SimpleExponentialSmoothingOptimized(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(ses_op_c, x=ap, h=13, level=[90, 80], skip_insample=True)
fcst_ses_op_c = ses_op_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_ses_op_c['mean'][:12],
fcst_ses_op['mean']
)
_plot_fcst(fcst_ses_op_c)#测试别名参数
test_eq(
repr(SimpleExponentialSmoothingOptimized()),
'SESOpt'
)
test_eq(
repr(SimpleExponentialSmoothingOptimized(alias='SESOpt_custom')),
'SESOpt_custom'
)show_doc(SimpleExponentialSmoothingOptimized, title_level=3)show_doc(SimpleExponentialSmoothingOptimized.fit, title_level=3)show_doc(SimpleExponentialSmoothingOptimized.predict, title_level=3)show_doc(SimpleExponentialSmoothingOptimized.predict_in_sample, title_level=3)show_doc(SimpleExponentialSmoothingOptimized.forecast, title_level=3)from statsforecast.models import SimpleExponentialSmoothingOptimized
from statsforecast.utils import AirPassengers as ap
# 简单指数平滑优化模型的应用示例
seso = SimpleExponentialSmoothingOptimized()
seso = seso.fit(y=ap)
y_hat_dict = seso.predict(h=4)
y_hat_dict季节平滑
def _seasonal_exponential_smoothing(
y: np.ndarray, # 时间序列
h: int, # 预测时间范围
fitted: bool, # 拟合值
season_length: int, # 赛季时长
alpha: float, # 平滑参数
) -> Dict[str, np.ndarray]:
n = y.size
if n < season_length:
return {'mean': np.full(h, np.nan, dtype=y.dtype)}
season_vals = np.empty(season_length, dtype=y.dtype)
fitted_vals = np.full_like(y, np.nan)
for i in range(season_length):
init_idx = (i + n % season_length)
season_vals[i], fitted_vals[init_idx::season_length] = _ses_forecast(y[init_idx::season_length], alpha)
out = _repeat_val_seas(season_vals=season_vals, h=h)
fcst = {'mean': out}
if fitted:
fcst['fitted'] = fitted_vals
return fcstclass SeasonalExponentialSmoothing(_TS):
r"""SeasonalExponentialSmoothing model.
Uses a weighted average of all past observations where the weights decrease exponentially into the past.
Suitable for data with no clear trend or seasonality.
Assuming there are $t$ observations and season $s$, the one-step forecast is given by:
$\hat{y}_{t+1,s} = \alpha y_t + (1-\alpha) \hat{y}_{t-1,s}$
Notes
-----
This method is an extremely simplified of Holt-Winter's method where the trend and level are set to zero.
And a single seasonal smoothing parameter $\alpha$ is shared across seasons.
References
----------
[Charles. C. Holt (1957). "Forecasting seasonals and trends by exponentially weighted moving averages", ONR Research Memorandum, Carnegie Institute of Technology 52.](https://www.sciencedirect.com/science/article/abs/pii/S0169207003001134).
[Peter R. Winters (1960). "Forecasting sales by exponentially weighted moving averages". Management Science](https://pubsonline.informs.org/doi/abs/10.1287/mnsc.6.3.324).
Parameters
----------
alpha : float
Smoothing parameter.
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
"""
def __init__(
self,
season_length: int,
alpha: float,
alias: str = 'SeasonalES',
prediction_intervals: Optional[ConformalIntervals] = None,
):
self.season_length = season_length
self.alpha = alpha
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the SeasonalExponentialSmoothing model.
Fit an SeasonalExponentialSmoothing to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
SeasonalExponentialSmoothing fitted model.
"""
y = _ensure_float(y)
mod = _seasonal_exponential_smoothing(
y=y,
season_length=self.season_length,
alpha=self.alpha,
fitted=True,
h=self.season_length,
)
self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted SeasonalExponentialSmoothing.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val_seas(self.model_['mean'], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self):
r"""Access fitted SeasonalExponentialSmoothing insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
res = {'fitted': self.model_['fitted']}
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient SeasonalExponentialSmoothing predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _seasonal_exponential_smoothing(
y=y, h=h, fitted=fitted, alpha=self.alpha, season_length=self.season_length
)
res = dict(res)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return resseas_es = SeasonalExponentialSmoothing(season_length=12, alpha=1.)
test_class(seas_es, x=ap, h=12)
test_eq(seas_es.predict_in_sample()['fitted'][-3:], np.array([461 - 54., 390 - 28., 432 - 27.]))
seas_es = seas_es.fit(ap)
fcst_seas_es = seas_es.predict(12)# 测试一致性预测
seas_es_c = SeasonalExponentialSmoothing(season_length=12, alpha=1., prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(seas_es_c, x=ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
fcst_seas_es_c = seas_es_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_seas_es_c['mean'][:12],
fcst_seas_es['mean']
)
_plot_fcst(fcst_seas_es_c)# 测试我们可以恢复预期的季节性
test_eq(
seas_es.forecast(ap[4:], h=12)['mean'],
seas_es.forecast(ap, h=12)['mean']
)# 测试接近季节性朴素预测
for i in range(1, 13):
test_close(
ap[i:][-12:],
seas_es.forecast(ap[i:], h=12)['mean'],
)plt.plot(np.concatenate([ap[6:], seas_es.forecast(ap[6:], h=12)['mean']]))#测试别名参数
test_eq(
repr(SeasonalExponentialSmoothing(season_length=12, alpha=1.)),
'SeasonalES'
)
test_eq(
repr(SeasonalExponentialSmoothing(season_length=12, alpha=1., alias='SeasonalES_custom')),
'SeasonalES_custom'
)show_doc(SeasonalExponentialSmoothing, title_level=3)show_doc(SeasonalExponentialSmoothing.fit, title_level=3)show_doc(SeasonalExponentialSmoothing.predict, title_level=3)show_doc(SeasonalExponentialSmoothing.predict_in_sample, title_level=3)show_doc(SeasonalExponentialSmoothing.forecast, title_level=3)from statsforecast.models import SeasonalExponentialSmoothing
from statsforecast.utils import AirPassengers as ap
# 季节性指数平滑法的使用示例
model = SeasonalExponentialSmoothing(alpha=0.5, season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict季节性平滑优化
def _seasonal_ses_optimized(
y: np.ndarray, # 时间序列
h: int, # 预测范围
fitted: bool , # 拟合值
season_length: int, # 季节长度
):
n = y.size
if n < season_length:
return {'mean': np.full(h, np.nan, dtype=y.dtype)}
season_vals = np.empty(season_length, dtype=y.dtype)
fitted_vals = np.full_like(y, np.nan)
for i in range(season_length):
init_idx = (i + n % season_length)
season_vals[i], fitted_vals[init_idx::season_length] = _optimized_ses_forecast(y[init_idx::season_length], [(0.01, 0.99)])
out = _repeat_val_seas(season_vals=season_vals, h=h)
fcst = {'mean': out}
if fitted:
fcst['fitted'] = fitted_vals
return fcstclass SeasonalExponentialSmoothingOptimized(_TS):
def __init__(
self,
season_length: int,
alias: str = 'SeasESOpt',
prediction_intervals: Optional[ConformalIntervals] = None,
):
r"""SeasonalExponentialSmoothingOptimized model.
Uses a weighted average of all past observations where the weights decrease exponentially into the past.
Suitable for data with no clear trend or seasonality.
Assuming there are $t$ observations and season $s$, the one-step forecast is given by:
$\hat{y}_{t+1,s} = \alpha y_t + (1-\alpha) \hat{y}_{t-1,s}$
The smoothing parameter $\alpha^*$ is optimized by square error minimization.
Notes
-----
This method is an extremely simplified of Holt-Winter's method where the trend and level are set to zero.
And a single seasonal smoothing parameter $\alpha$ is shared across seasons.
References
----------
[Charles. C. Holt (1957). "Forecasting seasonals and trends by exponentially weighted moving averages", ONR Research Memorandum, Carnegie Institute of Technology 52.](https://www.sciencedirect.com/science/article/abs/pii/S0169207003001134).
[Peter R. Winters (1960). "Forecasting sales by exponentially weighted moving averages". Management Science](https://pubsonline.informs.org/doi/abs/10.1287/mnsc.6.3.324).
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
"""
self.season_length = season_length
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the SeasonalExponentialSmoothingOptimized model.
Fit an SeasonalExponentialSmoothingOptimized to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
SeasonalExponentialSmoothingOptimized fitted model.
"""
y = _ensure_float(y)
mod = _seasonal_ses_optimized(
y=y,
season_length=self.season_length,
fitted=True,
h=self.season_length,
)
self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted SeasonalExponentialSmoothingOptimized.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val_seas(self.model_['mean'], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self):
r"""Access fitted SeasonalExponentialSmoothingOptimized insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
res = {'fitted': self.model_['fitted']}
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient SeasonalExponentialSmoothingOptimized predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _seasonal_ses_optimized(
y=y, h=h, fitted=fitted,
season_length=self.season_length
)
res = dict(res)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return resseas_es_opt = SeasonalExponentialSmoothingOptimized(season_length=12)
test_class(seas_es_opt, x=ap, h=12)
fcst_seas_es_opt = seas_es_opt.forecast(ap, h=12)# 测试保形预测
seas_es_opt_c = SeasonalExponentialSmoothingOptimized(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(seas_es_opt_c, x=ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
fcst_seas_es_opt_c = seas_es_opt_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_seas_es_opt_c['mean'][:12],
fcst_seas_es_opt['mean']
)
_plot_fcst(fcst_seas_es_opt_c)for i in range(1, 13):
test_close(
ap[i:][-12:],
seas_es_opt.forecast(ap[i:], h=12)['mean'],
eps=0.8
)#测试别名参数
test_eq(
repr(SeasonalExponentialSmoothingOptimized(season_length=12)),
'SeasESOpt'
)
test_eq(
repr(SeasonalExponentialSmoothingOptimized(season_length=12, alias='SeasESOpt_custom')),
'SeasESOpt_custom'
)show_doc(SeasonalExponentialSmoothingOptimized, title_level=3)show_doc(SeasonalExponentialSmoothingOptimized.forecast, title_level=3)show_doc(SeasonalExponentialSmoothingOptimized.fit, title_level=3)show_doc(SeasonalExponentialSmoothingOptimized.predict, title_level=3)show_doc(SeasonalExponentialSmoothingOptimized.predict_in_sample, title_level=3)from statsforecast.models import SeasonalExponentialSmoothingOptimized
from statsforecast.utils import AirPassengers as ap
# 季节性指数平滑优化用法示例
model = SeasonalExponentialSmoothingOptimized(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict霍尔特方法
class Holt(AutoETS):
r""" Holt's method.
Also known as double exponential smoothing, Holt's method is an extension of exponential smoothing for series with a trend.
This implementation returns the corresponding `ETS` model with additive (A) or multiplicative (M) errors (so either 'AAN' or 'MAN').
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Methods with trend"](https://otexts.com/fpp3/holt.html).
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 12 Monthly data.
error_type : str
The type of error of the ETS model. Can be additive (A) or multiplicative (M).
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
season_length: int = 1,
error_type: str = 'A',
alias: str = 'Holt',
prediction_intervals: Optional[ConformalIntervals] = None,
):
self.season_length = season_length
self.error_type = error_type
self.alias = alias
self.prediction_intervals = prediction_intervals
model = error_type + 'AN'
super().__init__(
season_length, model, alias=alias, prediction_intervals=prediction_intervals
)holt = Holt(season_length=12, error_type='A')
fcast_holt = holt.forecast(ap,12)
ets = AutoETS(season_length=12, model='AAN')
fcast_ets = ets.forecast(ap,12)
np.testing.assert_equal(
fcast_holt,
fcast_ets
)holt = Holt(season_length=12, error_type='A')
holt.fit(ap)
fcast_holt = holt.predict(12)
ets = AutoETS(season_length=12, model='AAN')
fcast_ets = ets.forecast(ap,12)
np.testing.assert_equal(
fcast_holt,
fcast_ets
)holt_c = Holt(season_length=12, error_type='A', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
fcast_holt_c = holt_c.forecast(ap, 12, level=[80, 90])
ets_c = AutoETS(season_length=12, model='AAN', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
fcast_ets_c = ets_c.forecast(ap, 12, level=[80, 90])
np.testing.assert_equal(
fcast_holt_c,
fcast_ets_c,
)#测试别名参数
test_eq(
repr(Holt()),
'Holt'
)
test_eq(
repr(Holt(alias='Holt_custom')),
'Holt_custom'
)show_doc(Holt, title_level=3)show_doc(Holt.forecast, name='Holt.forecast', title_level=3)show_doc(Holt.fit, name='Holt.fit', title_level=3)show_doc(Holt.predict, name='Holt.predict', title_level=3)show_doc(Holt.predict_in_sample, name='Holt.predict_in_sample', title_level=3)show_doc(Holt.forward, name='Holt.forward', title_level=3)from statsforecast.models import Holt
from statsforecast.utils import AirPassengers as ap
# Holt's usage example
model = Holt(season_length=12, error_type='A')
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict霍尔特-温特斯方法
class HoltWinters(AutoETS):
r""" Holt-Winters' method.
Also known as triple exponential smoothing, Holt-Winters' method is an extension of exponential smoothing for series that contain both trend and seasonality.
This implementation returns the corresponding `ETS` model with additive (A) or multiplicative (M) errors (so either 'AAA' or 'MAM').
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Methods with seasonality"](https://otexts.com/fpp3/holt-winters.html).
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 12 Monthly data.
error_type : str
The type of error of the ETS model. Can be additive (A) or multiplicative (M).
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
season_length: int = 1, # 季节长度
error_type: str = 'A', # 错误类型
alias: str = 'HoltWinters',
prediction_intervals: Optional[ConformalIntervals] = None,
):
self.season_length = season_length
self.error_type = error_type
self.alias = alias
model = error_type + 'A' + error_type
super().__init__(
season_length, model, alias=alias, prediction_intervals=prediction_intervals
)hw = HoltWinters(season_length=12, error_type='A')
fcast_hw = hw.forecast(ap,12)
ets = AutoETS(season_length=12, model='AAA')
fcast_ets = ets.forecast(ap,12)
np.testing.assert_equal(
fcast_hw,
fcast_ets
)hw_c = HoltWinters(season_length=12, error_type='A', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
fcast_hw_c = hw_c.forecast(ap, 12, level=[80, 90])
ets_c = AutoETS(season_length=12, model='AAA', prediction_intervals=ConformalIntervals(h=12, n_windows=2))
fcast_ets_c = ets_c.forecast(ap, 12, level=[80, 90])
np.testing.assert_equal(
fcast_hw_c,
fcast_ets_c,
)hw = HoltWinters(season_length=12, error_type='A')
hw.fit(ap)
fcast_hw = hw.predict(12)
ets = AutoETS(season_length=12, model='AAA')
fcast_ets = ets.forecast(ap,12)
np.testing.assert_equal(
fcast_hw,
fcast_ets
)#测试别名参数
test_eq(
repr(HoltWinters()),
'HoltWinters'
)
test_eq(
repr(HoltWinters(alias='HoltWinters_custom')),
'HoltWinters_custom'
)show_doc(HoltWinters, title_level=3)show_doc(HoltWinters.forecast, name='HoltWinters.forecast', title_level=3)show_doc(HoltWinters.fit, name='HoltWinters.fit', title_level=3) show_doc(HoltWinters.predict, name='HoltWinters.predict', title_level=3)show_doc(HoltWinters.predict_in_sample, name= 'HoltWinters.predict_in_sample', title_level=3)show_doc(HoltWinters.forward, name='HoltWinters.forward', title_level=3)from statsforecast.models import HoltWinters
from statsforecast.utils import AirPassengers as ap
# Holt-Winters' usage example
model = HoltWinters(season_length=12, error_type='A')
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict基线模型
历史平均值
def _historic_average(
y: np.ndarray, # 时间序列
h: int, # 预测时间范围
fitted: bool, # 拟合值
) -> Dict[str, np.ndarray]:
fcst = {'mean': _repeat_val(val=y.mean(), h=h)}
if fitted:
fitted_vals = _repeat_val(val=y.mean(), h=len(y))
fcst['fitted'] = fitted_vals
return fcstclass HistoricAverage(_TS):
def __init__(self, alias: str = 'HistoricAverage', prediction_intervals: Optional[ConformalIntervals] = None,):
r"""HistoricAverage model.
Also known as mean method. Uses a simple average of all past observations.
Assuming there are $t$ observations, the one-step forecast is given by:
$$\hat{y}_{t+1} = \frac{1}{t} \sum_{j=1}^t y_j$$
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).
Parameters
----------
alias: str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the HistoricAverage model.
Fit an HistoricAverage to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self
HistoricAverage fitted model.
r"""
y = _ensure_float(y)
mod = _historic_average(y, h=1, fitted=True)
mod = dict(mod)
residuals = y - mod['fitted']
mod['sigma'] = _calculate_sigma(residuals, len(residuals) - 1)
mod['n'] = len(y)
self.model_ = mod
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted HistoricAverage.
Parameters
----------
h : int
Forecast horizon.
X : numpy.array
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val(val=self.model_['mean'][0], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
sigma = self.model_['sigma']
sigmah = sigma * np.sqrt(1 + (1 / self.model_['n']))
pred_int = _calculate_intervals(res, level, h, sigmah)
res = {**res, **pred_int}
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted HistoricAverage insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
sigma = self.model_['sigma']
sigmah = sigma * np.sqrt(1 + (1 / self.model_['n']))
res = _add_fitted_pi(res, se=sigmah, level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient HistoricAverage predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : list[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
out = _historic_average(y=y, h=h, fitted=fitted or (level is not None))
res = {'mean': out['mean']}
if fitted:
res['fitted'] = out['fitted']
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
residuals = y - out["fitted"]
sigma = _calculate_sigma(residuals, len(residuals) - 1)
sigmah = sigma * np.sqrt(1 + (1 / len(y)))
pred_int = _calculate_intervals(out, level, h, sigmah)
res = {**res, **pred_int}
if fitted:
residuals = y - out["fitted"]
sigma = _calculate_sigma(residuals, len(residuals) - 1)
sigmah = sigma * np.sqrt(1 + (1 / len(y)))
res = _add_fitted_pi(res=res, se=sigmah, level=level)
return resha = HistoricAverage()
test_class(ha, x=ap, h=12, level=[80, 90])
#更多测试
ha.fit(ap)
fcst_ha = ha.predict(12)
test_close(fcst_ha['mean'], np.repeat(ap.mean(), 12), eps=1e-5)
np.testing.assert_almost_equal(
ha.predict_in_sample()['fitted'][:4],
#np.array([np.nan, 112., 115., 120.6666667]),
np.array([280.2986,280.2986,280.2986,280.2986]),
decimal=4
)ha = HistoricAverage()
fcst_ha = ha.forecast(ap,12,None,None,(80,95), True)
np.testing.assert_almost_equal(
fcst_ha['lo-80'],
np.repeat(126.0227,12),
decimal=4
)
_plot_insample_pi(fcst_ha)# 测试一致性预测
ha_c = HistoricAverage(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(ha_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_ha_c = ha_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_ha_c['mean'][:12],
fcst_ha['mean'],
)
_plot_fcst(fcst_ha_c)#测试别名参数
test_eq(
repr(HistoricAverage()),
'HistoricAverage'
)
test_eq(
repr(HistoricAverage(alias='HistoricAverage_custom')),
'HistoricAverage_custom'
)show_doc(HistoricAverage, title_level=3)show_doc(HistoricAverage.forecast, title_level=3)show_doc(HistoricAverage.fit, title_level=3)show_doc(HistoricAverage.predict, title_level=3)show_doc(HistoricAverage.predict_in_sample, title_level=3)from statsforecast.models import HistoricAverage
from statsforecast.utils import AirPassengers as ap
# HistoricAverage 的使用示例
model = HistoricAverage()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict单纯的
class Naive(_TS):
def __init__(self, alias: str = 'Naive', prediction_intervals: Optional[ConformalIntervals] = None):
r"""Naive model.
All forecasts have the value of the last observation:
$\hat{y}_{t+1} = y_t$ for all $t$
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).
Parameters
----------
alias: str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the Naive model.
Fit an Naive to a time series (numpy.array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self:
Naive fitted model.
"""
y = _ensure_float(y)
mod = _naive(y, h=1, fitted=True)
mod = dict(mod)
residuals = y - mod['fitted']
sigma = _calculate_sigma(residuals, len(residuals) - 1)
mod['sigma'] = sigma
self.model_ = mod
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int, # 预测时间范围
X: Optional[np.ndarray] = None, # 外生回归变量
level: Optional[List[int]] = None # 置信水平
):
r"""Predict with fitted Naive.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val(self.model_['mean'][0], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
steps = np.arange(1,h+1)
sigma = self.model_['sigma']
sigmah = sigma * np.sqrt(steps)
pred_int = _calculate_intervals(res, level, h, sigmah)
res = {**res, **pred_int}
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted Naive insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient Naive predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h: int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
out = _naive(y=y, h=h, fitted=fitted or (level is not None))
res = {'mean': out['mean']}
if fitted:
res['fitted'] = out['fitted']
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
steps = np.arange(1, h + 1)
residuals = y - out["fitted"]
sigma = _calculate_sigma(residuals, len(residuals) - 1)
sigmah = sigma * np.sqrt(steps)
pred_int = _calculate_intervals(out, level, h, sigmah)
res = {**res, **pred_int}
if fitted:
residuals = y - out["fitted"]
sigma = _calculate_sigma(residuals, len(residuals) - 1)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return res
def forward(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r""" Apply fitted model to an new/updated series.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h: int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = self.forecast(y=y, h=h, X=X, X_future=X_future, level=level, fitted=fitted)
return res# 测试预测区间 - 预测
naive = Naive()
naive.forecast(ap, 12)
naive.forecast(ap, 12, None, None, (80,95), True)# 测试预测区间 - 拟合与预测
naive.fit(ap)
naive.predict(12)
naive.predict(12, None, (80,95))naive = Naive()
test_class(naive, x=ap, h=12, level=[90, 80])
naive.fit(ap)
fcst_naive = naive.predict(12)
test_close(fcst_naive['mean'], np.repeat(ap[-1], 12), eps=1e-5)naive = Naive()
fcst_naive = naive.forecast(ap,12,None,None,(80,95), True)
np.testing.assert_almost_equal(
fcst_naive['lo-80'],
np.array([388.7984, 370.9037, 357.1726, 345.5967, 335.3982, 326.1781, 317.6992, 309.8073, 302.3951, 295.3845, 288.7164, 282.3452]),
decimal=4
) # 这几乎相等,因为Hyndman的预测值被四舍五入到小数点后四位。
_plot_insample_pi(fcst_naive)# 单元测试前向:=预测
naive = Naive()
fcst_naive = naive.forward(ap,12,None,None,(80,95), True)
np.testing.assert_almost_equal(
fcst_naive['lo-80'],
np.array([388.7984, 370.9037, 357.1726, 345.5967, 335.3982, 326.1781, 317.6992, 309.8073, 302.3951, 295.3845, 288.7164, 282.3452]),
decimal=4
) # 这几乎相等,因为Hyndman的预测值被四舍五入到小数点后四位。# 测试一致性预测
naive_c = Naive(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(naive_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_naive_c = naive_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_naive_c['mean'][:12],
fcst_naive['mean']
)
_plot_fcst(fcst_naive_c)#测试别名参数
test_eq(
repr(Naive()),
'Naive'
)
test_eq(
repr(Naive(alias='Naive_custom')),
'Naive_custom'
)show_doc(Naive, title_level=3)show_doc(Naive.forecast, title_level=3)show_doc(Naive.fit, title_level=3)show_doc(Naive.predict, title_level=3)show_doc(Naive.predict_in_sample, title_level=3)from statsforecast.models import Naive
from statsforecast.utils import AirPassengers as ap
# Naive 的使用示例
model = Naive()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict带漂移的随机游走
def _random_walk_with_drift(
y: np.ndarray, # 时间序列
h: int, # 预测时间范围
fitted: bool, # 拟合值
) -> Dict[str, np.ndarray]:
slope = (y[-1] - y[0]) / (y.size - 1)
mean = slope * (1 + np.arange(h, dtype=y.dtype)) + y[-1]
fcst = {'mean': mean,
'slope': np.array([slope], dtype=y.dtype),
'last_y': np.array([y[-1]], dtype=y.dtype)}
if fitted:
fitted_vals = np.full_like(y, np.nan)
fitted_vals[1:] = slope + y[:-1]
fcst['fitted'] = fitted_vals
return fcstclass RandomWalkWithDrift(_TS):
def __init__(self, alias: str = 'RWD', prediction_intervals: Optional[ConformalIntervals] = None):
r"""RandomWalkWithDrift model.
A variation of the naive method allows the forecasts to change over time.
The amout of change, called drift, is the average change seen in the historical data.
$$\hat{y}_{t+1} = y_t+\frac{1}{t-1}\sum_{j=1}^t (y_j-y_{j-1}) = y_t+ \frac{y_t-y_1}{t-1}$$
From the previous equation, we can see that this is equivalent to extrapolating a line between
the first and the last observation.
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the RandomWalkWithDrift model.
Fit an RandomWalkWithDrift to a time series (numpy array) `y`.
Parameters
----------
y: numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
RandomWalkWithDrift fitted model.
r"""
y = _ensure_float(y)
mod = _random_walk_with_drift(y, h=1, fitted=True)
mod = dict(mod)
residuals = y - mod['fitted']
sigma = _calculate_sigma(residuals, len(residuals) - 1)
mod['sigma'] = sigma
mod['n'] = len(y)
self.model_ = mod
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None
):
r"""Predict with fitted RandomWalkWithDrift.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
hrange = np.arange(h, dtype=self.model_['last_y'].dtype)
mean = self.model_['slope'] * (1 + hrange) + self.model_['last_y']
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
steps = np.arange(1, h + 1)
sigma = self.model_['sigma']
sigmah = sigma * np.sqrt(steps * (1 + steps / (self.model_['n'] - 1)))
pred_int = _calculate_intervals(res, level, h, sigmah)
res = {**res, **pred_int}
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted RandomWalkWithDrift insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient RandomWalkWithDrift predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts: dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
out = _random_walk_with_drift(y=y, h=h, fitted=fitted or (level is not None))
res = {'mean': out['mean']}
if fitted:
res['fitted'] = out['fitted']
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
steps = np.arange(1, h + 1)
residuals = y - out["fitted"]
sigma = _calculate_sigma(residuals, len(residuals) - 1)
sigmah = sigma * np.sqrt(steps * (1 + steps / (len(y) - 1)))
pred_int = _calculate_intervals(out, level, h, sigmah)
res = {**res, **pred_int}
if fitted:
residuals = y - out["fitted"]
sigma = _calculate_sigma(residuals, len(residuals) - 1)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return res # 测试预测区间 - 预测
rwd = RandomWalkWithDrift()
rwd.forecast(ap, 12)
rwd.forecast(ap, 12, None, None, (80,95), True)# 测试预测区间 - 拟合与预测
rwd = RandomWalkWithDrift()
rwd.fit(ap)
rwd.predict(12)
rwd.predict(12, None, (80,95))rwd = RandomWalkWithDrift()
test_class(rwd, x=ap, h=12, level=[90, 80])
rwd = rwd.fit(ap)
fcst_rwd = rwd.predict(12)
test_close(fcst_rwd['mean'][:2], np.array([434.2378, 436.4755]), eps=1e-4)
np.testing.assert_almost_equal(
rwd.predict_in_sample()['fitted'][:3],
np.array([np.nan, 118 - 3.7622378, 132 - 11.7622378]),
decimal=6
)# 测试一致性预测
rwd_c = RandomWalkWithDrift(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(rwd_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_rwd_c = rwd_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_rwd_c['mean'][:12],
fcst_rwd['mean']
)
_plot_fcst(fcst_rwd_c)rwd = RandomWalkWithDrift()
fcst_rwd = rwd.forecast(ap,12,None,None,(80,95), True)
np.testing.assert_almost_equal(
fcst_rwd['lo-80'],
np.array([390.9799, 375.0862, 363.2664, 353.5325, 345.1178, 337.6304, 330.8384, 324.5916, 318.7857, 313.3453, 308.2136, 303.3469]),
decimal=1
)
_plot_insample_pi(fcst_rwd)#测试别名参数
test_eq(
repr(RandomWalkWithDrift()),
'RWD'
)
test_eq(
repr(RandomWalkWithDrift(alias='RWD_custom')),
'RWD_custom'
)show_doc(RandomWalkWithDrift, title_level=3)show_doc(RandomWalkWithDrift.forecast, title_level=3)show_doc(RandomWalkWithDrift.fit, title_level=3)show_doc(RandomWalkWithDrift.predict, title_level=3)show_doc(RandomWalkWithDrift.predict_in_sample, title_level=3)from statsforecast.models import RandomWalkWithDrift
from statsforecast.utils import AirPassengers as ap
# RandomWalkWithDrift 的使用示例
model = RandomWalkWithDrift()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict季节性朴素法
class SeasonalNaive(_TS):
def __init__(self, season_length: int, alias: str = 'SeasonalNaive', prediction_intervals: Optional[ConformalIntervals] = None):
r"""Seasonal naive model.
A method similar to the naive, but uses the last known observation of the same period (e.g. the same month of the previous year) in order to capture seasonal variations.
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html#seasonal-na%C3%AFve-method).
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.season_length = season_length
self.alias = alias
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the SeasonalNaive model.
Fit an SeasonalNaive to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X: array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
SeasonalNaive fitted model.
r"""
y = _ensure_float(y)
mod = _seasonal_naive(
y=y,
season_length=self.season_length,
h=self.season_length,
fitted=True,
)
mod = dict(mod)
residuals = y - mod['fitted']
mod['sigma'] = _calculate_sigma(residuals,
len(y) - self.season_length)
self.model_ = mod
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted Naive.
Parameters
----------
h : int
Forecast horizon.
X: array-like
Optional exogenous of shape (h, n_x).
level: List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val_seas(season_vals=self.model_['mean'], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
k = np.floor((h - 1) / self.season_length)
sigma = self.model_['sigma']
sigmah = sigma * np.sqrt(k + 1)
pred_int = _calculate_intervals(res, level, h, sigmah)
res = {**res, **pred_int}
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted SeasonalNaive insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
r"""
res = {'fitted': self.model_['fitted']}
if level is not None:
level = sorted(level)
res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient SeasonalNaive predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
out = _seasonal_naive(
y=y, h=h, fitted=fitted or (level is not None),
season_length=self.season_length
)
res = {'mean': out['mean']}
if fitted:
res['fitted'] = out['fitted']
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
k = np.floor((h - 1) / self.season_length)
residuals = y - out["fitted"]
sigma = _calculate_sigma(residuals, len(y) - self.season_length)
sigmah = sigma * np.sqrt(k + 1)
pred_int = _calculate_intervals(out, level, h, sigmah)
res = {**res, **pred_int}
if fitted:
k = np.floor((h - 1) / self.season_length)
residuals = y - out["fitted"]
sigma = _calculate_sigma(residuals, len(y) - self.season_length)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return res # 测试预测区间 - 预测
seas_naive = SeasonalNaive(season_length=12)
seas_naive.forecast(ap, 12)
seas_naive.forecast(ap, 12, None, None, (80,95), True)# 测试预测区间 - 拟合与预测
seas_naive = SeasonalNaive(season_length=12)
seas_naive.fit(ap)
seas_naive.predict(12)
seas_naive.predict(12, None, (80,95))seas_naive = SeasonalNaive(season_length=12)
test_class(seas_naive, x=ap, h=12, level=[90, 80])
seas_naive = seas_naive.fit(ap)
fcst_seas_naive = seas_naive.predict(12)
test_eq(seas_naive.predict_in_sample()['fitted'][-3:], np.array([461 - 54., 390 - 28., 432 - 27.]))seas_naive = SeasonalNaive(season_length=12)
fcst_seas_naive = seas_naive.forecast(ap, 12, None, None, (80,95), True)
np.testing.assert_almost_equal(
fcst_seas_naive['lo-80'],
np.array([370.4595, 344.4595, 372.4595, 414.4595, 425.4595, 488.4595,
575.4595, 559.4595, 461.4595, 414.4595, 343.4595, 385.4595]),
decimal=4
)
_plot_insample_pi(fcst_seas_naive)# 测试保形预测
seas_naive_c = SeasonalNaive(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(seas_naive_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_seas_naive_c = seas_naive_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_seas_naive_c['mean'][:12],
fcst_seas_naive['mean']
)
_plot_fcst(fcst_seas_naive_c)#测试别名参数
test_eq(
repr(SeasonalNaive(12)),
'SeasonalNaive'
)
test_eq(
repr(SeasonalNaive(12, alias='SeasonalNaive_custom')),
'SeasonalNaive_custom'
)show_doc(SeasonalNaive, title_level=3)show_doc(SeasonalNaive.forecast, title_level=3)show_doc(SeasonalNaive.fit, title_level=3)show_doc(SeasonalNaive.predict, title_level=3)show_doc(SeasonalNaive.predict_in_sample, title_level=3)from statsforecast.models import SeasonalNaive
from statsforecast.utils import AirPassengers as ap
# 季节性朴素法的应用示例
model = SeasonalNaive(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict窗口平均值
def _window_average(
y: np.ndarray, # 时间序列
h: int, # 预测时间范围
fitted: bool, # 拟合值
window_size: int, # 窗口大小
) -> Dict[str, np.ndarray]:
if fitted:
raise NotImplementedError('return fitted')
if y.size < window_size:
return {'mean': np.full(h, np.nan, dtype=y.dtype)}
wavg = y[-window_size:].mean()
mean = _repeat_val(val=wavg, h=h)
return {'mean': mean}class WindowAverage(_TS):
def __init__(
self,
window_size: int,
alias: str = 'WindowAverage',
prediction_intervals: Optional[ConformalIntervals] = None,
):
r"""WindowAverage model.
Uses the average of the last $k$ observations, with $k$ the length of the window.
Wider windows will capture global trends, while narrow windows will reveal local trends.
The length of the window selected should take into account the importance of past
observations and how fast the series changes.
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).
Parameters
----------
window_size : int
Size of truncated series on which average is estimated.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
r"""
self.window_size = window_size
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the WindowAverage model.
Fit an WindowAverage to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
WindowAverage fitted model.
"""
y = _ensure_float(y)
mod = _window_average(y=y, h=1, window_size=self.window_size, fitted=False)
self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted WindowAverage.
Parameters
----------
h : int
Forecast horizon.
X : numpy.array
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val(self.model_['mean'][0], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self):
r"""Access fitted WindowAverage insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
raise NotImplementedError
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient WindowAverage predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _window_average(y=y, h=h, fitted=fitted, window_size=self.window_size)
res = dict(res)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return resw_avg = WindowAverage(window_size=24)
test_class(w_avg, x=ap, h=12, skip_insample=True)
w_avg = w_avg.fit(ap)
fcst_w_avg = w_avg.predict(12)
test_close(fcst_w_avg['mean'], np.repeat(ap[-24:].mean(), 12))# 测试一致性预测
w_avg_c = WindowAverage(window_size=24, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(w_avg_c, x=ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
fcst_w_avg_c = w_avg_c.forecast(ap, 13, None, None, (80,95), False)
test_eq(
fcst_w_avg_c['mean'][:12],
fcst_w_avg['mean']
)
_plot_fcst(fcst_w_avg_c)#测试别名参数
test_eq(
repr(WindowAverage(1)),
'WindowAverage'
)
test_eq(
repr(WindowAverage(1, alias='WindowAverage_custom')),
'WindowAverage_custom'
)show_doc(WindowAverage, title_level=3)show_doc(WindowAverage.forecast, title_level=3)show_doc(WindowAverage.fit, title_level=3)show_doc(WindowAverage.predict, title_level=3)from statsforecast.models import WindowAverage
from statsforecast.utils import AirPassengers as ap
# WindowAverage 的使用示例
model = WindowAverage(window_size=12*4)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict季节窗口平均值
def _seasonal_window_average(
y: np.ndarray,
h: int,
fitted: bool,
season_length: int,
window_size: int,
) -> Dict[str, np.ndarray]:
if fitted:
raise NotImplementedError('return fitted')
min_samples = season_length * window_size
if y.size < min_samples:
return {'mean': np.full(h, np.nan, dtype=y.dtype)}
season_avgs = y[-min_samples:].reshape(window_size, season_length).mean(axis=0)
out = _repeat_val_seas(season_vals=season_avgs, h=h)
return {'mean': out}class SeasonalWindowAverage(_TS):
def __init__(
self,
season_length: int,
window_size: int,
alias: str = 'SeasWA',
prediction_intervals: Optional[ConformalIntervals] = None
):
r"""SeasonalWindowAverage model.
An average of the last $k$ observations of the same period, with $k$ the length of the window.
References
----------
[Rob J. Hyndman and George Athanasopoulos (2018). "forecasting principles and practice, Simple Methods"](https://otexts.com/fpp3/simple-methods.html).
Parameters
----------
window_size : int
Size of truncated series on which average is estimated.
seasonal_length : int
Number of observations per cycle.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
r"""
self.season_length = season_length
self.window_size = window_size
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the SeasonalWindowAverage model.
Fit an SeasonalWindowAverage to a time series (numpy array) `y`
and optionally exogenous variables (numpy array) `X`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenpus of shape (t, n_x).
Returns
-------
self :
SeasonalWindowAverage fitted model.
"""
y = _ensure_float(y)
mod = _seasonal_window_average(
y=y,
h=self.season_length,
fitted=False,
season_length=self.season_length,
window_size=self.window_size,
)
self.model_ = dict(mod)
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted SeasonalWindowAverage.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val_seas(season_vals=self.model_['mean'], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self):
r"""Access fitted SeasonalWindowAverage insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
raise NotImplementedError
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient SeasonalWindowAverage predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _seasonal_window_average(
y=y, h=h, fitted=fitted,
season_length=self.season_length,
window_size=self.window_size
)
res = dict(res)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return resseas_w_avg = SeasonalWindowAverage(season_length=12, window_size=1)
test_class(seas_w_avg, x=ap, h=12, skip_insample=True)
seas_w_avg = seas_w_avg.fit(ap)
fcst_seas_w_avg = seas_w_avg.predict(12)# 测试一致性预测
seas_w_avg_c = SeasonalWindowAverage(season_length=12, window_size=1, prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(seas_w_avg_c, x=ap, h=13, level=[90, 80], test_forward=False, skip_insample=True)
fcst_seas_w_avg_c = seas_w_avg_c.forecast(ap, 13, None, None, (80,95), False)
test_eq(
fcst_seas_w_avg_c['mean'][:12],
fcst_seas_w_avg['mean']
)
fcst_seas_w_avg_c['mean'][:12]
_plot_fcst(fcst_seas_w_avg_c)#测试别名参数
test_eq(
repr(SeasonalWindowAverage(12, 1)),
'SeasWA'
)
test_eq(
repr(SeasonalWindowAverage(12, 1, alias='SeasWA_custom')),
'SeasWA_custom'
)show_doc(SeasonalWindowAverage, title_level=3)show_doc(SeasonalWindowAverage.forecast, title_level=3)show_doc(SeasonalWindowAverage.fit, title_level=3)show_doc(SeasonalWindowAverage.predict, title_level=3)from statsforecast.models import SeasonalWindowAverage
from statsforecast.utils import AirPassengers as ap
# SeasonalWindowAverage 的使用示例
model = SeasonalWindowAverage(season_length=12, window_size=4)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict稀疏或间歇性
ADIDA
def _chunk_forecast(y, aggregation_level):
lost_remainder_data = len(y) % aggregation_level
y_cut = y[lost_remainder_data:]
aggregation_sums = _chunk_sums(y_cut, aggregation_level)
sums_forecast, _ = _optimized_ses_forecast(aggregation_sums)
return sums_forecast
@njit(nogil=NOGIL, cache=CACHE)
def _expand_fitted_demand(fitted: np.ndarray, y: np.ndarray) -> np.ndarray:
out = np.empty_like(y)
out[0] = np.nan
fitted_idx = 0
for i in range(1, y.size):
if y[i - 1] > 0:
fitted_idx += 1
out[i] = fitted[fitted_idx]
elif fitted_idx > 0:
# if this entry is zero, the model didn't change
out[i] = out[i - 1]
else:
# if we haven't seen any demand, use naive
out[i] = y[i - 1]
return out
@njit(nogil=NOGIL, cache=CACHE)
def _expand_fitted_intervals(fitted: np.ndarray, y: np.ndarray) -> np.ndarray:
out = np.empty_like(y)
out[0] = np.nan
fitted_idx = 0
for i in range(1, y.size):
if y[i - 1] != 0:
fitted_idx += 1
if fitted[fitted_idx] == 0:
# 为了避免除以零
out[i] = 1
else:
out[i] = fitted[fitted_idx]
elif fitted_idx > 0:
# if this entry is zero, the model didn't change
out[i] = out[i - 1]
else:
# if we haven't seen any intervals, use 1 to avoid division by zero
out[i] = 1
return out
def _adida(
y: np.ndarray, # 时间序列
h: int, # 预测时间范围
fitted: bool, # 拟合值
):
if (y == 0).all():
res = {'mean': np.zeros(h, dtype=y.dtype)}
if fitted:
res['fitted'] = np.zeros_like(y)
res['fitted'][0] = np.nan
return res
y = _ensure_float(y)
y_intervals = _intervals(y)
mean_interval = y_intervals.mean()
aggregation_level = round(mean_interval)
sums_forecast = _chunk_forecast(y, aggregation_level)
forecast = sums_forecast / aggregation_level
res = {'mean': _repeat_val(val=forecast, h=h)}
if fitted:
warnings.warn("Computing fitted values for ADIDA is very expensive")
fitted_aggregation_levels = np.round(
y_intervals.cumsum() / np.arange(1, y_intervals.size + 1)
)
fitted_aggregation_levels = _expand_fitted_intervals(
np.append(np.nan, fitted_aggregation_levels), y
)[1:].astype(np.int32)
sums_fitted = np.empty(y.size - 1, dtype=y.dtype)
for i, agg_lvl in enumerate(fitted_aggregation_levels):
sums_fitted[i] = _chunk_forecast(y[:i+1], agg_lvl)
res['fitted'] = np.append(np.nan, sums_fitted / fitted_aggregation_levels)
return resclass ADIDA(_TS):
def __init__(self, alias: str = 'ADIDA', prediction_intervals: Optional[ConformalIntervals] = None):
r"""ADIDA model.
Aggregate-Dissagregate Intermittent Demand Approach: Uses temporal aggregation to reduce the
number of zero observations. Once the data has been agregated, it uses the optimized SES to
generate the forecasts at the new level. It then breaks down the forecast to the original
level using equal weights.
ADIDA specializes on sparse or intermittent series are series with very few non-zero observations.
They are notoriously hard to forecast, and so, different methods have been developed
especifically for them.
References
----------
[Nikolopoulos, K., Syntetos, A. A., Boylan, J. E., Petropoulos, F., & Assimakopoulos, V. (2011). An aggregate–disaggregate intermittent demand approach (ADIDA) to forecasting: an empirical proposition and analysis. Journal of the Operational Research Society, 62(3), 544-554.](https://researchportal.bath.ac.uk/en/publications/an-aggregate-disaggregate-intermittent-demand-approach-adida-to-f).
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
r"""
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the ADIDA model.
Fit an ADIDA to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
ADIDA fitted model.
"""
y = _ensure_float(y)
self.model_ = _adida(y=y, h=1, fitted=False)
self._y = y
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted ADIDA.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val(val=self.model_['mean'][0], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception(
"You have to instantiate the class with `prediction_intervals`"
"to calculate them"
)
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted ADIDA insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
fitted = _adida(y=self._y, h=1, fitted=True)['fitted']
res = {'fitted': fitted}
if level is not None:
sigma = _calculate_sigma(self._y - fitted, self._y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient ADIDA predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _adida(y=y, h=h, fitted=fitted)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception(
"You have to instantiate the class with `prediction_intervals`"
"to calculate them"
)
if fitted:
sigma = _calculate_sigma(y - res['fitted'], y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return resdeg_ts = np.zeros(10)adida = ADIDA()
test_class(adida, x=ap, h=12, skip_insample=False)
test_class(adida, x=deg_ts, h=12, skip_insample=False)
fcst_adida = adida.forecast(ap, 12)
_test_fitted_sparse(ADIDA)# 测试一致性预测
adida_c = ADIDA(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(adida_c, x=ap, h=13, level=[90, 80], skip_insample=False)
fcst_adida_c = adida_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_adida_c['mean'][:12],
fcst_adida['mean'],
)
_plot_insample_pi(fcst_adida_c)
_plot_fcst(fcst_adida_c)#测试别名参数
test_eq(
repr(ADIDA()),
'ADIDA'
)
test_eq(
repr(ADIDA(alias='ADIDA_custom')),
'ADIDA_custom'
)show_doc(ADIDA, title_level=3)show_doc(ADIDA.forecast, title_level=3)show_doc(ADIDA.fit, title_level=3)show_doc(ADIDA.predict, title_level=3)from statsforecast.models import ADIDA
from statsforecast.utils import AirPassengers as ap
# ADIDA的使用示例
model = ADIDA()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dictCroston经典
def _croston_classic(
y: np.ndarray, # 时间序列
h: int, # 预测范围
fitted: bool, # 拟合值
):
y = _ensure_float(y)
# 需求
yd = _demand(y)
if not yd.size: #无需求
return _naive(y=y, h=h, fitted=fitted)
ydp, ydf = _ses_forecast(yd, 0.1)
# 间隔
yi = _intervals(y)
yip, yif = _ses_forecast(yi, 0.1)
if yip != 0.0:
mean = ydp / yip
else:
mean = ydp
out = {'mean': _repeat_val(val=mean, h=h)}
if fitted:
ydf = _expand_fitted_demand(np.append(ydf, ydp), y)
yif = _expand_fitted_intervals(np.append(yif, yip), y)
out['fitted'] = ydf / yif
return outclass CrostonClassic(_TS):
def __init__(self, alias: str = 'CrostonClassic', prediction_intervals: Optional[ConformalIntervals] = None):
r"""CrostonClassic model.
A method to forecast time series that exhibit intermittent demand.
It decomposes the original time series into a non-zero demand size $z_t$ and
inter-demand intervals $p_t$. Then the forecast is given by:
$$\hat{y}_t = \frac{\hat{z}_t}{\hat{p}_t}$$
where $\hat{z}_t$ and $\hat{p}_t$ are forecasted using SES. The smoothing parameter
of both components is set equal to 0.1
References
----------
[Croston, J. D. (1972). Forecasting and stock control for intermittent demands. Journal of the Operational Research Society, 23(3), 289-303.](https://link.springer.com/article/10.1057/jors.1972.50)
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the CrostonClassic model.
Fit an CrostonClassic to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
CrostonClassic fitted model.
"""
y = _ensure_float(y)
self.model_ = _croston_classic(y=y, h=1, fitted=True)
self.model_['sigma'] = _calculate_sigma(y - self.model_['fitted'], y.size)
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted CrostonClassic.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val(val=self.model_['mean'][0], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception(
"You have to instantiate the class with `prediction_intervals` to calculate them"
)
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted CrostonClassic insample predictions.
Parameters
----------
level: List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient CrostonClassic predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _croston_classic(y=y, h=h, fitted=fitted)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=dict(res), y=y, X=X, level=level)
else:
raise Exception(
"You have to instantiate the class with `prediction_intervals` to calculate them"
)
if fitted:
sigma = _calculate_sigma(y - res['fitted'], y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return rescroston = CrostonClassic(prediction_intervals=ConformalIntervals(2, 1))
test_class(croston, x=ap, h=12, skip_insample=False, level=[80])
test_class(croston, x=deg_ts, h=12, skip_insample=False, level=[80])
fcst_croston = croston.forecast(ap, 12)
_test_fitted_sparse(CrostonClassic)# 测试一致性预测
croston_c = CrostonClassic(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(croston_c, x=ap, h=13, level=[90.0, 80.0], skip_insample=False)
fcst_croston_c = croston_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_croston_c['mean'][:12],
fcst_croston['mean'],
)
_plot_insample_pi(fcst_croston_c)
_plot_fcst(fcst_croston_c)#测试别名参数
test_eq(
repr(CrostonClassic()),
'CrostonClassic'
)
test_eq(
repr(CrostonClassic(alias='CrostonClassic_custom')),
'CrostonClassic_custom'
)show_doc(CrostonClassic, title_level=3)show_doc(CrostonClassic.forecast, title_level=3)show_doc(CrostonClassic.fit, title_level=3)show_doc(CrostonClassic.predict, title_level=3)from statsforecast.models import CrostonClassic
from statsforecast.utils import AirPassengers as ap
# CrostonClassic 的使用示例
model = CrostonClassic()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dictCroston优化
def _croston_optimized(
y: np.ndarray, # 时间序列
h: int, # 预测时间范围
fitted: bool, # 拟合值
):
y = _ensure_float(y)
# 需求
yd = _demand(y)
if not yd.size:
return _naive(y=y, h=h, fitted=fitted)
ydp, _ = _optimized_ses_forecast(yd)
# 间隔
yi = _intervals(y)
yip, _ = _optimized_ses_forecast(yi)
if yip != 0.0:
mean = ydp / yip
else:
mean = ydp
out = {'mean': _repeat_val(val=mean, h=h)}
if fitted:
warnings.warn("Computing fitted values for CrostonOptimized is very expensive")
ydf = np.empty(yd.size + 1, dtype=y.dtype)
ydf[0] = np.nan
for i in range(yd.size):
ydf[i + 1] = _optimized_ses_forecast(yd[:i + 1])[0]
yif = np.empty(yi.size + 1, dtype=y.dtype)
yif[0] = np.nan
for i in range(yi.size):
yiff = _optimized_ses_forecast(yi[:i + 1])[0]
if yiff == 0:
yiff = 1.0
yif[i + 1] = yiff
ydf = _expand_fitted_demand(ydf, y)
yif = _expand_fitted_intervals(yif, y)
out['fitted'] = ydf / yif
return outclass CrostonOptimized(_TS):
def __init__(self, alias: str = 'CrostonOptimized', prediction_intervals: Optional[ConformalIntervals] = None,):
r"""CrostonOptimized model.
A method to forecast time series that exhibit intermittent demand.
It decomposes the original time series into a non-zero demand size $z_t$ and
inter-demand intervals $p_t$. Then the forecast is given by:
$$\hat{y}_t = \frac{\hat{z}_t}{\hat{p}_t}$$
A variation of the classic Croston's method where the smooting paramater is optimally
selected from the range $[0.1,0.3]$. Both the non-zero demand $z_t$ and the inter-demand
intervals $p_t$ are smoothed separately, so their smoothing parameters can be different.
References
----------
[Croston, J. D. (1972). Forecasting and stock control for intermittent demands. Journal of the Operational Research Society, 23(3), 289-303.](https://link.springer.com/article/10.1057/jors.1972.50).
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
r"""
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the CrostonOptimized model.
Fit an CrostonOptimized to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
CrostonOptimized fitted model.
"""
y = _ensure_float(y)
self.model_ = _croston_optimized(y=y, h=1, fitted=False)
self._y = y
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted CrostonOptimized.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val(val=self.model_['mean'][0], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted CrostonOptimized insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
fitted = _croston_optimized(y=self._y, h=1, fitted=True)['fitted']
res = {'fitted': fitted}
if level is not None:
sigma = _calculate_sigma(self._y - fitted, self._y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient CrostonOptimized predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _croston_optimized(y=y, h=h, fitted=fitted)
res = dict(res)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
sigma = _calculate_sigma(y - res['fitted'], y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return rescroston_op = CrostonOptimized(prediction_intervals=ConformalIntervals(2, 1))
test_class(croston_op, x=ap, h=12, skip_insample=False, level=[80])
test_class(croston_op, x=deg_ts, h=12, skip_insample=False, level=[80])
fcst_croston_op = croston_op.forecast(ap, 12)
_test_fitted_sparse(CrostonOptimized)# 测试一致性预测
croston_op_c = CrostonOptimized(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(croston_op_c, x=ap, h=13, level=[90, 80], skip_insample=False)
fcst_croston_op_c = croston_op_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_croston_op_c['mean'][:12],
fcst_croston_op['mean'],
)
_plot_insample_pi(fcst_croston_op_c)
_plot_fcst(fcst_croston_op_c)#测试别名参数
test_eq(
repr(CrostonOptimized()),
'CrostonOptimized'
)
test_eq(
repr(CrostonOptimized(alias='CrostonOptimized_custom')),
'CrostonOptimized_custom'
)show_doc(CrostonOptimized, title_level=3)show_doc(CrostonOptimized.forecast, title_level=3)show_doc(CrostonOptimized.fit, title_level=3)show_doc(CrostonOptimized.predict, title_level=3)from statsforecast.models import CrostonOptimized
from statsforecast.utils import AirPassengers as ap
# CrostonOptimized 的使用示例
model = CrostonOptimized()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dictCrostonSBA
def _croston_sba(
y: np.ndarray, # 时间序列
h: int, # 预测范围
fitted: bool, # 拟合值
) -> Dict[str, np.ndarray]:
out = _croston_classic(y=y, h=h, fitted=fitted)
out['mean'] *= 0.95
if fitted:
out['fitted'] *= 0.95
return outclass CrostonSBA(_TS):
def __init__(self, alias: str = 'CrostonSBA', prediction_intervals: Optional[ConformalIntervals] = None,):
r"""CrostonSBA model.
A method to forecast time series that exhibit intermittent demand.
It decomposes the original time series into a non-zero demand size $z_t$ and
inter-demand intervals $p_t$. Then the forecast is given by:
$$\hat{y}_t = \frac{\hat{z}_t}{\hat{p}_t}$$
A variation of the classic Croston's method that uses a debiasing factor, so that the
forecast is given by:
$$\hat{y}_t = 0.95 \frac{\hat{z}_t}{\hat{p}_t}$$
References
----------
[Croston, J. D. (1972). Forecasting and stock control for intermittent demands. Journal of the Operational Research Society, 23(3), 289-303.](https://link.springer.com/article/10.1057/jors.1972.50).
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
r"""
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the CrostonSBA model.
Fit an CrostonSBA to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
CrostonSBA fitted model.
"""
y = _ensure_float(y)
self.model_ = _croston_sba(y=y, h=1, fitted=True)
self.model_['sigma'] = _calculate_sigma(y - self.model_['fitted'], y.size)
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted CrostonSBA.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val(val=self.model_['mean'][0], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception(
"You have to instantiate the class with `prediction_intervals` to calculate them"
)
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted CrostonSBA insample predictions.
Parameters
----------
level: List[float]
Confidence levels (0-100) prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient CrostonSBA predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _croston_sba(y=y, h=h, fitted=fitted)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=dict(res), y=y, X=X, level=level)
else:
raise Exception(
"You have to instantiate the class with `prediction_intervals`"
"to calculate them"
)
if fitted:
sigma = _calculate_sigma(y - res['fitted'], y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return rescroston_sba = CrostonSBA(prediction_intervals=ConformalIntervals(2, 1))
test_class(croston_sba, x=ap, h=12, skip_insample=False, level=[80])
test_class(croston_sba, x=deg_ts, h=12, skip_insample=False, level=[80])
fcst_croston_sba = croston_sba.forecast(ap, 12)
_test_fitted_sparse(CrostonSBA)# 测试保形预测
croston_sba_c = CrostonSBA(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(croston_sba_c, x=ap, h=13, level=[90, 80], skip_insample=False)
fcst_croston_sba_c = croston_sba_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_croston_sba_c['mean'][:12],
fcst_croston_sba['mean'],
)
_plot_insample_pi(fcst_croston_sba_c)
_plot_fcst(fcst_croston_sba_c)#测试别名参数
test_eq(
repr(CrostonSBA()),
'CrostonSBA'
)
test_eq(
repr(CrostonSBA(alias='CrostonSBA_custom')),
'CrostonSBA_custom'
)show_doc(CrostonSBA, title_level=3)show_doc(CrostonSBA.forecast, title_level=3)show_doc(CrostonSBA.fit, title_level=3)show_doc(CrostonSBA.predict, title_level=3)from statsforecast.models import CrostonSBA
from statsforecast.utils import AirPassengers as ap
# CrostonSBA 的使用示例
model = CrostonSBA()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dictIMAPA
def _imapa(
y: np.ndarray, # 时间序列
h: int, # 预测范围
fitted: bool, # 拟合值
):
if (y == 0).all():
res = {'mean': np.zeros(h, dtype=y.dtype)}
if fitted:
res['fitted'] = np.zeros_like(y)
res['fitted'][0] = np.nan
return res
y = _ensure_float(y)
y_intervals = _intervals(y)
mean_interval = y_intervals.mean().item()
max_aggregation_level = round(mean_interval)
forecasts = np.empty(max_aggregation_level, dtype=y.dtype)
for aggregation_level in range(1, max_aggregation_level + 1):
lost_remainder_data = len(y) % aggregation_level
y_cut = y[lost_remainder_data:]
aggregation_sums = _chunk_sums(y_cut, aggregation_level)
forecast, _ = _optimized_ses_forecast(aggregation_sums)
forecasts[aggregation_level - 1] = forecast / aggregation_level
forecast = forecasts.mean()
res = {'mean': _repeat_val(val=forecast, h=h)}
if fitted:
warnings.warn("Computing fitted values for IMAPA is very expensive.")
fitted_vals = np.empty_like(y)
fitted_vals[0] = np.nan
for i in range(y.size - 1):
fitted_vals[i + 1] = _imapa(y[:i+1], h=1, fitted=False)['mean'].item()
res['fitted'] = fitted_vals
return resclass IMAPA(_TS):
def __init__(self, alias: str = 'IMAPA', prediction_intervals: Optional[ConformalIntervals] = None,):
r"""IMAPA model.
Intermittent Multiple Aggregation Prediction Algorithm: Similar to ADIDA, but instead of
using a single aggregation level, it considers multiple in order to capture different
dynamics of the data. Uses the optimized SES to generate the forecasts at the new levels
and then combines them using a simple average.
References
----------
[Syntetos, A. A., & Boylan, J. E. (2021). Intermittent demand forecasting: Context, methods and applications. John Wiley & Sons.](https://www.ifors.org/intermittent-demand-forecasting-context-methods-and-applications/).
Parameters
----------
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the IMAPA model.
Fit an IMAPA to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
IMAPA fitted model.
"""
y = _ensure_float(y)
self.model_ = _imapa(y=y, h=1, fitted=False)
self._y = y
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted IMAPA.
Parameters
----------
h : int
Forecast horizon.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
"""
mean = _repeat_val(val=self.model_['mean'][0], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception(
"You have to instantiate the class with `prediction_intervals`"
"to calculate them"
)
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted IMAPA insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
fitted = _imapa(y=self._y, h=1, fitted=True)['fitted']
res = {'fitted': fitted}
if level is not None:
sigma = _calculate_sigma(self._y - fitted, self._y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient IMAPA predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _imapa(y=y, h=h, fitted=fitted)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception(
"You have to instantiate the class with `prediction_intervals`"
"to calculate them"
)
if fitted:
sigma = _calculate_sigma(y - res['fitted'], y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return resimapa = IMAPA(prediction_intervals=ConformalIntervals(2, 1))
test_class(imapa, x=ap, h=12, skip_insample=False, level=[80])
test_class(imapa, x=deg_ts, h=12, skip_insample=False, level=[80])
fcst_imapa = imapa.forecast(ap, 12)
_test_fitted_sparse(IMAPA)# 测试一致性预测
imapa_c = IMAPA(prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(imapa_c, x=ap, h=13, level=[90, 80], skip_insample=False)
fcst_imapa_c = imapa_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_imapa_c['mean'][:12],
fcst_imapa['mean'],
)
_plot_insample_pi(fcst_imapa_c)
_plot_fcst(fcst_imapa_c)#测试别名参数
test_eq(
repr(IMAPA()),
'IMAPA'
)
test_eq(
repr(IMAPA(alias='IMAPA_custom')),
'IMAPA_custom'
)show_doc(IMAPA, title_level=3)show_doc(IMAPA.forecast, title_level=3)show_doc(IMAPA.fit, title_level=3)show_doc(IMAPA.predict, title_level=3)from statsforecast.models import IMAPA
from statsforecast.utils import AirPassengers as ap# IMAPA的使用示例
model = IMAPA()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dictTSB
def _tsb(
y: np.ndarray, # 时间序列
h: int, # 预测时间范围
fitted: int, # 拟合值
alpha_d: float,
alpha_p: float,
) -> Dict[str, np.ndarray]:
if (y == 0).all():
res = {'mean': np.zeros(h, dtype=y.dtype)}
if fitted:
res['fitted'] = np.zeros_like(y)
res['fitted'][0] = np.nan
return res
y = _ensure_float(y)
yd = _demand(y)
yp = _probability(y)
ypf, ypft = _ses_forecast(yp, alpha_p)
ydf, ydft = _ses_forecast(yd, alpha_d)
res = {'mean': _repeat_val(val=ypf * ydf, h=h)}
if fitted:
ydft = _expand_fitted_demand(np.append(ydft, ydf), y)
res['fitted'] = ypft * ydft
return resclass TSB(_TS):
def __init__(
self,
alpha_d: float,
alpha_p: float,
alias: str = 'TSB',
prediction_intervals: Optional[ConformalIntervals] = None,
):
r"""TSB model.
Teunter-Syntetos-Babai: A modification of Croston's method that replaces the inter-demand
intervals with the demand probability $d_t$, which is defined as follows.
$$
d_t = \begin{cases}
1 & \text{if demand occurs at time t} \\
0 & \text{otherwise.}
\end{cases}
$$
Hence, the forecast is given by
$$\hat{y}_t= \hat{d}_t\hat{z_t}$$
Both $d_t$ and $z_t$ are forecasted using SES. The smooting paramaters of each may differ,
like in the optimized Croston's method.
References
----------
[Teunter, R. H., Syntetos, A. A., & Babai, M. Z. (2011). Intermittent demand: Linking forecasting to inventory obsolescence. European Journal of Operational Research, 214(3), 606-615.](https://www.sciencedirect.com/science/article/abs/pii/S0377221711004437)
Parameters
----------
alpha_d : float
Smoothing parameter for demand.
alpha_p : float
Smoothing parameter for probability.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
"""
self.alpha_d = alpha_d
self.alpha_p = alpha_p
self.alias = alias
self.prediction_intervals = prediction_intervals
self.only_conformal_intervals = True
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the TSB model.
Fit an TSB to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
TSB fitted model.
"""
y = _ensure_float(y)
self.model_ = _tsb(
y=y,
h=1,
fitted=True,
alpha_d=self.alpha_d,
alpha_p=self.alpha_p
)
self.model_['sigma'] = _calculate_sigma(y - self.model_['fitted'], y.size)
self._store_cs(y=y, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted TSB.
Parameters
----------
h : int
Forecast horizon.
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = _repeat_val(self.model_['mean'][0], h=h)
res = {'mean': mean}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted TSB insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient TSB predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
res = _tsb(
y=y, h=h,
fitted=fitted,
alpha_d=self.alpha_d,
alpha_p=self.alpha_p
)
res = dict(res)
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
sigma = _calculate_sigma(y - res['fitted'], y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return res tsb = TSB(alpha_d=0.9, alpha_p=0.1, prediction_intervals=ConformalIntervals(2, 1))
test_class(tsb, x=ap, h=12, skip_insample=False, level=[80])
test_class(tsb, x=deg_ts, h=12, skip_insample=False, level=[80])
fcst_tsb = tsb.forecast(ap, 12)
_test_fitted_sparse(lambda: TSB(alpha_d=0.9, alpha_p=0.1))# 测试一致性预测
tsb_c = TSB(alpha_d=0.9, alpha_p=0.1,prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(tsb_c, x=ap, h=13, level=[90, 80], skip_insample=True)
fcst_tsb_c = tsb_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_tsb_c['mean'][:12],
fcst_tsb['mean'],
)
_plot_insample_pi(fcst_tsb_c)
_plot_fcst(fcst_tsb_c)#测试别名参数
test_eq(
repr(TSB(0.9, 0.1)),
'TSB'
)
test_eq(
repr(TSB(0.9, 0.1, alias='TSB_custom')),
'TSB_custom'
)show_doc(TSB, title_level=3)show_doc(TSB.forecast, title_level=3)show_doc(TSB.fit, title_level=3)show_doc(TSB.predict, title_level=3)from statsforecast.models import TSB
from statsforecast.utils import AirPassengers as ap# TSB的使用示例
model = TSB(alpha_d=0.5, alpha_p=0.5)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict多重季节性
MSTL
def _predict_mstl_components(mstl_ob, h, season_length):
seasoncolumns = mstl_ob.filter(regex='seasonal*').columns
nseasons = len(seasoncolumns)
seascomp = np.full((h, nseasons), np.nan)
seasonal_periods = [season_length] if isinstance(season_length, int) else season_length
for i in range(nseasons):
mp = seasonal_periods[i]
colname = seasoncolumns[i]
seascomp[:, i] = np.tile(mstl_ob[colname].values[-mp:], trunc(1 + (h-1)/mp))[:h]
return seascomp
def _predict_mstl_seas(mstl_ob, h, season_length):
seascomp = _predict_mstl_components(mstl_ob, h, season_length)
return seascomp.sum(axis=1)class MSTL(_TS):
r"""MSTL model.
The MSTL (Multiple Seasonal-Trend decomposition using LOESS) decomposes the time series
in multiple seasonalities using LOESS. Then forecasts the trend using
a custom non-seaonal model and each seasonality using a SeasonalNaive model.
References
----------
[Bandara, Kasun & Hyndman, Rob & Bergmeir, Christoph. (2021). "MSTL: A Seasonal-Trend Decomposition Algorithm for Time Series with Multiple Seasonal Patterns".](https://arxiv.org/abs/2107.13462).
Parameters
----------
season_length : Union[int, List[int]
Number of observations per unit of time. For multiple seasonalities use a list.
trend_forecaster : model, default=AutoETS(model='ZZN')
StatsForecast model used to forecast the trend component.
stl_kwargs : dict
Extra arguments to pass to [`statsmodels.tsa.seasonal.STL`](https://www.statsmodels.org/dev/generated/statsmodels.tsa.seasonal.STL.html#statsmodels.tsa.seasonal.STL).
The `period` and `seasonal` arguments are reserved.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
season_length: Union[int, List[int]],
trend_forecaster = AutoETS(model='ZZN'),
stl_kwargs: Optional[Dict] = None,
alias: str = 'MSTL',
prediction_intervals: Optional[ConformalIntervals] = None,
):
# 检查ETS模型是否具有季节性
if repr(trend_forecaster) == 'AutoETS':
if trend_forecaster.model[2] != 'N':
raise Exception(
'Trend forecaster should not adjust '
'seasonal models.'
)
# 检查趋势预测器是否具有 season_length=1
if hasattr(trend_forecaster, 'season_length'):
if trend_forecaster.season_length != 1:
raise Exception(
'Trend forecaster should not adjust '
'seasonal models. Please pass `season_length=1` '
'to your trend forecaster'
)
if isinstance(season_length, int):
season_length = [season_length]
else:
season_length = sorted(season_length)
self.season_length = season_length
self.trend_forecaster = trend_forecaster
self.prediction_intervals = prediction_intervals
self.alias = alias
if self.trend_forecaster.prediction_intervals is None and (self.prediction_intervals is not None):
self.trend_forecaster.prediction_intervals = prediction_intervals
self.stl_kwargs = dict() if stl_kwargs is None else stl_kwargs
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the MSTL model.
Fit MSTL to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X: array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self :
MSTL fitted model.
"""
y = _ensure_float(y)
self.model_ = mstl(
x=y,
period=self.season_length,
stl_kwargs=self.stl_kwargs,
)
x_sa = self.model_[['trend', 'remainder']].sum(axis=1).values
self.trend_forecaster = self.trend_forecaster.new().fit(y=x_sa, X=X)
self._store_cs(y=x_sa, X=X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
):
r"""Predict with fitted MSTL.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
kwargs: Dict[str, Any] = {'h': h, 'X': X}
if self.trend_forecaster.prediction_intervals is None:
kwargs['level'] = level
res = self.trend_forecaster.predict(**kwargs)
seas = _predict_mstl_seas(self.model_, h=h, season_length=self.season_length)
res = {key: val + seas for key, val in res.items()}
if level is None or self.trend_forecaster.prediction_intervals is None:
return res
level = sorted(level)
if self.trend_forecaster.prediction_intervals is not None:
res = self.trend_forecaster._add_predict_conformal_intervals(res, level)
else:
raise Exception(
"You have to instantiate either the trend forecaster class or MSTL class with `prediction_intervals` to calculate them"
)
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted MSTL insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
res = self.trend_forecaster.predict_in_sample(level=level)
seas = self.model_.filter(regex='seasonal*').sum(axis=1).values
res = {key: val + seas for key, val in res.items()}
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient MSTL predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
model_ = mstl(
x=y,
period=self.season_length,
stl_kwargs=self.stl_kwargs,
)
x_sa = model_[['trend', 'remainder']].sum(axis=1).values
kwargs = {
'y': x_sa,
'h': h,
'X': X,
'X_future': X_future,
'fitted': fitted
}
if fitted or self.trend_forecaster.prediction_intervals is None:
kwargs['level'] = level
res = self.trend_forecaster.forecast(**kwargs)
if level is not None:
level = sorted(level)
if self.trend_forecaster.prediction_intervals is not None:
res = self.trend_forecaster._add_conformal_intervals(fcst=res, y=x_sa, X=X, level=level)
elif f'lo-{level[0]}' not in res:
raise Exception(
"You have to instantiate either the trend forecaster class or MSTL class with `prediction_intervals` to calculate them"
)
#重新季节性调整结果
seas_h = _predict_mstl_seas(model_, h=h, season_length=self.season_length)
seas_insample = model_.filter(regex='seasonal*').sum(axis=1).values
res = {
key: val + (seas_insample if 'fitted' in key else seas_h) \
for key, val in res.items()
}
return res
def forward(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Apply fitted MSTL model to a new time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self.trend_forecaster, 'model_'):
raise Exception('You have to use the `fit` method first')
y = _ensure_float(y)
model_ = mstl(
x=y,
period=self.season_length,
stl_kwargs=self.stl_kwargs,
)
x_sa = model_[['trend', 'remainder']].sum(axis=1).values
kwargs = {
'y': x_sa,
'h': h,
'X': X,
'X_future': X_future,
'fitted': fitted
}
if fitted or self.trend_forecaster.prediction_intervals is None:
kwargs['level'] = level
res = self.trend_forecaster.forward(**kwargs)
if level is not None:
level = sorted(level)
if self.trend_forecaster.prediction_intervals is not None:
res = self.trend_forecaster._add_conformal_intervals(fcst=res, y=x_sa, X=X, level=level)
#重新季节性调整结果
seas_h = _predict_mstl_seas(model_, h=h, season_length=self.season_length)
seas_insample = model_.filter(regex='seasonal*').sum(axis=1).values
res = {
key: val + (seas_insample if 'fitted' in key else seas_h) \
for key, val in res.items()
}
return restrend_forecasters = [
AutoARIMA(),
AutoCES(),
AutoETS(model='ZZN'),
Naive(),
CrostonClassic(),
]
skip_insamples = [False, True, False, False, True]
test_forwards = [False, True, True, False, False]
for trend_forecaster, skip_insample, test_forward in zip(trend_forecasters, skip_insamples, test_forwards):
for stl_kwargs in [None, dict(trend=25)]:
mstl_model = MSTL(
season_length=[12, 14],
trend_forecaster=trend_forecaster,
stl_kwargs=stl_kwargs,
)
test_class(mstl_model, x=ap, h=12,
skip_insample=skip_insample,
level=None,
test_forward=test_forward)# 有保形与无保形的间隔
# 趋势预测支持级别,使用原生级别
mstl_native = MSTL(season_length=12, trend_forecaster=ARIMA(order=(0, 1, 0)))
res_native_fp = pd.DataFrame(mstl_native.fit(y=ap).predict(h=24, level=[80, 95]))
res_native_fc = pd.DataFrame(mstl_native.forecast(y=ap, h=24, level=[80, 95]))
pd.testing.assert_frame_equal(res_native_fp, res_native_fc)
# 趋势预测支持级别,使用保形方法。
mstl_conformal = MSTL(
season_length=12,
trend_forecaster=ARIMA(
order=(0, 1, 0),
prediction_intervals=ConformalIntervals(h=24),
),
)
res_conformal_fp = pd.DataFrame(mstl_conformal.fit(y=ap).predict(h=24, level=[80, 95]))
res_conformal_fc = pd.DataFrame(mstl_conformal.forecast(y=ap, h=24, level=[80, 95]))
pd.testing.assert_frame_equal(res_conformal_fp, res_conformal_fc)
test_fail(lambda: pd.testing.assert_frame_equal(test_native_fp, test_conformal_fp))
# trend fcst doesn't support level
mstl_bad = MSTL(season_length=12, trend_forecaster=CrostonClassic())
test_fail(lambda: mstl_bad.fit(y=ap).predict(h=24, level=[80, 95]), contains='prediction_intervals')
test_fail(lambda: mstl_bad.forecast(y=ap, h=24, level=[80, 95]), contains='prediction_intervals')# 保形预测
# 定义趋势预测器中的预测区间
trend_forecasters = [
AutoARIMA(prediction_intervals=ConformalIntervals(h=13, n_windows=2)),
AutoCES(prediction_intervals=ConformalIntervals(h=13, n_windows=2)),
]
skip_insamples = [False, True]
test_forwards = [False, True]
for trend_forecaster, skip_insample, test_forward in zip(trend_forecasters, skip_insamples, test_forwards):
for stl_kwargs in [None, dict(trend=25)]:
mstl_model = MSTL(
season_length=[12, 14],
trend_forecaster=trend_forecaster,
stl_kwargs=stl_kwargs,
)
test_class(mstl_model, x=ap, h=13,
skip_insample=skip_insample,
level=[80, 90] if not skip_insample else None,
test_forward=test_forward)
_plot_fcst(mstl_model.forecast(ap, 13, None, None, (80,95), False))# 保形预测
# 定义MSTL中的prediction_interval
trend_forecasters = [
AutoCES()
]
for stl_kwargs in [None, dict(trend=25)]:
mstl_model = MSTL(
season_length=[12, 14],
trend_forecaster=trend_forecaster,
stl_kwargs=stl_kwargs,
prediction_intervals=ConformalIntervals(h=13, n_windows=2)
)
test_class(mstl_model, x=ap, h=13,
skip_insample=False,
level=[80, 90] if not skip_insample else None,
test_forward=True)
_plot_fcst(mstl_model.forecast(ap, 13, None, None, (80,95), False))#失败的季节性趋势预测者
test_fail(
MSTL,
contains='should not adjust seasonal',
args=([3, 12], AutoETS(model='ZZZ'))
)
test_fail(
MSTL,
contains='should not adjust seasonal',
args=([3, 12], AutoARIMA(season_length=12))
)#测试别名参数
test_eq(
repr(MSTL(season_length=7)),
'MSTL'
)
test_eq(
repr(MSTL(season_length=7, alias='MSTL_custom')),
'MSTL_custom'
)show_doc(MSTL, title_level=3)show_doc(MSTL.fit, title_level=3)show_doc(MSTL.predict, title_level=3)show_doc(MSTL.predict_in_sample, title_level=3)show_doc(MSTL.forecast, title_level=3)show_doc(MSTL.forward, title_level=3)from statsforecast.models import MSTL
from statsforecast.utils import AirPassengers as ap# MSTL的使用示例
mstl_model = MSTL(season_length=[3, 12], trend_forecaster=AutoARIMA(prediction_intervals=ConformalIntervals(h=4, n_windows=2)))
mstl_model = mstl_model.fit(y=ap)
y_hat_dict = mstl_model.predict(h=4, level=[80])
y_hat_dictTBATS
class TBATS(_TS):
r"""Trigonometric Box-Cox transform, ARMA errors, Trend and Seasonal components (TBATS) model.
TBATS is an innovations state space model framework used for forecasting time series with multiple seasonalities. It uses a Box-Cox tranformation, ARMA errors, and a trigonometric representation of the seasonal patterns based on Fourier series.
The name TBATS is an acronym for the key features of the model: Trigonometric, Box-Cox transform, ARMA errors, Trend, and Seasonal components.
References
----------
- [De Livera, A. M., Hyndman, R. J., & Snyder, R. D. (2011). Forecasting time series with complex seasonal patterns using exponential smoothing. Journal of the American statistical association, 106(496), 1513-1527.](https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=f3de25596ab60ef0e886366826bf58a02b35a44f)
- [De Livera, Alysha M (2017). Modeling time series with complex seasonal patterns using exponential smoothing. Monash University. Thesis.](https://doi.org/10.4225/03/589299681de3d)
Parameters
----------
season_length : int or list of int.
Number of observations per unit of time. Ex: 24 Hourly data.
use_boxcox : bool (default=True)
Whether or not to use a Box-Cox transformation.
bc_lower_bound : float (default=0.0)
Lower bound for the Box-Cox transformation.
bc_upper_bound : float (default=1.0)
Upper bound for the Box-Cox transformation.
use_trend : bool (default=True)
Whether or not to use a trend component.
use_damped_trend : bool (default=False)
Whether or not to dampen the trend component.
use_arma_errors : bool (default=False)
Whether or not to use a ARMA errors.
alias : str
Custom name of the model.
"""
@_old_kw_to_pos(['seasonal_periods'], [1])
def __init__(
self,
season_length: Union[int, List[int]],
use_boxcox: Optional[bool] = True,
bc_lower_bound: float = 0.0,
bc_upper_bound: float = 1.0,
use_trend: Optional[bool] = True,
use_damped_trend: Optional[bool] = False,
use_arma_errors: bool = False,
alias: str = 'TBATS',
*,
seasonal_periods=None, # noqa:ARG002
):
if isinstance(season_length, int):
season_length = [season_length]
self.season_length = list(season_length)
self.use_boxcox = use_boxcox
self.bc_lower_bound = bc_lower_bound
self.bc_upper_bound = bc_upper_bound
self.use_trend = use_trend
self.use_damped_trend = use_damped_trend
self.use_arma_errors = use_arma_errors
self.alias = alias
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None
):
r"""Fit TBATS model.
Fit TBATS model to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : numpy.array, optional (default=None)
Ignored
Returns
-------
self :
TBATS model.
"""
y = _ensure_float(y)
self.model_ = tbats_selection(
y=y,
seasonal_periods=self.season_length,
use_boxcox=self.use_boxcox,
bc_lower_bound=self.bc_lower_bound,
bc_upper_bound=self.bc_upper_bound,
use_trend=self.use_trend,
use_damped_trend=self.use_damped_trend,
use_arma_errors=self.use_arma_errors
)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None
):
r"""Predict with fitted TBATS model.
Parameters
----------
h : int
Forecast horizon.
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
fcst = tbats_forecast(self.model_, h)
res = {'mean': fcst['mean']}
if level is not None:
level = sorted(level)
sigmah = _compute_sigmah(self.model_, h)
pred_int = _calculate_intervals(res, level, h, sigmah)
res = {**res, **pred_int}
if self.model_['BoxCox_lambda'] is not None:
res_trans = {k: inv_boxcox(v, self.model_['BoxCox_lambda']) for k, v in res.items()}
for k, v in res_trans.items():
res_trans[k] = np.where(np.isnan(v), res[k], v)
else:
res_trans = res
return res_trans
def predict_in_sample(self, level: Optional[Tuple[int]] = None):
r"""Access fitted TBATS model predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
res = {'fitted': self.model_['fitted'].ravel()}
if level is not None:
se = _calculate_sigma(self.model_['errors'], self.model_['errors'].shape[1])
fitted_pred_int = _add_fitted_pi(res, se, level)
res = {**res, **fitted_pred_int}
if self.model_['BoxCox_lambda'] is not None:
res_trans = {k: inv_boxcox(v, self.model_['BoxCox_lambda']) for k, v in res.items()}
for k, v in res_trans.items():
res_trans[k] = np.where(np.isnan(v), res[k], v)
else:
res_trans = res
return res_trans
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool=False
):
r"""Memory Efficient TBATS model.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
mod = tbats_selection(
y=y,
seasonal_periods=self.season_length,
use_boxcox=self.use_boxcox,
bc_lower_bound=self.bc_lower_bound,
bc_upper_bound=self.bc_upper_bound,
use_trend=self.use_trend,
use_damped_trend=self.use_damped_trend,
use_arma_errors=self.use_arma_errors
)
fcst = tbats_forecast(mod, h)
res = {'mean': fcst['mean']}
if fitted:
res['fitted'] = mod['fitted'].ravel()
if level is not None:
level = sorted(level)
sigmah = _compute_sigmah(mod, h)
pred_int = _calculate_intervals(res, level, h, sigmah)
res = {**res, **pred_int}
if fitted:
se = _calculate_sigma(mod['errors'], mod['errors'].shape[1])
fitted_pred_int = _add_fitted_pi(res, se, level)
res = {**res, **fitted_pred_int}
if mod['BoxCox_lambda'] is not None:
res_trans = {k : inv_boxcox(v, mod['BoxCox_lambda']) for k, v in res.items()}
for k, v in res_trans.items():
res_trans[k] = np.where(np.isnan(v), res[k], v)
else:
res_trans = res
return res_trans tbats = TBATS(season_length=12)
test_class(tbats, x=ap, h=12, level=[90, 80])show_doc(TBATS, title_level=3)show_doc(TBATS.fit, title_level=3)show_doc(TBATS.predict, title_level=3)show_doc(TBATS.predict_in_sample, title_level=3)show_doc(TBATS.forecast, title_level=3)AutoTBATS
class AutoTBATS(TBATS):
r"""AutoTBATS model.
Automatically selects the best TBATS model from all feasible combinations of the parameters use_boxcox, use_trend, use_damped_trend, and use_arma_errors.
Selection is made using the AIC.
Default value for use_arma_errors is True since this enables the evaluation of models with and without ARMA errors.
References
----------
- [De Livera, A. M., Hyndman, R. J., & Snyder, R. D. (2011). Forecasting time series with complex seasonal patterns using exponential smoothing. Journal of the American statistical association, 106(496), 1513-1527.](https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=f3de25596ab60ef0e886366826bf58a02b35a44f)
- [De Livera, Alysha M (2017). Modeling time series with complex seasonal patterns using exponential smoothing. Monash University. Thesis.](https://doi.org/10.4225/03/589299681de3d)
Parameters
----------
seasonal_periods : int or list of int.
Number of observations per unit of time. Ex: 24 Hourly data.
use_boxcox : bool (default=None)
Whether or not to use a Box-Cox transformation. By default tries both.
bc_lower_bound : float (default=0.0)
Lower bound for the Box-Cox transformation.
bc_upper_bound : float (default=1.0)
Upper bound for the Box-Cox transformation.
use_trend : bool (default=None)
Whether or not to use a trend component. By default tries both.
use_damped_trend : bool (default=None)
Whether or not to dampen the trend component. By default tries both.
use_arma_errors : bool (default=True)
Whether or not to use a ARMA errors. Default is True and this evaluates both models.
alias : str
Custom name of the model.
"""
@_old_kw_to_pos(['seasonal_periods'], [1])
def __init__(
self,
season_length: Union[int, List[int]],
use_boxcox: Optional[bool] = None,
bc_lower_bound: float = 0.0,
bc_upper_bound: float = 1.0,
use_trend: Optional[bool] = None,
use_damped_trend: Optional[bool] = None,
use_arma_errors: bool = True,
alias: str = 'AutoTBATS',
*,
seasonal_periods=None # noqa:ARG002
):
super().__init__(
season_length=season_length,
use_boxcox=use_boxcox,
bc_lower_bound=bc_lower_bound,
bc_upper_bound=bc_upper_bound,
use_trend=use_trend,
use_damped_trend=use_damped_trend,
use_arma_errors=use_arma_errors,
alias=alias
)tbats = AutoTBATS(season_length=12)
test_class(tbats, x=ap, h=12, level=[90, 80])
fcst_tbats = tbats.forecast(ap, 13, None, None, (80,95), True)
_plot_fcst(fcst_tbats)_plot_insample_pi(fcst_tbats)show_doc(AutoTBATS, title_level=3)show_doc(AutoTBATS.fit, title_level=3)show_doc(AutoTBATS.predict, title_level=3)show_doc(AutoTBATS.predict_in_sample, title_level=3)show_doc(AutoTBATS.forecast, title_level=3)Θ家族
标准Theta方法
class Theta(AutoTheta):
r""" Standard Theta Method.
References
----------
[Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
decomposition_type : str
Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
season_length: int = 1,
decomposition_type: str = 'multiplicative',
alias: str = 'Theta',
prediction_intervals: Optional[ConformalIntervals] = None,
):
super().__init__(
season_length=season_length,
model='STM',
decomposition_type=decomposition_type,
alias=alias,
prediction_intervals=prediction_intervals,
)stm = Theta(season_length=12)
fcast_stm = stm.forecast(ap,12)
theta = AutoTheta(season_length=12, model='STM')
fcast_theta = theta.forecast(ap,12)
np.testing.assert_equal(
fcast_theta,
fcast_stm
)# 测试一致性预测
stm_c = Theta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5))
test_class(stm_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_stm_c = stm_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_stm_c['mean'][:12],
fcast_stm['mean'],
)stm = Theta(season_length=12)
stm.fit(ap)
fcast_stm = stm.predict(12)
forward_stm = stm.forward(y=ap, h=12)
theta = AutoTheta(season_length=12, model='STM')
fcast_theta = theta.forecast(ap,12)
theta.fit(ap)
forward_autotheta = theta.forward(y=ap, h=12)
np.testing.assert_equal(
fcast_theta,
fcast_stm,
)
np.testing.assert_equal(
forward_stm,
forward_autotheta
)#测试别名参数
test_eq(
repr(Theta()),
'Theta'
)
test_eq(
repr(Theta(alias='Theta_custom')),
'Theta_custom'
)show_doc(Theta, title_level=3)show_doc(Theta.forecast, name='Theta.forecast', title_level=3)show_doc(Theta.fit, name='Theta.fit', title_level=3)show_doc(Theta.predict, name='Theta.predict', title_level=3)show_doc(Theta.predict_in_sample, name='Theta.predict_in_sample', title_level=3)show_doc(Theta.forward, name='Theta.forward', title_level=3)from statsforecast.models import Theta
from statsforecast.utils import AirPassengers as ap# Theta的使用示例
model = Theta(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict优化的Theta方法
class OptimizedTheta(AutoTheta):
r""" Optimized Theta Method.
References
----------
[Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
decomposition_type : str
Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
alias : str
Custom name of the model. Default `OptimizedTheta`.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
season_length: int = 1,
decomposition_type: str = 'multiplicative',
alias: str = 'OptimizedTheta',
prediction_intervals: Optional[ConformalIntervals] = None,
):
super().__init__(
season_length=season_length,
model='OTM',
decomposition_type=decomposition_type,
alias=alias,
prediction_intervals=prediction_intervals,
)otm = OptimizedTheta(season_length=12)
fcast_otm = otm.forecast(ap,12)
otm.fit(ap)
forward_otm = otm.forward(y=ap, h=12)
theta = AutoTheta(season_length=12, model='OTM')
fcast_theta = theta.forecast(ap,12)
theta.fit(ap)
forward_autotheta = theta.forward(y=ap, h=12)
np.testing.assert_equal(
fcast_theta,
fcast_otm
)
np.testing.assert_equal(
forward_autotheta,
forward_otm
)# 测试一致性预测
otm_c = OptimizedTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5))
test_class(otm_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_otm_c = otm_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_otm_c['mean'][:12],
fcast_otm['mean'],
)# 隐藏
otm = OptimizedTheta(season_length=12)
otm.fit(ap)
fcast_otm = otm.predict(12)
theta = AutoTheta(season_length=12, model='OTM')
fcast_theta = theta.forecast(ap,12)
np.testing.assert_equal(
fcast_theta,
fcast_otm
)#测试别名参数
test_eq(
repr(OptimizedTheta()),
'OptimizedTheta'
)
test_eq(
repr(OptimizedTheta(alias='OptimizedTheta_custom')),
'OptimizedTheta_custom'
)show_doc(OptimizedTheta, title_level=3)show_doc(OptimizedTheta.forecast, name='OptimizedTheta.forecast', title_level=3)show_doc(OptimizedTheta.fit, name='OptimizedTheta.fit', title_level=3)show_doc(OptimizedTheta.predict, name='OptimizedTheta.predict', title_level=3)show_doc(OptimizedTheta.predict_in_sample, name='OptimizedTheta.predict_in_sample', title_level=3)show_doc(OptimizedTheta.forward, name='OptimizedTheta.forward', title_level=3)from statsforecast.models import OptimizedTheta
from statsforecast.utils import AirPassengers as ap# OptimzedThetA 的使用示例
model = OptimizedTheta(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict动态标准Theta方法
class DynamicTheta(AutoTheta):
r""" Dynamic Standard Theta Method.
References
----------
[Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
decomposition_type : str
Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
season_length: int = 1,
decomposition_type: str = 'multiplicative',
alias: str = 'DynamicTheta',
prediction_intervals: Optional[ConformalIntervals] = None,
):
super().__init__(
season_length=season_length,
model='DSTM',
decomposition_type=decomposition_type,
alias=alias,
prediction_intervals=prediction_intervals,
)dstm = DynamicTheta(season_length=12)
fcast_dstm = dstm.forecast(ap,12)
dstm.fit(ap)
forward_dstm = dstm.forward(y=ap, h=12)
theta = AutoTheta(season_length=12, model='DSTM')
fcast_theta = theta.forecast(ap,12)
theta.fit(ap)
forward_autotheta = theta.forward(y=ap, h=12)
np.testing.assert_equal(
fcast_theta,
fcast_dstm
)
np.testing.assert_equal(
forward_autotheta,
forward_dstm
)dstm = DynamicTheta(season_length=12)
dstm.fit(ap)
fcast_dstm = dstm.predict(12)
theta = AutoTheta(season_length=12, model='DSTM')
fcast_theta = theta.forecast(ap,12)
np.testing.assert_equal(
fcast_theta,
fcast_dstm
)# 测试一致性预测
dstm_c = DynamicTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5))
test_class(dstm_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_dstm_c = dstm_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_dstm_c['mean'][:12],
fcast_dstm['mean'],
)#测试别名参数
test_eq(
repr(DynamicTheta()),
'DynamicTheta'
)
test_eq(
repr(DynamicTheta(alias='DynamicTheta_custom')),
'DynamicTheta_custom'
)show_doc(DynamicTheta, title_level=3)show_doc(DynamicTheta.forecast, name='DynamicTheta.forecast', title_level=3)show_doc(DynamicTheta.fit, name='DynamicTheta.fit', title_level=3)show_doc(DynamicTheta.predict, name='DynamicTheta.predict', title_level=3)show_doc(DynamicTheta.predict_in_sample, name='DynamicTheta.predict_in_sample', title_level=3)show_doc(DynamicTheta.forward, name='DynamicTheta.forward', title_level=3)from statsforecast.models import DynamicTheta
from statsforecast.utils import AirPassengers as ap# DynStandardThetaMethod的使用示例
model = DynamicTheta(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict动态优化θ方法
class DynamicOptimizedTheta(AutoTheta):
r""" Dynamic Optimized Theta Method.
References
----------
[Jose A. Fiorucci, Tiago R. Pellegrini, Francisco Louzada, Fotios Petropoulos, Anne B. Koehler (2016). "Models for optimising the theta method and their relationship to state space models". International Journal of Forecasting](https://www.sciencedirect.com/science/article/pii/S0169207016300243)
Parameters
----------
season_length : int
Number of observations per unit of time. Ex: 24 Hourly data.
decomposition_type : str
Sesonal decomposition type, 'multiplicative' (default) or 'additive'.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
season_length: int = 1,
decomposition_type: str = 'multiplicative',
alias: str = 'DynamicOptimizedTheta',
prediction_intervals: Optional[ConformalIntervals] = None,
):
super().__init__(
season_length=season_length,
model='DOTM',
decomposition_type=decomposition_type,
alias=alias,
prediction_intervals=prediction_intervals,
)dotm = DynamicOptimizedTheta(season_length=12)
fcast_dotm = dotm.forecast(ap,12)
dotm.fit(ap)
forward_dotm = dotm.forward(y=ap, h=12)
theta = AutoTheta(season_length=12, model='DOTM')
fcast_theta = theta.forecast(ap,12)
theta.fit(ap)
forward_autotheta = theta.forward(y=ap, h=12)
np.testing.assert_equal(
fcast_theta,
fcast_dotm
)
np.testing.assert_equal(
forward_autotheta,
forward_dotm
)dotm = DynamicOptimizedTheta(season_length=12)
dotm.fit(ap)
fcast_dotm = dotm.predict(12)
theta = AutoTheta(season_length=12, model='DOTM')
fcast_theta = theta.forecast(ap,12)
np.testing.assert_equal(
fcast_theta,
fcast_dotm
)# 测试保形预测
dotm_c = DynamicOptimizedTheta(season_length=12, prediction_intervals=ConformalIntervals(h=13, n_windows=5))
test_class(dotm_c, x=ap, h=13, level=[90, 80], test_forward=True)
fcst_dotm_c = dotm_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_dotm_c['mean'][:12],
fcast_dotm['mean'],
)#测试别名参数
test_eq(
repr(DynamicOptimizedTheta()),
'DynamicOptimizedTheta'
)
test_eq(
repr(DynamicOptimizedTheta(alias='DynamicOptimizedTheta_custom')),
'DynamicOptimizedTheta_custom'
)show_doc(DynamicOptimizedTheta, title_level=3)show_doc(DynamicOptimizedTheta.forecast, name='DynamicOptimizedTheta.forecast', title_level=3)show_doc(DynamicOptimizedTheta.fit, name='DynamicOptimizedTheta.fit', title_level=3)show_doc(DynamicOptimizedTheta.predict, name='DynamicOptimizedTheta.predict', title_level=3)show_doc(DynamicOptimizedTheta.predict_in_sample, name='DynamicOptimizedTheta.predict_in_sample', title_level=3)show_doc(DynamicOptimizedTheta.forward, name='DynamicOptimizedTheta.forward', title_level=3)from statsforecast.models import DynamicOptimizedTheta
from statsforecast.utils import AirPassengers as ap# OptimzedThetaMethod 的使用示例
model = DynamicOptimizedTheta(season_length=12)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dictARCH 家族
Garch 模型
class GARCH(_TS):
r"""Generalized Autoregressive Conditional Heteroskedasticity (GARCH) model.
A method for modeling time series that exhibit non-constant volatility over time.
The GARCH model assumes that at time $t$, $y_t$ is given by:
$$y_t = v_t \sigma_t$$
with
$$\sigma_t^2 = w + \sum_{i=1}^p a_i y_{t-i}^2 + \sum_{j=1}^q b_j \sigma_{t-j}^2$$.
Here $v_t$ is a sequence of iid random variables with zero mean and unit variance.
The coefficients $w$, $a_i$, $i=1,...,p$, and $b_j$, $j=1,...,q$ must satisfy the following conditions:
1. $w > 0$ and $a_i, b_j \geq 0$ for all $i$ and $j$.
2. $\sum_{k=1}^{max(p,q)} a_k + b_k < 1$. Here it is assumed that $a_i=0$ for $i>p$ and $b_j=0$ for $j>q$.
The ARCH model is a particular case of the GARCH model when $q=0$.
References
----------
[Engle, R. F. (1982). Autoregressive conditional heteroscedasticity with estimates of the variance of United Kingdom inflation. Econometrica: Journal of the econometric society, 987-1007.](http://www.econ.uiuc.edu/~econ508/Papers/engle82.pdf)
[Bollerslev, T. (1986). Generalized autoregressive conditional heteroskedasticity. Journal of econometrics, 31(3), 307-327.](https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=7da8bfa5295375c1141d797e80065a599153c19d)
[James D. Hamilton. Time Series Analysis Princeton University Press, Princeton, New Jersey, 1st Edition, 1994.](https://press.princeton.edu/books/hardcover/9780691042893/time-series-analysis)
Parameters
----------
p : int
Number of lagged versions of the series.
q: int
Number of lagged versions of the volatility.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
p: int = 1,
q: int = 1,
alias: str = 'GARCH',
prediction_intervals: Optional[ConformalIntervals] = None,
):
self.p = p
self.q = q
if q !=0:
self.alias = alias+'('+str(p)+','+str(q)+')'
else:
self.alias = alias+'('+str(p)+')'
self.prediction_intervals = prediction_intervals
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None
):
r"""Fit GARCH model.
Fit GARCH model to a time series (numpy array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
Returns
-------
self :
GARCH model.
"""
y = _ensure_float(y)
self.model_ = garch_model(y, p=self.p, q=self.q)
self.model_['actual_residuals'] = y - self.model_['fitted']
self._store_cs(y, X)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None
):
r"""Predict with fitted GARCH model.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
fcst = garch_forecast(self.model_, h)
res = {'mean': fcst['mean'], 'sigma2': fcst['sigma2']}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
quantiles = _quantiles(level)
lo = res['mean'].reshape(-1, 1) - quantiles * res['sigma2'].reshape(-1, 1)
hi = res['mean'].reshape(-1, 1) + quantiles * res['sigma2'].reshape(-1, 1)
lo = lo[:, ::-1]
lo = {f'lo-{l}': lo[:, i] for i, l in enumerate(reversed(level))}
hi = {f'hi-{l}': hi[:, i] for i, l in enumerate(level)}
res = {**res, **lo, **hi}
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted GARCH model predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
residuals = self.model_['actual_residuals']
se = _calculate_sigma(residuals, len(residuals) - 1)
res = _add_fitted_pi(res=res, se=se, level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool=False
):
r"""Memory Efficient GARCH model.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
mod = garch_model(y, p=self.p, q=self.q)
fcst = garch_forecast(mod, h)
keys = ['mean', 'sigma2']
if fitted:
keys.append('fitted')
res = {key: fcst[key] for key in keys}
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
quantiles = _quantiles(level)
lo = res['mean'].reshape(-1, 1) - quantiles * res['sigma2'].reshape(-1, 1)
hi = res['mean'].reshape(-1, 1) + quantiles * res['sigma2'].reshape(-1, 1)
lo = lo[:, ::-1]
lo = {f'lo-{l}': lo[:, i] for i, l in enumerate(reversed(level))}
hi = {f'hi-{l}': hi[:, i] for i, l in enumerate(level)}
res = {**res, **lo, **hi}
if fitted:
se = _calculate_sigma(y - mod['fitted'], len(y) - 1)
res = _add_fitted_pi(res=res, se=se, level=level)
return res# 生成GARCH(2,2)数据
n = 1000
w = 0.5
alpha = np.array([0.1, 0.2])
beta = np.array([0.4, 0.2])
y = generate_garch_data(n, w, alpha, beta)
plt.figure(figsize=(10,4))
plt.plot(y)garch = GARCH(2,2)
test_class(garch, x=y, h=12, skip_insample=False, level=[90,80])
fcst_garch = garch.forecast(ap, 12)# 测试一致性预测
garch_c = GARCH(2,2,prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(garch_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_garch_c = garch_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_garch_c['mean'][:12],
fcst_garch['mean']
)
_plot_fcst(fcst_garch_c)h=100
fcst = garch.forecast(y, h=h, level=[80,95], fitted=True)fig, ax = plt.subplots(1, 1, figsize = (20,7))
#plt.plot(np.arange(0, len(y)), y)
plt.plot(np.arange(len(y), len(y) + h), fcst['mean'], label='mean')
plt.plot(np.arange(len(y), len(y) + h), fcst['sigma2'], color = 'c', label='sigma2')
plt.plot(np.arange(len(y), len(y) + h), fcst['lo-95'], color = 'r', label='lo-95')
plt.plot(np.arange(len(y), len(y) + h), fcst['hi-95'], color = 'r', label='hi-95')
plt.plot(np.arange(len(y), len(y) + h), fcst['lo-80'], color = 'g', label='lo-80')
plt.plot(np.arange(len(y), len(y) + h), fcst['hi-80'], color = 'g', label='hi-80')
plt.legend()fig, ax = plt.subplots(1, 1, figsize = (20,7))
plt.plot(np.arange(0, len(y)), y)
plt.plot(np.arange(0, len(y)), fcst['fitted'], label='fitted')
plt.plot(np.arange(0, len(y)), fcst['fitted-lo-95'], color = 'r', label='fitted-lo-95')
plt.plot(np.arange(0, len(y)), fcst['fitted-hi-95'], color = 'r', label='fitted-hi-95')
plt.plot(np.arange(0, len(y)), fcst['fitted-lo-80'], color = 'g', label='fitted-lo-80')
plt.plot(np.arange(0, len(y)), fcst['fitted-hi-80'], color = 'g', label='fitted-hi-80')
plt.xlim(len(y)-50, len(y))
plt.legend()show_doc(GARCH, title_level=3)show_doc(GARCH.fit, title_level=3)show_doc(GARCH.predict, title_level=3)show_doc(GARCH.predict_in_sample, title_level=3)show_doc(GARCH.forecast, title_level=3)ARCH模型
class ARCH(GARCH):
r"""Autoregressive Conditional Heteroskedasticity (ARCH) model.
A particular case of the GARCH(p,q) model where $q=0$.
It assumes that at time $t$, $y_t$ is given by:
$$y_t = \epsilon_t \sigma_t$$
with
$$\sigma_t^2 = w0 + \sum_{i=1}^p a_i y_{t-i}^2$$.
Here $\epsilon_t$ is a sequence of iid random variables with zero mean and unit variance.
The coefficients $w$ and $a_i$, $i=1,...,p$ must be nonnegative and $\sum_{k=1}^p a_k < 1$.
References
----------
[Engle, R. F. (1982). Autoregressive conditional heteroscedasticity with estimates of the variance of United Kingdom inflation. Econometrica: Journal of the econometric society, 987-1007.](http://www.econ.uiuc.edu/~econ508/Papers/engle82.pdf)
[James D. Hamilton. Time Series Analysis Princeton University Press, Princeton, New Jersey, 1st Edition, 1994.](https://press.princeton.edu/books/hardcover/9780691042893/time-series-analysis)
Parameters
----------
p : int
Number of lagged versions of the series.
alias : str
Custom name of the model.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
By default, the model will compute the native prediction
intervals.
"""
def __init__(
self,
p: int = 1,
alias: str = 'ARCH',
prediction_intervals: Optional[ConformalIntervals] = None
):
self.p = p
self.alias = alias
super().__init__(p, q=0, alias=alias)arch = ARCH(1)
test_class(arch, x=y, h=12, skip_insample=False, level=[90,80])
fcst_arch = arch.forecast(ap, 12)# 测试一致性预测
arch_c = ARCH(1,prediction_intervals=ConformalIntervals(h=13, n_windows=2))
test_class(arch_c, x=ap, h=13, level=[90, 80], test_forward=False)
fcst_arch_c = arch_c.forecast(ap, 13, None, None, (80,95), True)
test_eq(
fcst_arch_c['mean'][:12],
fcst_arch['mean']
)
_plot_fcst(fcst_arch_c)garch = GARCH(p=1, q=0)
fcast_garch = garch.forecast(y, h=12, level=[90,80], fitted=True)
arch = ARCH(p=1)
fcast_arch = arch.forecast(y, h=12, level=[90,80], fitted=True)
np.testing.assert_equal(
fcast_garch,
fcast_arch
)show_doc(ARCH, title_level=3)show_doc(ARCH.fit, name='ARCH.fit', title_level=3)show_doc(ARCH.predict, name='ARCH.predict', title_level=3)show_doc(ARCH.predict_in_sample, name='ARCH.predict_in_sample', title_level=3)show_doc(ARCH.forecast, name='ARCH.forecast', title_level=3)机器学习模型
Sklearn模型
class SklearnModel(_TS):
r"""scikit-learn model wrapper
Parameters
----------
model : sklearn.base.BaseEstimator
scikit-learn estimator
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
alias : str, optional (default=None)
Custom name of the model. If `None` will use the model's class.
"""
uses_exog = True
def __init__(
self,
model,
prediction_intervals: Optional[ConformalIntervals] = None,
alias: Optional[str] = None,
):
self.model = model
self.prediction_intervals = prediction_intervals
self.alias = alias if alias is not None else model.__class__.__name__
def fit(
self,
y: np.ndarray,
X: np.ndarray,
) -> 'SklearnModel':
r"""Fit the model.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Exogenous of shape (t, n_x).
Returns
-------
self : SklearnModel
Fitted SklearnModel object.
r"""
from sklearn.base import clone
self.model_ = {'model': clone(self.model)}
self.model_['model'].fit(X, y)
self._store_cs(y=y, X=X)
self.model_['fitted'] = self.model_['model'].predict(X)
residuals = y - self.model_['fitted']
self.model_['sigma'] = _calculate_sigma(residuals, y.size)
return self
def predict(
self,
h: int,
X: np.ndarray,
level: Optional[List[int]] = None,
) -> Dict[str, Any]:
r"""Predict with fitted SklearnModel.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Exogenous of shape (h, n_x).
level: List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
r"""
res = {'mean': self.model_['model'].predict(X)}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self, level: Optional[List[int]] = None) -> Dict[str, Any]:
r"""Access fitted SklearnModel insample predictions.
Parameters
----------
level : List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
level = sorted(level)
res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: np.ndarray,
X_future: np.ndarray,
level: Optional[List[int]] = None,
fitted: bool = False,
) -> Dict[str, Any]:
r"""Memory Efficient SklearnModel predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
h : int
Forecast horizon.
X : array-like
Insample exogenous of shape (t, n_x).
X_future : array-like
Exogenous of shape (h, n_x).
level : List[int]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
r"""
from sklearn.base import clone
model = clone(self.model)
model.fit(X, y)
res = {'mean': model.predict(X_future)}
if fitted:
res['fitted'] = model.predict(X)
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
residuals = y - res['fitted']
sigma = _calculate_sigma(residuals, y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return res
def forward(
self,
y: np.ndarray,
h: int,
X: np.ndarray,
X_future: np.ndarray,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Apply fitted SklearnModel to a new/updated time series.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
h : int
Forecast horizon.
X : array-like
Insample exogenous of shape (t, n_x).
X_future : array-like
Exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `constant` for point predictions and `level_*` for probabilistic predictions.
"""
if not hasattr(self, "model_"):
raise Exception("You have to use the `fit` method first")
res = {'mean': self.model_['model'].predict(X_future)}
if fitted:
res['fitted'] = self.model_['model'].predict(X)
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
se = _calculate_sigma(y - res['fitted'], y.size)
res = _add_fitted_pi(res=res, se=se, level=level)
return resfrom sklearn.linear_model import Ridgeh = 12
skm = SklearnModel(Ridge(), prediction_intervals=ConformalIntervals(h=h))
X = np.arange(ap.size).reshape(-1, 1)
X_future = ap.size + np.arange(h).reshape(-1, 1)
test_class(skm, x=ap, X=X, X_future=X_future, h=h, skip_insample=False, level=[80, 95], test_forward=True)
fcst_skm = skm.forecast(ap, h, X=X, X_future=X_future, fitted=True, level=[80, 95])
_plot_insample_pi(fcst_skm)#测试别名参数
test_eq(
repr(SklearnModel(Ridge())),
'Ridge'
)
test_eq(
repr(SklearnModel(Ridge(), alias='my_ridge')),
'my_ridge'
)show_doc(SklearnModel)show_doc(SklearnModel.fit)show_doc(SklearnModel.predict)show_doc(SklearnModel.predict_in_sample)show_doc(SklearnModel.forecast)MFLES
class MFLES(_TS):
r"""MFLES model.
A method to forecast time series based on Gradient Boosted Time Series Decomposition
which treats traditional decomposition as the base estimator in the boosting
process. Unlike normal gradient boosting, slight learning rates are applied at the
component level (trend/seasonality/exogenous).
The method derives its name from some of the underlying estimators that can
enter into the boosting procedure, specifically: a simple Median, Fourier
functions for seasonality, a simple/piecewise Linear trend, and Exponential
Smoothing.
Parameters
----------
season_length : int or list of int, optional (default=None)
Number of observations per unit of time. Ex: 24 Hourly data.
fourier_order : int, optional (default=None)
How many fourier sin/cos pairs to create, the larger the number the more complex of a seasonal pattern can be fitted.
A lower number leads to smoother results.
This is auto-set based on seasonal_period.
max_rounds : int (default=50)
The max number of boosting rounds. The boosting will auto-stop but depending on other parameters such as rs_lr you may want more rounds.
Generally more rounds means a smoother fit.
ma : int, optional (default=None)
The moving average order to use, this is auto-set based on internal logic.
Passing 4 would fit a 4 period moving average on the residual component.
alpha : float (default=1.0)
The alpha which is used in fitting the underlying LASSO when using piecewise functions.
decay : float (default=-1.0)
Effects the slopes of the piecewise-linear basis function.
changepoints : boolean (default=True)
Whether to fit for changepoints if all other logic allows for it. If False, MFLES will not ever fit a piecewise trend.
n_changepoints : int or float (default=0.25)
Number (if int) or proportion (if float) of changepoint knots to place. The default of 0.25 will place 0.25 * (series length) number of knots.
seasonal_lr : float (default=0.9)
A shrinkage parameter (0 < seasonal_lr <= 1) which penalizes the seasonal fit.
A value of 0.9 will flatly multiply the seasonal fit by 0.9 each boosting round, this can be used to allow more signal to the exogenous component.
trend_lr : float (default=0.9)
A shrinkage parameter (0 < trend_lr <= 1) which penalizes the linear trend fit
A value of 0.9 will flatly multiply the linear fit by 0.9 each boosting round, this can be used to allow more signal to the seasonality or exogenous components.
exogenous_lr : float (default=1.0)
The shrinkage parameter (0 < exogenous_lr <= 1) which controls how much of the exogenous signal is carried to the next round.
residuals_lr : float (default=1.0)
A shrinkage parameter (0 < residuals_lr <= 1) which penalizes the residual smoothing.
A value of 0.9 will flatly multiply the residual fit by 0.9 each boosting round, this can be used to allow more signal to the seasonality or linear components.
cov_threshold : float (default=0.7)
The deseasonalized cov is used to auto-set some logic, lowering the cov_threshold will result in simpler and less complex residual smoothing.
If you pass something like 1000 then there will be no safeguards applied.
moving_medians : bool (default=False)
The default behavior is to fit an initial median to the time series. If True, then it will fit a median per seasonal period.
min_alpha : float (default=0.05)
The minimum alpha in the SES ensemble.
max_alpha : float (default=1.0)
The maximum alpha used in the SES ensemble.
trend_penalty : bool (default=True)
Whether to apply a simple penalty to the linear trend component, very useful for dealing with the potentially dangerous piecewise trend.
multiplicative : bool, optional (default=None)
Auto-set based on internal logic. If True, it will simply take the log of the time series.
smoother : bool (default=False)
If True, then a simple exponential ensemble will be used rather than auto settings.
robust : bool, optional (default=None)
If True then MFLES will fit using more reserved methods, i.e. not using piecewise trend or moving average residual smoother.
Auto-set based on internal logic.
verbose : bool (default=False)
Print debugging information.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
alias : str (default='MFLES')
Custom name of the model.
"""
uses_exog = True
def __init__(
self,
season_length: Optional[Union[int, List[int]]] = None,
fourier_order: Optional[int] = None,
max_rounds: int = 50,
ma: Optional[int] = None,
alpha: float = 1.0,
decay: float = -1.0,
changepoints: bool = True,
n_changepoints: Union[float, int] = 0.25,
seasonal_lr: float = 0.9,
trend_lr: float = 0.9,
exogenous_lr: float = 1.0,
residuals_lr: float = 1.0,
cov_threshold: float = 0.7,
moving_medians: bool = False,
min_alpha: float = 0.05,
max_alpha: float = 1.0,
trend_penalty: bool = True,
multiplicative: Optional[bool] = None,
smoother: bool = False,
robust: Optional[bool] = None,
verbose: bool = False,
prediction_intervals: Optional[ConformalIntervals] = None,
alias: str = 'MFLES',
):
try:
import sklearn # noqa:F401
except ImportError:
raise ImportError("MFLES requires scikit-learn.") from None
self.season_length = season_length
self.fourier_order = fourier_order
self.max_rounds = max_rounds
self.ma = ma
self.alpha = alpha
self.decay = decay
self.changepoints = changepoints
self.n_changepoints = n_changepoints
self.seasonal_lr = seasonal_lr
self.trend_lr = trend_lr
self.exogenous_lr = exogenous_lr
self.residuals_lr = residuals_lr
self.cov_threshold = cov_threshold
self.moving_medians = moving_medians
self.min_alpha = min_alpha
self.max_alpha = max_alpha
self.trend_penalty = trend_penalty
self.multiplicative = multiplicative
self.smoother = smoother
self.robust = robust
self.verbose = verbose
self.prediction_intervals = prediction_intervals
self.alias = alias
def _fit(self, y: np.ndarray, X: Optional[np.ndarray]) -> Dict[str, Any]:
model = _MFLES(verbose=self.verbose, robust=self.robust)
fitted = model.fit(
y=y,
X=X,
seasonal_period=self.season_length,
fourier_order=self.fourier_order,
ma=self.ma,
alpha=self.alpha,
decay=self.decay,
n_changepoints=self.n_changepoints,
seasonal_lr=self.seasonal_lr,
linear_lr=self.trend_lr,
exogenous_lr=self.exogenous_lr,
rs_lr=self.residuals_lr,
cov_threshold=self.cov_threshold,
moving_medians=self.moving_medians,
max_rounds=self.max_rounds,
min_alpha=self.min_alpha,
max_alpha=self.max_alpha,
trend_penalty=self.trend_penalty,
multiplicative=self.multiplicative,
changepoints=self.changepoints,
smoother=self.smoother,
)
return {'model': model, 'fitted': fitted}
def fit(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> 'MFLES':
r"""Fit the model
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like, optional (default=None)
Exogenous of shape (t, n_x).
Returns
-------
self : MFLES
Fitted MFLES object.
"""
y = _ensure_float(y)
self.model_ = self._fit(y=y, X=X)
self._store_cs(y=y, X=X)
residuals = y - self.model_['fitted']
self.model_['sigma'] = _calculate_sigma(residuals, y.size)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
) -> Dict[str, Any]:
r"""Predict with fitted MFLES.
Parameters
----------
h : int
Forecast horizon.
X : array-like, optional (default=None)
Exogenous of shape (h, n_x).
level: List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
res = {"mean": self.model_["model"].predict(forecast_horizon=h, X=X)}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self, level: Optional[List[int]] = None) -> Dict[str, Any]:
r"""Access fitted SklearnModel insample predictions.
Parameters
----------
level : List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
res = {'fitted': self.model_['fitted']}
if level is not None:
level = sorted(level)
res = _add_fitted_pi(res=res, se=self.model_['sigma'], level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
) -> Dict[str, Any]:
r"""Memory Efficient MFLES predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
h : int
Forecast horizon.
X : array-like
Insample exogenous of shape (t, n_x).
X_future : array-like
Exogenous of shape (h, n_x).
level : List[int]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
model = self._fit(y=y, X=X)
res = {"mean": model['model'].predict(forecast_horizon=h, X=X_future)}
if fitted:
res["fitted"] = model['fitted']
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
residuals = y - res["fitted"]
sigma = _calculate_sigma(residuals, y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return resshow_doc(MFLES)show_doc(MFLES.fit)show_doc(MFLES.predict)show_doc(MFLES.predict_in_sample)show_doc(MFLES.forecast)h = 12
X = np.random.rand(ap.size, 2)
X_future = np.random.rand(h, 2)
mfles = MFLES()
test_class(mfles, x=deg_ts, X=X, X_future=X_future, h=h, skip_insample=False, test_forward=False)
mfles = MFLES(prediction_intervals=ConformalIntervals(h=h, n_windows=2))
test_class(mfles, x=ap, X=X, X_future=X_future, h=h, skip_insample=False, level=[80, 95], test_forward=False)
fcst_mfles = mfles.forecast(ap, h, X=X, X_future=X_future, fitted=True, level=[80, 95])
_plot_insample_pi(fcst_mfles)自动多层特征学习嵌入系统
class AutoMFLES(_TS):
r"""AutoMFLES
Parameters
----------
test_size : int
Forecast horizon used during cross validation.
season_length : int or list of int, optional (default=None)
Number of observations per unit of time. Ex: 24 Hourly data.
n_windows : int (default=2)
Number of windows used for cross validation.
config : dict, optional (default=None)
Mapping from parameter name (from the init arguments of MFLES) to a list of values to try.
If `None`, will use defaults.
step_size : int, optional (default=None)
Step size between each cross validation window. If `None` will be set to test_size.
metric : str (default='smape')
Metric used to select the best model. Possible options are: 'smape', 'mape', 'mse' and 'mae'.
verbose : bool (default=False)
Print debugging information.
prediction_intervals : Optional[ConformalIntervals]
Information to compute conformal prediction intervals.
This is required for generating future prediction intervals.
alias : str (default='AutoMFLES')
Custom name of the model.
"""
def __init__(
self,
test_size: int,
season_length: Optional[Union[int, List[int]]] = None,
n_windows: int = 2,
config: Optional[Dict[str, Any]] = None,
step_size: Optional[int] = None,
metric: str = 'smape',
verbose: bool = False,
prediction_intervals: Optional[ConformalIntervals] = None,
alias: str = 'AutoMFLES',
):
try:
import sklearn # noqa:F401
except ImportError:
raise ImportError("MFLES requires scikit-learn.") from None
self.season_length = season_length
self.n_windows = n_windows
self.test_size = test_size
self.config = config
self.step_size = step_size if step_size is not None else test_size
self.metric = metric
self.verbose = verbose
self.prediction_intervals = prediction_intervals
self.alias = alias
def _fit(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> Dict[str, Any]:
model = _MFLES(verbose=self.verbose)
optim_params = model.optimize(
y=y,
X=X,
test_size=self.test_size,
n_steps=self.n_windows,
step_size=self.step_size,
seasonal_period=self.season_length,
metric=self.metric,
params=self.config,
)
# the seasonal_period may've been found during the optimization
seasonal_period = optim_params.pop('seasonal_period', self.season_length)
fitted = model.fit(
y=y,
X=X,
seasonal_period=seasonal_period,
**optim_params,
)
return {'model': model, 'fitted': fitted}
def fit(self, y: np.ndarray, X: Optional[np.ndarray] = None) -> 'AutoMFLES':
r"""Fit the model
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like, optional (default=None)
Exogenous of shape (t, n_x).
Returns
-------
self : AutoMFLES
Fitted AutoMFLES object.
"""
y = _ensure_float(y)
self.model_ = self._fit(y=y, X=X)
self._store_cs(y=y, X=X)
residuals = y - self.model_["fitted"]
self.model_["sigma"] = _calculate_sigma(residuals, y.size)
return self
def predict(
self,
h: int,
X: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
) -> Dict[str, Any]:
r"""Predict with fitted AutoMFLES.
Parameters
----------
h : int
Forecast horizon.
X : array-like, optional (default=None)
Exogenous of shape (h, n_x).
level: List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
r"""
res = {"mean": self.model_["model"].predict(forecast_horizon=h, X=X)}
if level is None:
return res
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_predict_conformal_intervals(res, level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
return res
def predict_in_sample(self, level: Optional[List[int]] = None) -> Dict[str, Any]:
r"""Access fitted AutoMFLES insample predictions.
Parameters
----------
level : List[int]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
res = {"fitted": self.model_["fitted"]}
if level is not None:
level = sorted(level)
res = _add_fitted_pi(res=res, se=self.model_["sigma"], level=level)
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
) -> Dict[str, Any]:
r"""Memory Efficient AutoMFLES predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
h : int
Forecast horizon.
X : array-like
Insample exogenous of shape (t, n_x).
X_future : array-like
Exogenous of shape (h, n_x).
level : List[int]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
model = self._fit(y=y, X=X)
res = {"mean": model["model"].predict(forecast_horizon=h, X=X_future)}
if fitted:
res["fitted"] = model["fitted"]
if level is not None:
level = sorted(level)
if self.prediction_intervals is not None:
res = self._add_conformal_intervals(fcst=res, y=y, X=X, level=level)
else:
raise Exception("You must pass `prediction_intervals` to compute them.")
if fitted:
residuals = y - res["fitted"]
sigma = _calculate_sigma(residuals, y.size)
res = _add_fitted_pi(res=res, se=sigma, level=level)
return resh = 12
X = np.random.rand(ap.size, 2)
X_future = np.random.rand(h, 2)
auto_mfles = AutoMFLES(test_size=h, season_length=12)
test_class(auto_mfles, x=deg_ts, X=X, X_future=X_future, h=h, skip_insample=False, test_forward=False)
auto_mfles = AutoMFLES(test_size=h, season_length=12, prediction_intervals=ConformalIntervals(h=h, n_windows=2))
test_class(auto_mfles, x=ap, X=X, X_future=X_future, h=h, skip_insample=False, level=[80, 95], test_forward=False)
fcst_auto_mfles = auto_mfles.forecast(ap, h, X=X, X_future=X_future, fitted=True, level=[80, 95])
_plot_insample_pi(fcst_auto_mfles)后备模型
常量模型
class ConstantModel(_TS):
def __init__(self, constant: float, alias: str = 'ConstantModel'):
r"""Constant Model.
Returns Constant values.
Parameters
----------
constant: float
Custom value to return as forecast.
alias: str
Custom name of the model.
"""
self.constant = constant
self.alias = alias
def fit(
self,
y: np.ndarray,
X: Optional[np.ndarray] = None,
):
r"""Fit the Constant model.
Fit an Constant Model to a time series (numpy.array) `y`.
Parameters
----------
y : numpy.array
Clean time series of shape (t, ).
X : array-like
Optional exogenous of shape (t, n_x).
Returns
-------
self:
Constant fitted model.
"""
y = _ensure_float(y)
self.n_y = len(y)
self._dtype = y.dtype
return self
def predict(
self,
h: int, # 预测时间范围
X: Optional[np.ndarray] = None, # 外生回归变量
level: Optional[List[int]] = None # 置信水平
):
r"""Predict with fitted ConstantModel.
Parameters
----------
h : int
Forecast horizon.
X : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
mean = np.full(h, self.constant, dtype=self._dtype)
res = {'mean': mean}
if level is not None:
for lv in sorted(level):
res[f'lo-{lv}'] = mean
res[f'hi-{lv}'] = mean
return res
def predict_in_sample(self, level: Optional[List[int]] = None):
r"""Access fitted Constant Model insample predictions.
Parameters
----------
level : List[float]
Confidence levels (0-100) for prediction intervals.
Returns
-------
forecasts : dict
Dictionary with entries `fitted` for point predictions and `level_*` for probabilistic predictions.
"""
fitted = np.full(self.n_y, self.constant, dtype=self._dtype)
res = {'fitted': fitted}
if level is not None:
for lv in sorted(level):
res[f'fitted-lo-{lv}'] = fitted
res[f'fitted-hi-{lv}'] = fitted
return res
def forecast(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Memory Efficient Constant Model predictions.
This method avoids memory burden due from object storage.
It is analogous to `fit_predict` without storing information.
It assumes you know the forecast horizon in advance.
Parameters
----------
y : numpy.array
Clean time series of shape (n,).
h: int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels (0-100) for prediction intervals.
fitted : bool
Whether or not to return insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `mean` for point predictions and `level_*` for probabilistic predictions.
"""
y = _ensure_float(y)
mean = np.full(h, self.constant, dtype=y.dtype)
res = {'mean': mean}
if fitted:
fitted_vals = np.full_like(y, self.constant)
res['fitted'] = fitted_vals
if level is not None:
for lv in sorted(level):
res[f'lo-{lv}'] = mean
res[f'hi-{lv}'] = mean
if fitted:
res[f'fitted-lo-{lv}'] = fitted_vals
res[f'fitted-hi-{lv}'] = fitted_vals
return res
def forward(
self,
y: np.ndarray,
h: int,
X: Optional[np.ndarray] = None,
X_future: Optional[np.ndarray] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
):
r"""Apply Constant model predictions to a new/updated time series.
Parameters
----------
y : numpy.array
Clean time series of shape (n, ).
h : int
Forecast horizon.
X : array-like
Optional insample exogenous of shape (t, n_x).
X_future : array-like
Optional exogenous of shape (h, n_x).
level : List[float]
Confidence levels for prediction intervals.
fitted : bool
Whether or not returns insample predictions.
Returns
-------
forecasts : dict
Dictionary with entries `constant` for point predictions and `level_*` for probabilistic predictions.
"""
res = self.forecast(y=y, h=h, X=X, X_future=X_future, level=level, fitted=fitted)
return resconstant_model = ConstantModel(constant=1)
test_class(constant_model, x=ap, h=12, level=[90, 80])constant_model.forecast(ap, 12, level=[90, 80])constant_model.forward(ap, 12, level=[90, 80])#测试别名参数
test_eq(
repr(ConstantModel(1)),
'ConstantModel'
)
test_eq(
repr(ConstantModel(1, alias='ConstantModel_custom')),
'ConstantModel_custom'
)show_doc(ConstantModel, title_level=3)show_doc(ConstantModel.forecast, title_level=3)show_doc(ConstantModel.fit, title_level=3)show_doc(ConstantModel.predict, title_level=3)show_doc(ConstantModel.predict_in_sample, title_level=3)show_doc(ConstantModel.forward, title_level=3)from statsforecast.models import ConstantModel
from statsforecast.utils import AirPassengers as ap# ConstantModel 的使用示例
model = ConstantModel(1)
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict零模型
class ZeroModel(ConstantModel):
def __init__(self, alias: str = 'ZeroModel'):
r"""返回零值预测。
返回零值。
参数
----------
alias: str
模型的自定义名称。
"""
super().__init__(constant=0, alias=alias)zero_model = ZeroModel()
test_class(constant_model, x=ap, h=12, level=[90, 80])zero_model.forecast(ap, 12, level=[90, 80])zero_model.forward(ap, 12, level=[90, 80])#测试别名参数
test_eq(
repr(ZeroModel()),
'ZeroModel'
)
test_eq(
repr(ZeroModel(alias='ZeroModel_custom')),
'ZeroModel_custom'
)show_doc(ZeroModel, title_level=3)show_doc(ZeroModel.forecast, title_level=3, name='ZeroModel.forecast')show_doc(ZeroModel.fit, title_level=3, name='ZeroModel.fit')show_doc(ZeroModel.predict, title_level=3, name='ZeroModel.predict')show_doc(ZeroModel.predict_in_sample, title_level=3, name='ZeroModel.predict_in_sample')show_doc(ZeroModel.forward, title_level=3, name='ZeroModel.forward')from statsforecast.models import ZeroModel
from statsforecast.utils import AirPassengers as ap# NanModel的使用示例
model = ZeroModel()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dictNaN模型
class NaNModel(ConstantModel):
def __init__(self, alias: str = 'NaNModel'):
r"""NaN模型。
返回NaN值。
参数
----------
alias: str
模型的自定义名称。
"""
super().__init__(constant=np.nan, alias=alias)nanmodel = NaNModel()
nanmodel.forecast(ap, 12, level=[90, 80])#测试别名参数
test_eq(
repr(NaNModel()),
'NaNModel'
)
test_eq(
repr(NaNModel(alias='NaN_custom')),
'NaN_custom'
)show_doc(NaNModel, title_level=3)show_doc(NaNModel.forecast, title_level=3, name='NaNModel.forecast')show_doc(NaNModel.fit, title_level=3, name='NaNModel.fit')show_doc(NaNModel.predict, title_level=3, name='NaNModel.predict')show_doc(NaNModel.predict_in_sample, title_level=3, name='NaNModel.predict_in_sample')from statsforecast.models import NaNModel
from statsforecast.utils import AirPassengers as ap# NanModel的使用示例
model = NaNModel()
model = model.fit(y=ap)
y_hat_dict = model.predict(h=4)
y_hat_dict参考文献
一般信息
自动预测
指数平滑法
简单方法
稀疏间歇性需求
多季节性
Theta家族
GARCH模型
TBATS模型
Give us a ⭐ on Github