使用大型数据集

关于如何在无法全部加载到内存中的数据集上训练神经预测模型的教程

标准的DataLoader类用于NeuralForecast,期望数据集由单个DataFrame表示,该DataFrame在拟合模型时会完全加载到内存中。然而,当数据集太大而无法如此处理时,我们可以选择使用自定义的大规模DataLoader。这个自定义加载器假定每个时间序列分布在一系列Parquet文件中,并确保在任何时间点只加载一个批次到内存中。

在这个笔记本中,我们将演示这些文件的预期格式、如何训练模型以及如何使用这个大规模DataLoader进行推断。

加载库

import logging
import os
import tempfile

import numpy as nppackage_name
import pandas as pd

from neuralforecast import NeuralForecast
from neuralforecast.models import NHITS
from utilsforecast.evaluation import evaluate
from utilsforecast.losses import mae, rmse, smape
from neuralforecast.utils import AirPassengersPanel, AirPassengersStatic
logging.getLogger('pytorch_lightning').setLevel(logging.ERROR)
os.environ['NIXTLA_ID_AS_COL'] = '1'

数据

每个时间序列应存储在一个名为 unique_id=timeseries_id 的目录中。在该目录内,时间序列可以完全包含在一个 Parquet 文件中,或者分散在多个 Parquet 文件中。无论格式如何,时间序列必须按时间排列。

例如,以下代码将 AirPassengers 数据框(每个时间序列已经按时间排序)拆分为以下格式:

>  数据
    >  unique_id=Airline1
         -  a59945617fdb40d1bc6caa4aadad881c-0.parquet
    >  unique_id=Airline2
         -  a59945617fdb40d1bc6caa4aadad881c-0.parquet


然后我们只需输入这些目录的路径列表。

Y_df = AirPassengersPanel.copy()
Y_df
unique_id ds y trend y_[lag12]
0 Airline1 1949-01-31 112.0 0 112.0
1 Airline1 1949-02-28 118.0 1 118.0
2 Airline1 1949-03-31 132.0 2 132.0
3 Airline1 1949-04-30 129.0 3 129.0
4 Airline1 1949-05-31 121.0 4 121.0
... ... ... ... ... ...
283 Airline2 1960-08-31 906.0 283 859.0
284 Airline2 1960-09-30 808.0 284 763.0
285 Airline2 1960-10-31 761.0 285 707.0
286 Airline2 1960-11-30 690.0 286 662.0
287 Airline2 1960-12-31 732.0 287 705.0

288 rows × 5 columns

valid = Y_df.groupby('unique_id').tail(72)
# 从现在起,我们将使用id_col作为时间序列的唯一标识符(这是因为我们使用unique_id列将数据分区存储为Parquet文件)。
valid = valid.rename(columns={'unique_id': 'id_col'})

train = Y_df.drop(valid.index)
train['id_col'] = train['unique_id'].copy()

# 我们在此使用临时目录生成文件,以展示预期的文件结构。
tmpdir = tempfile.TemporaryDirectory()
train.to_parquet(tmpdir.name, partition_cols=['unique_id'], index=False)
files_list = [f"{tmpdir.name}/{dir}" for dir in os.listdir(tmpdir.name)]
files_list
['/tmp/tmp9cy2qa36/unique_id=Airline2', '/tmp/tmp9cy2qa36/unique_id=Airline1']

您还可以使用以下内容通过spark数据框创建该目录结构:

"""
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
(
  spark_df
  .repartition(id_col)
  .sortWithinPartitions(id_col, time_col)
  .write
  .partitionBy(id_col)
  .parquet(out_dir)
)
"""

DataLoader类仍然期望静态数据作为单个DataFrame传入,每个时间序列对应一行。

static = AirPassengersStatic.rename(columns={'unique_id': 'id_col'})
static
id_col airline1 airline2
0 Airline1 0 1
1 Airline2 1 0

模型训练

我们现在在上述数据集上训练一个 NHITS 模型。值得注意的是,NeuralForecast 当前在使用此 DataLoader 时不支持缩放。如果您想对时间序列进行缩放,应该在传递给 fit 方法之前完成。

horizon = 12
stacks = 3
models = [NHITS(input_size=5 * horizon,
                h=horizon,
                futr_exog_list=['trend', 'y_[lag12]'],
                stat_exog_list=['airline1', 'airline2'],
                max_steps=100,
                stack_types = stacks*['identity'],
                n_blocks = stacks*[1],
                mlp_units = [[256,256] for _ in range(stacks)],
                n_pool_kernel_size = stacks*[1])]
nf = NeuralForecast(models=models, freq='ME')
nf.fit(df=files_list, static_df=static, id_col='id_col')
Seed set to 1
2024-07-23 11:46:22.369481: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-07-23 11:46:22.402269: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI AVX512_BF16 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-07-23 11:46:22.934767: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT

预测

在处理大型数据集时,我们需要提供一个单一的DataFrame,其中包含所有时间序列的输入时间步。如果我们有未来的外生特征,我们也应该在单独的futr_df DataFrame中包含这些特征的未来值。

对于下面的预测,我们假设我们只想预测航空公司2的下12个时间步。

valid_df = valid[valid['id_col'] == 'Airline2']
# 我们在拟合模型时设置了输入大小为60,预测范围为12。
pred_df = valid_df[:60]
futr_df = valid_df[60:72]
futr_df = futr_df.drop(["y"], axis=1)

predictions = nf.predict(df=pred_df, futr_df=futr_df, static_df=static)
predictions
id_col ds NHITS
0 Airline2 1960-01-31 710.602417
1 Airline2 1960-02-29 688.900879
2 Airline2 1960-03-31 758.637573
3 Airline2 1960-04-30 748.974365
4 Airline2 1960-05-31 753.558655
5 Airline2 1960-06-30 801.517822
6 Airline2 1960-07-31 863.835449
7 Airline2 1960-08-31 847.854980
8 Airline2 1960-09-30 797.115845
9 Airline2 1960-10-31 748.879761
10 Airline2 1960-11-30 707.076233
11 Airline2 1960-12-31 747.851685

评估

target = valid_df[60:72]
evaluate(
    predictions.merge(target.drop(["trend", "y_[lag12]"], axis=1), on=['id_col', 'ds']),
    metrics=[mae, rmse, smape],
    id_col='id_col',
    agg_fn='mean',
)
metric NHITS
0 mae 23.693777
1 rmse 29.992256
2 smape 0.014734

Give us a ⭐ on Github