tsfresh.utilities 包

子模块

tsfresh.utilities.dataframe_functions 模块

用于处理 DataFrame 转换为内部标准化格式(参见 normalize_input_to_internal_representation)或如何处理 DataFrame 中的 NaNinf 的实用函数。

tsfresh.utilities.dataframe_functions.add_sub_time_series_index(df_or_dict, sub_length, column_id=None, column_sort=None, column_kind=None)[源代码]

添加一个包含以下内容的“id”列:

  • 如果 column_id 为 None:对于每种类型(或者如果 column_kind 为 None 则对整个数据框),通过将数据按长度为 “sub_length” 的包进行 “子打包” 来构建一个新的索引。例如,如果你有长度为 11 的数据,且 sub_length 为 2,你将得到 6 个新的包:0, 0; 1, 1; 2, 2; 3, 3; 4, 4; 5。

  • 如果 column_id 不是 None:与之前相同,只是针对每个 id 单独处理。旧的 column_id 值会添加到新的“id”列之后,用逗号分隔。

你可以使用这些函数将一个长测量值转换为子包,在这些子包中你可以提取特征。

参数:
  • df_or_dict (pandas.DataFrame or dict) – 一个 pandas DataFrame 或一个字典。对象所需的形状/形式取决于传递的其他参数。

  • column_id (basestring or None) – 它必须存在于 pandas DataFrame 中,或者存在于字典中的所有 DataFrame 中。此列中不允许有 NaN 值。

  • column_sort (basestring or None) – 如果不是 None,则按此列对行进行排序。此列中不允许有 NaN 值。

  • column_kind (basestring or None) – 它只能在传递 pandas DataFrame 时使用(假设字典已经按种类分组)。它必须存在于 DataFrame 中,并且不允许有 NaN 值。如果未传递种类列,则假定 pandas DataFrame 中的每一列(除了 id 或排序列)都是可能的种类。

返回:

添加了“id”列的数据框或数据框字典

返回类型:

the one from df_or_dict

tsfresh.utilities.dataframe_functions.check_for_nans_in_columns(df, columns=None)[源代码]

用于检查数据框中是否存在 NaN 并如果有则引发 ValueError 的辅助函数。

参数:
  • df (pandas.DataFrame) – 用于测试 NaN 的 pandas DataFrame

  • columns (list) – 要测试是否为 NaN 的列的列表。如果留空,将测试 DataFrame 的所有列。

返回:

返回类型:

None

Raise:

在 DataFrame 中发现了 NaNsValueError

tsfresh.utilities.dataframe_functions.get_ids(df_or_dict, column_id)[源代码]

从时间序列容器中聚合列 column_id 中的所有 ID

参数:
  • df_or_dict (pandas.DataFrame or dict) – 一个 pandas DataFrame 或一个字典。

  • column_id (basestring) – 它必须存在于 pandas DataFrame 中,或者存在于字典中的所有 DataFrame 中。此列中不允许有 NaN 值。

返回:

如在 energy_ratio_by_chunks 中设置的所有现有 id

返回类型:

Set

Raise:

如果 df_or_dict 不是 dict 或 pandas.DataFrame 类型,则抛出 TypeError

tsfresh.utilities.dataframe_functions.get_range_values_per_column(df)[源代码]

检索 DataFrame df 中每列的有限最大值、最小值和平均值,并将它们存储在三个字典中。这些字典 col_to_maxcol_to_mincol_to_median 将列名映射到该列的最大值、最小值或中位数值。

如果一列不包含任何有限值,则存储 0 代替。

参数:

df (pandas.DataFrame) – 获取数据框中列的最大值、最小值和中位数

返回:

将列名映射到最大值、最小值、平均值的字典

返回类型:

(dict, dict, dict)

tsfresh.utilities.dataframe_functions.impute(df_impute)[源代码]

按列替换 DataFrame df_impute 中的所有 NaNsinfs 为相同列的平均值/极值。具体操作如下:df_impute 中出现的每个 infNaN 都被替换为

  • -inf -> min

  • +inf -> max

  • NaN -> 中位数

