大数据

如果你正在处理大型时间序列数据,你可能面临多个问题。其中最重要的两个是:

  • 特征提取的长时间执行

  • 大内存消耗,甚至超出单台机器的处理能力

要解决第一个问题,您可以按照 并行化 中的描述并行化计算。请注意,在您的本地计算机上,并行化功能默认已经开启。

然而,对于较大的数据集,您需要同时处理这两个问题。您有多种选择来做到这一点,我们将在以下段落中讨论。

Dask - 简单的方式

tsfresh 接受一个 dask dataframe 作为 tsfresh.extract_features() 函数的输入,而不是 pandas dataframe。Dask dataframes 允许你将计算扩展到本地内存之外(通过内部数据分区),甚至扩展到大型机器集群。它的 dataframe API 与 pandas dataframes 非常相似,甚至可能是一个即插即用的替代品。

数据格式 中讨论的所有参数对 dask 数据帧同样有效。输入数据将使用 dask 方法转换为 tsfresh 所需的正确格式,特征提取将被添加为计算图中的额外计算。然后,您可以像往常一样向结果添加额外的计算,或使用 .compute() 触发计算。

备注

特征提取的最后一步是将所有特征转换为表格格式。特别是对于非常大的数据样本,这种计算可能成为性能瓶颈。因此,如果您并不真正需要它,我们建议关闭透视功能,并尽可能多地使用未透视的数据。

例如,要从parquet读取数据并进行特征提取:

import dask.dataframe as dd
from tsfresh import extract_features

df = dd.read_parquet(...)

X = extract_features(df,
                     column_id="id",
                     column_sort="time",
                     pivot=False)

result = X.compute()

Dask - 更多控制

特征提取方法在调用实际的特征计算器之前需要执行一些数据转换。如果你想优化数据流,你可能希望对特征计算如何添加到你的dask计算图中进行更多控制。

因此,直接添加特征提取功能也是可能的:

from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk
features = dask_feature_extraction_on_chunk(df_grouped,
                                            column_id="id",
                                            column_kind="kind",
                                            column_sort="time",
                                            column_value="value")

然而在这种情况下,df_grouped 必须已经是正确的格式。更多信息请查看 tsfresh.convenience.bindings.dask_feature_extraction_on_chunk() 的文档。在这种情况下不会进行数据透视。

PySpark

与 dask 类似,也可以将特征提取传递到 Spark 计算图中。你可以在 tsfresh.convenience.bindings.spark_feature_extraction_on_chunk() 的文档中找到更多信息。