%load_ext autoreload
%autoreload 2核心方法
拟合、预测、快速预测、交叉验证和绘图的方法
StatsForecast 的核心方法包括:
StatsForecast.fitStatsForecast.predictStatsForecast.forecastStatsForecast.cross_validationStatsForecast.plot
import warnings
from nbdev.showdoc import add_docs, show_doc
from statsforecast.models import Naivewarnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('always', category=UserWarning)import datetime as dt
import errno
import inspect
import logging
import os
import pickle
import re
import reprlib
import time
import warnings
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import numpy as np
import pandas as pd
import utilsforecast.processing as ufp
from fugue.execution.factory import make_execution_engine, try_get_context_execution_engine
from threadpoolctl import ThreadpoolController
from tqdm.auto import tqdm
from triad import conditional_dispatcher
from utilsforecast.compat import DataFrame, pl_DataFrame, pl_Series
from utilsforecast.grouped_array import GroupedArray as BaseGroupedArray
from utilsforecast.validation import ensure_time_dtype, validate_freq
from statsforecast.utils import ConformalIntervalsif __name__ == '__main__':
logging.basicConfig(
format='%(asctime)s %(name)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
logger = logging.getLogger(__name__)
_controller = ThreadpoolController()logger.setLevel(logging.ERROR)from fastcore.test import test_eq, test_fail, test_warns
from statsforecast.models import _TS
from statsforecast.utils import generate_seriesclass GroupedArray(BaseGroupedArray):
def __eq__(self, other):
if not hasattr(other, 'data') or not hasattr(other, 'indptr'):
return False
return np.allclose(self.data, other.data) and np.array_equal(self.indptr, other.indptr)
def fit(self, models, fallback_model=None):
fm = np.full((self.n_groups, len(models)), np.nan, dtype=object)
for i, grp in enumerate(self):
y = grp[:, 0] if grp.ndim == 2 else grp
X = grp[:, 1:] if (grp.ndim == 2 and grp.shape[1] > 1) else None
for i_model, model in enumerate(models):
try:
new_model = model.new()
fm[i, i_model] = new_model.fit(y=y, X=X)
except Exception as error:
if fallback_model is not None:
new_fallback_model = fallback_model.new()
new_fallback_model.alias = model.alias
fm[i, i_model] = new_fallback_model.fit(y=y, X=X)
else:
raise error
return fm
def _get_cols(self, models, attr, h, X, level=tuple()):
n_models = len(models)
cuts = np.full(n_models + 1, fill_value=0, dtype=np.int32)
has_level_models = np.full(n_models, fill_value=False, dtype=bool)
cuts[0] = 0
for i_model, model in enumerate(models):
len_cols = 1 # 意思是
has_level = 'level' in inspect.signature(getattr(model, attr)).parameters and len(level) > 0
has_level_models[i_model] = has_level
if has_level:
len_cols += 2 * len(level) #关卡
cuts[i_model + 1] = len_cols + cuts[i_model]
return cuts, has_level_models
def _output_fcst(self, models, attr, h, X, level=tuple()):
#根据方法返回空输出
cuts, has_level_models = self._get_cols(models=models, attr=attr, h=h, X=X, level=level)
out = np.full((self.n_groups * h, cuts[-1]), fill_value=np.nan, dtype=self.data.dtype)
return out, cuts, has_level_models
def predict(self, fm, h, X=None, level=tuple()):
#fm 代表拟合模型
#并且 fm 应该包含拟合模型
fcsts, cuts, has_level_models = self._output_fcst(
models=fm[0], attr='predict',
h=h, X=X, level=level
)
matches = ['mean', 'lo', 'hi']
cols = []
for i_model in range(fm.shape[1]):
has_level = has_level_models[i_model]
kwargs = {}
if has_level:
kwargs['level'] = level
for i, _ in enumerate(self):
if X is not None:
X_ = X[i]
else:
X_ = None
res_i = fm[i, i_model].predict(h=h, X=X_, **kwargs)
cols_m = [key for key in res_i.keys() if any(key.startswith(m) for m in matches)]
fcsts_i = np.vstack([res_i[key] for key in cols_m]).T
model_name = repr(fm[i, i_model])
cols_m = [f'{model_name}' if col == 'mean' else f'{model_name}-{col}' for col in cols_m]
if fcsts_i.ndim == 1:
fcsts_i = fcsts_i[:, None]
fcsts[i * h : (i + 1) * h, cuts[i_model]:cuts[i_model + 1]] = fcsts_i
cols += cols_m
return fcsts, cols
def fit_predict(self, models, h, X=None, level=tuple()):
#拟合模型
fm = self.fit(models=models)
#预测
fcsts, cols = self.predict(fm=fm, h=h, X=X, level=level)
return fm, fcsts, cols
def forecast(
self,
models,
h,
fallback_model=None,
fitted=False,
X=None,
level=tuple(),
verbose=False,
target_col='y',
):
fcsts, cuts, has_level_models = self._output_fcst(
models=models, attr='forecast', h=h, X=X, level=level
)
matches = ['mean', 'lo', 'hi']
matches_fitted = ['fitted', 'fitted-lo', 'fitted-hi']
if fitted:
#目前我们不会返回拟合值的水平
#预测模式
fitted_vals = np.full((self.data.shape[0], 1 + cuts[-1]), np.nan, dtype=self.data.dtype)
if self.data.ndim == 1:
fitted_vals[:, 0] = self.data
else:
fitted_vals[:, 0] = self.data[:, 0]
iterable = tqdm(enumerate(self),
disable=(not verbose),
total=len(self),
desc='Forecast')
times = {repr(m): 0.0 for m in models}
for i, grp in iterable:
y_train = grp[:, 0] if grp.ndim == 2 else grp
X_train = grp[:, 1:] if (grp.ndim == 2 and grp.shape[1] > 1) else None
if X is not None:
X_f = X[i]
else:
X_f = None
cols = []
cols_fitted = []
for i_model, model in enumerate(models):
has_level = has_level_models[i_model]
kwargs = {}
if has_level:
kwargs['level'] = level
start = time.perf_counter()
try:
res_i = model.forecast(h=h, y=y_train, X=X_train, X_future=X_f, fitted=fitted, **kwargs)
except Exception as error:
if fallback_model is not None:
res_i = fallback_model.forecast(h=h, y=y_train, X=X_train, X_future=X_f, fitted=fitted, **kwargs)
else:
raise error
times[repr(model)] += time.perf_counter() - start
cols_m = [key for key in res_i.keys() if any(key.startswith(m) for m in matches)]
fcsts_i = np.vstack([res_i[key] for key in cols_m]).T
cols_m = [f'{repr(model)}' if col == 'mean' else f'{repr(model)}-{col}' for col in cols_m]
if fcsts_i.ndim == 1:
fcsts_i = fcsts_i[:, None]
fcsts[i * h : (i + 1) * h, cuts[i_model]:cuts[i_model + 1]] = fcsts_i
cols += cols_m
if fitted:
cols_m_fitted = [key for key in res_i.keys() if any(key.startswith(m) for m in matches_fitted)]
fitted_i = np.vstack([res_i[key] for key in cols_m_fitted]).T
cols_m_fitted = [f'{repr(model)}' \
if col == 'fitted' else f"{repr(model)}-{col.replace('fitted-', '')}" \
for col in cols_m_fitted]
fitted_vals[self.indptr[i] : self.indptr[i + 1], (cuts[i_model] + 1):(cuts[i_model + 1] + 1)] = fitted_i
cols_fitted += cols_m_fitted
result = {'forecasts': fcsts, 'cols': cols, 'times': times}
if fitted:
result['fitted'] = {'values': fitted_vals}
result['fitted']['cols'] = [target_col] + cols_fitted
return result
def cross_validation(
self,
models,
h,
test_size,
fallback_model=None,
step_size=1,
input_size=None,
fitted=False,
level=tuple(),
refit=True,
verbose=False,
target_col='y',
):
# 输出尺寸:(ts, window, h)
if (test_size - h) % step_size:
raise Exception('`test_size - h` should be module `step_size`')
n_windows = int((test_size - h) / step_size) + 1
n_models = len(models)
cuts, has_level_models = self._get_cols(models=models, attr='forecast', h=h, X=None, level=level)
# out的第一列是实际的y值
out = np.full((self.n_groups, n_windows, h, 1 + cuts[-1]), np.nan, dtype=self.data.dtype)
if fitted:
fitted_vals = np.full((self.data.shape[0], n_windows, n_models + 1), np.nan, dtype=self.data.dtype)
fitted_idxs = np.full((self.data.shape[0], n_windows), False, dtype=bool)
last_fitted_idxs = np.full_like(fitted_idxs, False, dtype=bool)
matches = ['mean', 'lo', 'hi']
steps = list(range(-test_size, -h + 1, step_size))
for i_ts, grp in enumerate(self):
iterable = tqdm(
enumerate(steps, start=0),
desc=f"Cross Validation Time Series {i_ts + 1}",
disable=(not verbose),
total=len(steps),
)
fitted_models = [None for _ in range(n_models)]
for i_window, cutoff in iterable:
should_fit = i_window == 0 or (refit > 0 and i_window % refit == 0)
end_cutoff = cutoff + h
in_size_disp = cutoff if input_size is None else input_size
y = grp[(cutoff - in_size_disp):cutoff]
y_train = y[:, 0] if y.ndim == 2 else y
X_train = y[:, 1:] if (y.ndim == 2 and y.shape[1] > 1) else None
y_test = grp[cutoff:] if end_cutoff == 0 else grp[cutoff:end_cutoff]
X_future = y_test[:, 1:] if (y_test.ndim == 2 and y_test.shape[1] > 1) else None
out[i_ts, i_window, :, 0] = y_test[:, 0] if y.ndim == 2 else y_test
if fitted:
fitted_vals[self.indptr[i_ts] : self.indptr[i_ts + 1], i_window, 0][
(cutoff - in_size_disp):cutoff
] = y_train
fitted_idxs[self.indptr[i_ts] : self.indptr[i_ts + 1], i_window][
(cutoff - in_size_disp):cutoff
] = True
last_fitted_idxs[
self.indptr[i_ts] : self.indptr[i_ts + 1], i_window
][cutoff-1] = True
cols = [target_col]
for i_model, model in enumerate(models):
has_level = has_level_models[i_model]
kwargs = {}
if has_level:
kwargs['level'] = level
# 这是这样实现的,因为并非所有模型都具有forward方法。
# so we can't do fit + forward
if refit is True:
forecast_kwargs = dict(
h=h,
y=y_train,
X=X_train,
X_future=X_future,
fitted=fitted,
**kwargs,
)
try:
res_i = model.forecast(**forecast_kwargs)
except Exception as error:
if fallback_model is None:
raise error
res_i = fallback_model.forecast(**forecast_kwargs)
else:
if should_fit:
try:
fitted_models[i_model] = model.fit(y=y_train, X=X_train)
except Exception as error:
if fallback_model is None:
raise error
fitted_models[i_model] = fallback_model.new().fit(y=y_train, X=X_train)
res_i = fitted_models[i_model].forward(
h=h,
y=y_train,
X=X_train,
X_future=X_future,
fitted=fitted,
**kwargs,
)
cols_m = [key for key in res_i.keys() if any(key.startswith(m) for m in matches)]
fcsts_i = np.vstack([res_i[key] for key in cols_m]).T
cols_m = [f'{repr(model)}' if col == 'mean' else f'{repr(model)}-{col}' for col in cols_m]
out[i_ts, i_window, :, (1 + cuts[i_model]):(1 + cuts[i_model + 1])] = fcsts_i
if fitted:
fitted_vals[self.indptr[i_ts] : self.indptr[i_ts + 1], i_window, i_model + 1][
(cutoff - in_size_disp):cutoff
] = res_i['fitted']
cols += cols_m
result = {'forecasts': out.reshape(-1, 1 + cuts[-1]), 'cols': cols}
if fitted:
result['fitted'] = {
'values': fitted_vals,
'idxs': fitted_idxs,
'last_idxs': last_fitted_idxs,
'cols': [target_col] + [repr(model) for model in models]
}
return result
def take(self, idxs):
data, indptr = super().take(idxs)
return GroupedArray(data, indptr)
def split(self, n_chunks):
n_chunks = min(n_chunks, self.n_groups)
return [self.take(idxs) for idxs in np.array_split(range(self.n_groups), n_chunks)]
def split_fm(self, fm, n_chunks):
return [fm[idxs] for idxs in np.array_split(range(self.n_groups), n_chunks) if idxs.size]
@_controller.wrap(limits=1)
def _single_threaded_fit(self, models, fallback_model=None):
return self.fit(models=models, fallback_model=fallback_model)
@_controller.wrap(limits=1)
def _single_threaded_predict(self, fm, h, X=None, level=tuple()):
return self.predict(fm=fm, h=h, X=X, level=level)
@_controller.wrap(limits=1)
def _single_threaded_fit_predict(self, models, h, X=None, level=tuple()):
return self.fit_predict(models=models, h=h, X=X, level=level)
@_controller.wrap(limits=1)
def _single_threaded_forecast(
self,
models,
h,
fallback_model=None,
fitted=False,
X=None,
level=tuple(),
verbose=False,
target_col='y',
):
return self.forecast(
models=models,
h=h,
fallback_model=fallback_model,
fitted=fitted,
X=X,
level=level,
verbose=verbose,
target_col=target_col,
)
@_controller.wrap(limits=1)
def _single_threaded_cross_validation(
self,
models,
h,
test_size,
fallback_model=None,
step_size=1,
input_size=None,
fitted=False,
level=tuple(),
refit=True,
verbose=False,
target_col='y',
):
return self.cross_validation(
models=models,
h=h,
test_size=test_size,
fallback_model=fallback_model,
step_size=step_size,
input_size=input_size,
fitted=fitted,
level=level,
refit=refit,
verbose=verbose,
target_col=target_col,
)# sum ahead 仅返回最后一个值
# 加上h个未来值
class SumAhead:
def __init__(self):
pass
def fit(self, y, X):
self.last_value = y[-1]
self.fitted_values = np.full(y.size, np.nan, dtype=y.dtype)
self.fitted_values[1:] = y[:1]
return self
def predict(self, h, X=None, level=None):
mean = self.last_value + np.arange(1, h + 1)
res = {'mean': mean}
if level is not None:
for lv in level:
res[f'lo-{lv}'] = mean - 1.0
res[f'hi-{lv}'] = mean + 1.0
return res
def __repr__(self):
return 'SumAhead'
def forecast(self, y, h, X=None, X_future=None, fitted=False, level=None):
mean = y[-1] + np.arange(1, h + 1)
res = {'mean': mean}
if fitted:
fitted_values = np.full(y.size, np.nan, dtype=y.dtype)
fitted_values[1:] = y[1:]
res['fitted'] = fitted_values
if level is not None:
for lv in level:
res[f'lo-{lv}'] = mean - 1.0
res[f'hi-{lv}'] = mean + 1.0
return res
def forward(self, y, h, X=None, X_future=None, fitted=False, level=None):
# 修复self.last_value以供测试使用
mean = self.last_value + np.arange(1, h + 1)
res = {'mean': mean}
if fitted:
fitted_values = np.full(y.size, np.nan, dtype=mean.dtype)
fitted_values[1:] = y[1:]
res['fitted'] = fitted_values
if level is not None:
for lv in level:
res[f'lo-{lv}'] = mean - 1.0
res[f'hi-{lv}'] = mean + 1.0
return res
def new(self):
b = type(self).__new__(type(self))
b.__dict__.update(self.__dict__)
return b#用于测试的数据
data = np.arange(12).reshape(-1, 1)
indptr = np.array([0, 4, 8, 12])
# 测试我们可以恢复
# 系列数量
ga = GroupedArray(data, indptr)
test_eq(len(ga), 3)
#数据测试集划分
splits = ga.split(2)
test_eq(splits[0], GroupedArray(data[:8], indptr[:3]))
test_eq(splits[1], GroupedArray(data[8:], np.array([0, 4])))
# 为每个时间序列拟合模型
models = [Naive(), Naive()]
fm = ga.fit(models)
test_eq(fm.shape, (3, 2))
test_eq(len(ga.split_fm(fm, 2)), 2)
# 测试预测
exp_fcsts = np.vstack([2 * [data[i]] for i in indptr[1:] - 1])
fcsts, cols = ga.predict(fm=fm, h=2)
np.testing.assert_equal(
fcsts,
np.hstack([exp_fcsts, exp_fcsts]),
)
#测试拟合和预测管道
fm_fp, fcsts_fp, cols_fp = ga.fit_predict(models=models, h=2)
test_eq(fm_fp.shape, (3, 2))
np.testing.assert_equal(fcsts_fp, fcsts)
np.testing.assert_equal(cols_fp, cols)
#测试级别
fm_lv, fcsts_lv, cols_lv = ga.fit_predict(models=models, h=2, level=(50, 90))
test_eq(fcsts_lv.shape, (2 * len(ga), 10))
#测试预测
fcst_f = ga.forecast(models=models, h=2, fitted=True)
test_eq(fcst_f['forecasts'], fcsts_fp)
test_eq(fcst_f['cols'], cols_fp)class NullModel(_TS):
def __init__(self):
pass
def forecast(self):
pass
def __repr__(self):
return "NullModel"
#测试备用模型
fcst_f = ga.forecast(models=[NullModel(), NullModel()], fallback_model=Naive(), h=2, fitted=True)
test_eq(fcst_f['forecasts'], fcsts_fp)
test_eq(fcst_f['cols'], ['NullModel', 'NullModel'])
test_fail(ga.forecast, kwargs={'models': [NullModel()]})#测试级别
lv = (50, 60)
h = 2
#预测测试
fcsts_lv = ga.forecast(models=[SumAhead()], h=h, fitted=True, level=lv)
test_eq(
fcsts_lv['forecasts'].shape,
(len(ga) * h, 1 + 2 * len(lv))
)
test_eq(
fcsts_lv['cols'],
['SumAhead',
'SumAhead-lo-50',
'SumAhead-hi-50',
'SumAhead-lo-60',
'SumAhead-hi-60']
)
#拟合和预测流水线
fm_lv_fp, fcsts_lv_fp, cols_lv_fp = ga.fit_predict(models=[SumAhead()], h=h, level=lv)
test_eq(
fcsts_lv['forecasts'],
fcsts_lv_fp
)
test_eq(
fcsts_lv['cols'],
cols_lv_fp
)# 交叉验证测试
data = np.hstack([np.arange(10), np.arange(100, 200), np.arange(20, 40)])
indptr = np.array([0, 10, 110, 130])
ga = GroupedArray(data, indptr)
res_cv = ga.cross_validation(models=[SumAhead()], h=2, test_size=5, fitted=True)
fcsts_cv = res_cv['forecasts']
cols_cv = res_cv['cols']
test_eq(
fcsts_cv[:, cols_cv.index('y')],
fcsts_cv[:, cols_cv.index('SumAhead')]
)
#关卡
res_cv_lv = ga.cross_validation(models=[SumAhead(), Naive()], h=2, test_size=5, level=(50, 60))actual_step_size = np.unique(np.diff(fcsts_cv[:, cols_cv.index('SumAhead')].reshape((3, -1, 2)), axis=1))
test_eq(actual_step_size, 1)horizons = [1, 2, 3, 2]
test_sizes = [3, 4, 6, 6]
step_sizes = [2, 2, 3, 4]
for h, test_size, step_size in zip(horizons, test_sizes, step_sizes):
res_cv = ga.cross_validation(
models=[SumAhead()], h=h,
test_size=test_size,
step_size=step_size,
fitted=True
)
fcsts_cv = res_cv['forecasts']
cols_cv = res_cv['cols']
test_eq(
fcsts_cv[:, cols_cv.index('y')],
fcsts_cv[:, cols_cv.index('SumAhead')]
)
fcsts_cv = fcsts_cv[:, cols_cv.index('SumAhead')].reshape((3, -1, h))
actual_step_size = np.unique(
np.diff(fcsts_cv, axis=1)
)
test_eq(actual_step_size, step_size)
actual_n_windows = res_cv['forecasts'].shape[1]
test_eq(actual_n_windows, int((test_size - h)/step_size) + 1)def fail_cv(h, test_size, step_size):
return ga.cross_validation(models=[SumAhead()], h=h, test_size=test_size, step_size=step_size)
test_fail(fail_cv, contains='module', kwargs=dict(h=2, test_size=5, step_size=2))#测试备用模型
# 交叉验证
fcst_cv_f = ga.cross_validation(
models=[NullModel(), NullModel()],
fallback_model=Naive(), h=2,
test_size=5,
fitted=True
)
fcst_cv_naive = ga.cross_validation(
models=[Naive(), Naive()],
h=2,
test_size=5,
fitted=True
)
test_eq(fcst_cv_f['forecasts'], fcst_cv_naive['forecasts'])
np.testing.assert_array_equal(fcst_cv_f['fitted']['values'], fcst_cv_naive['fitted']['values'])# 在交叉验证中测试拟合失败时的回退模型
class FailedFit:
def __init__(self):
pass
def forecast(self):
pass
def fit(self, y, X):
raise Exception('Failed fit')
def __repr__(self):
return "FailedFit"
fcst_cv_f = ga.cross_validation(
models=[FailedFit()],
fallback_model=Naive(), h=2,
test_size=5,
refit=False,
fitted=True,
)
fcst_cv_naive = ga.cross_validation(
models=[Naive()],
h=2,
test_size=5,
refit=False,
fitted=True,
)
test_eq(fcst_cv_f['forecasts'], fcst_cv_naive['forecasts'])
np.testing.assert_array_equal(fcst_cv_f['fitted']['values'], fcst_cv_naive['fitted']['values'])# 不重拟合的交叉验证测试
cv_starts = np.array([0, 8, 16])
res_cv_wo_refit = ga.cross_validation(models=[SumAhead()], h=2, test_size=5, refit=False, level=(50, 60))
res_cv_refit = ga.cross_validation(models=[SumAhead()], h=2, test_size=5, refit=True, level=(50, 60))
test_fail(test_eq, args=(res_cv_wo_refit['forecasts'], res_cv_refit['forecasts']))
#测试首次预测相等
test_eq(
res_cv_wo_refit['forecasts'][cv_starts],
res_cv_refit['forecasts'][cv_starts]
)
# 对于改装=2,前两个窗口应保持一致。
res_cv_refit2 = ga.cross_validation(models=[SumAhead()], h=2, test_size=5, refit=2)
test_eq(
res_cv_refit2['forecasts'][np.hstack([cv_starts + 0, cv_starts + 1]), 1],
res_cv_refit2['forecasts'][np.hstack([cv_starts + 2, cv_starts + 3]), 1],
)
# 而接下来的两个窗口应该是一样的。
test_eq(
res_cv_refit2['forecasts'][np.hstack([cv_starts + 4, cv_starts + 5]), 1],
res_cv_refit2['forecasts'][np.hstack([cv_starts + 6, cv_starts + 7]), 1],
)
# 但它们之间有所不同。
test_fail(
lambda: test_eq(
res_cv_refit2['forecasts'][np.hstack([cv_starts + 0, cv_starts + 1]), 1],
res_cv_refit2['forecasts'][np.hstack([cv_starts + 4, cv_starts + 5]), 1],
)
)from statsforecast.models import AutoCESres_cv_wo_refit = ga.cross_validation(models=[AutoCES()], h=2, test_size=5, refit=False, level=(50, 60))
res_cv_refit = ga.cross_validation(models=[AutoCES()], h=2, test_size=5, refit=True, level=(50, 60))
test_fail(test_eq, args=(res_cv_wo_refit['forecasts'], res_cv_refit['forecasts']))
#测试首次预测相等
test_eq(
res_cv_wo_refit['forecasts'][[0, 8, 16]],
res_cv_refit['forecasts'][[0, 8, 16]]
)def _get_n_jobs(n_groups, n_jobs):
if n_jobs == -1 or (n_jobs is None):
actual_n_jobs = os.cpu_count()
else:
actual_n_jobs = n_jobs
return min(n_groups, actual_n_jobs)#测试中涉及的系列数量超过资源数量
test_eq(_get_n_jobs(100, -1), os.cpu_count())
test_eq(_get_n_jobs(100, None), os.cpu_count())
test_eq(_get_n_jobs(100, 2), 2)#测试资源数量多于系列数量
test_eq(_get_n_jobs(1, -1), 1)
test_eq(_get_n_jobs(1, None), 1)
test_eq(_get_n_jobs(2, 10), 2)def _warn_df_constructor():
warnings.warn(
"The `df` argument of the StatsForecast constructor as well as reusing stored "
"dfs from other methods is deprecated and will raise an error in a future version. "
"Please provide the `df` argument to the corresponding method instead, e.g. fit/forecast.",
category=FutureWarning,
)
def _maybe_warn_sort_df(sort_df):
if not sort_df:
warnings.warn(
"The `sort_df` argument is deprecated and will be removed in a future version. "
"You can leave it to its default value (True) to supress this warning",
category=FutureWarning,
)
def _warn_id_as_idx():
warnings.warn(
"In a future version the predictions will have the id as a column. "
"You can set the `NIXTLA_ID_AS_COL` environment variable "
"to adopt the new behavior and to suppress this warning.",
category=FutureWarning,
)
def _id_as_idx() -> bool:
return not bool(os.getenv('NIXTLA_ID_AS_COL', ''))_param_descriptions = {
'freq': """freq : str or int
Frequency of the data. Must be a valid pandas or polars offset alias, or an integer.""",
'df': """df : pandas or polars DataFrame, optional (default=None)
DataFrame with ids, times, targets and exogenous.""",
'sort_df': """sort_df : bool (default=True)
Sort `df` by ids and times.""",
'fallback_model': """fallback_model : Any, optional (default=None)
Any, optional (default=None)
Model to be used if a model fails.
Only works with the `forecast` and `cross_validation` methods.""",
'id_col': """id_col : str (default='unique_id')
Column that identifies each serie.""",
'time_col': """time_col : str (default='ds')
Column that identifies each timestep, its values can be timestamps or integers.""",
'target_col': """target_col : str (default='y')
Column that contains the target.""",
'h': """h : int
Forecast horizon.""",
'X_df': """X_df : pandas or polars DataFrame, optional (default=None)
DataFrame with ids, times and future exogenous.""",
'level': """level : List[float], optional (default=None)
Confidence levels between 0 and 100 for prediction intervals.""",
'prediction_intervals': """prediction_intervals : ConformalIntervals, optional (default=None)
Configuration to calibrate prediction intervals (Conformal Prediction).""",
'fitted': """fitted : bool (default=False)
Store in-sample predictions.""",
'n_jobs': """n_jobs : int (default=1)
Number of jobs used in the parallel processing, use -1 for all cores.""",
'verbose': """verbose : bool (default=True)
Prints TQDM progress bar when `n_jobs=1`.""",
'models': """models : List[Any]
List of instantiated objects models.StatsForecast.""",
'n_windows': """n_windows : int (default=1)
Number of windows used for cross validation.""",
'step_size': """step_size : int (default=1)
Step size between each window.""",
'test_size': """test_size : int, optional (default=None)
Length of test size. If passed, set `n_windows=None`.""",
'input_size': """input_size : int, optional (default=None)
Input size for each window, if not none rolled windows.""",
'refit': """refit : bool or int (default=True)
Wether or not refit the model for each window.
If int, train the models every `refit` windows.""",
}class _StatsForecast:
"""The `StatsForecast` class allows you to efficiently fit multiple `StatsForecast` models
for large sets of time series. It operates on a DataFrame `df` with at least three columns
ids, times and targets.
The class has memory-efficient `StatsForecast.forecast` method that avoids storing partial
model outputs. While the `StatsForecast.fit` and `StatsForecast.predict` methods with
Scikit-learn interface store the fitted models.
The `StatsForecast` class offers parallelization utilities with Dask, Spark and Ray back-ends.
See distributed computing example [here](https://github.com/Nixtla/statsforecast/tree/main/experiments/ray).
"""
def __init__(
self,
models: List[Any],
freq: Union[str, int],
n_jobs: int = 1,
df: Optional[DataFrame] = None,
sort_df: bool = True,
fallback_model: Optional[Any] = None,
verbose: bool = False,
):
"""训练统计模型。
参数
----------
{models}
{freq}
{n_jobs}
{df}
{sort_df}
{fallback_model}
{verbose}
"""
# 待办 @fede:残差计算所需,稍后考虑
self.models = models
self._validate_model_names()
self.freq = freq
self.n_jobs = n_jobs
self.fallback_model = fallback_model
self.verbose = verbose
if df is not None:
_warn_df_constructor()
self._prepare_fit(df=df, sort_df=sort_df)
else:
_maybe_warn_sort_df(sort_df)
__init__.__doc__ = __init__.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
def _validate_model_names(self):
# Some test models don't have alias
names = [getattr(model, 'alias', lambda: None) for model in self.models]
names = [x for x in names if x is not None]
if len(names) != len(set(names)):
raise ValueError('Model names must be unique. You can use `alias` to set a unique name for each model.')
def _prepare_fit(
self,
df: Optional[DataFrame],
sort_df: bool = True,
id_col: str = 'unique_id',
time_col: str = 'ds',
target_col: str = 'y',
) -> None:
if df is None:
if not hasattr(self, 'ga'):
raise ValueError('You must provide the `df` argument.')
_warn_df_constructor()
return
df = ensure_time_dtype(df, time_col)
validate_freq(df[time_col], self.freq)
if isinstance(df, pd.DataFrame) and df.index.name == id_col:
warnings.warn(
"Passing unique_id as the index is deprecated. "
"Please provide it as a column instead.",
category=FutureWarning
)
df = df.reset_index()
_maybe_warn_sort_df(sort_df)
self.uids, last_times, data, indptr, sort_idxs = ufp.process_df(
df, id_col, time_col, target_col
)
if isinstance(df, pd.DataFrame):
self.last_dates = pd.Index(last_times, name=time_col)
else:
self.last_dates = pl_Series(last_times)
self.ga = GroupedArray(data, indptr)
self.og_dates = df[time_col].to_numpy()
if sort_idxs is not None:
self.og_dates = self.og_dates[sort_idxs]
self.n_jobs = _get_n_jobs(len(self.ga), self.n_jobs)
self.df_constructor = type(df)
self.id_col = id_col
self.time_col = time_col
self.target_col = target_col
self._exog = [c for c in df.columns if c not in (id_col, time_col, target_col)]
def _validate_sizes_for_prediction_intervals(
self,
prediction_intervals: Optional[ConformalIntervals],
offset: int = 0,
) -> None:
if prediction_intervals is None:
return
sizes = np.diff(self.ga.indptr) - offset
# the absolute minimum requires two windows
min_samples = 2 * prediction_intervals.h + 1
if np.any(sizes < min_samples):
raise ValueError(
f'Minimum samples for computing prediction intervals are {min_samples + offset:,}, '
'some series have less. Please remove them or adjust the horizon.'
)
# required samples for current configuration
required_samples = prediction_intervals.n_windows * prediction_intervals.h + 1
if np.any(sizes < required_samples):
warnings.warn(
f'Prediction intervals settings require at least {required_samples + offset:,} samples, '
'some series have less and will use less windows.'
)
def _set_prediction_intervals(
self, prediction_intervals: Optional[ConformalIntervals]
) -> None:
for model in self.models:
interval = getattr(model, "prediction_intervals", None)
if interval is None:
setattr(model, "prediction_intervals", prediction_intervals)
def fit(
self,
df: Optional[DataFrame] = None,
sort_df: bool = True,
prediction_intervals: Optional[ConformalIntervals] = None,
id_col: str = 'unique_id',
time_col: str = 'ds',
target_col: str = 'y',
):
"""Fit statistical models.
Fit `models` to a large set of time series from DataFrame `df`
and store fitted models for later inspection.
Parameters
----------
{df}
If None, the `StatsForecast` class should have been instantiated using `df`.
{sort_df}
{prediction_intervals}
{id_col}
{time_col}
{target_col}
Returns
-------
self : StatsForecast
Returns with stored `StatsForecast` fitted `models`.
"""
self._prepare_fit(
df=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
)
self._validate_sizes_for_prediction_intervals(prediction_intervals)
self._set_prediction_intervals(prediction_intervals=prediction_intervals)
if self.n_jobs == 1:
self.fitted_ = self.ga.fit(models=self.models, fallback_model=self.fallback_model)
else:
self.fitted_ = self._fit_parallel()
return self
fit.__doc__ = fit.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
def _make_future_df(self, h: int):
start_dates = ufp.offset_times(self.last_dates, freq=self.freq, n=1)
dates = ufp.time_ranges(start_dates, freq=self.freq, periods=h)
uids = ufp.repeat(self.uids, n=h)
df = self.df_constructor({self.id_col: uids, self.time_col: dates})
if isinstance(df, pd.DataFrame):
if _id_as_idx():
_warn_id_as_idx()
df = df.set_index(self.id_col)
else:
df = df.reset_index(drop=True)
return df
def _parse_X_level(self, h: int, X: Optional[DataFrame], level: Optional[List[int]]):
if level is None:
level = []
if X is None:
return X, level
expected_shape = (h * len(self.ga), self.ga.data.shape[1] + 1)
if X.shape != expected_shape:
raise ValueError(f'Expected X to have shape {expected_shape}, but got {X.shape}')
_, _, data, indptr, _ = ufp.process_df(X, self.id_col, self.time_col, None)
return GroupedArray(data, indptr), level
def _validate_exog(self, X_df: Optional[DataFrame] = None) -> None:
if not any(m.uses_exog for m in self.models) or not self._exog:
return
err_msg = (
f'Models require the following exogenous features {self._exog} '
'for the forecasting step. Please provide them through `X_df`.'
)
if X_df is None:
raise ValueError(err_msg)
missing_exog = [c for c in self._exog if c not in X_df.columns]
if missing_exog:
raise ValueError(err_msg)
def predict(
self,
h: int,
X_df: Optional[DataFrame] = None,
level: Optional[List[int]] = None,
):
"""Predict statistical models.
Use stored fitted `models` to predict large set of time series from DataFrame `df`.
Parameters
----------
{h}
{X_df}
{level}
Returns
-------
fcsts_df : pandas or polars DataFrame
DataFrame with `models` columns for point predictions and probabilistic
predictions for all fitted `models`.
"""
if not hasattr(self, 'fitted_'):
raise ValueError('You must call the fit method before calling predict.')
if any(getattr(m, 'prediction_intervals', None) is not None for m in self.models) and level is None:
warnings.warn(
"Prediction intervals are set but `level` was not provided. "
"Predictions won't have intervals."
)
self._validate_exog(X_df)
X, level = self._parse_X_level(h=h, X=X_df, level=level)
if self.n_jobs == 1:
fcsts, cols = self.ga.predict(fm=self.fitted_, h=h, X=X, level=level)
else:
fcsts, cols = self._predict_parallel(h=h, X=X, level=level)
fcsts_df = self._make_future_df(h=h)
fcsts_df[cols] = fcsts
return fcsts_df
predict.__doc__ = predict.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
def fit_predict(
self,
h: int,
df: Optional[DataFrame] = None,
X_df: Optional[DataFrame] = None,
level: Optional[List[int]] = None,
sort_df: bool = True,
prediction_intervals: Optional[ConformalIntervals] = None,
id_col: str = 'unique_id',
time_col: str = 'ds',
target_col: str = 'y',
) -> DataFrame:
"""Fit and Predict with statistical models.
This method avoids memory burden due from object storage.
It is analogous to Scikit-Learn `fit_predict` without storing information.
It requires the forecast horizon `h` in advance.
In contrast to `StatsForecast.forecast` this method stores partial models outputs.
Parameters
----------
{h}
{df}
If None, the `StatsForecast` class should have been instantiated using `df`.
{X_df}
{level}
{sort_df}
{prediction_intervals}
{id_col}
{time_col}
{target_col}
Returns
-------
fcsts_df : pandas or polars DataFrame
DataFrame with `models` columns for point predictions and probabilistic
predictions for all fitted `models`.
"""
self._prepare_fit(
df=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
)
self._validate_exog(X_df)
if prediction_intervals is not None and level is None:
raise ValueError('You must specify `level` when using `prediction_intervals`')
self._validate_sizes_for_prediction_intervals(prediction_intervals)
self._set_prediction_intervals(prediction_intervals=prediction_intervals)
X, level = self._parse_X_level(h=h, X=X_df, level=level)
if self.n_jobs == 1:
self.fitted_, fcsts, cols = self.ga.fit_predict(models=self.models, h=h, X=X, level=level)
else:
self.fitted_, fcsts, cols = self._fit_predict_parallel(h=h, X=X, level=level)
fcsts_df = self._make_future_df(h=h)
fcsts_df[cols] = fcsts
return fcsts_df
fit_predict.__doc__ = fit_predict.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
def forecast(
self,
h: int,
df: Optional[DataFrame] = None,
X_df: Optional[DataFrame] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
sort_df: bool = True,
prediction_intervals: Optional[ConformalIntervals] = None,
id_col: str = 'unique_id',
time_col: str = 'ds',
target_col: str = 'y',
) -> DataFrame:
"""Memory Efficient predictions.
This method avoids memory burden due from object storage.
It is analogous to Scikit-Learn `fit_predict` without storing information.
It requires the forecast horizon `h` in advance.
Parameters
----------
{h}
{df}
{X_df}
{level}
{fitted}
{sort_df}
{prediction_intervals}
{id_col}
{time_col}
{target_col}
Returns
-------
fcsts_df : pandas or polars DataFrame
DataFrame with `models` columns for point predictions and probabilistic
predictions for all fitted `models`.
"""
self.__dict__.pop('fcst_fitted_values_', None)
self._prepare_fit(
df=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
)
self._validate_exog(X_df)
self._validate_sizes_for_prediction_intervals(prediction_intervals)
self._set_prediction_intervals(prediction_intervals=prediction_intervals)
X, level = self._parse_X_level(h=h, X=X_df, level=level)
if self.n_jobs == 1:
res_fcsts = self.ga.forecast(
models=self.models,
h=h,
fallback_model=self.fallback_model,
fitted=fitted,
X=X,
level=level,
verbose=self.verbose,
target_col=target_col,
)
else:
res_fcsts = self._forecast_parallel(
h=h,
fitted=fitted,
X=X,
level=level,
target_col=target_col,
)
if fitted:
self.fcst_fitted_values_ = res_fcsts['fitted']
fcsts = res_fcsts['forecasts']
cols = res_fcsts['cols']
fcsts_df = self._make_future_df(h=h)
fcsts_df[cols] = fcsts
self.forecast_times_ = res_fcsts['times']
return fcsts_df
forecast.__doc__ = forecast.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
def forecast_fitted_values(self):
"""Access insample predictions.
After executing `StatsForecast.forecast`, you can access the insample
prediction values for each model. To get them, you need to pass `fitted=True`
to the `StatsForecast.forecast` method and then use the
`StatsForecast.forecast_fitted_values` method.
Returns
-------
fcsts_df : pandas.DataFrame | polars.DataFrame
DataFrame with insample `models` columns for point predictions and probabilistic
predictions for all fitted `models`.
"""
if not hasattr(self, "fcst_fitted_values_"):
raise Exception("Please run `forecast` method using `fitted=True`")
cols = self.fcst_fitted_values_["cols"]
df = self.df_constructor({
self.id_col: ufp.repeat(self.uids, np.diff(self.ga.indptr)),
self.time_col: self.og_dates
})
df[cols] = self.fcst_fitted_values_['values']
if isinstance(df, pd.DataFrame):
if _id_as_idx():
_warn_id_as_idx()
df = df.set_index(self.id_col)
else:
df = df.reset_index(drop=True)
return df
def cross_validation(
self,
h: int,
df: Optional[DataFrame] = None,
n_windows: int = 1,
step_size: int = 1,
test_size: Optional[int] = None,
input_size: Optional[int] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
refit: Union[bool, int] = True,
sort_df: bool = True,
prediction_intervals: Optional[ConformalIntervals] = None,
id_col: str = 'unique_id',
time_col: str = 'ds',
target_col: str = 'y',
) -> DataFrame:
"""Temporal Cross-Validation.
Efficiently fits a list of `StatsForecast`
models through multiple training windows, in either chained or rolled manner.
`StatsForecast.models`' speed allows to overcome this evaluation technique
high computational costs. Temporal cross-validation provides better model's
generalization measurements by increasing the test's length and diversity.
Parameters
----------
{h}
{df}
If None, the `StatsForecast` class should have been instantiated using `df`.
{n_windows}
{step_size}
{test_size}
{input_size}
{level}
{fitted}
{refit}
{sort_df}
{prediction_intervals}
{id_col}
{time_col}
{target_col}
Returns
-------
fcsts_df : pandas or polars DataFrame
DataFrame with insample `models` columns for point predictions and probabilistic
predictions for all fitted `models`.
"""
if n_windows is None and test_size is None:
raise ValueError('you must define `n_windows` or `test_size`')
if test_size is None:
test_size = h + step_size * (n_windows - 1)
if prediction_intervals is not None and level is None:
raise ValueError('You must specify `level` when using `prediction_intervals`')
if refit != True:
no_forward = [m for m in self.models if not hasattr(m, 'forward')]
if no_forward:
raise ValueError(
'Can only use integer refit or refit=False with models that implement the forward method. '
f'The following models do not implement the forward method: {no_forward}.'
)
if self.fallback_model is not None and not hasattr(self.fallback_model, 'forward'):
raise ValueError(
'Can only use integer refit or refit=False with a fallback model that implements the forward method.'
)
self.__dict__.pop('cv_fitted_values_', None)
self._prepare_fit(
df=df, sort_df=sort_df, id_col=id_col, time_col=time_col, target_col=target_col
)
series_sizes = np.diff(self.ga.indptr)
short_series = series_sizes <= test_size
if short_series.any():
short_ids = self.uids[short_series].to_numpy().tolist()
raise ValueError(
f"The following series are too short for the cross validation settings: {reprlib.repr(short_ids)}\n"
"Please remove these series or change the settings, e.g. reducing the horizon or the number of windows."
)
self._validate_sizes_for_prediction_intervals(
prediction_intervals=prediction_intervals, offset=test_size
)
self._set_prediction_intervals(prediction_intervals=prediction_intervals)
_, level = self._parse_X_level(h=h, X=None, level=level)
if self.n_jobs == 1:
res_fcsts = self.ga.cross_validation(
models=self.models, h=h, test_size=test_size,
fallback_model=self.fallback_model,
step_size=step_size,
input_size=input_size,
fitted=fitted,
level=level,
verbose=self.verbose,
refit=refit,
target_col=target_col,
)
else:
res_fcsts = self._cross_validation_parallel(
h=h,
test_size=test_size,
step_size=step_size,
input_size=input_size,
fitted=fitted,
level=level,
refit=refit,
target_col=target_col,
)
if fitted:
self.cv_fitted_values_ = res_fcsts['fitted']
self.n_cv_ = n_windows
fcsts_df = ufp.cv_times(
times=self.og_dates,
uids=self.uids,
indptr=self.ga.indptr,
h=h,
test_size=test_size,
step_size=step_size,
id_col=id_col,
time_col=time_col,
)
# the cv_times is sorted by window and then id
fcsts_df = ufp.sort(fcsts_df, [id_col, "cutoff", time_col])
fcsts_df = ufp.assign_columns(fcsts_df, res_fcsts["cols"], res_fcsts["forecasts"])
if isinstance(fcsts_df, pd.DataFrame) and _id_as_idx():
_warn_id_as_idx()
fcsts_df = fcsts_df.set_index(id_col)
return fcsts_df
cross_validation.__doc__ = cross_validation.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
def cross_validation_fitted_values(self) -> DataFrame:
"""Access insample cross validated predictions.
After executing `StatsForecast.cross_validation`, you can access the insample
prediction values for each model and window. To get them, you need to pass `fitted=True`
to the `StatsForecast.cross_validation` method and then use the
`StatsForecast.cross_validation_fitted_values` method.
Returns
-------
fcsts_df : pandas or polars DataFrame
DataFrame with insample `models` columns for point predictions
and probabilistic predictions for all fitted `models`.
"""
if not hasattr(self, 'cv_fitted_values_'):
raise Exception('Please run `cross_validation` method using `fitted=True`')
idxs = self.cv_fitted_values_['idxs'].flatten(order='F')
train_uids = ufp.repeat(self.uids, np.diff(self.ga.indptr))
cv_uids = ufp.vertical_concat([train_uids for _ in range(self.n_cv_)])
used_uids = ufp.take_rows(cv_uids, idxs)
dates = np.tile(self.og_dates, self.n_cv_)[idxs]
cutoffs_mask = self.cv_fitted_values_['last_idxs'].flatten(order='F')[idxs]
cutoffs_sizes = np.diff(np.append(0, np.where(cutoffs_mask)[0] + 1))
cutoffs = np.repeat(dates[cutoffs_mask], cutoffs_sizes)
df = self.df_constructor({
self.id_col: used_uids,
self.time_col: dates,
'cutoff': cutoffs,
})
fitted_vals = np.reshape(
self.cv_fitted_values_['values'],
(-1, len(self.models) + 1),
order='F',
)
df = ufp.assign_columns(df, self.cv_fitted_values_['cols'], fitted_vals[idxs])
df = ufp.drop_index_if_pandas(df)
if isinstance(df, pd.DataFrame):
if _id_as_idx():
_warn_id_as_idx()
df = df.set_index(self.id_col)
else:
df = df.reset_index(drop=True)
return df
def _get_pool(self):
from multiprocessing import Pool
pool_kwargs = dict()
return Pool, pool_kwargs
def _fit_parallel(self):
gas = self.ga.split(self.n_jobs)
Pool, pool_kwargs = self._get_pool()
with Pool(self.n_jobs, **pool_kwargs) as executor:
futures = []
for ga in gas:
future = executor.apply_async(
ga._single_threaded_fit,
(self.models, self.fallback_model)
)
futures.append(future)
fm = np.vstack([f.get() for f in futures])
return fm
def _get_gas_Xs(self, X, tasks_per_job=1):
n_chunks = min(tasks_per_job * self.n_jobs, self.ga.n_groups)
gas = self.ga.split(n_chunks)
if X is not None:
Xs = X.split(n_chunks)
else:
from itertools import repeat
Xs = repeat(None)
return gas, Xs
def _predict_parallel(self, h, X, level):
#create elements for each core
gas, Xs = self._get_gas_Xs(X=X)
fms = self.ga.split_fm(self.fitted_, self.n_jobs)
Pool, pool_kwargs = self._get_pool()
#compute parallel forecasts
with Pool(self.n_jobs, **pool_kwargs) as executor:
futures = []
for ga, fm, X_ in zip(gas, fms, Xs):
future = executor.apply_async(
ga._single_threaded_predict,
(fm, h, X_, level),
)
futures.append(future)
out = [f.get() for f in futures]
fcsts, cols = list(zip(*out))
fcsts = np.vstack(fcsts)
cols = cols[0]
return fcsts, cols
def _fit_predict_parallel(self, h, X, level):
#create elements for each core
gas, Xs = self._get_gas_Xs(X=X)
Pool, pool_kwargs = self._get_pool()
#compute parallel forecasts
with Pool(self.n_jobs, **pool_kwargs) as executor:
futures = []
for ga, X_ in zip(gas, Xs):
future = executor.apply_async(
ga._single_threaded_fit_predict,
(self.models, h, X_, level),
)
futures.append(future)
out = [f.get() for f in futures]
fm, fcsts, cols = list(zip(*out))
fm = np.vstack(fm)
fcsts = np.vstack(fcsts)
cols = cols[0]
return fm, fcsts, cols
def _forecast_parallel(self, h, fitted, X, level, target_col):
gas, Xs = self._get_gas_Xs(X=X, tasks_per_job=100)
results = [None] * len(gas)
with ProcessPoolExecutor(self.n_jobs) as executor:
future2pos = {
executor.submit(
ga._single_threaded_forecast,
h=h,
models=self.models,
fallback_model=self.fallback_model,
fitted=fitted,
X=X,
level=level,
verbose=False,
target_col=target_col,
): i
for i, (ga, X) in enumerate(zip(gas, Xs))
}
iterable = tqdm(
as_completed(future2pos),
disable=not self.verbose,
total=len(future2pos),
desc="Forecast",
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [Elapsed: {elapsed}{postfix}]",
)
for future in iterable:
i = future2pos[future]
results[i] = future.result()
result = {
'cols': results[0]['cols'],
'forecasts': np.vstack([r['forecasts'] for r in results]),
'times': {
m: sum(r['times'][m] for r in results)
for m in [repr(m) for m in self.models]
},
}
if fitted:
result['fitted'] = {
'cols': results[0]['fitted']['cols'],
'values': np.vstack([r['fitted']['values'] for r in results]),
}
return result
def _cross_validation_parallel(self, h, test_size, step_size, input_size, fitted, level, refit, target_col):
#create elements for each core
gas = self.ga.split(self.n_jobs)
Pool, pool_kwargs = self._get_pool()
#compute parallel forecasts
result = {}
with Pool(self.n_jobs, **pool_kwargs) as executor:
futures = []
for ga in gas:
future = executor.apply_async(
ga._single_threaded_cross_validation,
tuple(),
dict(
models=self.models,
h=h,
test_size=test_size,
fallback_model=self.fallback_model,
step_size=step_size,
input_size=input_size,
fitted=fitted,
level=level,
refit=refit,
verbose=self.verbose,
target_col=target_col,
),
)
futures.append(future)
out = [f.get() for f in futures]
fcsts = [d['forecasts'] for d in out]
fcsts = np.vstack(fcsts)
cols = out[0]['cols']
result['forecasts'] = fcsts
result['cols'] = cols
if fitted:
result['fitted'] = {}
result['fitted']['values'] = np.concatenate([d['fitted']['values'] for d in out])
for key in ['last_idxs', 'idxs']:
result['fitted'][key] = np.concatenate([d['fitted'][key] for d in out])
result['fitted']['cols'] = out[0]['fitted']['cols']
return result
@staticmethod
def plot(
df: DataFrame,
forecasts_df: Optional[DataFrame] = None,
unique_ids: Union[Optional[List[str]], np.ndarray] = None,
plot_random: bool = True,
models: Optional[List[str]] = None,
level: Optional[List[float]] = None,
max_insample_length: Optional[int] = None,
plot_anomalies: bool = False,
engine: str = 'matplotlib',
id_col: str = 'unique_id',
time_col: str = 'ds',
target_col: str = 'y',
resampler_kwargs: Optional[Dict] = None
):
"""Plot forecasts and insample values.
Parameters
----------
{df}
forecasts_df : pandas or polars DataFrame, optional (default=None)
DataFrame ids, times and models.
unique_ids : list of str, optional (default=None)
ids to plot. If None, they're selected randomly.
plot_random : bool (default=True)
Select time series to plot randomly.
models : List[str], optional (default=None)
List of models to plot.
level : List[float], optional (default=None)
List of prediction intervals to plot if paseed.
max_insample_length : int, optional (default=None)
Max number of train/insample observations to be plotted.
plot_anomalies : bool (default=False)
Plot anomalies for each prediction interval.
engine : str (default='matplotlib')
Library used to plot. 'plotly', 'plotly-resampler' or 'matplotlib'.
{id_col}
{time_col}
{target_col}
resampler_kwargs : dict
Kwargs to be passed to plotly-resampler constructor.
For further custumization ("show_dash") call the method,
store the plotting object and add the extra arguments to
its `show_dash` method.
"""
from utilsforecast.plotting import plot_series
df = ensure_time_dtype(df, time_col)
if isinstance(df, pd.DataFrame) and df.index.name == id_col:
warnings.warn(
"Passing the ids as the index is deprecated. "
"Please provide them as a column instead.",
category=FutureWarning
)
df = df.reset_index()
if forecasts_df is not None:
forecasts_df = ensure_time_dtype(forecasts_df, time_col)
if isinstance(forecasts_df, pd.DataFrame) and forecasts_df.index.name == id_col:
warnings.warn(
"Passing the ids as the index is deprecated. "
"Please provide them as a column instead.",
category=FutureWarning
)
forecasts_df = forecasts_df.reset_index()
return plot_series(
df=df,
forecasts_df=forecasts_df,
ids=unique_ids,
plot_random=plot_random,
models=models,
level=level,
max_insample_length=max_insample_length,
plot_anomalies=plot_anomalies,
engine=engine,
resampler_kwargs=resampler_kwargs,
palette='tab20b',
id_col=id_col,
time_col=time_col,
target_col=target_col,
)
def save(
self,
path: Optional[Union[Path, str]] = None,
max_size: Optional[str] = None,
trim: bool = False,
):
"""Function that will save StatsForecast class with certain settings to make it
reproducible.
Parameters
----------
path : str or pathlib.Path, optional (default=None)
Path of the file to be saved. If `None` will create one in the current
directory using the current UTC timestamp.
max_size : str, optional (default = None)
StatsForecast object should not exceed this size.
Available byte naming: ['B', 'KB', 'MB', 'GB']
trim : bool (default = False)
Delete any attributes not needed for inference.
"""
# Will be used to find the size of the fitted models
# Never expecting anything higher than GB (even that's a lot')
bytes_hmap = {
"B": 1,
"KB": 2**10,
"MB": 2**20,
"GB": 2**30,
}
# Removing unnecessary attributes
# @jmoralez decide future implementation
trim_attr:list = ["fcst_fitted_values_", "cv_fitted_values_"]
if trim:
for attr in trim_attr:
# remove unnecessary attributes here
self.__dict__.pop(attr, None)
sf_size = len(pickle.dumps(self))
if max_size is not None:
cap_size = self._get_cap_size(max_size, bytes_hmap)
if sf_size >= cap_size:
err_messg = "StatsForecast is larger than the specified max_size"
raise OSError(errno.EFBIG, err_messg)
converted_size, sf_byte = None, None
for key in reversed(list(bytes_hmap.keys())):
x_byte = bytes_hmap[key]
if sf_size >= x_byte:
converted_size = sf_size / x_byte
sf_byte = key
break
if converted_size is None or sf_byte is None:
err_messg = "Internal Error, this shouldn't happen, please open an issue"
raise RuntimeError(err_messg)
print(f"Saving StatsForecast object of size {converted_size:.2f}{sf_byte}.")
if path is None:
datetime_record = dt.datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S")
path = f"StatsForecast_{datetime_record}.pkl"
with open(path, "wb") as m_file:
pickle.dump(self, m_file)
print("StatsForecast object saved")
def _get_cap_size(self, max_size, bytes_hmap):
max_size = max_size.upper().replace(" ", "")
match = re.match(r'(\d+\.\d+|\d+)(\w+)', max_size)
if match is None or len(match.groups()) < 2 or match[2] not in bytes_hmap.keys():
parsing_error = "Couldn't parse `max_size`, it should be `None`", \
" or a number followed by one of the following units: ['B', 'KB', 'MB', 'GB']"
raise ValueError(parsing_error)
else:
m_size = float(match[1])
key_ = match[2]
cap_size = m_size * bytes_hmap[key_]
return cap_size
@staticmethod
def load(path:Union[Path, str]):
"""
Automatically loads the model into ready StatsForecast.
Parameters
----------
path : str or pathlib.Path
Path to saved StatsForecast file.
Returns
-------
sf: StatsForecast
Previously saved StatsForecast
"""
if not Path(path).exists():
raise ValueError("Specified path does not exist, check again and retry.")
with open(path, "rb") as f:
return pickle.load(f)
def __repr__(self):
return f"StatsForecast(models=[{','.join(map(repr, self.models))}])"
_StatsForecast.plot.__doc__ = _StatsForecast.plot.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]class ParallelBackend:
def forecast(
self,
*,
models,
fallback_model,
freq,
h,
df,
X_df,
level,
fitted,
prediction_intervals,
id_col,
time_col,
target_col,
) -> Any:
model = _StatsForecast(
models=models,
freq=freq,
fallback_model=fallback_model,
)
return model.forecast(
df=df,
h=h,
X_df=X_df,
level=level,
fitted=fitted,
prediction_intervals=prediction_intervals,
id_col=id_col,
time_col=time_col,
target_col=target_col,
)
def cross_validation(
self,
*,
df,
models,
freq,
fallback_model,
h,
n_windows,
step_size,
test_size,
input_size,
level,
refit,
fitted,
prediction_intervals,
id_col,
time_col,
target_col,
) -> Any:
model = _StatsForecast(
models=models,
freq=freq,
fallback_model=fallback_model,
)
return model.cross_validation(
df=df,
h=h,
n_windows=n_windows,
step_size=step_size,
test_size=test_size,
input_size=input_size,
level=level,
refit=refit,
fitted=fitted,
prediction_intervals=prediction_intervals,
id_col=id_col,
time_col=time_col,
target_col=target_col,
)
@conditional_dispatcher
def make_backend(obj:Any, *args:Any, **kwargs:Any) -> ParallelBackend:
return ParallelBackend()class StatsForecast(_StatsForecast):
def forecast(
self,
h: int,
df: Any = None,
X_df: Optional[DataFrame] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
sort_df: bool = True,
prediction_intervals: Optional[ConformalIntervals] = None,
id_col: str = 'unique_id',
time_col: str = 'ds',
target_col: str = 'y',
):
if prediction_intervals is not None and level is None:
raise ValueError('You must specify `level` when using `prediction_intervals`')
if self._is_native(df=df):
return super().forecast(
df=df,
h=h,
X_df=X_df,
level=level,
fitted=fitted,
sort_df=sort_df,
prediction_intervals=prediction_intervals,
id_col=id_col,
time_col=time_col,
target_col=target_col,
)
assert df is not None
engine = make_execution_engine(infer_by=[df])
self._backend = make_backend(engine)
return self._backend.forecast(
models=self.models,
fallback_model=self.fallback_model,
freq=self.freq,
df=df,
h=h,
X_df=X_df,
level=level,
fitted=fitted,
prediction_intervals=prediction_intervals,
id_col=id_col,
time_col=time_col,
target_col=target_col,
)
def forecast_fitted_values(self):
if hasattr(self, '_backend'):
res = self._backend.forecast_fitted_values()
else:
res = super().forecast_fitted_values()
return res
def cross_validation(
self,
h: int,
df: Any = None,
n_windows: int = 1,
step_size: int = 1,
test_size: Optional[int] = None,
input_size: Optional[int] = None,
level: Optional[List[int]] = None,
fitted: bool = False,
refit: Union[bool, int] = True,
sort_df: bool = True,
prediction_intervals: Optional[ConformalIntervals] = None,
id_col: str = 'unique_id',
time_col: str = 'ds',
target_col: str = 'y',
):
if self._is_native(df=df):
return super().cross_validation(
h=h,
df=df,
n_windows=n_windows,
step_size=step_size,
test_size=test_size,
input_size=input_size,
level=level,
fitted=fitted,
refit=refit,
sort_df=sort_df,
prediction_intervals=prediction_intervals,
id_col=id_col,
time_col=time_col,
target_col=target_col,
)
assert df is not None
engine = make_execution_engine(infer_by=[df])
backend = make_backend(engine)
return backend.cross_validation(
df=df,
models=self.models,
freq=self.freq,
fallback_model=self.fallback_model,
h=h,
n_windows=n_windows,
step_size=step_size,
test_size=test_size,
input_size=input_size,
level=level,
refit=refit,
fitted=fitted,
prediction_intervals=prediction_intervals,
id_col=id_col,
time_col=time_col,
target_col=target_col,
)
def _is_native(self, df) -> bool:
engine = try_get_context_execution_engine()
return engine is None and (df is None or isinstance(df, pd.DataFrame) or isinstance(df, pl_DataFrame))show_doc(StatsForecast, title_level=2, name='StatsForecast')# StatsForecast 的类使用示例
#来自 statsforecast.core 的 StatsForecast
from statsforecast.models import (
ADIDA,
AutoARIMA,
CrostonClassic,
CrostonOptimized,
CrostonSBA,
HistoricAverage,
IMAPA,
Naive,
RandomWalkWithDrift,
SeasonalExponentialSmoothing,
SeasonalNaive,
SeasonalWindowAverage,
SimpleExponentialSmoothing,
TSB,
WindowAverage,
DynamicOptimizedTheta,
AutoETS,
AutoCES
)
# 生成合成面板数据框示例
panel_df = generate_series(n_series=9, equal_ends=False, engine='pandas')
panel_df.groupby('unique_id').tail(4)if 'NIXTLA_ID_AS_COL' in os.environ:
del os.environ['NIXTLA_ID_AS_COL']# 将id用作索引的警告
fcst = StatsForecast(models=[Naive()], freq='D')
fcst.fit(df=panel_df)
with warnings.catch_warnings(record=True) as issued_warnings:
warnings.simplefilter('always', category=FutureWarning)
std_preds = fcst.predict(h=1)
std_preds2 = fcst.fit_predict(df=panel_df, h=1)
std_fcst = fcst.forecast(df=panel_df, h=1, fitted=True)
std_fitted = fcst.forecast_fitted_values()
std_cv = fcst.cross_validation(df=panel_df, h=1, fitted=True)
std_fitted_cv = fcst.cross_validation_fitted_values()
assert len(issued_warnings) == 6
assert all('the predictions will have the id as a column' in str(w.message) for w in issued_warnings)os.environ['NIXTLA_ID_AS_COL'] = '1'# 如果我们使用包含外生变量的模型进行训练,则必须通过X_df提供这些外生变量。
# 否则将引发错误,提示此问题。
panel_with_exog = panel_df[panel_df['unique_id'] == 0].copy()
panel_with_exog['month'] = panel_df['ds'].dt.month
sf = StatsForecast(
models=[AutoARIMA(season_length=12)],
freq='M',
)
sf.fit(panel_with_exog)
expected_msg = "['month'] for the forecasting step. Please provide them through `X_df`"
test_fail(
lambda: sf.predict(h=12),
contains=expected_msg,
)
test_fail(
lambda: sf.forecast(df=panel_with_exog, h=12),
contains=expected_msg,
)
test_fail(
lambda: sf.fit_predict(df=panel_with_exog, h=12),
contains=expected_msg,
)
# if the models don't use exog then it continues
sf = StatsForecast(
models=[SeasonalNaive(season_length=10), Naive()],
freq='M',
)
sf.fit(panel_with_exog)
_ = sf.predict(h=12)# 检查具有预测区间的尺寸
fcst = StatsForecast(models=[Naive()], freq='D')
intervals = ConformalIntervals(n_windows=4, h=10)
# 间隔需要41个样本,而30个样本我们只能使用2个窗口,应发出警告。
with warnings.catch_warnings(record=True) as issued_warnings:
fcst.fit(df=panel_df.head(30), prediction_intervals=intervals)
assert 'will use less windows' in str(issued_warnings[0].message)
assert fcst.fitted_[0, 0]._cs.shape[0] == 2
# 如果我们拥有的样本少于21个(即两个窗口,h = 10 + 1用于训练),它应该会失败。
test_fail(
lambda: fcst.fit(df=panel_df.head(20), prediction_intervals=intervals),
contains='Please remove them or adjust the horizon',
)
# 对于CV,应考虑测试规模(CV为20,区间为21)
test_fail(
lambda: fcst.cross_validation(
df=panel_df.head(40),
n_windows=2,
step_size=10,
h=10,
prediction_intervals=intervals,
level=[80],
),
contains='Minimum samples for computing prediction intervals are 41',
)# 整数重置或重置=False 对不支持的模型引发错误
fcst = StatsForecast(models=[Naive(), SeasonalNaive(season_length=7)], freq='D')
test_fail(
lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=2),
contains='implement the forward method: [SeasonalNaive]'
)
test_fail(
lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=False),
contains='implement the forward method: [SeasonalNaive]'
)
fcst = StatsForecast(models=[Naive()], freq='D', fallback_model=SeasonalNaive(season_length=7))
test_fail(
lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=2),
contains='a fallback model that implements the forward method.'
)
test_fail(
lambda: fcst.cross_validation(df=panel_df, h=8, n_windows=4, refit=False),
contains='a fallback model that implements the forward method.'
)# 非标准列名
renamer = {'unique_id': 'uid', 'ds': 'time', 'y': 'target'}
kwargs = dict(id_col='uid', time_col='time', target_col='target')
inverse_renamer = {v: k for k, v in renamer.items()}
non_std_df = panel_df.rename(columns=renamer)
def assert_equal_results(df1, df2):
pd.testing.assert_frame_equal(
df1.reset_index(),
df2.rename(columns=inverse_renamer),
)
fcst = StatsForecast(models=[Naive()], freq='D')
fcst.fit(df=non_std_df, **kwargs)
non_std_preds = fcst.predict(h=1)
non_std_preds2 = fcst.fit_predict(df=non_std_df, h=1, **kwargs)
non_std_fcst = fcst.forecast(df=non_std_df, h=1, fitted=True, **kwargs)
non_std_fitted = fcst.forecast_fitted_values()
non_std_cv = fcst.cross_validation(df=non_std_df, h=1, fitted=True, **kwargs)
non_std_fitted_cv = fcst.cross_validation_fitted_values()
assert_equal_results(std_preds, non_std_preds)
assert_equal_results(std_preds2, non_std_preds2)
assert_equal_results(std_fcst, non_std_fcst)
assert_equal_results(std_fitted, non_std_fitted)
assert_equal_results(std_cv, non_std_cv)
assert_equal_results(std_fitted_cv, non_std_fitted_cv)# 声明要拟合的StatsForecast估计器实例列表
# You can try other estimator's hyperparameters
# You can try other methods from the `models.StatsForecast` collection
# Check them here: https://nixtla.github.io/statsforecast/models.html
models=[AutoARIMA(), Naive(),
AutoETS(), AutoARIMA(allowmean=True, alias='MeanAutoARIMA')]
# Instantiate StatsForecast class
fcst = StatsForecast(models=models,
freq='D',
n_jobs=1,
verbose=True)
# Efficiently predict
fcsts_df = fcst.forecast(df=panel_df, h=4, fitted=True)
fcsts_df.groupby('unique_id').tail(4)# 测试保存和加载
import tempfile
from polars.testing import assert_frame_equalwith tempfile.TemporaryDirectory() as td:
f_path = Path(td).joinpath("sf_test.pickle")
test_df = generate_series(n_series=9, equal_ends=False, engine='polars')
test_frcs = StatsForecast(
models=models,
freq='1d',
n_jobs=1,
verbose=True
)
origin_df = test_frcs.forecast(df=test_df, h=4, fitted=True)
test_frcs.save(f_path)
sf_test = StatsForecast.load(f_path)
load_df = sf_test.forecast(df=test_df, h=4, fitted=True)
assert_frame_equal(origin_df, load_df)# 测试自定义名称
test_eq(
fcsts_df.columns[-1],
'MeanAutoARIMA'
)# 测试无重复名称
test_fail(lambda: StatsForecast(models=[Naive(), Naive()], freq="D"))
StatsForecast(models=[Naive(), Naive(alias="Naive2")], freq="D")fig = StatsForecast.plot(panel_df, max_insample_length=10)
figtest_fail(
StatsForecast.plot,
contains='Please use a list',
kwargs={'df': panel_df, 'level': 90}
)fcst.plot(panel_df, fcsts_df, engine='matplotlib')# 以ds为对象的测试图
panel_df['ds'] = panel_df['ds'].astype(str)
fcst.plot(panel_df, fcsts_df)fcsts_df = fcst.forecast(df=panel_df, h=4, fitted=True, level=[90, 80, 30])
fcsts_df.groupby('unique_id').tail(4)
fcst.plot(panel_df, fcsts_df, models=['AutoARIMA', 'AutoETS'], level=[90, 80], max_insample_length=28)fcst.plot(fcst.forecast_fitted_values(),
forecasts_df=fcsts_df,
models=['AutoARIMA', 'AutoETS'], level=[80],
max_insample_length=20,
plot_anomalies=True)fcst.plot(panel_df, fcsts_df, models=['AutoARIMA', 'Naive'])fcst.plot(panel_df, fcsts_df, models=['AutoARIMA', 'Naive'], max_insample_length=28)fcst.plot(panel_df.query('unique_id in [0, 1]'), fcsts_df, models=['AutoARIMA', 'Naive'], level=[90])fcst.plot(panel_df, fcsts_df, unique_ids=[0, 1], models=['AutoARIMA', 'Naive'], level=[90])fcst.plot(panel_df.query('unique_id == 0'), fcsts_df, models=['AutoARIMA', 'Naive'], level=[90])fcst.plot(panel_df, fcsts_df.query('unique_id == 0'), models=['AutoARIMA', 'Naive'], level=[90])fcst.plot(panel_df, fcsts_df, unique_ids=[0], models=['AutoARIMA', 'Naive'], level=[90])fcst.plot(panel_df.query('unique_id in [0, 1, 3]'), fcsts_df, models=['AutoARIMA', 'Naive'], level=[90])fcst.plot(panel_df, fcsts_df, unique_ids=[0, 1, 2], level=[90])fig = fcst.plot(panel_df, fcsts_df, unique_ids=[0, 1, 2, 3, 4], models=['AutoARIMA', 'Naive'], level=[90])
figfig = fcst.plot(
panel_df, fcsts_df, unique_ids=[0, 1, 2, 3, 4],
models=['AutoARIMA', 'Naive'],
level=[90],
engine='matplotlib'
)
fig# 测试模型预测区间覆盖
models=[SimpleExponentialSmoothing(alpha=0.1, prediction_intervals=ConformalIntervals(h=24, n_windows=2))]
fcst = StatsForecast(models=models, freq='D', n_jobs=1)
fcst._set_prediction_intervals(None)
assert models[0].prediction_intervals is not Nonefcst = StatsForecast(models=[AutoARIMA(season_length=7)],
freq='D',
n_jobs=1,
verbose=True)
fcsts_df = fcst.forecast(df=panel_df, h=4, fitted=True, level=[90])
fcsts_df.groupby('unique_id').tail(4)
fitted_vals = fcst.forecast_fitted_values()
fcst.plot(panel_df, fitted_vals.drop(columns='y'), level=[90])show_doc(_StatsForecast.fit,
title_level=2,
name='StatsForecast.fit')show_doc(_StatsForecast.predict,
title_level=2,
name='SatstForecast.predict')show_doc(_StatsForecast.fit_predict,
title_level=2,
name='StatsForecast.fit_predict')show_doc(_StatsForecast.forecast, title_level=2, name='StatsForecast.forecast')# StatsForecast.forecast 方法使用示例
#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import AutoARIMA, Naive
# 实例化 StatsForecast 类
fcst = StatsForecast(models=[AutoARIMA(), Naive()],
freq='D', n_jobs=1)
# 高效预测,无需存储记忆
fcsts_df = fcst.forecast(df=panel_df, h=4, fitted=True)
fcsts_df.groupby('unique_id').tail(4)series = generate_series(100, n_static_features=2, equal_ends=False)
models = [
ADIDA(), CrostonClassic(), CrostonOptimized(),
CrostonSBA(), HistoricAverage(),
IMAPA(), Naive(),
RandomWalkWithDrift(),
SeasonalExponentialSmoothing(season_length=7, alpha=0.1),
SeasonalNaive(season_length=7),
SeasonalWindowAverage(season_length=7, window_size=4),
SimpleExponentialSmoothing(alpha=0.1),
TSB(alpha_d=0.1, alpha_p=0.3),
WindowAverage(window_size=4)
]
fcst = StatsForecast(
models=models,
freq='D',
n_jobs=1,
verbose=True
)
res = fcst.forecast(df=series, h=14)# 不带日期时间作为日期时间的测试系列
series_wo_dt = series.copy()
series_wo_dt['ds'] = series_wo_dt['ds'].astype(str)
fcst = StatsForecast(models=models, freq='D')
fcsts_wo_dt = fcst.forecast(df=series_wo_dt, h=14)
test_eq(res, fcsts_wo_dt)test_eq(res['unique_id'].unique(), fcst.uids.values)
last_dates = series.groupby('unique_id')['ds'].max()
test_eq(res.groupby('unique_id')['ds'].min().values, last_dates + pd.offsets.Day())
test_eq(res.groupby('unique_id')['ds'].max().values, last_dates + 14 * pd.offsets.Day())#月度数据测试
monthly_series = generate_series(10_000, freq='M', min_length=10, max_length=20, equal_ends=True)
monthly_series
fcst = StatsForecast(
models=[Naive()],
freq='M'
)
monthly_res = fcst.forecast(df=monthly_series, h=4)
monthly_res
last_dates = monthly_series.groupby('unique_id')['ds'].max()
test_eq(monthly_res.groupby('unique_id')['ds'].min().values, pd.Series(fcst.last_dates) + pd.offsets.MonthEnd())
test_eq(monthly_res.groupby('unique_id')['ds'].max().values, pd.Series(fcst.last_dates) + 4 * pd.offsets.MonthEnd())show_doc(_StatsForecast.forecast_fitted_values,
title_level=2,
name='StatsForecast.forecast_fitted_values')# StatsForecast.forecast_fitted_values 方法使用示例
#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import Naive
# 实例化 StatsForecast 类
fcst = StatsForecast(models=[AutoARIMA()], freq='D', n_jobs=1)
# 访问样本预测
fcsts_df = fcst.forecast(df=panel_df, h=12, fitted=True, level=(90, 10))
insample_fcsts_df = fcst.forecast_fitted_values()
insample_fcsts_df.tail(4)#拟合值的测试
def test_fcst_fitted(series, n_jobs=1, str_ds=False):
if str_ds:
series = series.copy()
series['ds'] = series['ds'].astype(str)
fitted_fcst = StatsForecast(
models=[Naive()],
freq='D',
n_jobs=n_jobs,
)
fitted_res = fitted_fcst.forecast(df=series, h=14, fitted=True)
fitted = fitted_fcst.forecast_fitted_values()
if str_ds:
test_eq(pd.to_datetime(series['ds']), fitted['ds'])
else:
test_eq(series['ds'], fitted['ds'])
test_eq(series['y'], fitted['y'])
test_fcst_fitted(series)
test_fcst_fitted(series, str_ds=True)#备用模型的测试
def test_fcst_fallback_model(n_jobs=1):
fitted_fcst = StatsForecast(
models=[NullModel()],
freq='D',
n_jobs=n_jobs,
fallback_model=Naive()
)
fitted_res = fitted_fcst.forecast(df=series, h=14, fitted=True)
fitted = fitted_fcst.forecast_fitted_values()
test_eq(series['ds'], fitted['ds'])
test_eq(series['y'], fitted['y'])
# 测试空模型实际上失败了
fitted_fcst = StatsForecast(
models=[NullModel()],
freq='D',
n_jobs=n_jobs,
)
test_fail(lambda: fitted_fcst.forecast(df=series, h=14))
test_fcst_fallback_model()show_doc(_StatsForecast.cross_validation,
title_level=2,
name='StatsForecast.cross_validation')# StatsForecast.crossvalidation 方法使用示例
#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import Naive
# 实例化 StatsForecast 类
fcst = StatsForecast(models=[Naive()],
freq='D', n_jobs=1, verbose=True)
# 访问样本预测
rolled_fcsts_df = fcst.cross_validation(df=panel_df, h=14, n_windows=2)
rolled_fcsts_df.head(4)#交叉验证测试
series_cv = pd.DataFrame({
'unique_id': np.array(10 * ['id_0'] + 100 * ['id_1'] + 20 * ['id_2']),
'ds': np.hstack([
pd.date_range(end='2021-01-01', freq='D', periods=10),
pd.date_range(end='2022-01-01', freq='D', periods=100),
pd.date_range(end='2020-01-01', freq='D', periods=20)
]),
'y': np.hstack([np.arange(10.), np.arange(100, 200), np.arange(20, 40)]),
})
fcst = StatsForecast(
models=[SumAhead(), Naive()],
freq='D',
verbose=True,
)
res_cv = fcst.cross_validation(df=series_cv, h=2, test_size=5, n_windows=None, level=(50, 60))
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))
n_windows = fcst.cross_validation(df=series_cv, h=2, n_windows=2).groupby('unique_id').size().unique()
test_eq(n_windows, 2 * 2)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))
n_windows = fcst.cross_validation(df=series_cv, h=3, n_windows=3, step_size=3, fitted=True).groupby('unique_id').size().unique()
test_eq(n_windows, 3 * 3)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))
test_fail(lambda: fcst.cross_validation(df=series_cv, h=10), contains="The following series are too short for the cross validation settings: ['id_0']")
test_fail(lambda: fcst.cross_validation(df=series_cv, h=20), contains="The following series are too short for the cross validation settings: ['id_0', 'id_2']")# 测试交叉验证,不重新拟合
fcst = StatsForecast(
models=[SumAhead()],
freq='D',
verbose=True
)
res_cv_wo_refit = fcst.cross_validation(
df=series_cv,
h=2,
test_size=5,
n_windows=None,
level=(50, 60),
refit=False,
)
test_fail(test_eq, args=(res_cv_wo_refit, res_cv))
cols_wo_refit = res_cv_wo_refit.columns
test_eq(res_cv_wo_refit.groupby('unique_id').head(1), res_cv[cols_wo_refit].groupby('unique_id').head(1))
n_windows = fcst.cross_validation(
df=series_cv,
h=2,
n_windows=2,
refit=False,
).groupby('unique_id').size().unique()
test_eq(n_windows, 2 * 2)
n_windows = fcst.cross_validation(
df=series_cv,
h=3,
n_windows=3,
step_size=3,
fitted=True,
refit=False,
).groupby('unique_id').size().unique()
test_eq(n_windows, 3 * 3)# 测试交叉验证,不重新拟合模型。
fcst = StatsForecast(
models=[DynamicOptimizedTheta(), AutoCES(),
DynamicOptimizedTheta(season_length=7, alias='test')],
freq='D',
verbose=True
)
res_cv_wo_refit = fcst.cross_validation(
df=series_cv,
h=2,
test_size=5,
n_windows=None,
level=(50, 60),
refit=False,
)
res_cv_w_refit = fcst.cross_validation(
df=series_cv,
h=2,
test_size=5,
n_windows=None,
level=(50, 60),
refit=True,
)
test_fail(test_eq, args=(res_cv_wo_refit, res_cv_w_refit))
test_eq(
res_cv_wo_refit.groupby('unique_id').head(1),
res_cv_w_refit.groupby('unique_id').head(1)
)# 不带日期时间作为日期时间的测试系列
series_cv_wo_dt = series_cv.copy()
series_cv_wo_dt['ds'] = series_cv_wo_dt['ds'].astype(str)
fcst = StatsForecast(
models=[SumAhead(), Naive()],
freq='D',
verbose=False
)
res_cv_wo_dt = fcst.cross_validation(
df=series_cv_wo_dt,
h=2,
test_size=5,
n_windows=None,
level=(50, 60),
)
test_eq(res_cv, res_cv_wo_dt)#测试等端交叉验证
series_cv = pd.DataFrame({
'unique_id': np.hstack([np.zeros(10), np.zeros(100) + 1, np.zeros(20) + 2]).astype('int64'),
'ds': np.hstack([
pd.date_range(end='2022-01-01', freq='D', periods=10),
pd.date_range(end='2022-01-01', freq='D', periods=100),
pd.date_range(end='2022-01-01', freq='D', periods=20)
]),
'y': np.hstack([np.arange(10), np.arange(100, 200), np.arange(20, 40)]),
})
fcst = StatsForecast(
models=[SumAhead()],
freq='D',
)
res_cv = fcst.cross_validation(
df=series_cv,
h=2,
test_size=5,
n_windows=None,
level=(50,60),
fitted=True,
)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))
n_windows = fcst.cross_validation(
df=series_cv,
h=2,
n_windows=2,
).groupby('unique_id').size().unique()
test_eq(n_windows, 2 * 2)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))
n_windows = fcst.cross_validation(
df=series_cv,
h=3,
n_windows=3,
step_size=3,
).groupby('unique_id').size().unique()
test_eq(n_windows, 3 * 3)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))show_doc(_StatsForecast.cross_validation_fitted_values,
title_level=2,
name='StatsForecast.cross_validation_fitted_values')# StatsForecast.cross_validation_fitted_values 方法使用示例
#来自 statsforecast.core 的 StatsForecast
from statsforecast.utils import AirPassengersDF as panel_df
from statsforecast.models import Naive
# 实例化 StatsForecast 类
fcst = StatsForecast(models=[Naive()],
freq='D', n_jobs=1)
# 访问样本预测
rolled_fcsts_df = fcst.cross_validation(df=panel_df, h=12, n_windows=2, fitted=True)
insample_rolled_fcsts_df = fcst.cross_validation_fitted_values()
insample_rolled_fcsts_df.tail(4)#拟合值交叉验证测试
def test_cv_fitted(series_cv, n_jobs=1, str_ds=False):
if str_ds:
series_cv = series_cv.copy()
series_cv['ds'] = series_cv['ds'].astype(str)
resids_fcst = StatsForecast(
models=[SumAhead(), Naive()],
freq='D',
n_jobs=n_jobs
)
resids_res_cv = resids_fcst.cross_validation(df=series_cv, h=2, n_windows=4, fitted=True)
resids_cv = resids_fcst.cross_validation_fitted_values()
np.testing.assert_array_equal(
resids_cv['cutoff'].unique(),
resids_res_cv['cutoff'].unique()
)
if str_ds:
series_cv['ds'] = pd.to_datetime(series_cv['ds'])
for uid in resids_cv['unique_id'].unique():
resids_uid = resids_cv[resids_cv['unique_id'].eq(uid)]
for cutoff in resids_uid['cutoff'].unique():
pd.testing.assert_frame_equal(
resids_uid.query('cutoff == @cutoff')[['unique_id', 'ds', 'y']].reset_index(drop=True),
series_cv.query('ds <= @cutoff & unique_id == @uid')[['unique_id', 'ds', 'y']].reset_index(drop=True),
check_dtype=False
)
test_cv_fitted(series_cv)
test_cv_fitted(series_cv, str_ds=True)#备用模型的测试
def test_cv_fallback_model(n_jobs=1):
fitted_fcst = StatsForecast(
models=[NullModel()],
freq='D',
n_jobs=n_jobs,
fallback_model=Naive()
)
fitted_res = fitted_fcst.cross_validation(df=series, h=2, n_windows=4, fitted=True)
fitted = fitted_fcst.cross_validation_fitted_values()
# 测试空模型实际上失败了
fitted_fcst = StatsForecast(
models=[NullModel()],
freq='D',
n_jobs=n_jobs,
)
test_fail(lambda: fitted_fcst.cross_validation(df=series, h=12, n_windows=4),
contains='got an unexpected keyword argument')
test_cv_fallback_model()show_doc(_StatsForecast.plot,
title_level=2,
name='StatsForecast.plot')show_doc(StatsForecast.save, title_level=2, name='StatsForecast.save')show_doc(StatsForecast.load, title_level=2, name='StatsForecast.load')fcst.fit(df=series)
test_eq(
fcst.predict(h=12),
fcst.forecast(df=series, h=12)
)test_eq(
fcst.fit_predict(df=series, h=12),
fcst.forecast(df=series, h=12)
)# 用于一致性预测的测试
uids = series.index.unique()[:10]
series_subset = series.query('unique_id in @uids')[['unique_id', 'ds', 'y']]
sf = StatsForecast(
models=[SeasonalNaive(season_length=7)],
freq='D',
n_jobs=1,
)
sf = sf.fit(df=series_subset, prediction_intervals=ConformalIntervals(h=12))
test_eq(
sf.predict(h=12, level=[80, 90]),
sf.fit_predict(df=series_subset, h=12, level=[80, 90], prediction_intervals=ConformalIntervals(h=12)),
)
test_eq(
sf.predict(h=12, level=[80, 90]),
sf.forecast(df=series_subset, h=12, level=[80, 90], prediction_intervals=ConformalIntervals(h=12)),
)
# 当未指定级别时,会引发测试错误/警告
intervals = ConformalIntervals(h=12)
sf2 = StatsForecast(
models=[ADIDA()],
freq='D',
n_jobs=1,
)
sf2.fit(df=series_subset, prediction_intervals=intervals)
test_warns(lambda: sf2.predict(h=12))
test_fail(lambda: sf2.forecast(df=series_subset, h=12, prediction_intervals=intervals))
test_fail(lambda: sf2.fit_predict(df=series_subset, h=12, prediction_intervals=intervals))
test_fail(lambda: sf2.cross_validation(df=series_subset, h=12, prediction_intervals=intervals))# 测试保形交叉验证
cv_conformal = sf.cross_validation(
df=series_subset,
h=12,
n_windows=2,
level=[80, 90],
prediction_intervals=ConformalIntervals(h=12),
)
cv_no_conformal = sf.cross_validation(
df=series_subset,
h=12,
n_windows=2,
level=[80, 90],
)
test_eq(
cv_conformal.columns,
cv_no_conformal.columns,
)
test_eq(
cv_conformal.filter(regex='ds|cutoff|y|AutoARIMA$'),
cv_no_conformal.filter(regex='ds|cutoff|y|AutoARIMA$')
)fcst = StatsForecast(
models=[ADIDA(), SimpleExponentialSmoothing(0.1),
HistoricAverage(), CrostonClassic()],
freq='D',
n_jobs=1
)
res = fcst.forecast(df=series, h=14)#| 评估: 错误
#并行处理测试
fcst = StatsForecast(
models=[ADIDA(), SimpleExponentialSmoothing(0.1),
HistoricAverage(), CrostonClassic()],
freq='D',
n_jobs=-1
)
res = fcst.forecast(df=series, h=14)
res_cv = fcst.cross_validation(df=series, h=3, test_size=10, n_windows=None)
fcst = StatsForecast(
models=[SumAhead()],
freq='D',
)
res_cv = fcst.cross_validation(df=series_cv, h=2, test_size=5, n_windows=None)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))
test_fcst_fitted(series, n_jobs=-1)
test_cv_fitted(series_cv, n_jobs=-1)
test_fcst_fitted(series, n_jobs=-1, str_ds=True)
test_cv_fitted(series_cv, n_jobs=-1, str_ds=True)
# 检查 n_windows 参数
n_windows = fcst.cross_validation(df=series_cv, h=2, n_windows=2).groupby('unique_id').size().unique()
test_eq(n_windows, 2 * 2)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))
# 检查step_size参数
n_windows = fcst.cross_validation(df=series_cv, h=3, n_windows=3, step_size=3).groupby('unique_id').size().unique()
test_eq(n_windows, 3 * 3)
test_eq(0., np.mean(res_cv['y'] - res_cv['SumAhead']))杂项
整数日期戳
StatsForecast 类还可以接收整数作为日期戳,以下示例展示了如何做到这一点。
# 从statsforecast.core导入StatsForecast
from statsforecast.utils import AirPassengers as ap
from statsforecast.models import HistoricAverageint_ds_df = pd.DataFrame({'ds': np.arange(1, len(ap) + 1), 'y': ap})
int_ds_df.insert(0, 'unique_id', 'AirPassengers')
int_ds_df.head()int_ds_df.tail()int_ds_dffcst = StatsForecast(models=[HistoricAverage()], freq=1)
horizon = 7
forecast = fcst.forecast(df=int_ds_df, h=horizon)
forecast.head()last_date = int_ds_df['ds'].max()
test_eq(forecast['ds'].values, np.arange(last_date + 1, last_date + 1 + horizon))int_ds_cv = fcst.cross_validation(df=int_ds_df, h=7, test_size=8, n_windows=None)
int_ds_cv外部回归变量
每个y之后的列都被视为外部回归变量,并将传递给允许使用它们的模型。如果使用这些外部回归变量,您必须向StatsForecast.forecast方法提供未来值。
class LinearRegression(_TS):
def __init__(self):
pass
def fit(self, y, X):
self.coefs_, *_ = np.linalg.lstsq(X, y, rcond=None)
return self
def predict(self, h, X):
mean = X @ coefs
return mean
def __repr__(self):
return 'LinearRegression()'
def forecast(self, y, h, X=None, X_future=None, fitted=False):
coefs, *_ = np.linalg.lstsq(X, y, rcond=None)
return {'mean': X_future @ coefs}
def new(self):
b = type(self).__new__(type(self))
b.__dict__.update(self.__dict__)
return bseries_xreg = series = generate_series(10_000, equal_ends=True)
series_xreg['intercept'] = 1
series_xreg['dayofweek'] = series_xreg['ds'].dt.dayofweek
series_xreg = pd.get_dummies(series_xreg, columns=['dayofweek'], drop_first=True)
series_xregdates = sorted(series_xreg['ds'].unique())
valid_start = dates[-14]
train_mask = series_xreg['ds'] < valid_start
series_train = series_xreg[train_mask]
series_valid = series_xreg[~train_mask]
X_valid = series_valid.drop(columns=['y'])
fcst = StatsForecast(
models=[LinearRegression()],
freq='D',
)
xreg_res = fcst.forecast(df=series_train, h=14, X_df=X_valid)
xreg_res['y'] = series_valid['y'].valuesxreg_res.drop(columns='unique_id').groupby('ds').mean().plot()xreg_res_cv = fcst.cross_validation(df=series_train, h=3, test_size=5, n_windows=None)# 以下单元格包含对外部回归量的测试class ReturnX(_TS):
def __init__(self):
pass
def fit(self, y, X):
return self
def predict(self, h, X):
mean = X
return X
def __repr__(self):
return 'ReturnX'
def forecast(self, y, h, X=None, X_future=None, fitted=False):
return {'mean': X_future.flatten()}
def new(self):
b = type(self).__new__(type(self))
b.__dict__.update(self.__dict__)
return bdf = pd.DataFrame(
{
'unique_id': [0] * 10 + [1] * 10,
'ds': np.hstack([np.arange(10), np.arange(10)]),
'y': np.random.rand(20),
'x': np.arange(20, dtype=np.float64),
}
)
train_mask = df['ds'] < 6
train_df = df[train_mask]
test_df = df[~train_mask]def test_x_vars(n_jobs=1):
fcst = StatsForecast(
models=[ReturnX()],
freq=1,
n_jobs=n_jobs,
)
xreg = test_df.drop(columns='y')
res = fcst.forecast(df=train_df, h=4, X_df=xreg)
expected_res = xreg.rename(columns={'x': 'ReturnX'})
pd.testing.assert_frame_equal(
res,
expected_res.reset_index(drop=True),
check_dtype=False,
)
test_x_vars(n_jobs=1)#| 评估: 错误
test_x_vars(n_jobs=2)预测区间
您可以将参数 level 传递给 StatsForecast.forecast 方法,以计算预测区间。并非所有模型目前都能计算预测区间,因此我们只能获得那些已实现此功能模型的区间。
ap_df = pd.DataFrame({'ds': np.arange(ap.size), 'y': ap})
ap_df['unique_id'] = 0
sf = StatsForecast(
models=[
SeasonalNaive(season_length=12),
AutoARIMA(season_length=12)
],
freq=1,
n_jobs=1
)
ap_ci = sf.forecast(df=ap_df, h=12, level=(80, 95))
fcst.plot(ap_df, ap_ci, level=[80], engine="matplotlib")适应性预测区间
您还可以使用以下代码添加符合间隔。
from statsforecast.utils import ConformalIntervalssf = StatsForecast(
models=[
AutoARIMA(season_length=12),
AutoARIMA(
season_length=12,
prediction_intervals=ConformalIntervals(n_windows=2, h=12),
alias='ConformalAutoARIMA'
),
],
freq=1,
n_jobs=1
)
ap_ci = sf.forecast(df=ap_df, h=12, level=(80, 95))
fcst.plot(ap_df, ap_ci, level=[80], engine="plotly")您还可以为所有支持的模型计算保形区间,使用以下内容,
sf = StatsForecast(
models=[
AutoARIMA(season_length=12),
],
freq=1,
n_jobs=1
)
ap_ci = sf.forecast(
df=ap_df,
h=12,
level=(50, 80, 95),
prediction_intervals=ConformalIntervals(h=12),
)
fcst.plot(ap_df, ap_ci, level=[80], engine="matplotlib")def test_conf_intervals(n_jobs=1):
ap_df = pd.DataFrame(
{
'unique_id': [0] * ap.size,
'ds': np.arange(ap.size),
'y': ap
}
)
fcst = StatsForecast(
models=[
SeasonalNaive(season_length=12),
AutoARIMA(season_length=12)
],
freq=1,
n_jobs=n_jobs
)
ap_ci = fcst.forecast(df=ap_df, h=12, level=(80, 95))
ap_ci.drop(columns='unique_id').set_index('ds').plot(marker='.', figsize=(10, 6))
test_conf_intervals(n_jobs=1)#| 评估: 错误
#测试任务数量大于可用核心数
test_conf_intervals(n_jobs=101)Give us a ⭐ on Github