如果该列完全不包含有限值,则用零填充。

此函数会就地修改 df_impute。之后,df_impute 将保证不包含任何非有限值。此外,所有列都将保证为 np.float64 类型。

参数:

df_impute (pandas.DataFrame) – DataFrame 用于插补

Return df_impute:

插补后的 DataFrame

Rtype df_impute:

pandas.DataFrame

tsfresh.utilities.dataframe_functions.impute_dataframe_range(df_impute, col_to_max, col_to_min, col_to_median)[源代码]

按列替换 DataFrame df_impute 中的所有 NaN-inf+inf,使用提供的字典中的平均值/极值。

这是通过以下方式完成的:在 df_impute 中出现的每个 infNaN 都被替换为

  • -inf -> 按 col_to_min 列的值

  • +inf -> 按 col_to_max 中的值

  • NaN -> 按 col_to_median 列中的中值填充

如果在 df_impute 的某一列在任一词典中未找到,此方法将引发 ValueError。此外,如果替换的值之一不是有限的,则会返回 ValueError。

此函数会就地修改 df_impute。之后,df_impute 将保证不包含任何非有限值。此外,所有列都将保证为 np.float64 类型。

参数:
  • df_impute (pandas.DataFrame) – DataFrame 用于插补

  • col_to_max (dict) – 字典映射列名到最大值

  • col_to_min – 字典映射列名到最小值

  • col_to_median – 字典将列名映射到中值

Return df_impute:

插补后的 DataFrame

Rtype df_impute:

pandas.DataFrame

抛出:

ValueError – 如果 df_impute 的一列在 col_to_max、col_to_min 或 col_to_median 中缺失,或者替换值为非有限值

tsfresh.utilities.dataframe_functions.impute_dataframe_zero(df_impute)[源代码]

将 DataFrame df_impute 中的所有 NaN-infs+infs 替换为 0。df_impute 将被原地修改。其所有列将被转换为 dtype np.float64

参数:

df_impute (pandas.DataFrame) – DataFrame 用于插补

Return df_impute:

插补后的 DataFrame

Rtype df_impute:

pandas.DataFrame

tsfresh.utilities.dataframe_functions.make_forecasting_frame(x, kind, max_timeshift, rolling_direction)[源代码]

接受一个单一的时间序列 x 并构建一个 DataFrame df 和目标向量 y,可用于时间序列预测任务。

返回的 df 将包含,对于 x 中的每个时间戳,最后 max_timeshift 个数据点作为一个新的时间序列,这可以用于拟合时间序列预测模型。

有关滚动过程的详细描述以及如何推导特征矩阵和目标向量的信息,请参阅 滚动/时间序列预测

返回的时间序列容器 df 将包含滚动后的时间序列作为一个扁平数据框,即 数据格式标签 中的第一种格式。

当 x 是 pandas.Series 时,索引将被用作 id。

参数:
  • x (np.array or pd.Series) – 单一时间序列

  • kind (str) – 时间序列的类型

  • rolling_direction (int) – 符号决定是否向后滚动(如果符号为正)或在“时间”上向前滚动

  • max_timeshift (int) – 如果不是 None,则最多移动 max_timeshift 次。如果是 None,则尽可能频繁地移动。

返回:

时间序列容器 df,目标向量 y

返回类型:

(pd.DataFrame, pd.Series)

tsfresh.utilities.dataframe_functions.restrict_input_to_index(df_or_dict, column_id, index)[源代码]

将 df_or_dict 限制为包含在索引中的那些id。

参数:
  • df_or_dict (pandas.DataFrame or dict) – 一个 pandas DataFrame 或一个字典。

  • column_id (basestring) – 它必须存在于 pandas DataFrame 中,或者存在于字典中的所有 DataFrame 中。此列中不允许有 NaN 值。

  • index (Iterable or pandas.Series) – 包含id的索引

