使用 DataTransformerPipeline 进行数据(预)处理

在这个笔记本中,我们将演示如何使用 darts 执行一些常见的预处理任务。

作为一个简单的示例,我们将使用 Monthly Milk Production dataset 数据集。

DataTransformer 抽象

DataTransformer 旨在提供一种统一的方式来处理 TimeSeries 的转换:

  • transform() 由所有转换器实现。此方法接收一个 TimeSeries 或一系列 TimeSeries,应用转换并将其作为新的 TimeSeries/一系列 TimeSeries 返回。

  • inverse_transform() 由存在逆变换函数的转换器实现。它的工作方式与 transform() 类似。

  • fit() 允许转换器在调用 transform()inverse_transform() 之前首先从时间序列中提取一些信息。

设置示例

[1]:
# fix python path if working locally
from utils import fix_pythonpath_if_working_locally

fix_pythonpath_if_working_locally()

%load_ext autoreload
%autoreload 2
%matplotlib inline
[2]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

from darts import TimeSeries
from darts.models import ExponentialSmoothing
from darts.dataprocessing.transformers import (
    Scaler,
    MissingValuesFiller,
    Mapper,
    InvertibleMapper,
)
from darts.dataprocessing import Pipeline
from darts.metrics import mape
from darts.utils.statistics import check_seasonality, plot_acf, plot_residuals_analysis
from darts.utils.timeseries_generation import linear_timeseries
from darts.datasets import MonthlyMilkDataset, MonthlyMilkIncompleteDataset

import warnings

warnings.filterwarnings("ignore")
import logging

logging.disable(logging.CRITICAL)

读取数据并创建时间序列

[3]:
series = MonthlyMilkDataset().load()

print(series)
series.plot()
<TimeSeries (DataArray) (Month: 168, component: 1, sample: 1)>
array([[[589.]],

       [[561.]],

       [[640.]],

       [[656.]],

       [[727.]],

       [[697.]],

       [[640.]],

       [[599.]],

       [[568.]],

       [[577.]],

...

       [[892.]],

       [[903.]],

       [[966.]],

       [[937.]],

       [[896.]],

       [[858.]],

       [[817.]],

       [[827.]],

       [[797.]],

       [[843.]]])
Coordinates:
  * Month      (Month) datetime64[ns] 1962-01-01 1962-02-01 ... 1975-12-01
  * component  (component) object 'Pounds per cow'
Dimensions without coordinates: sample
../_images/examples_02-data-processing_6_1.png

使用转换器:使用 Scaler 对时间序列进行缩放。

某些应用程序可能要求您的数据点介于0和1之间(例如,将时间序列输入基于神经网络的预测模型)。这可以通过使用默认的 Scaler 轻松实现,它是 sklearn.preprocessing.MinMaxScaler(feature_range=(0, 1)) 的包装器。

[4]:
scaler = Scaler()
rescaled = scaler.fit_transform(series)
print(rescaled)
<TimeSeries (DataArray) (Month: 168, component: 1, sample: 1)>
array([[[0.08653846]],

       [[0.01923077]],

       [[0.20913462]],

       [[0.24759615]],

       [[0.41826923]],

       [[0.34615385]],

       [[0.20913462]],

       [[0.11057692]],

       [[0.03605769]],

       [[0.05769231]],

...

       [[0.81490385]],

       [[0.84134615]],

       [[0.99278846]],

       [[0.92307692]],

       [[0.82451923]],

       [[0.73317308]],

       [[0.63461538]],

       [[0.65865385]],

       [[0.58653846]],

       [[0.69711538]]])
Coordinates:
  * Month      (Month) datetime64[ns] 1962-01-01 1962-02-01 ... 1975-12-01
  * component  (component) <U1 '0'
Dimensions without coordinates: sample

这种缩放也可以通过调用 inverse_transform() 轻松反转。

[5]:
back = scaler.inverse_transform(rescaled)
print(back)
<TimeSeries (DataArray) (Month: 168, component: 1, sample: 1)>
array([[[589.]],

       [[561.]],

       [[640.]],

       [[656.]],

       [[727.]],

       [[697.]],

       [[640.]],

       [[599.]],

       [[568.]],

       [[577.]],

...

       [[892.]],

       [[903.]],

       [[966.]],

       [[937.]],

       [[896.]],

       [[858.]],

       [[817.]],

       [[827.]],

       [[797.]],

       [[843.]]])
