大数据
如果你正在处理大型时间序列数据,你可能面临多个问题。其中最重要的两个是:
特征提取的长时间执行
大内存消耗,甚至超出单台机器的处理能力
要解决第一个问题,您可以按照 并行化 中的描述并行化计算。请注意,在您的本地计算机上,并行化功能默认已经开启。
然而,对于较大的数据集,您需要同时处理这两个问题。您有多种选择来做到这一点,我们将在以下段落中讨论。
Dask - 简单的方式
tsfresh 接受一个 dask dataframe 作为 tsfresh.extract_features()
函数的输入,而不是 pandas dataframe。Dask dataframes 允许你将计算扩展到本地内存之外(通过内部数据分区),甚至扩展到大型机器集群。它的 dataframe API 与 pandas dataframes 非常相似,甚至可能是一个即插即用的替代品。
在 数据格式 中讨论的所有参数对 dask 数据帧同样有效。输入数据将使用 dask 方法转换为 tsfresh 所需的正确格式,特征提取将被添加为计算图中的额外计算。然后,您可以像往常一样向结果添加额外的计算,或使用 .compute()
触发计算。
备注
特征提取的最后一步是将所有特征转换为表格格式。特别是对于非常大的数据样本,这种计算可能成为性能瓶颈。因此,如果您并不真正需要它,我们建议关闭透视功能,并尽可能多地使用未透视的数据。
例如,要从parquet读取数据并进行特征提取:
import dask.dataframe as dd
from tsfresh import extract_features
df = dd.read_parquet(...)
X = extract_features(df,
column_id="id",
column_sort="time",
pivot=False)
result = X.compute()
Dask - 更多控制
特征提取方法在调用实际的特征计算器之前需要执行一些数据转换。如果你想优化数据流,你可能希望对特征计算如何添加到你的dask计算图中进行更多控制。
因此,直接添加特征提取功能也是可能的:
from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk
features = dask_feature_extraction_on_chunk(df_grouped,
column_id="id",
column_kind="kind",
column_sort="time",
column_value="value")
然而在这种情况下,df_grouped
必须已经是正确的格式。更多信息请查看 tsfresh.convenience.bindings.dask_feature_extraction_on_chunk()
的文档。在这种情况下不会进行数据透视。
PySpark
与 dask 类似,也可以将特征提取传递到 Spark 计算图中。你可以在 tsfresh.convenience.bindings.spark_feature_extraction_on_chunk()
的文档中找到更多信息。