%load_ext autoreload
%autoreload 2自动模型
from os import cpu_count
import torch
from ray import tune
from ray.tune.search.basic_variant import BasicVariantGenerator
from neuralforecast.common._base_auto import BaseAuto
from neuralforecast.common._base_auto import MockTrial
from neuralforecast.models.rnn import RNN
from neuralforecast.models.gru import GRU
from neuralforecast.models.tcn import TCN
from neuralforecast.models.lstm import LSTM
from neuralforecast.models.deepar import DeepAR
from neuralforecast.models.dilated_rnn import DilatedRNN
from neuralforecast.models.bitcn import BiTCN
from neuralforecast.models.mlp import MLP
from neuralforecast.models.nbeats import NBEATS
from neuralforecast.models.nbeatsx import NBEATSx
from neuralforecast.models.nhits import NHITS
from neuralforecast.models.dlinear import DLinear
from neuralforecast.models.nlinear import NLinear
from neuralforecast.models.tide import TiDE
from neuralforecast.models.deepnpts import DeepNPTS
from neuralforecast.models.tft import TFT
from neuralforecast.models.vanillatransformer import VanillaTransformer
from neuralforecast.models.informer import Informer
from neuralforecast.models.autoformer import Autoformer
from neuralforecast.models.fedformer import FEDformer
from neuralforecast.models.patchtst import PatchTST
from neuralforecast.models.timesnet import TimesNet
from neuralforecast.models.itransformer import iTransformer
from neuralforecast.models.kan import KAN
from neuralforecast.models.rmok import RMoK
from neuralforecast.models.stemgnn import StemGNN
from neuralforecast.models.hint import HINT
from neuralforecast.models.tsmixer import TSMixer
from neuralforecast.models.tsmixerx import TSMixerx
from neuralforecast.models.mlpmultivariate import MLPMultivariate
from neuralforecast.models.softs import SOFTS
from neuralforecast.models.timemixer import TimeMixer
from neuralforecast.losses.pytorch import MAE, MQLoss, DistributionLossimport matplotlib.pyplot as plt
from fastcore.test import test_eq
from nbdev.showdoc import show_doc
import logging
import warnings
import inspect
from neuralforecast.losses.pytorch import MSElogging.getLogger("pytorch_lightning").setLevel(logging.ERROR)
warnings.filterwarnings("ignore")
plt.rcParams["axes.grid"]=True
plt.rcParams['font.family'] = 'serif'
plt.rcParams["figure.figsize"] = (6,4)# 单元测试以验证Auto*模型包含BaseAuto类中的所有必需参数。
# 适用于Python 3.11的获取参数规范补丁
if not hasattr(inspect, 'getargspec'):
getargspec = inspect.getfullargspec
else:
getargspec = inspect.getargspec
def test_args(auto_model, exclude_args=None):
base_auto_args = getargspec(BaseAuto)[0]
auto_model_args = getargspec(auto_model)[0]
if exclude_args is not None:
base_auto_args = [arg for arg in base_auto_args if arg not in exclude_args]
args_diff = set(base_auto_args) - set(auto_model_args)
assert not args_diff, f"__init__ of {auto_model.__name__} does not contain the following required variables from BaseAuto class:\n\t\t{args_diff}"NeuralForecast 包含用户友好的神经预测模型实现,允许轻松切换计算能力(GPU/CPU)、计算并行化和超参数调优。
所有的NeuralForecast模型都是“全局”的,因为我们使用输入的pd.DataFrame数据Y_df中的所有系列进行训练,但当前的优化目标是“单变量”的,因为它没有考虑跨时间序列的输出预测之间的相互作用。像StatsForecast库一样,core.NeuralForecast允许您高效地探索模型集合,并包含方便处理输入和输出pd.DataFrames预测的函数。
首先,我们加载AirPassengers数据集,以便您可以运行所有示例。
%%capture
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from neuralforecast.tsdataset import TimeSeriesDataset
from neuralforecast.utils import AirPassengersDF as Y_df%%capture
# 拆分训练集/测试集并声明时间序列数据集
Y_train_df = Y_df[Y_df.ds<='1959-12-31'] # 132次列车
Y_test_df = Y_df[Y_df.ds>'1959-12-31'] # 12项测试
dataset, *_ = TimeSeriesDataset.from_df(Y_train_df)1. 自动预测
A. 基于RNN的
class AutoRNN(BaseAuto):
default_config = {
"input_size_multiplier": [-1, 4, 16, 64],
"inference_input_size_multiplier": [-1],
"h": None,
"encoder_hidden_size": tune.choice([50, 100, 200, 300]),
"encoder_n_layers": tune.randint(1, 4),
"context_size": tune.choice([5, 10, 50]),
"decoder_hidden_size": tune.choice([64, 128, 256, 512]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"max_steps": tune.choice([500, 1000]),
"batch_size": tune.choice([16, 32]),
"loss": None,
"random_seed": tune.randint(1, 20)
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None,
):
""" 自动循环神经网络
**参数:**<br>
"""
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoRNN, self).__init__(
cls_model=RNN,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['inference_input_size'] = tune.choice([h*x \
for x in config['inference_input_size_multiplier']])
del config['input_size_multiplier'], config['inference_input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return configshow_doc(AutoRNN, title_level=3)%%capture
# 使用你自己的配置或AutoRNN.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=-1, encoder_hidden_size=8)
model = AutoRNN(h=12, config=config, num_samples=1, cpus=1)
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoRNN(h=12, config=None, num_samples=1, cpus=1, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoRNN, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoRNN.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': -1, 'encoder_hidden_size': 8})
return config
model = AutoRNN(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoRNN.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = -1
my_config['encoder_hidden_size'] = 8
model = AutoRNN(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoLSTM(BaseAuto):
default_config = {
"input_size_multiplier": [-1, 4, 16, 64],
"inference_input_size_multiplier": [-1],
"h": None,
"encoder_hidden_size": tune.choice([50, 100, 200, 300]),
"encoder_n_layers": tune.randint(1, 4),
"context_size": tune.choice([5, 10, 50]),
"decoder_hidden_size": tune.choice([64, 128, 256, 512]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"max_steps": tune.choice([500, 1000]),
"batch_size": tune.choice([16, 32]),
"loss": None,
"random_seed": tune.randint(1, 20)
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoLSTM, self).__init__(
cls_model=LSTM,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['inference_input_size'] = tune.choice([h*x \
for x in config['inference_input_size_multiplier']])
del config['input_size_multiplier'], config['inference_input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return configshow_doc(AutoLSTM, title_level=3)%%capture
# 使用你自己的配置或AutoLSTM.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=-1, encoder_hidden_size=8)
model = AutoLSTM(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoLSTM(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoLSTM, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoLSTM.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': -1, 'encoder_hidden_size': 8})
return config
model = AutoLSTM(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoLSTM.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = -1
my_config['encoder_hidden_size'] = 8
model = AutoLSTM(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoGRU(BaseAuto):
default_config = {
"input_size_multiplier": [-1, 4, 16, 64],
"inference_input_size_multiplier": [-1],
"h": None,
"encoder_hidden_size": tune.choice([50, 100, 200, 300]),
"encoder_n_layers": tune.randint(1, 4),
"context_size": tune.choice([5, 10, 50]),
"decoder_hidden_size": tune.choice([64, 128, 256, 512]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"max_steps": tune.choice([500, 1000]),
"batch_size": tune.choice([16, 32]),
"loss": None,
"random_seed": tune.randint(1, 20)
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoGRU, self).__init__(
cls_model=GRU,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['inference_input_size'] = tune.choice([h*x \
for x in config['inference_input_size_multiplier']])
del config['input_size_multiplier'], config['inference_input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoGRU, title_level=3)%%capture
# 使用你自己的配置或AutoGRU.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=-1, encoder_hidden_size=8)
model = AutoGRU(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoGRU(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoGRU, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoGRU.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': -1, 'encoder_hidden_size': 8})
return config
model = AutoGRU(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线进行单元测试
my_config = AutoGRU.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = -1
my_config['encoder_hidden_size'] = 8
model = AutoGRU(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoTCN(BaseAuto):
default_config = {
"input_size_multiplier": [-1, 4, 16, 64],
"inference_input_size_multiplier": [-1],
"h": None,
"encoder_hidden_size": tune.choice([50, 100, 200, 300]),
"context_size": tune.choice([5, 10, 50]),
"decoder_hidden_size": tune.choice([64, 128]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"max_steps": tune.choice([500, 1000]),
"batch_size": tune.choice([16, 32]),
"loss": None,
"random_seed": tune.randint(1, 20)
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoTCN, self).__init__(
cls_model=TCN,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['inference_input_size'] = tune.choice([h*x \
for x in config['inference_input_size_multiplier']])
del config['input_size_multiplier'], config['inference_input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoTCN, title_level=3)%%capture
# 使用你自己的配置或AutoTCN.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=-1, encoder_hidden_size=8)
model = AutoTCN(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoTCN(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试,用于测试Auto*模型是否包含BaseAuto中的所有必需参数
test_args(AutoTCN, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoTCN.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': -1, 'encoder_hidden_size': 8})
return config
model = AutoTCN(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoTCN.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = -1
my_config['encoder_hidden_size'] = 8
model = AutoTCN(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoDeepAR(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"lstm_hidden_size": tune.choice([32, 64, 128, 256]),
"lstm_n_layers": tune.randint(1, 4),
"lstm_dropout": tune.uniform(0.0, 0.5),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice(['robust', 'minmax1']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=DistributionLoss(distribution='StudentT', level=[80, 90], return_params=False),
valid_loss=MQLoss(level=[80, 90]),
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoDeepAR, self).__init__(
cls_model=DeepAR,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoDeepAR, title_level=3)%%capture
# 使用你自己的配置或AutoDeepAR.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, lstm_hidden_size=8)
model = AutoDeepAR(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoDeepAR(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoDeepAR, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoDeepAR.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'lstm_hidden_size': 8})
return config
model = AutoDeepAR(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoDeepAR.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['lstm_hidden_size'] = 8
model = AutoDeepAR(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoDilatedRNN(BaseAuto):
default_config = {
"input_size_multiplier": [-1, 4, 16, 64],
"inference_input_size_multiplier": [-1],
"h": None,
"cell_type": tune.choice(['LSTM', 'GRU']),
"encoder_hidden_size": tune.choice([50, 100, 200, 300]),
"dilations": tune.choice([ [[1, 2], [4, 8]], [[1, 2, 4, 8]] ]),
"context_size": tune.choice([5, 10, 50]),
"decoder_hidden_size": tune.choice([64, 128, 256, 512]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"max_steps": tune.choice([500, 1000]),
"batch_size": tune.choice([16, 32]),
"loss": None,
"random_seed": tune.randint(1, 20)
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoDilatedRNN, self).__init__(
cls_model=DilatedRNN,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['inference_input_size'] = tune.choice([h*x \
for x in config['inference_input_size_multiplier']])
del config['input_size_multiplier'], config['inference_input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoDilatedRNN, title_level=3)%%capture
# 使用你自己的配置或AutoDilatedRNN.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=-1, encoder_hidden_size=8)
model = AutoDilatedRNN(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoDilatedRNN(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoDilatedRNN, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoDilatedRNN.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': -1, 'encoder_hidden_size': 8})
return config
model = AutoDilatedRNN(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoDilatedRNN.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = -1
my_config['encoder_hidden_size'] = 8
model = AutoDilatedRNN(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoBiTCN(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"hidden_size": tune.choice([16, 32]),
"dropout": tune.uniform(0.0, 0.99),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoBiTCN, self).__init__(
cls_model=BiTCN,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoBiTCN, title_level=3)%%capture
# 使用你自己的配置或AutoBiTCN.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, hidden_size=8)
model = AutoBiTCN(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoBiTCN(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoBiTCN, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoBiTCN.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'hidden_size': 8})
return config
model = AutoBiTCN(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoBiTCN.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['hidden_size'] = 8
model = AutoBiTCN(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)B. 基于多层感知器(MLP)
class AutoMLP(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"hidden_size": tune.choice( [256, 512, 1024] ),
"num_layers": tune.randint(2, 6),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoMLP, self).__init__(
cls_model=MLP,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoMLP, title_level=3)%%capture
# 使用你自己的配置或AutoMLP.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=12, hidden_size=8)
model = AutoMLP(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoMLP(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoMLP, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoMLP.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 2, 'val_check_steps': 1, 'input_size': 12, 'hidden_size': 8})
return config
model = AutoMLP(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoMLP.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 2
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['hidden_size'] = 8
model = AutoMLP(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoNBEATS(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoNBEATS, self).__init__(
cls_model=NBEATS,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoNBEATS, title_level=3)%%capture
# 使用你自己的配置或AutoNBEATS.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=12,
mlp_units=3*[[8, 8]])
model = AutoNBEATS(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoNBEATS(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoNBEATS, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoNBEATS.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 2, 'val_check_steps': 1, 'input_size': 12, 'mlp_units': 3 * [[8, 8]]})
return config
model = AutoNBEATS(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoNBEATS.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 2
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['mlp_units'] = 3 * [[8, 8]]
model = AutoNBEATS(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoNBEATSx(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoNBEATSx, self).__init__(
cls_model=NBEATSx,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoNBEATSx, title_level=3)%%capture
# 使用你自己的配置或AutoNBEATSx.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=12,
mlp_units=3*[[8, 8]])
model = AutoNBEATSx(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoNBEATSx(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoNBEATSx, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoNBEATSx.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 2, 'val_check_steps': 1, 'input_size': 12, 'mlp_units': 3 * [[8, 8]]})
return config
model = AutoNBEATSx(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线进行单元测试的情景
my_config = AutoNBEATSx.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 2
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['mlp_units'] = 3 * [[8, 8]]
model = AutoNBEATSx(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoNHITS(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"n_pool_kernel_size": tune.choice([[2, 2, 1], 3*[1], 3*[2], 3*[4],
[8, 4, 1], [16, 8, 1]]),
"n_freq_downsample": tune.choice([[168, 24, 1], [24, 12, 1],
[180, 60, 1], [60, 8, 1],
[40, 20, 1], [1, 1, 1]]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.quniform(lower=500, upper=1500, q=100),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(lower=1, upper=20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None,
):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoNHITS, self).__init__(
cls_model=NHITS,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoNHITS, title_level=3)%%capture
# 使用你自己的配置或AutoNHITS.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=12,
mlp_units=3 * [[8, 8]])
model = AutoNHITS(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoNHITS(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试,用于测试Auto*模型是否包含BaseAuto中的所有必需参数
test_args(AutoNHITS, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoNHITS.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 2, 'val_check_steps': 1, 'input_size': 12, 'mlp_units': 3 * [[8, 8]]})
return config
model = AutoNHITS(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoNHITS.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 2
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['mlp_units'] = 3 * [[8, 8]]
model = AutoNHITS(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoDLinear(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"moving_avg_window": tune.choice([11, 25, 51]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.quniform(lower=500, upper=1500, q=100),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(lower=1, upper=20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None,
):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoDLinear, self).__init__(
cls_model=DLinear,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoDLinear, title_level=3)%%capture
# 使用你自己的配置或AutoDLinear.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=12)
model = AutoDLinear(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoDLinear(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoDLinear, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoDLinear.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 2, 'val_check_steps': 1, 'input_size': 12})
return config
model = AutoDLinear(h=12, config=my_config_new, backend='optuna', cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线进行单元测试的情景
my_config = AutoDLinear.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 2
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
model = AutoDLinear(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoNLinear(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.quniform(lower=500, upper=1500, q=100),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(lower=1, upper=20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None,
):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoNLinear, self).__init__(
cls_model=NLinear,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoNLinear, title_level=3)%%capture
# 使用你自己的配置或AutoNLinear.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=12)
model = AutoNLinear(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoNLinear(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoNLinear, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoNLinear.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 2, 'val_check_steps': 1, 'input_size': 12})
return config
model = AutoNLinear(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoNLinear.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 2
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
model = AutoNLinear(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoTiDE(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"hidden_size": tune.choice([256, 512, 1024]),
"decoder_output_dim": tune.choice([8, 16, 32]),
"temporal_decoder_dim": tune.choice([32, 64, 128]),
"num_encoder_layers": tune.choice([1, 2, 3]),
"num_decoder_layers": tune.choice([1, 2, 3]),
"temporal_width": tune.choice([4, 8, 16]),
"dropout":tune.choice([0.0, 0.1, 0.2, 0.3, 0.5]),
"layernorm": tune.choice([True, False]),
"learning_rate": tune.loguniform(1e-5, 1e-2),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.quniform(lower=500, upper=1500, q=100),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(lower=1, upper=20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None,
):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoTiDE, self).__init__(
cls_model=TiDE,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoTiDE, title_level=3)%%capture
# 使用你自己的配置或AutoTiDE.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=12)
model = AutoTiDE(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoTiDE(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试,用于测试Auto*模型是否包含BaseAuto中的所有必需参数
test_args(AutoTiDE, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoTiDE.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 2, 'val_check_steps': 1, 'input_size': 12})
return config
model = AutoTiDE(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoTiDE.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 2
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
model = AutoTiDE(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoDeepNPTS(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"hidden_size": tune.choice([16, 32, 64]),
"dropout": tune.uniform(0.0, 0.99),
"n_layers": tune.choice([1, 2, 4]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.quniform(lower=500, upper=1500, q=100),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(lower=1, upper=20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None,
):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoDeepNPTS, self).__init__(
cls_model=DeepNPTS,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoDeepNPTS, title_level=3)%%capture
# 使用你自己的配置或AutoDeepNPTS.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=12)
model = AutoDeepNPTS(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoDeepNPTS(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoDeepNPTS, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoDeepNPTS.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 2, 'val_check_steps': 1, 'input_size': 12})
return config
model = AutoDeepNPTS(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线进行单元测试
my_config = AutoDeepNPTS.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 2
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
model = AutoDeepNPTS(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)C. 基于KAN的方法
class AutoKAN(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"grid_size": tune.choice([5, 10, 15]),
"spline_order": tune.choice([2, 3, 4]),
"hidden_size": tune.choice([64, 128, 256, 512]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.quniform(lower=500, upper=1500, q=100),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(lower=1, upper=20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None,
):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoKAN, self).__init__(
cls_model=KAN,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoKAN, title_level=3)%%capture
# 使用你自己的配置或AutoKAN.default_config
config = dict(max_steps=2, val_check_steps=1, input_size=12)
model = AutoKAN(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoKAN(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoKAN, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoKAN.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 2, 'val_check_steps': 1, 'input_size': 12})
return config
model = AutoKAN(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoKAN.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 2
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
model = AutoKAN(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)D. 基于变换器的方法
class AutoTFT(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"hidden_size": tune.choice([64, 128, 256]),
"n_head": tune.choice([4, 8]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoTFT, self).__init__(
cls_model=TFT,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return configshow_doc(AutoTFT, title_level=3)%%capture
# 使用你自己的配置或AutoTFT.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, hidden_size=8)
model = AutoTFT(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoTFT(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试,用于测试Auto*模型是否包含BaseAuto中的所有必需参数
test_args(AutoTFT, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoTFT.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'hidden_size': 8})
return config
model = AutoTFT(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoTFT.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['hidden_size'] = 8
model = AutoTFT(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoVanillaTransformer(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"hidden_size": tune.choice([64, 128, 256]),
"n_head": tune.choice([4, 8]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoVanillaTransformer, self).__init__(
cls_model=VanillaTransformer,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoVanillaTransformer, title_level=3)%%capture
# 使用你自己的配置或AutoVanillaTransformer.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, hidden_size=8)
model = AutoVanillaTransformer(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoVanillaTransformer(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoVanillaTransformer, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoVanillaTransformer.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'hidden_size': 8})
return config
model = AutoVanillaTransformer(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoVanillaTransformer.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['hidden_size'] = 8
model = AutoVanillaTransformer(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoInformer(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"hidden_size": tune.choice([64, 128, 256]),
"n_head": tune.choice([4, 8]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoInformer, self).__init__(
cls_model=Informer,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoInformer, title_level=3)%%capture
# 使用你自己的配置或AutoInformer.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, hidden_size=8)
model = AutoInformer(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoInformer(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoInformer, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoInformer.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'hidden_size': 8})
return config
model = AutoInformer(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoInformer.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['hidden_size'] = 8
model = AutoInformer(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoAutoformer(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"hidden_size": tune.choice([64, 128, 256]),
"n_head": tune.choice([4, 8]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoAutoformer, self).__init__(
cls_model=Autoformer,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoAutoformer, title_level=3)%%capture
# 使用你自己的配置或AutoAutoformer.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, hidden_size=8)
model = AutoAutoformer(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoAutoformer(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoAutoformer, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoAutoformer.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'hidden_size': 8})
return config
model = AutoAutoformer(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoAutoformer.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['hidden_size'] = 8
model = AutoAutoformer(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoFEDformer(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"hidden_size": tune.choice([64, 128, 256]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoFEDformer, self).__init__(
cls_model=FEDformer,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoFEDformer, title_level=3)%%capture
# 使用你自己的配置或AutoFEDFormer.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, hidden_size=64)
model = AutoFEDformer(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoFEDformer(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoFEDformer, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoFEDformer.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'hidden_size': 64})
return config
model = AutoFEDformer(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoFEDformer.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['hidden_size'] = 64
model = AutoFEDformer(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoPatchTST(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3],
"h": None,
"hidden_size": tune.choice([16, 128, 256]),
"n_heads": tune.choice([4, 16]),
"patch_len": tune.choice([16, 24]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"revin": tune.choice([False, True]),
"max_steps": tune.choice([500, 1000, 5000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"windows_batch_size": tune.choice([128, 256, 512, 1024]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoPatchTST, self).__init__(
cls_model=PatchTST,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h * x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoPatchTST, title_level=3)%%capture
# 使用你自己的配置文件或AutoPatchTST.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, hidden_size=16)
model = AutoPatchTST(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoPatchTST(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoPatchTST, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoPatchTST.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'hidden_size': 16})
return config
model = AutoPatchTST(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoPatchTST.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['hidden_size'] = 16
model = AutoPatchTST(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoiTransformer(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"n_series": None,
"hidden_size": tune.choice([64, 128, 256]),
"n_heads": tune.choice([4, 8]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
n_series,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend, n_series=n_series)
# Always use n_series from parameters, raise exception with Optuna because we can't enforce it
if backend == 'ray':
config['n_series'] = n_series
elif backend == 'optuna':
mock_trial = MockTrial()
if ('n_series' in config(mock_trial) and config(mock_trial)['n_series'] != n_series) or ('n_series' not in config(mock_trial)):
raise Exception(f"config needs 'n_series': {n_series}")
super(AutoiTransformer, self).__init__(
cls_model=iTransformer,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h * x \
for x in config["input_size_multiplier"]])
# 以步长为1或h的滚动窗口
# See `BaseWindows` and `BaseRNN`'s create_windows
config['step_size'] = tune.choice([1, h])
del config["input_size_multiplier"]
if backend == 'optuna':
# Always use n_series from parameters
config['n_series'] = n_series
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoiTransformer, title_level=3)%%capture
# 使用你自己的配置或AutoiTransformer.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, hidden_size=16)
model = AutoiTransformer(h=12, n_series=1, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoiTransformer(h=12, n_series=1, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试,用于测试Auto*模型是否包含BaseAuto中的所有必需参数
test_args(AutoiTransformer, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoiTransformer.get_default_config(h=12, n_series=1, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'hidden_size': 16})
return config
model = AutoiTransformer(h=12, n_series=1, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoiTransformer.get_default_config(h=12, n_series=1, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['hidden_size'] = 16
model = AutoiTransformer(h=12, n_series=1, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)E. 基于卷积神经网络(CNN)
class AutoTimesNet(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"hidden_size": tune.choice([32, 64, 128]),
"conv_hidden_size": tune.choice([32, 64, 128]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice(['robust', 'standard']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128]),
"windows_batch_size": tune.choice([32, 64, 128, 256]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend)
super(AutoTimesNet, self).__init__(
cls_model=TimesNet,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series=None):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h*x \
for x in config['input_size_multiplier']])
config['step_size'] = tune.choice([1, h])
del config['input_size_multiplier']
if backend == 'optuna':
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoTimesNet, title_level=3)%%capture
# 使用你自己的配置或AutoTimesNet.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, hidden_size=32)
model = AutoTimesNet(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoTimesNet(h=12, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoTimesNet, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoTimesNet.get_default_config(h=12, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 2, 'val_check_steps': 1, 'input_size': 12, 'hidden_size': 32})
return config
model = AutoTimesNet(h=12, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线进行单元测试
my_config = AutoTimesNet.get_default_config(h=12, backend='ray')
my_config['max_steps'] = 2
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['hidden_size'] = 32
model = AutoTimesNet(h=12, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)F. 多变量
class AutoStemGNN(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4],
"h": None,
"n_series": None,
"n_stacks": tune.choice([2]),
"multi_layer": tune.choice([3, 5, 7]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
n_series,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend, n_series=n_series)
# Always use n_series from parameters, raise exception with Optuna because we can't enforce it
if backend == 'ray':
config['n_series'] = n_series
elif backend == 'optuna':
mock_trial = MockTrial()
if ('n_series' in config(mock_trial) and config(mock_trial)['n_series'] != n_series) or ('n_series' not in config(mock_trial)):
raise Exception(f"config needs 'n_series': {n_series}")
super(AutoStemGNN, self).__init__(
cls_model=StemGNN,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h * x \
for x in config["input_size_multiplier"]])
# 以步长为1或h的滚动窗口
# See `BaseWindows` and `BaseRNN`'s create_windows
config['step_size'] = tune.choice([1, h])
del config["input_size_multiplier"]
if backend == 'optuna':
# Always use n_series from parameters
config['n_series'] = n_series
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoStemGNN, title_level=3)%%capture
# 使用你自己的配置或AutoStemGNN.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12)
model = AutoStemGNN(h=12, n_series=1, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoStemGNN(h=12, n_series=1, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
assert model.config(MockTrial())['n_series'] == 1
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoStemGNN, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoStemGNN.get_default_config(h=12, backend='optuna', n_series=1)
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12})
return config
model = AutoStemGNN(h=12, n_series=1, config=my_config_new, backend='optuna')
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoStemGNN.get_default_config(h=12, backend='ray', n_series=1)
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
model = AutoStemGNN(h=12, n_series=1, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoHINT(BaseAuto):
def __init__(self,
cls_model,
h,
loss,
valid_loss,
S,
config,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
refit_with_val=False,
verbose=False,
alias=None,
backend='ray',
callbacks=None,
):
super(AutoHINT, self).__init__(
cls_model=cls_model,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
if backend == 'optuna':
raise Exception("Optuna is not supported for AutoHINT.")
# 验证对账策略的存在性
# 配置空间中的参数
if not ('reconciliation' in config.keys()):
raise Exception("config needs reconciliation, \
try tune.choice(['BottomUp', 'MinTraceOLS', 'MinTraceWLS'])")
self.S = S
def _fit_model(self, cls_model, config,
dataset, val_size, test_size, distributed_config=None):
# Overwrite _fit_model for HINT two-stage instantiation
reconciliation = config.pop('reconciliation')
base_model = cls_model(**config)
model = HINT(h=base_model.h, model=base_model,
S=self.S, reconciliation=reconciliation)
model.test_size = test_size
model = model.fit(
dataset,
val_size=val_size,
test_size=test_size,
distributed_config=distributed_config,
)
return model
@classmethod
def get_default_config(cls, h, backend, n_series=None):
raise Exception("AutoHINT has no default configuration.")
show_doc(AutoHINT, title_level=3)def sort_df_hier(Y_df, S_df):
# NeuralForecast 核心,按字典顺序对 unique_id 进行排序
# 默认情况下,此类会匹配 S_df 和 Y_hat_df 的顺序。
Y_df.unique_id = Y_df.unique_id.astype('category')
Y_df.unique_id = Y_df.unique_id.cat.set_categories(S_df.index)
Y_df = Y_df.sort_values(by=['unique_id', 'ds'])
return Y_df
# -----创建合成数据集-----
np.random.seed(123)
train_steps = 20
num_levels = 7
level = np.arange(0, 100, 0.1)
qs = [[50-lv/2, 50+lv/2] for lv in level]
quantiles = np.sort(np.concatenate(qs)/100)
levels = ['Top', 'Mid1', 'Mid2', 'Bottom1', 'Bottom2', 'Bottom3', 'Bottom4']
unique_ids = np.repeat(levels, train_steps)
S = np.array([[1., 1., 1., 1.],
[1., 1., 0., 0.],
[0., 0., 1., 1.],
[1., 0., 0., 0.],
[0., 1., 0., 0.],
[0., 0., 1., 0.],
[0., 0., 0., 1.]])
S_dict = {col: S[:, i] for i, col in enumerate(levels[3:])}
S_df = pd.DataFrame(S_dict, index=levels)
ds = pd.date_range(start='2018-03-31', periods=train_steps, freq='Q').tolist() * num_levels
# 创建Y_df
y_lists = [S @ np.random.uniform(low=100, high=500, size=4) for i in range(train_steps)]
y = [elem for tup in zip(*y_lists) for elem in tup]
Y_df = pd.DataFrame({'unique_id': unique_ids, 'ds': ds, 'y': y})
Y_df = sort_df_hier(Y_df, S_df)
hint_dataset, *_ = TimeSeriesDataset.from_df(df=Y_df)%%capture
# 执行简单的超参数优化
# NHITS,然后与HINT和解
from neuralforecast.losses.pytorch import GMM, sCRPS
base_config = dict(max_steps=1, val_check_steps=1, input_size=8)
base_model = AutoNHITS(h=4, loss=GMM(n_components=2, quantiles=quantiles),
config=base_config, num_samples=1, cpus=1)
model = HINT(h=4, S=S_df.values,
model=base_model, reconciliation='MinTraceOLS')
model.fit(dataset=dataset)
y_hat = model.predict(dataset=hint_dataset)
# 执行联合超参数优化
# NHITS + HINT 对账配置
nhits_config = {
"learning_rate": tune.choice([1e-3]), # 初始学习率
"max_steps": tune.choice([1]), # SGD步骤数
"val_check_steps": tune.choice([1]), # 验证之间的步骤数
"input_size": tune.choice([5 * 12]), # 输入大小 = 乘数 * 时间范围
"batch_size": tune.choice([7]), # 窗口中的系列数量
"windows_batch_size": tune.choice([256]), # 批处理中的窗口数量
"n_pool_kernel_size": tune.choice([[2, 2, 2], [16, 8, 1]]), # MaxPool's Kernelsize
"n_freq_downsample": tune.choice([[168, 24, 1], [24, 12, 1], [1, 1, 1]]), # Interpolation expressivity ratios
"activation": tune.choice(['ReLU']), # Type of non-linear activation
"n_blocks": tune.choice([[1, 1, 1]]), # Blocks per each 3 stacks
"mlp_units": tune.choice([[[512, 512], [512, 512], [512, 512]]]), # 2 512-Layers per block for each stack
"interpolation_mode": tune.choice(['linear']), # Type of multi-step interpolation
"random_seed": tune.randint(1, 10),
"reconciliation": tune.choice(['BottomUp', 'MinTraceOLS', 'MinTraceWLS'])
}
model = AutoHINT(h=4, S=S_df.values,
cls_model=NHITS,
config=nhits_config,
loss=GMM(n_components=2, level=[80, 90]),
valid_loss=sCRPS(level=[80, 90]),
num_samples=1, cpus=1)
model.fit(dataset=dataset)
y_hat = model.predict(dataset=hint_dataset)# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoHINT) class AutoTSMixer(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4],
"h": None,
"n_series": None,
"n_block": tune.choice([1, 2, 4, 6, 8]),
"learning_rate": tune.loguniform(1e-4, 1e-2),
"ff_dim": tune.choice([32, 64, 128]),
"scaler_type": tune.choice(['identity', 'robust', 'standard']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"dropout": tune.uniform(0.0, 0.99),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
n_series,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend, n_series=n_series)
# Always use n_series from parameters, raise exception with Optuna because we can't enforce it
if backend == 'ray':
config['n_series'] = n_series
elif backend == 'optuna':
mock_trial = MockTrial()
if ('n_series' in config(mock_trial) and config(mock_trial)['n_series'] != n_series) or ('n_series' not in config(mock_trial)):
raise Exception(f"config needs 'n_series': {n_series}")
super(AutoTSMixer, self).__init__(
cls_model=TSMixer,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h * x \
for x in config["input_size_multiplier"]])
# 以步长为1或h的滚动窗口
# See `BaseWindows` and `BaseRNN`'s create_windows
config['step_size'] = tune.choice([1, h])
del config["input_size_multiplier"]
if backend == 'optuna':
# Always use n_series from parameters
config['n_series'] = n_series
config = cls._ray_config_to_optuna(config)
return configshow_doc(AutoTSMixer, title_level=3)%%capture
# 使用你自己的配置或AutoTSMixer.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12)
model = AutoTSMixer(h=12, n_series=1, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoTSMixer(h=12, n_series=1, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
assert model.config(MockTrial())['n_series'] == 1
# 单元测试,用于测试Auto*模型是否包含BaseAuto中的所有必需参数
test_args(AutoTSMixer, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoTSMixer.get_default_config(h=12, backend='optuna', n_series=1)
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12})
return config
model = AutoTSMixer(h=12, n_series=1, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoTSMixer.get_default_config(h=12, backend='ray', n_series=1)
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
model = AutoTSMixer(h=12, n_series=1, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoTSMixerx(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4],
"h": None,
"n_series": None,
"n_block": tune.choice([1, 2, 4, 6, 8]),
"learning_rate": tune.loguniform(1e-4, 1e-2),
"ff_dim": tune.choice([32, 64, 128]),
"scaler_type": tune.choice(['identity', 'robust', 'standard']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"dropout": tune.uniform(0.0, 0.99),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
n_series,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend, n_series=n_series)
# Always use n_series from parameters, raise exception with Optuna because we can't enforce it
if backend == 'ray':
config['n_series'] = n_series
elif backend == 'optuna':
mock_trial = MockTrial()
if ('n_series' in config(mock_trial) and config(mock_trial)['n_series'] != n_series) or ('n_series' not in config(mock_trial)):
raise Exception(f"config needs 'n_series': {n_series}")
super(AutoTSMixerx, self).__init__(
cls_model=TSMixerx,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h * x \
for x in config["input_size_multiplier"]])
# 以步长为1或h的滚动窗口
# See `BaseWindows` and `BaseRNN`'s create_windows
config['step_size'] = tune.choice([1, h])
del config["input_size_multiplier"]
if backend == 'optuna':
# Always use n_series from parameters
config['n_series'] = n_series
config = cls._ray_config_to_optuna(config)
return configshow_doc(AutoTSMixerx, title_level=3)%%capture
# 使用你自己的配置或AutoTSMixerx.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12)
model = AutoTSMixerx(h=12, n_series=1, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoTSMixerx(h=12, n_series=1, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
assert model.config(MockTrial())['n_series'] == 1
# 单元测试,用于测试Auto*模型是否包含BaseAuto中的所有必需参数
test_args(AutoTSMixerx, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoTSMixerx.get_default_config(h=12, backend='optuna', n_series=1)
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12})
return config
model = AutoTSMixerx(h=12, n_series=1, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoTSMixerx.get_default_config(h=12, backend='ray', n_series=1)
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
model = AutoTSMixerx(h=12, n_series=1, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)
class AutoMLPMultivariate(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"n_series": None,
"hidden_size": tune.choice( [256, 512, 1024] ),
"num_layers": tune.randint(2, 6),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard']),
"max_steps": tune.choice([500, 1000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
n_series,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend, n_series=n_series)
# Always use n_series from parameters, raise exception with Optuna because we can't enforce it
if backend == 'ray':
config['n_series'] = n_series
elif backend == 'optuna':
mock_trial = MockTrial()
if ('n_series' in config(mock_trial) and config(mock_trial)['n_series'] != n_series) or ('n_series' not in config(mock_trial)):
raise Exception(f"config needs 'n_series': {n_series}")
super(AutoMLPMultivariate, self).__init__(
cls_model=MLPMultivariate,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h * x \
for x in config["input_size_multiplier"]])
# 以步长为1或h的滚动窗口
# See `BaseWindows` and `BaseRNN`'s create_windows
config['step_size'] = tune.choice([1, h])
del config["input_size_multiplier"]
if backend == 'optuna':
# Always use n_series from parameters
config['n_series'] = n_series
config = cls._ray_config_to_optuna(config)
return configshow_doc(AutoMLPMultivariate, title_level=3)%%capture
# 使用你自己的配置或AutoMLPMultivariate.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12)
model = AutoMLPMultivariate(h=12, n_series=1, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoMLPMultivariate(h=12, n_series=1, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
assert model.config(MockTrial())['n_series'] == 1
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoMLPMultivariate, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoMLPMultivariate.get_default_config(h=12, backend='optuna', n_series=1)
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12})
return config
model = AutoMLPMultivariate(h=12, n_series=1, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线进行单元测试
my_config = AutoMLPMultivariate.get_default_config(h=12, backend='ray', n_series=1)
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
model = AutoMLPMultivariate(h=12, n_series=1, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoSOFTS(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"n_series": None,
"hidden_size": tune.choice([64, 128, 256, 512]),
"d_core": tune.choice([64, 128, 256, 512]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard', 'identity']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
n_series,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend, n_series=n_series)
# Always use n_series from parameters, raise exception with Optuna because we can't enforce it
if backend == 'ray':
config['n_series'] = n_series
elif backend == 'optuna':
mock_trial = MockTrial()
if ('n_series' in config(mock_trial) and config(mock_trial)['n_series'] != n_series) or ('n_series' not in config(mock_trial)):
raise Exception(f"config needs 'n_series': {n_series}")
super(AutoSOFTS, self).__init__(
cls_model=SOFTS,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h * x \
for x in config["input_size_multiplier"]])
# 以步长为1或h的滚动窗口
# See `BaseWindows` and `BaseRNN`'s create_windows
config['step_size'] = tune.choice([1, h])
del config["input_size_multiplier"]
if backend == 'optuna':
# Always use n_series from parameters
config['n_series'] = n_series
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoSOFTS, title_level=3)%%capture
# 使用你自己的配置或AutoSOFTS.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, hidden_size=16)
model = AutoSOFTS(h=12, n_series=1, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoSOFTS(h=12, n_series=1, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoSOFTS, exclude_args=['cls_model'])
# 情境单元测试:使用更新默认配置的Optuna
my_config = AutoSOFTS.get_default_config(h=12, n_series=1, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'hidden_size': 16})
return config
model = AutoSOFTS(h=12, n_series=1, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线进行单元测试
my_config = AutoSOFTS.get_default_config(h=12, n_series=1, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['hidden_size'] = 16
model = AutoSOFTS(h=12, n_series=1, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoTimeMixer(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"n_series": None,
"d_model": tune.choice([16, 32, 64]),
"d_ff": tune.choice([16, 32, 64]),
"down_sampling_layers": tune.choice([1, 2]),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard', 'identity']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
n_series,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend, n_series=n_series)
# Always use n_series from parameters, raise exception with Optuna because we can't enforce it
if backend == 'ray':
config['n_series'] = n_series
elif backend == 'optuna':
mock_trial = MockTrial()
if ('n_series' in config(mock_trial) and config(mock_trial)['n_series'] != n_series) or ('n_series' not in config(mock_trial)):
raise Exception(f"config needs 'n_series': {n_series}")
super(AutoTimeMixer, self).__init__(
cls_model=TimeMixer,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h * x \
for x in config["input_size_multiplier"]])
# 以步长为1或h的滚动窗口
# See `BaseWindows` and `BaseRNN`'s create_windows
config['step_size'] = tune.choice([1, h])
del config["input_size_multiplier"]
if backend == 'optuna':
# Always use n_series from parameters
config['n_series'] = n_series
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoTimeMixer, title_level=3)%%capture
# 使用你自己的配置或AutoTimeMixer.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, d_model=16)
model = AutoTimeMixer(h=12, n_series=1, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoTimeMixer(h=12, n_series=1, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoTimeMixer, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoTimeMixer.get_default_config(h=12, n_series=1, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'd_model': 16})
return config
model = AutoTimeMixer(h=12, n_series=1, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线的单元测试场景
my_config = AutoTimeMixer.get_default_config(h=12, n_series=1, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['d_model'] = 16
model = AutoTimeMixer(h=12, n_series=1, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)class AutoRMoK(BaseAuto):
default_config = {
"input_size_multiplier": [1, 2, 3, 4, 5],
"h": None,
"n_series": None,
"taylor_order": tune.choice([3, 4, 5]),
"jacobi_degree": tune.choice([4, 5, 6]),
"wavelet_function": tune.choice(['mexican_hat', 'morlet', 'dog', 'meyer', 'shannon']),
"learning_rate": tune.loguniform(1e-4, 1e-1),
"scaler_type": tune.choice([None, 'robust', 'standard', 'identity']),
"max_steps": tune.choice([500, 1000, 2000]),
"batch_size": tune.choice([32, 64, 128, 256]),
"loss": None,
"random_seed": tune.randint(1, 20),
}
def __init__(self,
h,
n_series,
loss=MAE(),
valid_loss=None,
config=None,
search_alg=BasicVariantGenerator(random_state=1),
num_samples=10,
refit_with_val=False,
cpus=cpu_count(),
gpus=torch.cuda.device_count(),
verbose=False,
alias=None,
backend='ray',
callbacks=None):
# 定义搜索空间、输入/输出大小
if config is None:
config = self.get_default_config(h=h, backend=backend, n_series=n_series)
# Always use n_series from parameters, raise exception with Optuna because we can't enforce it
if backend == 'ray':
config['n_series'] = n_series
elif backend == 'optuna':
mock_trial = MockTrial()
if ('n_series' in config(mock_trial) and config(mock_trial)['n_series'] != n_series) or ('n_series' not in config(mock_trial)):
raise Exception(f"config needs 'n_series': {n_series}")
super(AutoRMoK, self).__init__(
cls_model=RMoK,
h=h,
loss=loss,
valid_loss=valid_loss,
config=config,
search_alg=search_alg,
num_samples=num_samples,
refit_with_val=refit_with_val,
cpus=cpus,
gpus=gpus,
verbose=verbose,
alias=alias,
backend=backend,
callbacks=callbacks,
)
@classmethod
def get_default_config(cls, h, backend, n_series):
config = cls.default_config.copy()
config['input_size'] = tune.choice([h * x \
for x in config["input_size_multiplier"]])
# 以步长为1或h的滚动窗口
# See `BaseWindows` and `BaseRNN`'s create_windows
config['step_size'] = tune.choice([1, h])
del config["input_size_multiplier"]
if backend == 'optuna':
# Always use n_series from parameters
config['n_series'] = n_series
config = cls._ray_config_to_optuna(config)
return config show_doc(AutoRMoK, title_level=3)%%capture
# 使用你自己的配置或AutoRMoK.default_config
config = dict(max_steps=1, val_check_steps=1, input_size=12, learning_rate=1e-2)
model = AutoRMoK(h=12, n_series=1, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Optuna
model = AutoRMoK(h=12, n_series=1, config=None, backend='optuna')# 检查Optuna
assert model.config(MockTrial())['h'] == 12
# 单元测试以验证Auto*模型包含BaseAuto中的所有必需参数
test_args(AutoRMoK, exclude_args=['cls_model'])
# 情况单元测试:使用更新默认配置的Optuna
my_config = AutoRMoK.get_default_config(h=12, n_series=1, backend='optuna')
def my_config_new(trial):
config = {**my_config(trial)}
config.update({'max_steps': 1, 'val_check_steps': 1, 'input_size': 12, 'learning_rate': 1e-1})
return config
model = AutoRMoK(h=12, n_series=1, config=my_config_new, backend='optuna', num_samples=1, cpus=1)
model.fit(dataset=dataset)
# 针对更新默认配置的射线情况的单元测试
my_config = AutoRMoK.get_default_config(h=12, n_series=1, backend='ray')
my_config['max_steps'] = 1
my_config['val_check_steps'] = 1
my_config['input_size'] = 12
my_config['learning_rate'] = 1e-1
model = AutoRMoK(h=12, n_series=1, config=my_config, backend='ray', num_samples=1, cpus=1)
model.fit(dataset=dataset)测试
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from neuralforecast.tsdataset import TimeSeriesDataset
from neuralforecast.utils import AirPassengersDF as Y_df# 拆分训练集/测试集并声明时间序列数据集
Y_train_df = Y_df[Y_df.ds<='1959-12-31'] # 132次列车
Y_test_df = Y_df[Y_df.ds>'1959-12-31'] # 12项测试
dataset, *_ = TimeSeriesDataset.from_df(Y_train_df)
config = dict(max_steps=1, val_check_steps=1, input_size=12)
model = AutoNHITS(h=12, config=config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)# #测试
nhits_config = {
"learning_rate": tune.choice([1e-3]), # 初始学习率
"max_steps": tune.choice([1]), # SGD步骤数
"val_check_steps": tune.choice([1]), # 验证之间的步数
"input_size": tune.choice([5 * 12]), # 输入大小 = 乘数 * 时间范围
"batch_size": tune.choice([7]), # Windows中的系列数量
"windows_batch_size": tune.choice([256]), # 批处理中的窗口数量
"n_pool_kernel_size": tune.choice([[2, 2, 2], [16, 8, 1]]), # MaxPool's Kernelsize
"n_freq_downsample": tune.choice([[168, 24, 1], [24, 12, 1], [1, 1, 1]]), # Interpolation expressivity ratios
"activation": tune.choice(['ReLU']), # Type of non-linear activation
"n_blocks": tune.choice([[1, 1, 1]]), # Blocks per each 3 stacks
"mlp_units": tune.choice([[[512, 512], [512, 512], [512, 512]]]), # 2 512-Layers per block for each stack
"interpolation_mode": tune.choice(['linear']), # Type of multi-step interpolation
"random_seed": tune.randint(1, 10),
}
model = AutoNHITS(h=12, loss=MAE(), valid_loss=MSE(), config=nhits_config, num_samples=1, cpus=1)
# Fit and predict
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# Test equality
test_eq(str(type(model.valid_loss)), "<class 'neuralforecast.losses.pytorch.MSE'>")from neuralforecast.losses.pytorch import GMM, sCRPS# #待办事项:为损失/有效损失类型之间的交互添加单元测试
# #待办事项:单元测试(2种网络类型 x 2种损失类型 x 2种验证损失类型)
# #检查基础循环方法是否正确运行点验证损失
tcn_config = {
"learning_rate": tune.choice([1e-3]), # 初始学习率
"max_steps": tune.choice([1]), # SGD步骤数
"val_check_steps": tune.choice([1]), # 验证之间的步数
"input_size": tune.choice([5 * 12]), # 输入大小 = 乘数 * 时间范围
"batch_size": tune.choice([7]), # 窗口中的系列数量
"random_seed": tune.randint(1, 10),
}
model = AutoTCN(h=12,
loss=MAE(),
valid_loss=MSE(),
config=tcn_config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)
# #检查基础循环方法是否正确运行分位数验证损失
model = AutoTCN(h=12,
loss=GMM(n_components=2, level=[80, 90]),
valid_loss=sCRPS(level=[80, 90]),
config=tcn_config, num_samples=1, cpus=1)
# 拟合与预测
model.fit(dataset=dataset)
y_hat = model.predict(dataset=dataset)Give us a ⭐ on Github