返回 df_or_dict_restricted:

受限的 df_or_dict

Rtype df_or_dict_restricted:

字典或 pandas.DataFrame

Raise:

如果 df_or_dict 不是 dict 或 pandas.DataFrame 类型,则抛出 TypeError

tsfresh.utilities.dataframe_functions.roll_time_series(df_or_dict, column_id, column_sort=None, column_kind=None, rolling_direction=1, max_timeshift=None, min_timeshift=0, chunksize=None, n_jobs=5, show_warnings=False, disable_progressbar=False, distributor=None)[源代码]

此方法创建时间序列的子窗口。它将每种类型和每个ID的(已排序的)数据框在“时间”域中分别滚动(时间域由`column_sort`给出的排序列的排序顺序表示)。

对于每个滚动步骤,通过方案 ({id}, {shift}) 创建一个新的id,其中 id 是列的前一个id,shift 是“时间”偏移的量。你可以将其视为一个固定长度的窗口(max_timeshift)在时间序列上每次移动一步。窗口看到的每个剪切部分都是一个带有新标识符的新时间序列。

几点说明:

  • 此方法将创建新的ID!

  • 滚动符号定义了时间滚动的方向,正值意味着我们正在将剪切窗口向前移动。每个新子时间序列的名称由最后一个时间点给出。这意味着,名为 ([id=]4,[timeshift=]5)max_timeshift 为 3 的时间序列包含时间 3、4 和 5 的数据。负滚动方向意味着,您在数据中向负时间方向移动。名为 ([id=]4,[timeshift=]5)max_timeshift 为 3 的时间序列将包含时间 5、6 和 7 的数据。绝对值定义了每一步要移动的时间量。

  • 可以对不同长度的时序数据进行时间偏移,但是:

  • 我们假设时间序列是均匀采样的

  • 更多信息,请参见 滚动/时间序列预测

参数:
  • df_or_dict (pandas.DataFrame or dict) – 一个 pandas DataFrame 或一个字典。对象所需的形状/形式取决于传递的其他参数。

  • column_id (basestring) – 它必须存在于 pandas DataFrame 中,或者存在于字典中的所有 DataFrame 中。此列中不允许有 NaN 值。

  • column_sort (basestring or None) – 如果不是 None,则按此列对行进行排序。此列中不允许有 NaN 值。如果未给出,将填充一个递增的数字,这意味着传递的数据帧的顺序将用作时间序列的“时间”。

  • column_kind (basestring or None) – 它只能在传递 pandas DataFrame 时使用(假设字典已经按种类分组)。它必须存在于 DataFrame 中,并且不允许有 NaN 值。如果未传递种类列,则假定 pandas DataFrame 中的每一列(除了 id 或排序列)都是可能的种类。

  • rolling_direction (int) – 这个符号决定了是否在“时间”上将我们的剪切窗口向前或向后移动。绝对值决定了每一步移动的量。

  • max_timeshift (int) – 如果不是 None,剪切窗口的最大尺寸为 max_timeshift。如果为 None,它将无限增长。

  • min_timeshift (int) – 丢弃所有提取的预测窗口,这些窗口的大小等于或小于此值。必须大于或等于0。

  • n_jobs (int) – 用于并行化的进程数。如果为零,则不使用并行化。

  • chunksize (None or int) – 每个职位应计算多少个班次。

  • show_warnings (bool) – 在特征提取过程中显示警告(用于计算器的调试)。

  • disable_progressbar (bool) – 在计算过程中不要显示进度条。

  • distributor (class) – 高级参数:将其设置为您希望用作分发器的类名。有关更多信息,请参阅 utilities/distribution.py。如果您希望 TSFresh 选择最佳分发器,请保留为 None。

返回:

滚动数据框或数据框字典

返回类型:

the one from df_or_dict

tsfresh.utilities.distribution 模块

此模块包含 Distributor 类,此类对象用于分配特征的计算。本质上,Distributor 组织了特征计算器对数据块的应用。