Coordinates:
  * Month      (Month) datetime64[ns] 1962-01-01 1962-02-01 ... 1975-12-01
  * component  (component) <U1 '0'
Dimensions without coordinates: sample

注意,Scaler 的构造函数也允许指定其他缩放器,只要它们在 TimeSeries 上实现了 fit()transform()inverse_transform() 方法(通常是来自 scikit-learn 的缩放器)。

另一个例子 : MissingValuesFiller

让我们来看看如何处理数据集中的缺失值。

[6]:
incomplete_series = MonthlyMilkIncompleteDataset().load()
incomplete_series.plot()
../_images/examples_02-data-processing_13_0.png
[7]:
filler = MissingValuesFiller()
filled = filler.transform(incomplete_series, method="quadratic")

filled.plot()
../_images/examples_02-data-processing_14_0.png

由于 MissingValuesFiller 默认封装了 pd.interpolate ,因此我们可以在调用 transform() 时向 pd.interpolate() 函数提供参数。

[8]:
filled = filler.transform(incomplete_series, method="quadratic")
filled.plot()
../_images/examples_02-data-processing_16_0.png

MapperInvertibleMapper: 一种特殊的转换器

有时你可能想对数据执行一个简单的 map() 函数。这也可以使用数据转换器来完成。Mapper 接受一个函数,并在调用 transform() 时逐元素地应用于数据。

InvertibleMapper 还允许在创建时指定一个反函数(如果有的话),并提供 inverse_transform() 方法。

[9]:
lin_series = linear_timeseries(start_value=0, end_value=2, length=10)

squarer = Mapper(lambda x: x**2)
squared = squarer.transform(lin_series)

lin_series.plot(label="original")
squared.plot(label="squared")
plt.legend()
[9]:
<matplotlib.legend.Legend at 0x7ff5e3826c10>
../_images/examples_02-data-processing_18_1.png

更复杂(且有用)的转换

在之前使用的月度牛奶生产数据集中,月份之间的差异部分是由于某些月份的天数比其他月份多,导致这些月份的牛奶产量更大。这使得时间序列更加复杂,从而更难以预测。

[10]:
training, validation = series.split_before(pd.Timestamp("1973-01-01"))

model = ExponentialSmoothing()
model.fit(training)
forecast = model.predict(36)

plt.title("MAPE = {:.2f}%".format(mape(forecast, validation)))
series.plot(label="actual")
forecast.plot(label="forecast")
plt.legend()
[10]:
<matplotlib.legend.Legend at 0x7ff5e3bcc790>
../_images/examples_02-data-processing_20_1.png

为了考虑这一事实并实现更好的性能,我们可以改为:

  1. 将时间序列转换为表示每个月的平均每日牛奶产量(而不是每月的总产量)

  2. 做出预测

  3. 逆变换

让我们看看如何使用 InvertibleMapperpd.timestamp.days_in_month 来实现这一点

