并行化
特征提取、特征选择以及滚动,都提供了并行化的可能性。默认情况下,tsfresh 会并行化所有这些任务。在这里,我们讨论控制并行化的不同设置。为了在使用场景中获得最佳结果,您应该尝试这些参数。
备注
本文档描述了并行化以加快处理时间。如果您正在处理大量数据(可能无法放入内存),请查看 大数据。
请告知我们关于调整以下参数的结果!这将有助于改进文档以及默认设置。
特征选择的并行化
我们使用 multiprocessing.Pool 来并行计算每个特征的 p 值。在实例化时,我们将 Pool 的工作进程数设置为 n_jobs。此字段默认为当前系统上的处理器数量。我们建议将其设置为可用(且空闲)处理器的最大数量。
Pool 的 map 函数的 chunksize 是另一个需要考虑的重要参数。它可以通过 chunksize 字段设置。默认情况下,它由 multiprocessing.Pool 的并行化参数决定。一个数据块被定义为一个 id 和一种类型的单一时间序列。chunksize 是指作为一项任务提交给一个工作进程的块数。如果你将 chunksize 设置为 10,那么这意味着一个工作任务对应于计算 10 个 id/kind 时间序列组合的所有特征。如果设置为 None,根据分发器,将使用启发式方法来找到最佳的 chunksize。chunksize 对最佳集群性能有至关重要的影响,应该针对当前问题在基准测试中进行优化。
特征提取的并行化
对于特征提取,tsfresh 暴露了参数 n_jobs 和 chunksize。两者的行为类似于特征选择中的参数。
要进行性能研究和分析,有时关闭并行化是有用的。这可以通过将参数 n_jobs 设置为 0 来实现。
单机之外的并行化
大量时间序列数据可能需要进行大规模分析。因此,时间序列需要在计算单元组上进行处理,而不是在单个机器上。
因此,可能需要将时间序列特征的提取分布到一个集群中。可以使用 tsfresh 以分布式方式提取特征。在以下段落中,我们将讨论如何设置分布式 tsfresh。
为了分发特征的计算,我们使用一个特定的对象,即 Distributor 类(位于 tsfresh.utilities.distribution 模块中)。
本质上,一个分发器组织特征计算器对数据块的应用。它将特征计算器映射到数据块,然后进行归约,这意味着它将各个映射的结果合并为一个对象,即特征矩阵。
因此,分销商将按以下顺序进行,
基于时间序列数据的特点计算一个最优的
chunk_size(通过calculate_best_chunk_size())将时间序列数据分割成块(通过
partition())将特征计算器的应用分配给数据块(通过
distribute())将结果合并到特征矩阵中(通过
map_reduce())关闭所有连接,关闭所有资源并清理所有内容(通过
close())
那么,你如何使用 Distributor 通过 tsfresh 提取特征呢?你需要将 distributor 作为参数传递给 extract_features() 方法。
以下示例展示了如何定义 MultiprocessingDistributor,它将把计算分配给本地线程池:
from tsfresh.examples.robot_execution_failures import \
download_robot_execution_failures, \
load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.distribution import MultiprocessingDistributor
# download and load some time series data
download_robot_execution_failures()
df, y = load_robot_execution_failures()
# We construct a Distributor that will spawn the calculations
# over four threads on the local machine
Distributor = MultiprocessingDistributor(n_workers=4,
disable_progressbar=False,
progressbar_title="Feature Extraction")
# just to pass the Distributor object to
# the feature extraction, along with the other parameters
X = extract_features(timeseries_container=df,
column_id='id',
column_sort='time',
distributor=Distributor)
以下示例对应于现有的 multiprocessing tsfresh API,您只需指定作业数量,无需构建分发器:
from tsfresh.examples.robot_execution_failures import \
download_robot_execution_failures, \
load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
download_robot_execution_failures()
df, y = load_robot_execution_failures()
X = extract_features(timeseries_container=df,
column_id='id',
column_sort='time',
n_jobs=4)
使用dask来分布计算
我们为 dask 框架 提供了一个分发器,其中 “Dask 是一个用于分析计算的灵活并行计算库。”
备注
本部分文档仅处理使用 dask 集群并行计算。输入和输出仍然是 pandas 对象。如果你想利用 dask 的能力来扩展到超出本地内存的数据,请查看 大数据。
Dask 是一个用于将分析计算分布到集群中的优秀框架。它可以上下扩展,这意味着你也可以在单个机器上使用它。在 Dask 集群上运行 tsfresh 所需的唯一信息是 dask-scheduler 的 IP 地址和端口号。
假设你的dask调度器运行在 192.168.0.1:8786,那么我们可以构建一个 ClusterDaskDistributor,它连接到调度器并将时间序列数据和计算分发到一个集群:
from tsfresh.examples.robot_execution_failures import \
download_robot_execution_failures, \
load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.distribution import ClusterDaskDistributor
download_robot_execution_failures()
df, y = load_robot_execution_failures()
Distributor = ClusterDaskDistributor(address="192.168.0.1:8786")
X = extract_features(timeseries_container=df,
column_id='id',
column_sort='time',
distributor=Distributor)
与上面的 MultiprocessingDistributor 示例相比,我们只需更改一行代码即可从一台机器切换到整个集群。就是这么简单。通过更改分发器,您可以轻松地将应用程序部署到集群上运行,而不是在您的个人工作站上。
你也可以在你的本地机器上使用一个本地 Dask 集群来模拟一个 Dask 网络。以下示例展示了如何在 3 个工作者的本地集群上设置一个 LocalDaskDistributor:
from tsfresh.examples.robot_execution_failures import \
download_robot_execution_failures, \
load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.distribution import LocalDaskDistributor
download_robot_execution_failures()
df, y = load_robot_execution_failures()
Distributor = LocalDaskDistributor(n_workers=3)
X = extract_features(timeseries_container=df,
column_id='id',
column_sort='time',
distributor=Distributor)
编写自己的分发器
如果你想使用其他框架而不是 Dask,你将需要编写自己的 Distributor。要构建你的自定义 Distributor,你需要定义一个继承自抽象基类 tsfresh.utilities.distribution.DistributorBaseClass 的对象。tsfresh.utilities.distribution 模块包含了更多你需要实现的信息。
高效并行化的注意事项
默认情况下,tsfresh 使用并行化将单线程的 Python 代码分发到主机上的多个核心。
然而,这可能会导致一个被称为过度配置的问题。许多用于特征计算器的底层Python库(例如numpy)在其底层处理中使用了C代码实现。这些`也`试图在尽可能多的核心之间分配它们的工作负载——这与tsfresh进行的并行化相冲突。
过度配置是低效的,因为重复的上下文切换会产生开销。
这个问题可以通过将C库限制为单线程来解决,使用以下环境变量:
import os
os.environ['OMP_NUM_THREADS'] = "1"
os.environ['MKL_NUM_THREADS'] = "1"
os.environ['OPENBLAS_NUM_THREADS'] = "1"
在你的笔记本/Python脚本的开头插入这些行 - 在你调用任何 tsfresh 代码或导入任何其他模块之前。
你的主机计算机拥有的核心越多,通过实施这些环境变化,处理速度的提升就越大。根据主机类型的不同,速度提升在6倍到26倍之间。
备注
如果你打算在特征提取后运行机器学习管道,强烈建议恢复这些更改。这些设置已经在GPU加速的XGBoost分类器训练任务中进行了测试,速度降低了9倍。许多流行的机器学习库,如scikit-learn和Tensorflow/PyTorch,依赖于CPU核心的并行化来加速依赖于CPU的低级计算。通过强制这些库只使用单个线程,即使你使用GPU加速其他下游任务,也会在这些任务中产生瓶颈。例如,在每次PyTorch神经网络前向传递后,损失函数的CPU计算将只在一个线程上运行,而GPU则空闲并等待其完成后再开始其反向传播任务。