Nils Braun 设计的这个模块

class tsfresh.utilities.distribution.ApplyDistributor(meta)[源代码]

基类:DistributorBaseClass

map_reduce(map_function, data, function_kwargs=None, chunk_size=None, data_length=None)[源代码]

此方法包含 DistributorBaseClass 类的核心功能。

它将 map_function 映射到数据的每个元素,并将结果缩减以返回一个扁平化的列表。

它需要为每个子类实现。

参数:
  • map_function (callable) – 应用于每个数据项的函数。

  • data (iterable) – 用于计算的数据

  • function_kwargs (dict of string to parameter) – map 函数的参数

  • chunk_size (int) – 如果提供,则根据此大小对数据进行分块。如果未提供,则使用经验值。

  • data_length (int) – 如果数据是一个生成器,你需要在这里设置长度。如果它是无,长度则从数据的长度推导出来。

返回:

计算结果

返回类型:

list

class tsfresh.utilities.distribution.ClusterDaskDistributor(address)[源代码]

基类:IterableDistributorBaseClass

使用dask集群的分发器,意味着计算分布在一个集群上

calculate_best_chunk_size(data_length)[源代码]

使用集群中dask工作者的数量(在执行时间,即开始提取时)来找到最佳的chunk_size。

参数:

data_length (int) – 定义需要进行多少次计算的长度。

close()[源代码]

关闭与 Dask 调度器的连接

distribute(func, partitioned_chunks, kwargs)[源代码]

通过将map命令分发到集群上的dask工作者来并行计算特征

参数:
  • func (callable) – 发送到每个工作者的函数。

  • partitioned_chunks (iterable) – 数据块列表 - 每个元素又是一个块列表 - 应由一个工作者处理。

  • kwargs (dict of string to parameter) – map 函数的参数

返回:

计算结果为列表 - 每个项目应为将 func 应用于单个元素的结果。

class tsfresh.utilities.distribution.DistributorBaseClass[源代码]

基类:object

分发器抽象基类。

DistributorBaseClass 子类的实例的主要目的是对一个数据项列表(称为数据)上的函数(称为 map_function)进行评估。

根据 distribute 函数的实现,这可以通过并行处理或使用节点集群来完成。

map_reduce(map_function, data, function_kwargs=None, chunk_size=None, data_length=None)[源代码]

此方法包含 DistributorBaseClass 类的核心功能。

它将 map_function 映射到数据的每个元素,并将结果缩减以返回一个扁平化的列表。

它需要为每个子类实现。

参数:
  • map_function (callable) – 应用于每个数据项的函数。

  • data (iterable) – 用于计算的数据

  • function_kwargs (dict of string to parameter) – map 函数的参数

  • chunk_size (int) – 如果提供,则根据此大小对数据进行分块。如果未提供,则使用经验值。

  • data_length (int) – 如果数据是一个生成器,你需要在这里设置长度。如果它是无,长度则从数据的长度推导出来。

返回:

计算结果

返回类型:

list

class tsfresh.utilities.distribution.IterableDistributorBaseClass[源代码]

基类:DistributorBaseClass

可以处理所有可迭代项并分别计算每个项的 map_function 的分发器基类。

这是在数据的块上完成的,意味着,DistributorBaseClass 类会将数据分块,分发数据并在各个项目上分别应用 map_function 函数。

根据 distribute 函数的实现,这可以通过并行处理或使用节点集群来完成。

calculate_best_chunk_size(data_length)[源代码]

计算长度为 data_length 的列表的最佳块大小。当前实现的公式或多或少是针对单机多进程情况的经验结果。

参数:

data_length (int) – 定义需要进行多少次计算的长度。

返回:

计算出的块大小

返回类型:

int

TODO: 调查在不同设置下哪种块大小最佳。

close()[源代码]

在使用后清理 DistributorBaseClass 的抽象基函数,例如关闭与 DaskScheduler 的连接

distribute(func, partitioned_chunks, kwargs)[源代码]

这个抽象基函数在工作者之间分配工作,工作者可以是线程或集群中的节点。必须在派生类中实现。

参数:
  • func (callable) – 发送到每个工作者的函数。

  • partitioned_chunks (iterable) – 数据块列表 - 每个元素又是一个块列表 - 应由一个工作者处理。

  • kwargs (dict of string to parameter) – map 函数的参数

返回:

计算结果为列表 - 每个项目应为将 func 应用于单个元素的结果。

map_reduce(map_function, data, function_kwargs=None, chunk_size=None, data_length=None)[源代码]

此方法包含 DistributorBaseClass 类的核心功能。

它将 map_function 映射到数据的每个元素,并将结果缩减以返回一个扁平化的列表。

任务如何计算,由 tsfresh.utilities.distribution.DistributorBaseClass.distribute() 方法决定,该方法可以在多个线程中、跨多个处理单元等分配任务。

为了不单独传输数据的每个元素,数据根据块大小(如果没有给出,则根据经验猜测)被分割成块。通过这种方式,工作进程处理的不是微小的而是适当大小的数据部分。

参数:
  • map_function (callable) – 应用于每个数据项的函数。

  • data (iterable) – 用于计算的数据

  • function_kwargs (dict of string to parameter) – map 函数的参数

  • chunk_size (int) – 如果提供,则根据此大小对数据进行分块。如果未提供,则使用经验值。

  • data_length (int) – 如果数据是一个生成器,你需要在这里设置长度。如果它是无,长度则从数据的长度推导出来。

返回:

计算结果

返回类型:

list

static partition(data, chunk_size)[源代码]

此生成器将一个可迭代对象分割成长度为 chunk_size 的切片。如果块大小不是数据长度的除数,则最后一个切片将较短。

来自 https://stackoverflow.com/questions/1915170/split-a-generator-iterable-every-n-items-in-python-splitevery

这里的重要部分是,可迭代对象只遍历一次,并且一次只生成一个块。这对内存和速度都有好处。

参数:
  • data (Iterable) – 要分区的数据。

  • chunk_size (int) – 块大小。最后一个块可能会更小。

返回:

一个生成数据块的生成器。

返回类型:

Generator[Iterable]

class tsfresh.utilities.distribution.LocalDaskDistributor(n_workers)[源代码]

基类:IterableDistributorBaseClass

使用本地dask集群和进程内通信的分发器。

close()[源代码]

关闭与本地 Dask 调度器的连接

distribute(func, partitioned_chunks, kwargs)[源代码]

通过将map命令分发到本地机器上的dask工作者,以并行方式计算特征

参数:
  • func (callable) – 发送到每个工作者的函数。

  • partitioned_chunks (iterable) – 数据块列表 - 每个元素又是一个块列表 - 应由一个工作者处理。

  • kwargs (dict of string to parameter) – map 函数的参数

返回:

计算结果为列表 - 每个项目应为将 func 应用于单个元素的结果。

class tsfresh.utilities.distribution.MapDistributor(disable_progressbar=False, progressbar_title='Feature Extraction')[源代码]

基类:IterableDistributorBaseClass

使用Python内置的map进行分发,该方法会依次顺序计算每个任务。

calculate_best_chunk_size(data_length)[源代码]

对于按顺序计算特征的 map 命令,将使用 chunk_size 为 1。

参数:

data_length (int) – 定义需要进行多少次计算的长度。

distribute(func, partitioned_chunks, kwargs)[源代码]

通过Python的map命令按顺序计算特征

参数:
  • func (callable) – 发送到每个工作者的函数。

  • partitioned_chunks (iterable) – 数据块列表 - 每个元素又是一个块列表 - 应由一个工作者处理。

  • kwargs (dict of string to parameter) – map 函数的参数

返回:

计算结果为列表 - 每个项目应为将 func 应用于单个元素的结果。

class tsfresh.utilities.distribution.MultiprocessingDistributor(n_workers, disable_progressbar=False, progressbar_title='Feature Extraction', show_warnings=True)[源代码]

基类:IterableDistributorBaseClass

使用多进程池在本地机器上并行计算任务的分发器。

close()[源代码]

收集来自工作线程的结果并关闭线程池。

distribute(func, partitioned_chunks, kwargs)[源代码]

通过将map命令分发到线程池中以并行方式计算特征

参数:
  • func (callable) – 发送到每个工作者的函数。

  • partitioned_chunks (iterable) – 数据块列表 - 每个元素又是一个块列表 - 应由一个工作者处理。

  • kwargs (dict of string to parameter) – map 函数的参数

返回:

计算结果为列表 - 每个项目应为将 func 应用于单个元素的结果。

tsfresh.utilities.distribution.initialize_warnings_in_workers(show_warnings)[源代码]

用于在多进程工作器中初始化警告模块的小助手函数。

在Windows上,Python会生成不继承警告状态的新进程,因此在运行计算之前必须启用/禁用警告。

参数:

show_warnings (bool) – 是否显示警告。

tsfresh.utilities.profiling 模块

包含启动和停止分析器的方法,该分析器用于检查不同特征计算器的运行时间

tsfresh.utilities.profiling.end_profiling(profiler, filename, sorting=None)[源代码]

帮助函数,用于停止分析过程并将分析数据写入给定的文件名。在此之前,根据传递的排序对统计数据进行排序。

参数:
  • profiler (cProfile.Profile) – 一个已经启动的分析器(可能是通过 start_profiling 启动的)。

  • filename (basestring) – 要保存配置文件的输出文件名。

  • sorting (basestring) – 传递给 sort_stats 函数的统计数据的排序。

返回:

返回类型:

None

使用以下命令启动和停止分析器:

>>> profiler = start_profiling()
>>> # Do something you want to profile
>>> end_profiling(profiler, "out.txt", "cumulative")
tsfresh.utilities.profiling.get_n_jobs()[源代码]

获取用于并行处理的任务数量。

返回:

用于并行处理的任务数量。

返回类型:

int

tsfresh.utilities.profiling.set_n_jobs(n_jobs)[源代码]

设置用于并行处理的任务数。

参数:

n_jobs (int) – 用于并行处理的任务数量。

返回:

返回类型:

None

tsfresh.utilities.profiling.start_profiling()[源代码]

启动分析过程并返回分析器(稍后关闭)的辅助函数。

返回:

一个启动的分析器。

返回类型:

cProfile.Profile

使用以下命令启动和停止分析器:

>>> profiler = start_profiling()
>>> # Do something you want to profile
>>> end_profiling(profiler, "cumulative", "out.txt")

tsfresh.utilities.string_manipulation 模块

tsfresh.utilities.string_manipulation.convert_to_output_format(param)[源代码]

帮助函数将参数转换为有效的字符串,该字符串可以用作列名。这与在 from_columns 函数中使用的操作相反。

参数按其名称排序,并以以下形式写出

<param name>_<param value>__<param name>_<param value>__ …

如果 <param_value> 是一个字符串,此方法将用引号 “ 包裹它,因此 “<param_value>”。

参数:

param (dict) – 要写出的参数字典

返回:

解析后的参数字符串

返回类型:

str

tsfresh.utilities.string_manipulation.get_config_from_string(parts)[源代码]

从列名中提取某个函数的配置的辅助函数。列名部分(通过“__”分割)应传递给此函数。它将跳过种类名称和函数名称,仅使用参数部分。这些部分将在“_”上分割为参数名称和参数值。该值被转换为python对象(例如,“(1, 2, 3)”被转换为由整数1、2和3组成的元组)。

如果列名中没有参数,则返回 None。

参数:

parts (list) – 列名在“__”上拆分

返回:

一个包含所有参数的字典,这些参数编码在列名中。

返回类型:

dict

模块内容

这个 utilities 子模块包含几个实用函数。这些函数应仅在 tsfresh 内部使用。