(这个想法来自 Hyndman 和 Athanasopoulos 的 “Forecasting: principles and Practice”

要转换时间序列,我们必须将月度值(数据点)除以由该值对应的时间戳给出的月份中的天数。

map()``(以及 ``Mapper / InvertibleMapper)通过允许应用一个使用值及其时间戳来计算新值的转换函数,使得这一过程变得方便:f(timestamp, value) = new_value

[11]:
# Transform the time series
toDailyAverage = InvertibleMapper(
    fn=lambda timestamp, x: x / timestamp.days_in_month,
    inverse_fn=lambda timestamp, x: x * timestamp.days_in_month,
)

dailyAverage = toDailyAverage.transform(series)

dailyAverage.plot()
../_images/examples_02-data-processing_23_0.png
[12]:
# Make a forecast
dailyavg_train, dailyavg_val = dailyAverage.split_after(pd.Timestamp("1973-01-01"))

model = ExponentialSmoothing()
model.fit(dailyavg_train)
dailyavg_forecast = model.predict(36)

plt.title("MAPE = {:.2f}%".format(mape(dailyavg_forecast, dailyavg_val)))
dailyAverage.plot()
dailyavg_forecast.plot()
plt.legend()
[12]:
<matplotlib.legend.Legend at 0x7ff5e3dd1ac0>
../_images/examples_02-data-processing_24_1.png
[13]:
# Inverse the transformation
# Here the forecast is stochastic; so we take the median value
forecast = toDailyAverage.inverse_transform(dailyavg_forecast)
[14]:
plt.title("MAPE = {:.2f}%".format(mape(forecast, validation)))
series.plot(label="actual")
forecast.plot(label="forecast")
plt.legend()
[14]:
<matplotlib.legend.Legend at 0x7ff5e3fbb280>
../_images/examples_02-data-processing_26_1.png

链接转换 : 介绍 Pipeline

现在假设我们双方都希望应用上述变换(每日平均),并将数据集重新缩放到0到1之间,以使用基于神经网络的预测模型。与其分别应用这两个变换,然后再分别反转它们,我们可以使用一个 Pipeline

[15]:
pipeline = Pipeline([toDailyAverage, scaler])
transformed = pipeline.fit_transform(training)
transformed.plot()
../_images/examples_02-data-processing_28_0.png

如果管道中的所有变换都是可逆的,那么 Pipeline 对象也是可逆的。

[16]:
back = pipeline.inverse_transform(transformed)
back.plot()
../_images/examples_02-data-processing_30_0.png

现在回想一下来自 monthly-milk-incomplete.csv 的不完整系列。假设我们想将所有预处理步骤封装到一个管道中,该管道包括:一个用于填充缺失值的 MissingValuesFiller ,以及一个用于将数据集缩放到 0 到 1 之间的 Scaler

[17]:
incomplete_series = MonthlyMilkIncompleteDataset().load()

filler = MissingValuesFiller()
scaler = Scaler()

pipeline = Pipeline([filler, scaler])
transformed = pipeline.fit_transform(incomplete_series)

假设我们已经训练了一个神经网络并生成了一些预测。现在,我们想要缩放回我们的数据。不幸的是,由于 MissingValuesFiller 不是一个 InvertibleDataTransformer (为什么有人会想在结果中插入缺失值呢!?),逆变换将会引发一个异常:ValueError: 管道中的所有变换器都不能执行 inverse_transform

很烦人吧?幸运的是,你不必从头开始重新运行所有内容,只需从 Pipeline 中排除 MissingValuesFiller 。相反,你可以将 inverse_transform 方法的 partial 参数设置为 True。在这种情况下,逆变换将跳过不可逆的转换器进行。

[18]:
back = pipeline.inverse_transform(transformed, partial=True)

处理多个时间序列

通常,我们需要处理多个时间序列。DARTS 支持将时间序列序列作为输入传递给转换器和管道,因此您不必单独处理每个样本。此外,它将负责在转换不同时间序列时存储每个缩放器使用的参数(例如,使用缩放器时)。

[19]:
series = MonthlyMilkDataset().load()
incomplete_series = MonthlyMilkIncompleteDataset().load()

multiple_ts = [incomplete_series, series[:10]]

filler = MissingValuesFiller()
scaler = Scaler()

pipeline = Pipeline([filler, scaler])
transformed = pipeline.fit_transform(multiple_ts)

for ts in transformed:
    ts.plot()
../_images/examples_02-data-processing_36_0.png
[20]:
back = pipeline.inverse_transform(transformed, partial=True)
for ts in back:
    ts.plot()
../_images/examples_02-data-processing_37_0.png

监控与并行数据处理

有时,我们可能还需要处理庞大的数据集。在这种情况下,按顺序处理每个样本可能会花费相当长的时间。Darts 不仅可以帮助监控转换过程,还可以在可能的情况下并行处理多个样本。

在每个转换器或管道中设置 verbose 参数时,会创建一些进度条:

[21]:
series = MonthlyMilkIncompleteDataset().load()

huge_number_of_series = [series] * 10000

scaler = Scaler(verbose=True, name="Basic")

transformed = scaler.fit_transform(huge_number_of_series)

我们现在知道需要等待多久。但由于没有人喜欢浪费时间等待,我们可以利用机器的多个核心来并行处理数据。我们可以通过设置 n_jobs 参数(与 sklearn 中的用法相同)来实现这一点。

注意:加速效果紧密依赖于可用核心的数量和转换的‘CPU密集度’。

[22]:
# setting n_jobs to -1 will make the library using all the cores available in the machine
scaler = Scaler(verbose=True, n_jobs=-1, name="Faster")
scaler.fit(huge_number_of_series)
back = scaler.transform(huge_number_of_series)
[ ]: