API参考

在这里你可以找到所有的Qlib接口。

数据

提供者

class qlib.data.data.ProviderBackendMixin

这个辅助类试图使基于存储后端的提供者更加方便 如果提供者不依赖于后端存储,则不需要继承此类

class qlib.data.data.CalendarProvider

日历提供者基类

提供日历数据。

calendar(start_time=None, end_time=None, freq='day', future=False)

获取特定市场在给定时间范围内的日历。

Parameters:
  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • freq (str) – 时间频率,可选:年/季度/月/周/天。

  • future (bool) – 是否包括未来的交易日。

Returns:

日历列表

Return type:

列表

locate_index(start_time: Timestamp | str, end_time: Timestamp | str, freq: str, future: bool = False)

在特定频率的日历中定位开始时间索引和结束时间索引。

Parameters:
  • start_time (pd.Timestamp) – 时间范围的开始。

  • end_time (pd.Timestamp) – 时间范围的结束时间。

  • freq (str) – 时间频率,可选:年/季度/月/周/天。

  • future (bool) – 是否包括未来的交易日。

Returns:

  • pd.Timestamp – 实际的开始时间。

  • pd.Timestamp – 实际的结束时间。

  • int – 开始时间的索引。

  • int – 结束时间的索引。

load_calendar(freq, future)

从文件中加载原始日历时间戳。

Parameters:
  • freq (str) – 读取日历文件的频率。

  • future (bool) –

Returns:

时间戳列表

Return type:

列表

class qlib.data.data.InstrumentProvider

仪器提供者基类

提供仪器数据。

static instruments(market: List | str = 'all', filter_pipe: List | None = None)

获取基础市场的通用配置字典,并添加几个动态过滤器。

Parameters:
  • 市场 (联合[列表, 字符串]) –

    字符串:

    市场/行业/指数的简称,例如 all/sse/szse/sse50/csi300/csi500。

    列表:

    [“ID1”, “ID2”]。股票列表

  • filter_pipe (list) – 动态过滤器的列表。

Returns:

  • dict (如果 isinstance(market, str)) – 股票池配置的字典。

    {market => 基础市场名称, filter_pipe => 过滤器列表}

    示例 :

    {'market': 'csi500',
    'filter_pipe': [{'filter_type': 'ExpressionDFilter',
    'rule_expression': '$open<40',
    'filter_start_time': None,
    'filter_end_time': None,
    'keep': False},
    {'filter_type': 'NameDFilter',
    'name_rule_re': 'SH[0-9]{4}55',
    'filter_start_time': None,
    'filter_end_time': None}]}
    
  • list (如果 isinstance(market, list)) – 直接返回原始列表。 注意:这将使工具适用于更多情况。用户代码将更简单。

abstract list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)

根据某个股票池配置列出工具。

Parameters:
  • instruments (dict) – 股票池配置。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • as_list (bool) – 将仪器返回为列表或字典。

Returns:

带有时间跨度的仪器列表或字典

Return type:

字典或列表

class qlib.data.data.FeatureProvider

特征提供者类

提供特征数据。

abstract feature(instrument, field, start_time, end_time, freq)

获取特征数据。

Parameters:
  • instrument (str) – 某种仪器。

  • field (str) – 特征的某个字段。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • freq (str) – 时间频率,可选:年/季度/月/周/天。

Returns:

某个特征的数据

Return type:

pd.Series

class qlib.data.data.PITProvider
abstract period_feature(instrument, field, start_index: int, end_index: int, cur_time: Timestamp, period: int | None = None) Series

获取start_indexend_index之间的历史周期数据序列

Parameters:
  • start_index (int) – start_index 是相对于最新周期到 cur_time 的相对索引

  • end_index (int) – end_index 是相对于最新周期到 cur_time 的相对索引 在大多数情况下,start_index 和 end_index 将是非正值 例如,start_index == -3 end_index == 0 且当前周期索引为 cur_idx, 则将检索 [start_index + cur_idx, end_index + cur_idx] 之间的数据。

  • period (int) – 用于查询特定周期。 在Qlib中,周期用整数表示。(例如,202001可能代表2020年第一季度) 注意:period 将覆盖 start_indexend_index

Returns:

索引将是整数,用于指示数据的周期 一个典型的例子将是 TODO

Return type:

pd.Series

Raises:

FileNotFoundError – 如果查询的数据不存在,将引发此异常。

class qlib.data.data.ExpressionProvider

表达式提供者类

提供表达数据。

__init__()
abstract expression(instrument, field, start_time=None, end_time=None, freq='day') Series

获取表达式数据。

expression的职责 - 解析field并加载相应的数据。 - 在加载数据时,应处理数据的时间依赖性。get_expression_instance通常在此方法中使用。

Parameters:
  • instrument (str) – 某种仪器。

  • field (str) – 特征的某个字段。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • freq (str) – 时间频率,可选:年/季度/月/周/天。

Returns:

某个表达式的数据

数据有两种格式

  1. 带有日期时间索引的表达式

  2. 带有整数索引的表达式

    • 因为日期时间不如

Return type:

pd.Series

class qlib.data.data.DatasetProvider

数据集提供者类

提供数据集数据。

abstract dataset(instruments, fields, start_time=None, end_time=None, freq='day', inst_processors=[])

获取数据集数据。

Parameters:
  • instruments (listdict) – 仪器列表/字典或股票池配置字典。

  • fields (列表) – 特征实例的列表。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • freq (str) – 时间频率。

  • inst_processors (Iterable[Union[dict, InstProcessor]]) – 对每个仪器执行的操作

Returns:

一个带有索引的pandas dataframe。

Return type:

pd.DataFrame

static get_instruments_d(instruments, freq)

解析不同类型的输入工具以输出instruments_d 输入工具格式错误将导致异常。

static get_column_names(fields)

从输入字段获取列名

static dataset_processor(instruments_d, column_names, start_time, end_time, freq, inst_processors=[])

加载并处理数据,返回数据集。 - 默认使用多核方法。

static inst_calculator(inst, start_time, end_time, freq, column_names, spans=None, g_config=None, inst_processors=[])

计算一个工具的表达式,返回一个df结果。 如果表达式之前已经计算过,则从缓存中加载。

返回值:一个带有索引'datetime'和其他数据列的数据框。

class qlib.data.data.LocalCalendarProvider(remote=False, backend={})

本地日历数据提供者类

从本地数据源提供日历数据。

__init__(remote=False, backend={})
load_calendar(freq, future)

从文件中加载原始日历时间戳。

Parameters:
  • freq (str) – 读取日历文件的频率。

  • future (bool) –

Returns:

时间戳列表

Return type:

列表

class qlib.data.data.LocalInstrumentProvider(backend={})

本地仪器数据提供者类

从本地数据源提供仪器数据。

__init__(backend={}) None
list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)

根据某个股票池配置列出工具。

Parameters:
  • instruments (dict) – 股票池配置。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • as_list (bool) – 将仪器返回为列表或字典。

Returns:

带有时间跨度的仪器列表或字典

Return type:

字典或列表

class qlib.data.data.LocalFeatureProvider(remote=False, backend={})

本地特征数据提供者类

从本地数据源提供特征数据。

__init__(remote=False, backend={})
feature(instrument, field, start_index, end_index, freq)

获取特征数据。

Parameters:
  • instrument (str) – 某种仪器。

  • field (str) – 特征的某个字段。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • freq (str) – 时间频率,可选:年/季度/月/周/天。

Returns:

某个特征的数据

Return type:

pd.Series

class qlib.data.data.LocalPITProvider
period_feature(instrument, field, start_index, end_index, cur_time, period=None)

获取start_indexend_index之间的历史周期数据序列

Parameters:
  • start_index (int) – start_index 是相对于最新周期到 cur_time 的相对索引

  • end_index (int) – end_index 是相对于最新周期到 cur_time 的相对索引 在大多数情况下,start_index 和 end_index 将是非正值 例如,start_index == -3 end_index == 0 且当前周期索引为 cur_idx, 则将检索 [start_index + cur_idx, end_index + cur_idx] 之间的数据。

  • period (int) – 用于查询特定周期。 在Qlib中,周期用整数表示。(例如,202001可能代表2020年第一季度) 注意:period 将覆盖 start_indexend_index

Returns:

索引将是整数,用于指示数据的周期 一个典型的例子将是 TODO

Return type:

pd.Series

Raises:

FileNotFoundError – 如果查询的数据不存在,将引发此异常。

class qlib.data.data.LocalExpressionProvider(time2idx=True)

本地表达式数据提供者类

从本地数据源提供表达数据。

__init__(time2idx=True)
expression(instrument, field, start_time=None, end_time=None, freq='day')

获取表达式数据。

expression的职责 - 解析field并加载相应的数据。 - 在加载数据时,应处理数据的时间依赖性。get_expression_instance通常在此方法中使用。

Parameters:
  • instrument (str) – 某种仪器。

  • field (str) – 特征的某个字段。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • freq (str) – 时间频率,可选:年/季度/月/周/天。

Returns:

某个表达式的数据

数据有两种格式

  1. 带有日期时间索引的表达式

  2. 带有整数索引的表达式

    • 因为日期时间不如

Return type:

pd.Series

class qlib.data.data.LocalDatasetProvider(align_time: bool = True)

本地数据集数据提供者类

从本地数据源提供数据集数据。

__init__(align_time: bool = True)
Parameters:

align_time (bool) –

我们是否将时间对齐到日历 在某些数据集中,频率是灵活的,无法对齐。 对于具有固定频率和共享日历的数据,将数据对齐到日历将提供以下好处

  • 将查询对齐到相同的参数,以便可以共享缓存。

dataset(instruments, fields, start_time=None, end_time=None, freq='day', inst_processors=[])

获取数据集数据。

Parameters:
  • instruments (listdict) – 仪器列表/字典或股票池配置字典。

  • fields (列表) – 特征实例的列表。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • freq (str) – 时间频率。

  • inst_processors (Iterable[Union[dict, InstProcessor]]) – 对每个仪器执行的操作

Returns:

一个带有索引的pandas dataframe。

Return type:

pd.DataFrame

static multi_cache_walker(instruments, fields, start_time=None, end_time=None, freq='day')

此方法用于为客户准备表达式缓存。 然后客户端将自行从表达式缓存中加载数据。

static cache_walker(inst, start_time, end_time, freq, column_names)

如果一个仪器的表达式之前没有被计算过,计算它并将其写入表达式缓存。

class qlib.data.data.ClientCalendarProvider

客户端日历数据提供者类

通过作为客户端从服务器请求数据来提供日历数据。

__init__()
calendar(start_time=None, end_time=None, freq='day', future=False)

获取特定市场在给定时间范围内的日历。

Parameters:
  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • freq (str) – 时间频率,可选:年/季度/月/周/天。

  • future (bool) – 是否包括未来的交易日。

Returns:

日历列表

Return type:

列表

class qlib.data.data.ClientInstrumentProvider

客户端仪器数据提供者类

通过作为客户端从服务器请求数据来提供仪器数据。

__init__()
list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)

根据某个股票池配置列出工具。

Parameters:
  • instruments (dict) – 股票池配置。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • as_list (bool) – 将仪器返回为列表或字典。

Returns:

带有时间跨度的仪器列表或字典

Return type:

字典或列表

class qlib.data.data.ClientDatasetProvider

客户端数据集数据提供者类

通过作为客户端从服务器请求数据来提供数据集数据。

__init__()
dataset(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=0, return_uri=False, inst_processors=[])

获取数据集数据。

Parameters:
  • instruments (listdict) – 仪器列表/字典或股票池配置字典。

  • fields (列表) – 特征实例的列表。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

  • freq (str) – 时间频率。

  • inst_processors (Iterable[Union[dict, InstProcessor]]) – 对每个仪器执行的操作

Returns:

一个带有索引的pandas dataframe。

Return type:

pd.DataFrame

class qlib.data.data.BaseProvider

本地提供者类 这是一组允许用户访问数据的接口。 由于PITD未公开向用户暴露,因此未包含在接口中。

为了保持与旧版qlib提供者的兼容性。

features(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=None, inst_processors=[])
Parameters:

disk_cache (int) – 是否跳过(0)/使用(1)/替换(2)磁盘缓存

此函数将尝试使用具有关键字disk_cache的缓存方法,如果由于DatasetD实例是提供者类而引发类型错误,则将使用提供者方法。

class qlib.data.data.LocalProvider
features_uri(instruments, fields, start_time, end_time, freq, disk_cache=1)

返回生成的特征/数据集缓存的URI

Parameters:
  • disk_cache

  • instruments

  • fields

  • start_time

  • end_time

  • freq

class qlib.data.data.ClientProvider

客户端提供者

作为客户端从服务器请求数据。可以提出请求:

  • 日历:直接响应一个日历列表

  • 工具(无过滤器):直接响应工具列表/字典

  • 仪器(带过滤器):响应仪器列表/字典

  • 功能:响应一个缓存URI

一般的工作流程描述如下: 当用户使用客户端提供者提出请求时,客户端提供者将连接服务器并发送请求。客户端将开始等待响应。响应将立即生成,指示缓存是否可用。只有当客户端收到响应说feature_available为真时,等待过程才会终止。 BUG:每次我们请求某些数据时,都需要连接到服务器,等待响应并断开连接。我们无法在一个连接内进行一系列请求。您可以参考https://python-socketio.readthedocs.io/en/latest/client.html获取python-socketIO客户端的文档。

__init__()
qlib.data.data.CalendarProviderWrapper

CalendarProvider 的别名

qlib.data.data.InstrumentProviderWrapper

InstrumentProvider 的别名

qlib.data.data.FeatureProviderWrapper

FeatureProvider 的别名

qlib.data.data.PITProviderWrapper

PITProvider 的别名

qlib.data.data.ExpressionProviderWrapper

ExpressionProvider 的别名

qlib.data.data.DatasetProviderWrapper

DatasetProvider 的别名

qlib.data.data.BaseProviderWrapper

BaseProvider 的别名

qlib.data.data.register_all_wrappers(C)

过滤器

class qlib.data.filter.BaseDFilter

动态仪器过滤器抽象类

用户可以覆盖此类以构建自己的过滤器

重写 __init__ 以输入过滤规则

重写 filter_main 以使用法规来过滤工具

__init__()
static from_config(config)

从配置字典构造实例。

Parameters:

config (dict) – 配置参数的字典。

abstract to_config()

从配置字典构造实例。

Returns:

返回配置参数的字典。

Return type:

字典

class qlib.data.filter.SeriesDFilter(fstart_time=None, fend_time=None, keep=False)

动态仪器过滤器 用于过滤一系列特定特征的抽象类

过滤器应提供参数:

  • 过滤开始时间

  • 过滤结束时间

  • 过滤规则

重写 __init__ 以分配特定规则来过滤系列。

重写 _getFilterSeries 以使用规则过滤系列并获取 {inst => series} 的字典,或重写 filter_main 以获取更高级的系列过滤规则

__init__(fstart_time=None, fend_time=None, keep=False)
Init function for filter base class.

根据由fstart_time和fend_time指定的某个时间段内的特定规则筛选一组工具。

Parameters:
  • fstart_time (str) – 过滤器规则开始过滤工具的时间。

  • fend_time (str) – 过滤规则停止过滤工具的时间。

  • keep (bool) – 是否保留在过滤时间范围内不存在的特征的工具。

filter_main(instruments, start_time=None, end_time=None)

实现此方法以过滤仪器。

Parameters:
  • instruments (dict) – 输入要过滤的仪器。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

Returns:

过滤后的工具,结构与输入工具相同。

Return type:

字典

class qlib.data.filter.NameDFilter(name_rule_re, fstart_time=None, fend_time=None)

名称动态仪器过滤器

根据规定的名称格式筛选仪器。

需要一个名称规则的正则表达式。

__init__(name_rule_re, fstart_time=None, fend_time=None)

名称过滤器类的初始化函数

Parameters:

name_rule_re (str) – 用于名称规则的正则表达式。

static from_config(config)

从配置字典构造实例。

Parameters:

config (dict) – 配置参数的字典。

to_config()

从配置字典构造实例。

Returns:

返回配置参数的字典。

Return type:

字典

class qlib.data.filter.ExpressionDFilter(rule_expression, fstart_time=None, fend_time=None, keep=False)

表达式动态仪器过滤器

根据特定表达式筛选仪器。

表示某个特征字段是必需的表达式规则。

示例

  • 基本特征过滤器 : rule_expression = ‘$close/$open>5’

  • 横截面特征过滤器 : rule_expression = ‘$rank($close)<10’

  • 时间序列特征过滤器 : rule_expression = ‘$Ref($close, 3)>100’

__init__(rule_expression, fstart_time=None, fend_time=None, keep=False)

表达式过滤器类的初始化函数

Parameters:
  • fstart_time (str) – 从该时间开始筛选特征。

  • fend_time (str) – 过滤在此时间结束的特征。

  • rule_expression (str) – 规则的输入表达式。

static from_config(config)

从配置字典构造实例。

Parameters:

config (dict) – 配置参数的字典。

to_config()

从配置字典构造实例。

Returns:

返回配置参数的字典。

Return type:

字典

class qlib.data.base.Expression

表达式基类

表达式设计用于处理以下格式的数据计算 每个工具的数据具有两个维度,

  • 特征

  • 时间:它可以是观察时间或周期时间。

    • period time 是为点时间数据库设计的。例如,period time 可能是 2014Q4,它的值可以被多次观察(由于修正,不同时间可能会观察到不同的值)。

load(instrument, start_index, end_index, *args)

加载特征 此函数负责根据表达式引擎加载特征/表达式。

具体实现将分为两部分:

  1. 缓存数据,处理错误。

    • 这部分由所有表达式共享,并在Expression中实现

  2. 基于特定表达式处理和计算数据。

    • 这部分在每个表达式中都不同,并在每个表达式中实现

表达式引擎由不同的数据共享。 不同的数据将为args提供不同的额外信息。

Parameters:
  • instrument (str) – 仪器代码。

  • start_index (str) – 特征开始索引 [在日历中]。

  • end_index (str) – 特征结束索引 [在日历中]。

  • 信息 (*args 可能包含以下内容) –

  • data (2) 如果在PIT中使用) –

    freq: str

    特征频率。

  • arguments (包含以下内容) –

    freq: str

    特征频率。

  • data

    cur_pit:

    它设计用于点时间数据。

    period: int

    这用于查询特定时期。 在Qlib中,时期用int表示。(例如,202001可能代表2020年第一季度)

  • arguments

    cur_pit:

    它设计用于点时间数据。

    period: int

    这用于查询特定时期。 在Qlib中,时期用int表示。(例如,202001可能代表2020年第一季度)

Returns:

特征系列:该系列的索引是日历索引

Return type:

pd.Series

abstract get_longest_back_rolling()

获取该特征访问的历史数据的最长长度

这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。

因此,这将仅用于检测所需历史数据的长度。

abstract get_extended_window_size()

get_extend_window_size

为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征

Returns:

lft_etd, rght_etd

Return type:

(int, int)

class qlib.data.base.Feature(name=None)

静态表达式

这种特性将从提供者加载数据

__init__(name=None)
get_longest_back_rolling()

获取该特征访问的历史数据的最长长度

这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。

因此,这将仅用于检测所需历史数据的长度。

get_extended_window_size()

get_extend_window_size

为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征

Returns:

lft_etd, rght_etd

Return type:

(int, int)

class qlib.data.base.PFeature(name=None)
class qlib.data.base.ExpressionOps

操作符表达式

这种特性将使用操作符进行动态特性构建。

操作符

class qlib.data.ops.ElemOperator(feature)

元素级运算符

Parameters:

特征 (Expression) – 特征实例

Returns:

特征操作输出

Return type:

Expression

__init__(feature)
get_longest_back_rolling()

获取该特征访问的历史数据的最长长度

这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。

因此,这将仅用于检测所需历史数据的长度。

get_extended_window_size()

get_extend_window_size

为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征

Returns:

lft_etd, rght_etd

Return type:

(int, int)

class qlib.data.ops.ChangeInstrument(instrument, feature)

更改仪器操作符 在某些情况下,人们可能希望在计算时更改为另一个仪器,例如,计算股票相对于市场指数的贝塔值。 这将需要将特征的计算从股票(原始仪器)更改为指数(参考仪器) :param instrument: 例如,SH000300(沪深300指数),或^GPSC(标普500指数)。 :type instrument: 下游操作应在其上执行的新仪器。 :param feature: :type feature: 为新仪器计算的特征。

Returns:

特征操作输出

Return type:

Expression

__init__(instrument, feature)
load(instrument, start_index, end_index, *args)

加载特征 此函数负责根据表达式引擎加载特征/表达式。

具体实现将分为两部分:

  1. 缓存数据,处理错误。

    • 这部分由所有表达式共享,并在Expression中实现

  2. 基于特定表达式处理和计算数据。

    • 这部分在每个表达式中都不同,并在每个表达式中实现

表达式引擎由不同的数据共享。 不同的数据将为args提供不同的额外信息。

Parameters:
  • instrument (str) – 仪器代码。

  • start_index (str) – 特征开始索引 [在日历中]。

  • end_index (str) – 特征结束索引 [在日历中]。

  • 信息 (*args 可能包含以下内容) –

  • data (2) 如果在PIT中使用) –

    freq: str

    特征频率。

  • arguments (包含以下内容) –

    freq: str

    特征频率。

  • data

    cur_pit:

    它设计用于点时间数据。

    period: int

    这用于查询特定时期。 在Qlib中,时期用int表示。(例如,202001可能代表2020年第一季度)

  • arguments

    cur_pit:

    它设计用于点时间数据。

    period: int

    这用于查询特定时期。 在Qlib中,时期用int表示。(例如,202001可能代表2020年第一季度)

Returns:

特征系列:该系列的索引是日历索引

Return type:

pd.Series

class qlib.data.ops.NpElemOperator(feature, func)

Numpy 元素级运算符

Parameters:
  • 特征 (Expression) – 特征实例

  • func (str) – numpy 特征操作方法

Returns:

特征操作输出

Return type:

Expression

__init__(feature, func)
class qlib.data.ops.Abs(feature)

特征绝对值

Parameters:

特征 (Expression) – 特征实例

Returns:

具有绝对输出的特征实例

Return type:

Expression

__init__(feature)
class qlib.data.ops.Sign(feature)

特征符号

Parameters:

特征 (Expression) – 特征实例

Returns:

带有签名的特征实例

Return type:

Expression

__init__(feature)
class qlib.data.ops.Log(feature)

功能日志

Parameters:

特征 (Expression) – 特征实例

Returns:

带有日志的功能实例

Return type:

Expression

__init__(feature)
class qlib.data.ops.Mask(feature, instrument)

特征掩码

Parameters:
  • 特征 (Expression) – 特征实例

  • instrument (str) – 仪器掩码

Returns:

一个带有遮蔽工具的特征实例

Return type:

Expression

__init__(feature, instrument)
class qlib.data.ops.Not(feature)

非运算符

Parameters:

特征 (Expression) – 特征实例

Returns:

特征元素不输出

Return type:

Feature

__init__(feature)
class qlib.data.ops.PairOperator(feature_left, feature_right)

成对操作符

Parameters:
  • feature_left (Expression) – 特征实例或数值

  • feature_right (Expression) – 特征实例或数值

Returns:

两个特征的操作输出

Return type:

Feature

__init__(feature_left, feature_right)
get_longest_back_rolling()

获取该特征访问的历史数据的最长长度

这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。

因此,这将仅用于检测所需历史数据的长度。

get_extended_window_size()

get_extend_window_size

为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征

Returns:

lft_etd, rght_etd

Return type:

(int, int)

class qlib.data.ops.NpPairOperator(feature_left, feature_right, func)

Numpy 成对操作符

Parameters:
  • feature_left (Expression) – 特征实例或数值

  • feature_right (Expression) – 特征实例或数值

  • func (str) – 操作函数

Returns:

两个特征的操作输出

Return type:

Feature

__init__(feature_left, feature_right, func)
class qlib.data.ops.Power(feature_left, feature_right)

幂运算符

Parameters:
Returns:

feature_left 中的基数提升到 feature_right 中的指数

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Add(feature_left, feature_right)

添加操作符

Parameters:
Returns:

两个特征的总和

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Sub(feature_left, feature_right)

减法运算符

Parameters:
Returns:

两个特征的减法

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Mul(feature_left, feature_right)

乘法运算符

Parameters:
Returns:

两个特征的乘积

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Div(feature_left, feature_right)

除法运算符

Parameters:
Returns:

两个特征的除法

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Greater(feature_left, feature_right)

大于运算符

Parameters:
Returns:

从输入的两个特征中提取的较大元素

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Less(feature_left, feature_right)

小于运算符

Parameters:
Returns:

从输入的两个特征中提取的较小元素

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Gt(feature_left, feature_right)

大于运算符

Parameters:
Returns:

布尔系列表示 left > right

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Ge(feature_left, feature_right)

大于等于运算符

Parameters:
Returns:

布尔系列表示 left >= right

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Lt(feature_left, feature_right)

小于运算符

Parameters:
Returns:

布尔系列表示 left < right

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Le(feature_left, feature_right)

小于等于运算符

Parameters:
Returns:

布尔系列表示 left <= right

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Eq(feature_left, feature_right)

等于运算符

Parameters:
Returns:

布尔系列表示 left == right

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Ne(feature_left, feature_right)

不等于运算符

Parameters:
Returns:

布尔系列表示 left != right

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.And(feature_left, feature_right)

与运算符

Parameters:
Returns:

两个特征逐行输出

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.Or(feature_left, feature_right)

或运算符

Parameters:
Returns:

两个特征逐行 | 输出

Return type:

Feature

__init__(feature_left, feature_right)
class qlib.data.ops.If(condition, feature_left, feature_right)

如果运算符

Parameters:
  • condition (Expression) – 具有布尔值的特征实例作为条件

  • feature_left (Expression) – 特征实例

  • feature_right (Expression) – 特征实例

__init__(condition, feature_left, feature_right)
get_longest_back_rolling()

获取该特征访问的历史数据的最长长度

这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。

因此,这将仅用于检测所需历史数据的长度。

get_extended_window_size()

get_extend_window_size

为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征

Returns:

lft_etd, rght_etd

Return type:

(int, int)

class qlib.data.ops.Rolling(feature, N, func)

滚动操作符 在pandas中,滚动和扩展的含义是相同的。 当窗口设置为0时,操作符的行为应遵循扩展 否则,它遵循滚动

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

  • func (str) – 滚动方法

Returns:

滚动输出

Return type:

Expression

__init__(feature, N, func)
get_longest_back_rolling()

获取该特征访问的历史数据的最长长度

这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。

因此,这将仅用于检测所需历史数据的长度。

get_extended_window_size()

get_extend_window_size

为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征

Returns:

lft_etd, rght_etd

Return type:

(int, int)

class qlib.data.ops.Ref(feature, N)

特征参考

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – N = 0,获取第一个数据;N > 0,获取N个周期前的数据;N < 0,获取未来的数据

Returns:

具有目标引用的特征实例

Return type:

Expression

__init__(feature, N)
get_longest_back_rolling()

获取该特征访问的历史数据的最长长度

这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。

因此,这将仅用于检测所需历史数据的长度。

get_extended_window_size()

get_extend_window_size

为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征

Returns:

lft_etd, rght_etd

Return type:

(int, int)

class qlib.data.ops.Mean(feature, N)

滚动均值 (MA)

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动平均值的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Sum(feature, N)

滚动求和

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动求和的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Std(feature, N)

滚动标准差

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动标准差的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Var(feature, N)

滚动方差

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动方差特征的实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Skew(feature, N)

滚动偏度

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动偏度的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Kurt(feature, N)

滚动峰度

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动峰度的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Max(feature, N)

滚动最大值

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动最大值的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.IdxMax(feature, N)

滚动最大索引

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动最大索引的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Min(feature, N)

滚动最小值

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动最小值的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.IdxMin(feature, N)

滚动最小指数

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动最小索引的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Quantile(feature, N, qscore)

滚动分位数

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动分位数特征实例

Return type:

Expression

__init__(feature, N, qscore)
class qlib.data.ops.Med(feature, N)

滚动中位数

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动中位数的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Mad(feature, N)

滚动平均绝对偏差

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动平均绝对偏差的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Rank(feature, N)

滚动排名(百分位数)

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有滚动排名的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Count(feature, N)

滚动计数

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有非NaN元素滚动计数的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Delta(feature, N)

滚动差值

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个特征实例,在滚动窗口中结束减去开始

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Slope(feature, N)

滚动斜率 此操作符计算idxfeature之间的斜率。 (例如:[, , ] 和 [1, 2, 3])

使用示例: - “Slope($close, %d)/$close”

# 待办事项: # 一些用户可能希望进行成对滚动,如Slope(A, B, N)

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有给定窗口线性回归斜率的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Rsquare(feature, N)

滚动R值平方

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

具有给定窗口的线性回归r值平方的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.Resi(feature, N)

滚动回归残差

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

具有给定窗口回归残差的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.WMA(feature, N)

滚动加权移动平均

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有加权移动平均输出的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.EMA(feature, N)

滚动指数平均 (EMA)

Parameters:
  • 特征 (Expression) – 特征实例

  • N (int, float) – 滚动窗口大小

Returns:

具有给定窗口回归r值平方的特征实例

Return type:

Expression

__init__(feature, N)
class qlib.data.ops.PairRolling(feature_left, feature_right, N, func)

配对滚动操作符

Parameters:
  • feature_left (Expression) – 特征实例

  • feature_right (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

具有两个输入特征的滚动输出的特征实例

Return type:

Expression

__init__(feature_left, feature_right, N, func)
get_longest_back_rolling()

获取该特征访问的历史数据的最长长度

这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。

因此,这将仅用于检测所需历史数据的长度。

get_extended_window_size()

get_extend_window_size

为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征

Returns:

lft_etd, rght_etd

Return type:

(int, int)

class qlib.data.ops.Corr(feature_left, feature_right, N)

滚动相关性

Parameters:
  • feature_left (Expression) – 特征实例

  • feature_right (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有两个输入特征滚动相关性的特征实例

Return type:

Expression

__init__(feature_left, feature_right, N)
class qlib.data.ops.Cov(feature_left, feature_right, N)

滚动协方差

Parameters:
  • feature_left (Expression) – 特征实例

  • feature_right (Expression) – 特征实例

  • N (int) – 滚动窗口大小

Returns:

一个具有两个输入特征的滚动最大值的特征实例

Return type:

Expression

__init__(feature_left, feature_right, N)
class qlib.data.ops.TResample(feature, freq, func)
__init__(feature, freq, func)

将数据重新采样到目标频率。 使用了pandas的resample函数。

  • 时间戳将在重新采样后的时间跨度开始时。

Parameters:
  • 特征 (Expression) – 用于计算特征的表达式

  • freq (str) – 它将传递给重采样方法,以便根据给定的频率进行重采样

  • func (method) – 获取重采样值的方法 一些表达式是高频使用的

class qlib.data.ops.OpsWrapper

运维包装器

__init__()
register(ops_list: List[Type[ExpressionOps] | dict])

注册操作符

Parameters:

ops_list (List[Union[Type[ExpressionOps], dict]]) –

  • 如果类型(ops_list)是List[Type[ExpressionOps]],ops_list的每个元素代表操作符类,应该是ExpressionOps的子类。

  • 如果类型(ops_list)是List[dict],ops_list的每个元素代表操作符的配置,其格式如下:

    {
        "class": class_name,
        "module_path": path,
    }
    

    注意:class应该是操作符的类名,module_path应该是python模块或文件路径。

qlib.data.ops.register_all_ops(C)

注册所有操作符

缓存

class qlib.data.cache.MemCacheUnit(*args, **kwargs)

内存缓存单元。

__init__(*args, **kwargs)
property limited

内存缓存是否有限

class qlib.data.cache.MemCache(mem_cache_size_limit=None, limit_type='length')

内存缓存。

__init__(mem_cache_size_limit=None, limit_type='length')
Parameters:
  • mem_cache_size_limit – 缓存最大大小。

  • limit_type – 长度或大小;长度(调用函数:len),大小(调用函数:sys.getsizeof)。

class qlib.data.cache.ExpressionCache(provider)

表达式缓存机制的基类。

此类用于包装表达式提供者,并带有自定义的表达式缓存机制。

注意

重写_uri_expression方法以创建您自己的表达式缓存机制。

expression(instrument, field, start_time, end_time, freq)

获取表达式数据。

注意

与表达式提供者中的expression方法相同的接口

update(cache_uri: str | Path, freq: str = 'day')

将表达式缓存更新到最新的日历。

重写此方法以定义如何根据用户自己的缓存机制更新表达式缓存。

Parameters:
  • cache_uri (strPath) – 表达式缓存文件的完整URI(包括目录路径)。

  • freq (str) –

Returns:

0(成功更新)/ 1(无需更新)/ 2(更新失败)。

Return type:

整数

class qlib.data.cache.DatasetCache(provider)

数据集缓存机制的基类。

此类用于使用自定义数据集缓存机制包装数据集提供者。

注意

重写 _uri_dataset 方法以创建您自己的数据集缓存机制。

dataset(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=1, inst_processors=[])

获取特征数据集。

注意

与数据集提供者中的dataset方法相同的接口

注意

服务器使用redis_lock来确保不会触发读写冲突,但没有考虑客户端读取者。

update(cache_uri: str | Path, freq: str = 'day')

更新数据集缓存到最新的日历。

重写此方法以定义如何根据用户自己的缓存机制更新数据集缓存。

Parameters:
  • cache_uri (strPath) – 数据集缓存文件的完整URI(包括目录路径)。

  • freq (str) –

Returns:

0(成功更新)/ 1(无需更新)/ 2(更新失败)

Return type:

整数

static cache_to_origin_data(data, fields)

将缓存数据转换为原始数据

Parameters:
  • data – pd.DataFrame, 缓存数据。

  • fields – 特征字段。

Returns:

pd.DataFrame.

static normalize_uri_args(instruments, fields, freq)

规范化URI参数

class qlib.data.cache.DiskExpressionCache(provider, **kwargs)

为服务器准备的缓存机制。

__init__(provider, **kwargs)
gen_expression_cache(expression_data, cache_path, instrument, field, freq, last_update)

使用二进制文件保存类似特征数据。

update(sid, cache_uri, freq: str = 'day')

将表达式缓存更新到最新的日历。

重写此方法以定义如何根据用户自己的缓存机制更新表达式缓存。

Parameters:
  • cache_uri (strPath) – 表达式缓存文件的完整URI(包括目录路径)。

  • freq (str) –

Returns:

0(成功更新)/ 1(无需更新)/ 2(更新失败)。

Return type:

整数

class qlib.data.cache.DiskDatasetCache(provider, **kwargs)

为服务器准备的缓存机制。

__init__(provider, **kwargs)
classmethod read_data_from_cache(cache_path: str | Path, start_time, end_time, fields)

read_cache_from

此函数可以从磁盘缓存数据集中读取数据

Parameters:
  • cache_path

  • start_time

  • end_time

  • fields – 数据集缓存的字段顺序已排序。因此,重新排列列以使其一致。

Returns:

class IndexManager(cache_path: str | Path)

锁不在类中考虑。请在代码外部考虑锁。 此类是磁盘数据的代理。

__init__(cache_path: str | Path)
gen_dataset_cache(cache_path: str | Path, instruments, fields, freq, inst_processors=[])

注意

此函数不考虑缓存读写锁。请在此函数外部获取锁。

缓存格式包含3部分(后跟典型文件名)。

  • 索引 : cache/d41366901e25de3ec47297f12e2ba11d.index

    • 文件的内容可能采用以下格式(pandas.Series)

                          start end
      1999-11-10 00:00:00     0   1
      1999-11-11 00:00:00     1   2
      1999-11-12 00:00:00     2   3
      ...
      

      注意

      开始是关闭的。结束是开放的!!!!!

    • 每行包含两个元素 ,并以时间戳作为其索引。

    • 它表示timestamp数据的start_index(包含)和end_index(不包含)。

  • 元数据:cache/d41366901e25de3ec47297f12e2ba11d.meta

  • 数据 : cache/d41366901e25de3ec47297f12e2ba11d

    • 这是一个按日期时间排序的hdf文件

Parameters:
  • cache_path – 存储缓存的路径。

  • instruments – 用于存储缓存的工具。

  • fields – 用于存储缓存的字段。

  • freq – 存储缓存的频率。

  • inst_processors – 仪器处理器。

:返回类型 pd.DataFrame; 返回的DataFrame的字段与函数的参数一致。

update(cache_uri, freq: str = 'day')

更新数据集缓存到最新的日历。

重写此方法以定义如何根据用户自己的缓存机制更新数据集缓存。

Parameters:
  • cache_uri (strPath) – 数据集缓存文件的完整URI(包括目录路径)。

  • freq (str) –

Returns:

0(成功更新)/ 1(无需更新)/ 2(更新失败)

Return type:

整数

存储

class qlib.data.storage.storage.BaseStorage
class qlib.data.storage.storage.CalendarStorage(freq: str, future: bool, **kwargs)

CalendarStorage的方法和List的同名方法的行为保持一致

__init__(freq: str, future: bool, **kwargs)
property data: Iterable[str]

获取所有数据

Raises:

ValueError – 如果数据(存储)不存在,则引发 ValueError

index(value: str) int
Raises:

ValueError – 如果数据(存储)不存在,则引发 ValueError

class qlib.data.storage.storage.InstrumentStorage(market: str, freq: str, **kwargs)
__init__(market: str, freq: str, **kwargs)
property data: Dict[str, List[Tuple[str, str]]]

获取所有数据

Raises:

ValueError – 如果数据(存储)不存在,则引发 ValueError

update([E, ]**F) None.  Update D from mapping/iterable E and F.

注释

如果 E 存在并且具有 .keys() 方法,则执行: for k in E: D[k] = E[k]

如果 E 存在且缺少 .keys() 方法,则执行: for (k, v) in E: D[k] = v

无论哪种情况,接下来都是:for k, v in F.items(): D[k] = v

class qlib.data.storage.storage.FeatureStorage(instrument: str, field: str, freq: str, **kwargs)
__init__(instrument: str, field: str, freq: str, **kwargs)
property data: Series

获取所有数据

注释

如果数据(存储)不存在,返回空的pd.Series:return pd.Series(dtype=np.float32)

property start_index: int | None

获取 FeatureStorage 起始索引

注释

如果数据(存储)不存在,返回 None

property end_index: int | None

获取 FeatureStorage 结束索引

注释

数据范围的右索引(两侧都是闭合的)

下一个数据追加点将是 end_index + 1

如果数据(存储)不存在,返回 None

write(data_array: List | ndarray | Tuple, index: int | None = None)

从索引开始将data_array写入FeatureStorage。

注释

如果索引为None,则将data_array附加到特征。

如果 len(data_array) == 0; 返回

如果 (index - self.end_index) >= 1,self[end_index+1: index] 将会被填充为 np.nan

示例

feature:
    3   4
    4   5
    5   6

>>> self.write([6, 7], index=6)

    feature:
        3   4
        4   5
        5   6
        6   6
        7   7

>>> self.write([8], index=9)

    feature:
        3   4
        4   5
        5   6
        6   6
        7   7
        8   np.nan
        9   8

>>> self.write([1, np.nan], index=3)

    feature:
        3   1
        4   np.nan
        5   6
        6   6
        7   7
        8   np.nan
        9   8
rebase(start_index: int | None = None, end_index: int | None = None)

重新调整FeatureStorage的start_index和end_index。

start_index 和 end_index 是闭区间:[start_index, end_index]

示例

feature:
    3   4
    4   5
    5   6

>>> self.rebase(start_index=4)

    feature:
        4   5
        5   6

>>> self.rebase(start_index=3)

    feature:
        3   np.nan
        4   5
        5   6

>>> self.write([3], index=3)

    feature:
        3   3
        4   5
        5   6

>>> self.rebase(end_index=4)

    feature:
        3   3
        4   5

>>> self.write([6, 7, 8], index=4)

    feature:
        3   3
        4   6
        5   7
        6   8

>>> self.rebase(start_index=4, end_index=5)

    feature:
        4   6
        5   7
rewrite(data: List | ndarray | Tuple, index: int)

用数据覆盖FeatureStorage中的所有数据

Parameters:
  • data (Union[List, np.ndarray, Tuple]) – 数据

  • index (int) – 数据起始索引

class qlib.data.storage.file_storage.FileStorageMixin

FileStorageMixin,适用于FileXXXStorage 子类需要具有provider_uri、freq、storage_name、file_name属性

check()

检查 self.uri

Raises:

ValueError

class qlib.data.storage.file_storage.FileCalendarStorage(freq: str, future: bool, provider_uri: dict | None = None, **kwargs)
__init__(freq: str, future: bool, provider_uri: dict | None = None, **kwargs)
property data: List[str]

获取所有数据

Raises:

ValueError – 如果数据(存储)不存在,则引发 ValueError

index(value: str) int
Raises:

ValueError – 如果数据(存储)不存在,则引发 ValueError

class qlib.data.storage.file_storage.FileInstrumentStorage(market: str, freq: str, provider_uri: dict | None = None, **kwargs)
__init__(market: str, freq: str, provider_uri: dict | None = None, **kwargs)
property data: Dict[str, List[Tuple[str, str]]]

获取所有数据

Raises:

ValueError – 如果数据(存储)不存在,则引发 ValueError

update([E, ]**F) None.  Update D from mapping/iterable E and F.

注释

如果 E 存在并且具有 .keys() 方法,则执行: for k in E: D[k] = E[k]

如果 E 存在且缺少 .keys() 方法,则执行: for (k, v) in E: D[k] = v

无论哪种情况,接下来都是:for k, v in F.items(): D[k] = v

class qlib.data.storage.file_storage.FileFeatureStorage(instrument: str, field: str, freq: str, provider_uri: dict | None = None, **kwargs)
__init__(instrument: str, field: str, freq: str, provider_uri: dict | None = None, **kwargs)
property data: Series

获取所有数据

注释

如果数据(存储)不存在,返回空的pd.Series:return pd.Series(dtype=np.float32)

write(data_array: List | ndarray, index: int | None = None) None

从索引开始将data_array写入FeatureStorage。

注释

如果索引为None,则将data_array附加到特征。

如果 len(data_array) == 0; 返回

如果 (index - self.end_index) >= 1,self[end_index+1: index] 将会被填充为 np.nan

示例

feature:
    3   4
    4   5
    5   6

>>> self.write([6, 7], index=6)

    feature:
        3   4
        4   5
        5   6
        6   6
        7   7

>>> self.write([8], index=9)

    feature:
        3   4
        4   5
        5   6
        6   6
        7   7
        8   np.nan
        9   8

>>> self.write([1, np.nan], index=3)

    feature:
        3   1
        4   np.nan
        5   6
        6   6
        7   7
        8   np.nan
        9   8
property start_index: int | None

获取 FeatureStorage 起始索引

注释

如果数据(存储)不存在,返回 None

property end_index: int | None

获取 FeatureStorage 结束索引

注释

数据范围的右索引(两侧都是闭合的)

下一个数据追加点将是 end_index + 1

如果数据(存储)不存在,返回 None

数据集

数据集类

class qlib.data.dataset.__init__.Dataset(**kwargs)

为模型训练和推理准备数据。

__init__(**kwargs)

init 旨在完成以下步骤:

  • init the sub instance and the state of the dataset(info to prepare the data)
    • 准备数据的基本状态的名称不应以“_”开头,以便在序列化时可以在磁盘上序列化。

  • setup data
    • 与数据相关的属性名称应以“_”开头,以便在序列化时不会保存到磁盘上。

数据可以指定信息以计算准备所需的基本数据

config(**kwargs)

config 用于配置无法从数据中学习的参数

setup_data(**kwargs)

设置数据。

我们为以下情况拆分setup_data函数:

  • 用户有一个带有学习状态的Dataset对象在磁盘上。

  • 用户从磁盘加载数据集对象。

  • 用户调用setup_data来加载新数据。

  • 用户根据之前的状态为模型准备数据。

prepare(**kwargs) object

数据集的类型取决于模型。(可能是pd.DataFrame、pytorch.DataLoader等) 参数应指定准备数据的范围 该方法应: - 处理数据

  • 返回处理后的数据

Returns:

返回对象

Return type:

对象

class qlib.data.dataset.__init__.DatasetH(handler: Dict | DataHandler, segments: Dict[str, Tuple], fetch_kwargs: Dict = {}, **kwargs)

带有数据(H)处理器的数据集

用户应尝试将数据预处理函数放入处理程序中。 只有以下数据处理函数应放置在数据集中:

  • 处理与特定模型相关。

  • 处理与数据分割有关。

__init__(handler: Dict | DataHandler, segments: Dict[str, Tuple], fetch_kwargs: Dict = {}, **kwargs)

设置基础数据。

Parameters:
  • handler (Union[dict, DataHandler]) –

    handler 可以是:

    • DataHandler 的实例

    • DataHandler 的配置。请参考 DataHandler

  • segments (dict) –

    描述数据分段的选项。 以下是一些示例:

    1) 'segments': {
            'train': ("2008-01-01", "2014-12-31"),
            'valid': ("2017-01-01", "2020-08-01",),
            'test': ("2015-01-01", "2016-12-31",),
        }
    2) 'segments': {
            'insample': ("2008-01-01", "2014-12-31"),
            'outsample': ("2017-01-01", "2020-08-01",),
        }
    

config(handler_kwargs: dict | None = None, **kwargs)

初始化数据集H

Parameters:
  • handler_kwargs (dict) –

    DataHandler的配置,可能包含以下参数:

    • DataHandler.conf_data的参数,例如'instruments'、'start_time'和'end_time'。

  • kwargs (dict) –

    DatasetH的配置,例如

    • segmentsdict

      segments的配置,与self.__init__中的‘segments’相同

setup_data(handler_kwargs: dict | None = None, **kwargs)

设置数据

Parameters:

handler_kwargs (dict) –

DataHandler的初始化参数,可能包括以下参数:

  • init_type : 处理程序的初始化类型

  • enable_cache : 是否启用缓存

prepare(segments: List[str] | Tuple[str] | str | slice | Index, col_set='__all', data_key='infer', **kwargs) List[DataFrame] | DataFrame

准备用于学习和推理的数据。

Parameters:
  • segments (Union[List[Text], Tuple[Text], Text, slice]) –

    描述要准备的数据范围 以下是一些示例:

    • ’train’

    • [‘train’, ‘valid’]

  • col_set (str) –

    col_set 将在获取数据时传递给 self.handler。 TODO: 使其自动化:

    • 选择 DK_I 用于测试数据

    • 选择 DK_L 用于训练数据。

  • data_key (str) – 要获取的数据: DK_* 默认是 DK_I,表示获取用于推理的数据。

  • kwargs

    kwargs 可能包含的参数:
    flt_colstr

    它仅存在于 TSDatasetH 中,可用于添加一列数据(True 或 False)以过滤数据。 此参数仅在 TSDatasetH 实例中支持。

Return type:

联合[列表[pd.DataFrame], pd.DataFrame]

Raises:

NotImplementedError:

数据加载器

class qlib.data.dataset.loader.DataLoader

DataLoader 设计用于从原始数据源加载原始数据。

abstract load(instruments, start_time=None, end_time=None) DataFrame

将数据加载为 pd.DataFrame。

数据示例(列的多重索引是可选的):

                        feature                                                             label
                        $close     $volume     Ref($close, 1)  Mean($close, 3)  $high-$low  LABEL0
datetime    instrument
2010-01-04  SH600000    81.807068  17145150.0       83.737389        83.016739    2.741058  0.0032
            SH600004    13.313329  11800983.0       13.313329        13.317701    0.183632  0.0042
            SH600005    37.796539  12231662.0       38.258602        37.919757    0.970325  0.0289
Parameters:
  • instruments (strdict) – 它可以是市场名称或由InstrumentProvider生成的仪器配置文件。 如果instruments的值为None,则表示没有进行过滤。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

Returns:

数据从底层源加载

Return type:

pd.DataFrame

Raises:

KeyError: – 如果不支持仪器过滤器,则引发 KeyError

class qlib.data.dataset.loader.DLWParser(config: list | tuple | dict)

用于特征和名称的(D)ata(L)oader (W)ith (P)arser

提取这个类以便QlibDataLoader和其他数据加载器(如QdbDataLoader)可以共享字段。

__init__(config: list | tuple | dict)
Parameters:

config (Union[list, tuple, dict]) –

Config 将用于描述字段和列名

<config> := {
    "group_name1": <fields_info1>
    "group_name2": <fields_info2>
}

<config> := <fields_info>

<fields_info> := ["expr", ...] | (["expr", ...], ["col_name", ...])
# 注意:列表或元组在解析时将被视为相同的东西

abstract load_group_df(instruments, exprs: list, names: list, start_time: str | Timestamp | None = None, end_time: str | Timestamp | None = None, gp_name: str | None = None) DataFrame

加载特定组的数据框

Parameters:
  • instruments – 仪器。

  • exprs (list) – 描述数据内容的表达式。

  • names (list) – 数据的名称。

Returns:

查询的数据框。

Return type:

pd.DataFrame

load(instruments=None, start_time=None, end_time=None) DataFrame

将数据加载为 pd.DataFrame。

数据示例(列的多重索引是可选的):

                        feature                                                             label
                        $close     $volume     Ref($close, 1)  Mean($close, 3)  $high-$low  LABEL0
datetime    instrument
2010-01-04  SH600000    81.807068  17145150.0       83.737389        83.016739    2.741058  0.0032
            SH600004    13.313329  11800983.0       13.313329        13.317701    0.183632  0.0042
            SH600005    37.796539  12231662.0       38.258602        37.919757    0.970325  0.0289
Parameters:
  • instruments (strdict) – 它可以是市场名称或由InstrumentProvider生成的仪器配置文件。 如果instruments的值为None,则表示没有进行过滤。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

Returns:

数据从底层源加载

Return type:

pd.DataFrame

Raises:

KeyError: – 如果不支持仪器过滤器,则引发 KeyError

class qlib.data.dataset.loader.QlibDataLoader(config: Tuple[list, tuple, dict], filter_pipe: List | None = None, swap_level: bool = True, freq: str | dict = 'day', inst_processors: dict | list | None = None)

与QlibDataLoader相同。字段可以通过配置定义

__init__(config: Tuple[list, tuple, dict], filter_pipe: List | None = None, swap_level: bool = True, freq: str | dict = 'day', inst_processors: dict | list | None = None)
Parameters:
  • config (Tuple[list, tuple, dict]) – 请参考DLWParser的文档

  • filter_pipe – 仪器的过滤管道

  • swap_level – 是否交换MultiIndex的层级

  • freq (dictstr) – 如果 type(config) == dict 且 type(freq) == str,则使用 freq 加载配置数据。 如果 type(config) == dict 且 type(freq) == dict,则使用 freq[] 加载 config[] 数据

  • inst_processors (dict | list) – 如果 inst_processors 不为 None 且 type(config) == dict;使用 inst_processors[] 加载 config[] 数据 如果 inst_processors 是一个列表,那么它将应用于所有组。

load_group_df(instruments, exprs: list, names: list, start_time: str | Timestamp | None = None, end_time: str | Timestamp | None = None, gp_name: str | None = None) DataFrame

加载特定组的数据框

Parameters:
  • instruments – 仪器。

  • exprs (list) – 用于描述数据内容的表达式。

  • names (list) – 数据的名称。

Returns:

查询的数据框。

Return type:

pd.DataFrame

class qlib.data.dataset.loader.StaticDataLoader(config: dict | str | DataFrame, join='outer')

支持从文件或提供的来源加载数据的DataLoader。

__init__(config: dict | str | DataFrame, join='outer')
Parameters:
  • config (dict) – {fields_group: <路径或对象>}

  • join (str) – 如何对齐不同的数据框

load(instruments=None, start_time=None, end_time=None) DataFrame

将数据加载为 pd.DataFrame。

数据示例(列的多重索引是可选的):

                        feature                                                             label
                        $close     $volume     Ref($close, 1)  Mean($close, 3)  $high-$low  LABEL0
datetime    instrument
2010-01-04  SH600000    81.807068  17145150.0       83.737389        83.016739    2.741058  0.0032
            SH600004    13.313329  11800983.0       13.313329        13.317701    0.183632  0.0042
            SH600005    37.796539  12231662.0       38.258602        37.919757    0.970325  0.0289
Parameters:
  • instruments (strdict) – 它可以是市场名称或由InstrumentProvider生成的仪器配置文件。 如果instruments的值为None,则表示没有进行过滤。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

Returns:

数据从底层源加载

Return type:

pd.DataFrame

Raises:

KeyError: – 如果不支持仪器过滤器,则引发 KeyError

class qlib.data.dataset.loader.NestedDataLoader(dataloader_l: List[Dict], join='left')

我们有多个DataLoader,我们可以使用这个类来组合它们。

__init__(dataloader_l: List[Dict], join='left') None
Parameters:
  • dataloader_l (list[dict]) –

    一个数据加载器的列表,例如

    nd = NestedDataLoader(
        dataloader_l=[
            {
                "class": "qlib.contrib.data.loader.Alpha158DL",
            }, {
                "class": "qlib.contrib.data.loader.Alpha360DL",
                "kwargs": {
                    "config": {
                        "label": ( ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"])
                    }
                }
            }
        ]
    )
    

  • join – 在合并时它将传递给pd.concat。

load(instruments=None, start_time=None, end_time=None) DataFrame

将数据加载为 pd.DataFrame。

数据示例(列的多重索引是可选的):

                        feature                                                             label
                        $close     $volume     Ref($close, 1)  Mean($close, 3)  $high-$low  LABEL0
datetime    instrument
2010-01-04  SH600000    81.807068  17145150.0       83.737389        83.016739    2.741058  0.0032
            SH600004    13.313329  11800983.0       13.313329        13.317701    0.183632  0.0042
            SH600005    37.796539  12231662.0       38.258602        37.919757    0.970325  0.0289
Parameters:
  • instruments (strdict) – 它可以是市场名称或由InstrumentProvider生成的仪器配置文件。 如果instruments的值为None,则表示没有进行过滤。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

Returns:

数据从底层源加载

Return type:

pd.DataFrame

Raises:

KeyError: – 如果不支持仪器过滤器,则引发 KeyError

class qlib.data.dataset.loader.DataLoaderDH(handler_config: dict, fetch_kwargs: dict = {}, is_group=False)

基于(D)ata (H)andler的DataLoader 它设计用于从数据处理器加载多个数据 - 如果您只想从单个数据处理器加载数据,您可以在单个数据处理器中编写它们

待办事项:是什么让这个模块不那么容易使用。

  • 适用于在线场景

    • 底层数据处理器应进行配置。但数据加载器未提供此类接口和钩子。

__init__(handler_config: dict, fetch_kwargs: dict = {}, is_group=False)
Parameters:
  • handler_config (dict) –

    handler_config 将用于描述处理程序

    <handler_config> := {
        "group_name1": <handler>
        "group_name2": <handler>
    }
    
    <handler_config> := <handler>
    <handler> := DataHandler 实例 | DataHandler 配置
    

  • fetch_kwargs (dict) – fetch_kwargs 将用于描述 fetch 方法的不同参数,例如 col_set、squeeze、data_key 等。

  • is_group (bool) – is_group 将用于描述 handler_config 的键是否为组

load(instruments=None, start_time=None, end_time=None) DataFrame

将数据加载为 pd.DataFrame。

数据示例(列的多重索引是可选的):

                        feature                                                             label
                        $close     $volume     Ref($close, 1)  Mean($close, 3)  $high-$low  LABEL0
datetime    instrument
2010-01-04  SH600000    81.807068  17145150.0       83.737389        83.016739    2.741058  0.0032
            SH600004    13.313329  11800983.0       13.313329        13.317701    0.183632  0.0042
            SH600005    37.796539  12231662.0       38.258602        37.919757    0.970325  0.0289
Parameters:
  • instruments (strdict) – 它可以是市场名称或由InstrumentProvider生成的仪器配置文件。 如果instruments的值为None,则表示没有进行过滤。

  • start_time (str) – 时间范围的开始。

  • end_time (str) – 时间范围的结束时间。

Returns:

数据从底层源加载

Return type:

pd.DataFrame

Raises:

KeyError: – 如果不支持仪器过滤器,则引发 KeyError

数据处理器

class qlib.data.dataset.handler.DataHandler(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, init_data=True, fetch_orig=True)

使用处理程序的步骤 1. 初始化数据处理器(通过init调用)。 2. 使用数据。

数据处理器尝试维护一个具有两个级别的处理器。 datetimeinstruments

可以支持索引级别的任何顺序(顺序将在数据中隐含)。 当数据框索引名称缺失时,将使用顺序<datetime, instruments>。

数据示例: 列的多重索引是可选的。

                        feature                                                            label
                        $close     $volume  Ref($close, 1)  Mean($close, 3)  $high-$low  LABEL0
datetime   instrument
2010-01-04 SH600000    81.807068  17145150.0       83.737389        83.016739    2.741058  0.0032
           SH600004    13.313329  11800983.0       13.313329        13.317701    0.183632  0.0042
           SH600005    37.796539  12231662.0       38.258602        37.919757    0.970325  0.0289

提高数据处理器性能的技巧 - 使用col_set=CS_RAW获取数据将返回原始数据,并可能避免在调用loc时pandas复制数据

__init__(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, init_data=True, fetch_orig=True)
Parameters:
  • instruments – 要检索的股票列表。

  • start_time – 原始数据的开始时间。

  • end_time – 原始数据的结束时间。

  • data_loader (Union[dict, str, DataLoader]) – 用于加载数据的数据加载器。

  • init_data – 在构造函数中初始化原始数据。

  • fetch_orig (bool) – 如果可能,返回原始数据而不是副本。

config(**kwargs)

数据的配置。 # 从数据源加载哪些数据

此方法将在从数据集中加载腌制处理程序时使用。 数据将使用不同的时间范围进行初始化。

setup_data(enable_cache: bool = False)

在多次运行初始化的情况下设置数据

它负责维护以下变量 1) self._data

Parameters:

enable_cache (bool) –

默认值为 false:

  • 如果 enable_cache == True:

    处理后的数据将保存在磁盘上,当我们下次调用 init 时,处理程序将直接从磁盘加载缓存的数据

fetch(selector: Timestamp | slice | str | Index = slice(None, None, None), level: str | int = 'datetime', col_set: str | List[str] = '__all', squeeze: bool = False, proc_func: Callable | None = None) DataFrame

从底层数据源获取数据

设计动机: - 为底层数据提供统一的接口。 - 有可能使接口更加友好。 - 用户可以在这一额外层中获取数据时提高性能。

Parameters:
  • selector (Union[pd.Timestamp, slice, str]) –

    描述如何通过索引选择数据 它可以分为以下几类

    • 获取单个索引

    • 获取一系列索引

      • 一个切片范围

      • 特定索引的pd.Index

    可能会出现以下冲突

    • ["20200101", "20210101"] 是选择这个切片还是这两天?

      • 切片具有更高的优先级

  • level (Union[str, int]) – 选择数据的索引级别

  • col_set (Union[str, List[str]]) –

    • 如果 isinstance(col_set, str):

      选择一组有意义的 pd.Index 列。(例如:特征、列)

      • 如果 col_set == CS_RAW:

        将返回原始数据集。

    • 如果 isinstance(col_set, List[str]):

      选择多组有意义的列,返回的数据具有多个层次

  • proc_func (Callable) –

    • 提供一个钩子函数,用于在获取数据之前处理数据

    • 一个例子来解释钩子的必要性:

      • 一个数据集学习了一些处理器来处理与数据分割相关的数据

      • 每次准备数据时都会应用这些处理器

      • 学习到的处理器要求在拟合和应用时数据框保持相同的格式

      • 然而,数据格式会根据参数而变化

      • 因此,处理器应该应用于底层数据

  • squeeze (bool) – 是否压缩列和索引

Return type:

pd.DataFrame.

get_cols(col_set='__all') list

获取列名

Parameters:

col_set (str) – 选择一组有意义的列。(例如:特征、列)

Returns:

列名列表

Return type:

列表

get_range_selector(cur_date: Timestamp | str, periods: int) slice

根据周期数获取范围选择器

Parameters:
  • cur_date (pd.Timestampstr) – 当前日期

  • periods (int) – 周期数

get_range_iterator(periods: int, min_periods: int | None = None, **kwargs) Iterator[Tuple[Timestamp, DataFrame]]

获取具有给定周期的切片数据的迭代器

Parameters:
  • periods (int) – 周期数。

  • min_periods (int) – 切片数据帧的最小周期。

  • kwargs (dict) – 将被传递给 self.fetch

class qlib.data.dataset.handler.DataHandlerLP(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, infer_processors: List = [], learn_processors: List = [], shared_processors: List = [], process_type='append', drop_raw=False, **kwargs)

带有(L)earnable (P)rocessor的DataHandler

此处理程序将生成三块数据,格式为pd.DataFrame。

  • DK_R / self._data: 从加载器加载的原始数据

  • DK_I / self._infer: 用于推理处理的数据

  • DK_L / self._learn: 用于学习模型处理的数据。

使用不同处理器工作流进行学习和推理的动机 以下是一些示例。

  • 用于学习和推理的工具集可能不同。

  • 某些样本的处理可能依赖于标签(例如,一些达到限制的样本可能需要额外处理或被丢弃)。

    • 这些处理器仅适用于学习阶段。

数据处理器的提示

  • 为了减少内存成本

    • drop_raw=True: 这将会在原始数据上就地修改数据;

  • 请注意,像self._inferself._learn这样的处理数据与Qlib的Dataset中的“train”和“test”等segments概念不同。

    • 处理过的数据如 self._inferself._learn 是通过不同处理器处理的基础数据

    • segments 在 Qlib 的 Dataset 中,如“train”和“test”,只是查询数据时的时间分段(在时间序列中,“train”通常早于“test”)。

    • 例如,您可以在“训练”时间段查询由infer_processors处理的data._infer

__init__(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, infer_processors: List = [], learn_processors: List = [], shared_processors: List = [], process_type='append', drop_raw=False, **kwargs)
Parameters:
  • infer_processors (list) –

    • 用于生成推理数据的处理器列表

    • 示例:

    1) classname & kwargs:
        {
            "class": "MinMaxNorm",
            "kwargs": {
                "fit_start_time": "20080101",
                "fit_end_time": "20121231"
            }
        }
    2) Only classname:
        "DropnaFeature"
    3) object instance of Processor
    

  • learn_processors (list) – 类似于 infer_processors,但用于生成学习模型的数据

  • process_type (str) –

    PTYPE_I = ‘independent’

    • self._infer 将由 infer_processors 处理

    • self._learn 将由 learn_processors 处理

    PTYPE_A = ‘append’

    • self._infer 将由 infer_processors 处理

    • self._learn 将由 infer_processors + learn_processors 处理

      • (例如,self._infer 由 learn_processors 处理)

  • drop_raw (bool) – 是否删除原始数据

fit()

在不处理数据的情况下拟合数据

fit_process_data()

拟合和处理数据

fit 的输入将是前一个处理器的输出

process_data(with_fit: bool = False)

处理数据。如果需要,请使用processor.fit

符号:(数据) [处理器]

# 当 self.process_type == DataHandlerLP.PTYPE_I 时的数据处理流程

(self._data)-[shared_processors]-(_shared_df)-[learn_processors]-(_learn_df)
                                       \
                                        -[infer_processors]-(_infer_df)

# 当 self.process_type == DataHandlerLP.PTYPE_A 时的数据处理流程

(self._data)-[shared_processors]-(_shared_df)-[infer_processors]-(_infer_df)-[learn_processors]-(_learn_df)
Parameters:

with_fit (bool) – fit 的输入将是前一个处理器的输出

config(processor_kwargs: dict | None = None, **kwargs)

数据的配置。 # 从数据源加载哪些数据

此方法将在从数据集中加载腌制处理程序时使用。 数据将使用不同的时间范围进行初始化。

setup_data(init_type: str = 'fit_seq', **kwargs)

设置数据以防多次运行初始化

Parameters:
  • init_type (str) – 上面列出的类型 IT_*

  • enable_cache (bool) –

    默认值为 false:

    • 如果 enable_cache == True:

      处理后的数据将保存在磁盘上,当我们下次调用 init 时,处理程序将直接从磁盘加载缓存的数据

fetch(selector: Timestamp | slice | str = slice(None, None, None), level: str | int = 'datetime', col_set='__all', data_key: Literal['raw', 'infer', 'learn'] = 'infer', squeeze: bool = False, proc_func: Callable | None = None) DataFrame

从底层数据源获取数据

Parameters:
  • selector (Union[pd.Timestamp, slice, str]) – 描述如何通过索引选择数据。

  • level (Union[str, int]) – 选择数据的索引级别。

  • col_set (str) – 选择一组有意义的列。(例如:特征、列)。

  • data_key (str) – 要获取的数据: DK_*.

  • proc_func (Callable) – 请参考DataHandler.fetch的文档

Return type:

pd.DataFrame

get_cols(col_set='__all', data_key: Literal['raw', 'infer', 'learn'] = 'infer') list

获取列名

Parameters:
  • col_set (str) – 选择一组有意义的列。(例如:特征、列)。

  • data_key (DATA_KEY_TYPE) – 要获取的数据: DK_*.

Returns:

列名列表

Return type:

列表

classmethod cast(handler: DataHandlerLP) DataHandlerLP

Motivation

  • 用户在他的自定义包中创建了一个数据处理器。然后他希望将处理后的处理器分享给其他用户,而不引入包依赖和复杂的数据处理逻辑。

  • 这个类通过将类转换为DataHandlerLP并仅保留处理后的数据来实现这一点

Parameters:

handler (DataHandlerLP) – DataHandlerLP 的一个子类

Returns:

转换后的处理数据

Return type:

DataHandlerLP

classmethod from_df(df: DataFrame) DataHandlerLP

动机: - 当用户想要快速获取数据处理程序时。

创建的数据处理程序将只有一个共享的Dataframe,没有处理器。 创建处理程序后,用户可能经常希望转储处理程序以便重用 这是一个典型的使用案例

from qlib.data.dataset import DataHandlerLP
dh = DataHandlerLP.from_df(df)
dh.to_pickle(fname, dump_all=True)

待办事项: - StaticDataLoader 相当慢。它不必再次复制数据…

处理器

qlib.data.dataset.processor.get_group_columns(df: DataFrame, group: str | None)

从多索引列的DataFrame中获取一组列

Parameters:
  • df (pd.DataFrame) – 包含多列。

  • group (str) – 特征组的名称,即组索引的第一级值。

class qlib.data.dataset.processor.Processor
fit(df: DataFrame | None = None)

学习数据处理参数

Parameters:

df (pd.DataFrame) – 当我们逐个使用处理器拟合和处理数据时。拟合函数依赖于前一个处理器的输出,即 df

is_for_infer() bool

这个处理器是否可用于推理 有些处理器不能用于推理。

Returns:

如果它可用于推理。

Return type:

布尔

readonly() bool

处理器在处理时是否将输入数据视为只读(即不写入输入数据)

了解只读信息有助于Handler避免不必要的复制

config(**kwargs)

配置可序列化对象

Parameters:
  • keys (kwargs 可能包括以下内容) –

    dump_allbool

    对象是否转储所有对象

    excludelist

    哪些属性不会被转储

    includelist

    哪些属性会被转储

  • recursive (bool) – 配置是否递归

class qlib.data.dataset.processor.DropnaProcessor(fields_group=None)
__init__(fields_group=None)
readonly()

处理器在处理时是否将输入数据视为只读(即不写入输入数据)

了解只读信息有助于Handler避免不必要的复制

class qlib.data.dataset.processor.DropnaLabel(fields_group='label')
__init__(fields_group='label')
is_for_infer() bool

样本根据标签被丢弃。因此,它不适用于推理

class qlib.data.dataset.processor.DropCol(col_list=[])
__init__(col_list=[])
readonly()

处理器在处理时是否将输入数据视为只读(即不写入输入数据)

了解只读信息有助于Handler避免不必要的复制

class qlib.data.dataset.processor.FilterCol(fields_group='feature', col_list=[])
__init__(fields_group='feature', col_list=[])
readonly()

处理器在处理时是否将输入数据视为只读(即不写入输入数据)

了解只读信息有助于Handler避免不必要的复制

class qlib.data.dataset.processor.TanhProcess

使用tanh处理噪声数据

class qlib.data.dataset.processor.ProcessInf

处理无限

class qlib.data.dataset.processor.Fillna(fields_group=None, fill_value=0)

处理NaN

__init__(fields_group=None, fill_value=0)
class qlib.data.dataset.processor.MinMaxNorm(fit_start_time, fit_end_time, fields_group=None)
__init__(fit_start_time, fit_end_time, fields_group=None)
fit(df: DataFrame | None = None)

学习数据处理参数

Parameters:

df (pd.DataFrame) – 当我们逐个使用处理器拟合和处理数据时。拟合函数依赖于前一个处理器的输出,即 df

class qlib.data.dataset.processor.ZScoreNorm(fit_start_time, fit_end_time, fields_group=None)

Z分数归一化

__init__(fit_start_time, fit_end_time, fields_group=None)
fit(df: DataFrame | None = None)

学习数据处理参数

Parameters:

df (pd.DataFrame) – 当我们逐个使用处理器拟合和处理数据时。拟合函数依赖于前一个处理器的输出,即 df

class qlib.data.dataset.processor.RobustZScoreNorm(fit_start_time, fit_end_time, fields_group=None, clip_outlier=True)

鲁棒Z分数归一化

Use robust statistics for Z-Score normalization:

mean(x) = median(x) std(x) = MAD(x) * 1.4826

Reference:

https://en.wikipedia.org/wiki/Median_absolute_deviation.

__init__(fit_start_time, fit_end_time, fields_group=None, clip_outlier=True)
fit(df: DataFrame | None = None)

学习数据处理参数

Parameters:

df (pd.DataFrame) – 当我们逐个使用处理器拟合和处理数据时。拟合函数依赖于前一个处理器的输出,即 df

class qlib.data.dataset.processor.CSZScoreNorm(fields_group=None, method='zscore')

横截面Z分数归一化

__init__(fields_group=None, method='zscore')
class qlib.data.dataset.processor.CSRankNorm(fields_group=None)

横截面排名归一化。 “横截面”通常用于描述数据操作。 对不同股票的操作通常称为横截面操作。

例如,CSRankNorm 是一种操作,它按每天对数据进行分组,并在每天对所有股票进行排名。

关于3.46和0.5的解释

import numpy as np
import pandas as pd
x = np.random.random(10000)  # for any variable
x_rank = pd.Series(x).rank(pct=True)  # if it is converted to rank, it will be a uniform distributed
x_rank_norm = (x_rank - x_rank.mean()) / x_rank.std()  # Normally, we will normalize it to make it like normal distribution

x_rank.mean()   # accounts for 0.5
1 / x_rank.std()  # accounts for 3.46
__init__(fields_group=None)
class qlib.data.dataset.processor.CSZFillna(fields_group=None)

横截面填充空值

__init__(fields_group=None)
class qlib.data.dataset.processor.HashStockFormat

处理从df到哈希库存格式的存储

class qlib.data.dataset.processor.TimeRangeFlt(start_time: Timestamp | str | None = None, end_time: Timestamp | str | None = None, freq: str = 'day')

这是一个用于筛选股票的过滤器。 只保留从start_time到end_time存在的数据(中间的存在性未检查)。 警告:可能会导致泄漏!!!

__init__(start_time: Timestamp | str | None = None, end_time: Timestamp | str | None = None, freq: str = 'day')
Parameters:
  • start_time (可选[联合[pd.Timestamp, str]]) – 数据必须早于(或等于)start_time None 表示数据不会基于start_time进行过滤

  • end_time (可选[联合[pd.Timestamp, str]]) – 类似于 start_time

  • freq (str) – 日历的频率

贡献

模型

class qlib.model.base.BaseModel

建模事物

abstract predict(*args, **kwargs) object

在建模后进行预测

class qlib.model.base.Model

可学习模型

fit(dataset: Dataset, reweighter: Reweighter)

从基础模型学习模型

注意

学习模型的属性名称不应以‘_’开头。这样模型才能被转储到磁盘。

以下代码示例展示了如何从dataset中检索x_trainy_trainw_train

# get features and labels
df_train, df_valid = dataset.prepare(
    ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L
)
x_train, y_train = df_train["feature"], df_train["label"]
x_valid, y_valid = df_valid["feature"], df_valid["label"]

# get weights
try:
    wdf_train, wdf_valid = dataset.prepare(["train", "valid"], col_set=["weight"],
                                           data_key=DataHandlerLP.DK_L)
    w_train, w_valid = wdf_train["weight"], wdf_valid["weight"]
except KeyError as e:
    w_train = pd.DataFrame(np.ones_like(y_train.values), index=y_train.index)
    w_valid = pd.DataFrame(np.ones_like(y_valid.values), index=y_valid.index)
Parameters:

dataset (Dataset) – 数据集将生成模型训练所需的处理数据。

abstract predict(dataset: Dataset, segment: str | slice = 'test') object

给定数据集进行预测

Parameters:
  • dataset (Dataset) – 数据集将从模型训练中生成处理后的数据集。

  • segment (Textslice) – 数据集将使用此段来准备数据。(默认=test)

Return type:

预测结果具有特定类型,例如pandas.Series

class qlib.model.base.ModelFT

模型 (F)ine(t)unable

abstract finetune(dataset: Dataset)

基于给定数据集的微调模型

使用qlib.workflow.R微调模型的典型用例。

# start exp to train init model
with R.start(experiment_name="init models"):
    model.fit(dataset)
    R.save_objects(init_model=model)
    rid = R.get_recorder().id

# Finetune model based on previous trained model
with R.start(experiment_name="finetune model"):
    recorder = R.get_recorder(recorder_id=rid, experiment_name="init models")
    model = recorder.load_object("init_model")
    model.finetune(dataset, num_boost_round=10)
Parameters:

dataset (Dataset) – 数据集将从模型训练中生成处理后的数据集。

策略

class qlib.contrib.strategy.TopkDropoutStrategy(*, topk, n_drop, method_sell='bottom', method_buy='top', hold_thresh=1, only_tradable=False, forbid_all_trade_at_limit=True, **kwargs)
__init__(*, topk, n_drop, method_sell='bottom', method_buy='top', hold_thresh=1, only_tradable=False, forbid_all_trade_at_limit=True, **kwargs)
Parameters:
  • topk (int) – 投资组合中的股票数量。

  • n_drop (int) – 每个交易日要替换的股票数量。

  • method_sell (str) – 丢弃方法,随机/底部。

  • method_buy (str) – 随机/顶部的dropout method_buy。

  • hold_thresh (int) – 卖出股票前的最小持有天数,将检查 current.get_stock_count(order.stock_id) >= self.hold_thresh。

  • only_tradable (bool) –

    策略在买卖时是否只考虑可交易的股票。

    如果 only_tradable 为真:

    策略将根据股票信息的可交易状态做出决策,并避免买卖它们。

    否则:

    策略将在不检查股票可交易状态的情况下做出买卖决策。

  • forbid_all_trade_at_limit (bool) –

    如果达到涨停或跌停时禁止所有交易。

    如果 forbid_all_trade_at_limit:

    当价格达到涨停/跌停时,策略将不会进行任何交易,即使在现实中允许,也不会在涨停时卖出或在跌停时买入。

    否则:

    策略将在涨停时卖出并在跌停时买入。

generate_trade_decision(execute_result=None)

在每个交易时段生成交易决策

Parameters:

execute_result (List[object], optional) –

交易决策的执行结果,默认为 None

  • 首次调用 generate_trade_decision 时,execute_result 可能为 None

class qlib.contrib.strategy.WeightStrategyBase(*, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWOInteract'>, **kwargs)
__init__(*, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWOInteract'>, **kwargs)
signal :

描述信号的信息。请参考qlib.backtest.signal.create_signal_from的文档 策略的决策将基于给定的信号

trade_exchangeExchange

提供市场信息的交易所,用于处理订单和生成报告

  • 如果 trade_exchange 为 None,self.trade_exchange 将被设置为 common_infra

  • 它允许在不同的执行中使用不同的trade_exchanges。

  • 例如:

    • 在日常执行中,每日交换和每分钟交换都是可用的,但建议使用每日交换,因为它运行得更快。

    • 在每分钟执行中,每日交换不可用,仅推荐使用每分钟交换。

generate_target_weight_position(score, current, trade_start_time, trade_end_time)

根据该日期的分数和当前位置生成目标位置。现金不包括在位置中。

Parameters:
  • score (pd.Series) – 该交易日的预测分数,索引为stock_id,包含‘score’列。

  • current (Position()) – 当前位置。

  • trade_start_time (pd.Timestamp) –

  • trade_end_time (pd.Timestamp) –

generate_trade_decision(execute_result=None)

在每个交易时段生成交易决策

Parameters:

execute_result (List[object], optional) –

交易决策的执行结果,默认为 None

  • 首次调用 generate_trade_decision 时,execute_result 可能为 None

class qlib.contrib.strategy.EnhancedIndexingStrategy(*, riskmodel_root, market='csi500', turn_limit=None, name_mapping={}, optimizer_kwargs={}, verbose=False, **kwargs)

增强型索引策略

增强型指数投资结合了主动管理和被动管理的艺术,旨在控制风险暴露(即跟踪误差)的同时,在投资组合回报方面超越基准指数(例如,S&P 500)。

用户需要准备如下的风险模型数据:

├── /path/to/riskmodel
├──── 20210101
├────── factor_exp.{csv|pkl|h5}
├────── factor_cov.{csv|pkl|h5}
├────── specific_risk.{csv|pkl|h5}
├────── blacklist.{csv|pkl|h5}  # optional

风险模型数据可以从风险数据提供商处获取。你也可以使用 qlib.model.riskmodel.structured.StructuredCovEstimator 来准备这些数据。

Parameters:
  • riskmodel_path (str) – 风险模型路径

  • name_mapping (dict) – 替代文件名

__init__(*, riskmodel_root, market='csi500', turn_limit=None, name_mapping={}, optimizer_kwargs={}, verbose=False, **kwargs)
signal :

描述信号的信息。请参考qlib.backtest.signal.create_signal_from的文档 策略的决策将基于给定的信号

trade_exchangeExchange

提供市场信息的交易所,用于处理订单和生成报告

  • 如果 trade_exchange 为 None,self.trade_exchange 将被设置为 common_infra

  • 它允许在不同的执行中使用不同的trade_exchanges。

  • 例如:

    • 在日常执行中,每日交换和每分钟交换都是可用的,但建议使用每日交换,因为它运行得更快。

    • 在每分钟执行中,每日交换不可用,仅推荐使用每分钟交换。

generate_target_weight_position(score, current, trade_start_time, trade_end_time)

根据该日期的分数和当前位置生成目标位置。现金不包括在位置中。

Parameters:
  • score (pd.Series) – 该交易日的预测分数,索引为stock_id,包含‘score’列。

  • current (Position()) – 当前位置。

  • trade_start_time (pd.Timestamp) –

  • trade_end_time (pd.Timestamp) –

class qlib.contrib.strategy.TWAPStrategy(outer_trade_decision: BaseTradeDecision = None, level_infra: LevelInfrastructure = None, common_infra: CommonInfrastructure = None, trade_exchange: Exchange = None)

TWAP交易策略

注意

  • 此TWAP策略将在交易时进行四舍五入。这将使TWAP交易策略在总交易单位金额小于交易步长时更早生成订单。

reset(outer_trade_decision: BaseTradeDecision | None = None, **kwargs)
Parameters:

outer_trade_decision (BaseTradeDecision, 可选) –

generate_trade_decision(execute_result=None)

在每个交易时段生成交易决策

Parameters:

execute_result (List[object], optional) –

交易决策的执行结果,默认为 None

  • 首次调用 generate_trade_decision 时,execute_result 可能为 None

class qlib.contrib.strategy.SBBStrategyBase(outer_trade_decision: BaseTradeDecision = None, level_infra: LevelInfrastructure = None, common_infra: CommonInfrastructure = None, trade_exchange: Exchange = None)

在每两个相邻的交易条中选择更好的一个进行卖出或买入。

reset(outer_trade_decision: BaseTradeDecision | None = None, **kwargs)
Parameters:

outer_trade_decision (BaseTradeDecision, optional) –

generate_trade_decision(execute_result=None)

在每个交易时段生成交易决策

Parameters:

execute_result (List[object], optional) –

交易决策的执行结果,默认为 None

  • 首次调用 generate_trade_decision 时,execute_result 可能为 None

class qlib.contrib.strategy.SBBStrategyEMA(outer_trade_decision: BaseTradeDecision | None = None, instruments: List | str = 'csi300', freq: str = 'day', trade_exchange: Exchange | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs)

在每两个相邻的交易(B)ars中选择(S)更好的一个,根据(EMA)信号进行卖出或买入。

__init__(outer_trade_decision: BaseTradeDecision | None = None, instruments: List | str = 'csi300', freq: str = 'day', trade_exchange: Exchange | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs)
Parameters:
  • instruments (Union[List, str], optional) – EMA信号的instruments,默认为“csi300”

  • freq (str, optional) – EMA信号的频率,默认为“day” 注意:freq 可能与 time_per_step 不同

reset_level_infra(level_infra)

重置级别共享基础设施 - 重置交易日历后,信号将会改变

class qlib.contrib.strategy.SoftTopkStrategy(model, dataset, topk, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWInteract'>, max_sold_weight=1.0, risk_degree=0.95, buy_method='first_fill', trade_exchange=None, level_infra=None, common_infra=None, **kwargs)
__init__(model, dataset, topk, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWInteract'>, max_sold_weight=1.0, risk_degree=0.95, buy_method='first_fill', trade_exchange=None, level_infra=None, common_infra=None, **kwargs)
Parameters:
  • topk (int) – 要购买的 top-N 股票

  • risk_degree (float) –

    占总价值的持仓百分比 buy_method:

    rank_fill: 首先分配权重给排名高的股票(1/topk max) average_fill: 平均分配权重给排名高的股票。

get_risk_degree(trade_step=None)

返回您将用于投资的总价值的比例。 动态的risk_degree将导致市场时机选择

generate_target_weight_position(score, current, trade_start_time, trade_end_time)
Parameters:
  • score – 该交易日的预测分数,pd.Series,索引为stock_id,包含‘score’列

  • current – 当前位置,使用 Position() 类

  • trade_date

    交易日期

    根据该日期的评分和当前持仓生成目标持仓

    持仓中不考虑缓存

评估

qlib.contrib.evaluate.risk_analysis(r, N: int | None = None, freq: str = 'day')

风险分析 注意: 年化收益的计算与年化收益的定义不同。 这是设计上的实现。 Qlib试图通过求和而不是乘积来累积收益,以避免累积曲线呈指数偏斜。 在Qlib中,所有年化收益的计算都遵循这一原则。

待办事项:添加一个参数以启用使用生产积累回报计算指标。

Parameters:
  • r (pandas.Series) – 每日收益序列。

  • N (int) – 用于年化信息比率的缩放因子(日:252,周:50,月:12),至少需要存在Nfreq中的一个

  • freq (str) – 用于计算缩放器的分析频率,至少需要存在Nfreq中的一个

qlib.contrib.evaluate.indicator_analysis(df, method='mean')

分析交易的统计时间序列指标

Parameters:
  • df (pandas.DataFrame) –

    列: 如 [‘pa’, ‘pos’, ‘ffr’, ‘deal_amount’, ‘value’].
    必要字段:
    • ’pa’ 是交易指标中的价格优势

    • ’pos’ 是交易指标中的正利率

    • ’ffr’ 是交易指标中的履行率

    可选字段:
    • ’deal_amount’ 是总交易量,仅在方法为 ‘amount_weighted’ 时必要

    • ’value’ 是总交易价值,仅在方法为 ‘value_weighted’ 时必要

    索引: Index(datetime)

  • method (str, optional) –

    pa/ffr的统计方法,默认为“mean”

    • 如果方法是‘mean’,计算每个交易指标的平均统计值

    • 如果方法是‘amount_weighted’,计算每个交易指标的deal_amount加权平均统计值

    • 如果方法是‘value_weighted’,计算每个交易指标的价值加权平均统计值

    注意:pos的统计方法始终为“mean”

Returns:

每个交易指标的统计值

Return type:

pd.DataFrame

qlib.contrib.evaluate.backtest_daily(start_time: str | Timestamp, end_time: str | Timestamp, strategy: str | dict | BaseStrategy, executor: str | dict | BaseExecutor | None = None, account: float | int | Position = 100000000.0, benchmark: str = 'SH000300', exchange_kwargs: dict | None = None, pos_type: str = 'Position')

初始化策略和执行器,然后执行每日频率的回测

Parameters:
  • start_time (Union[str, pd.Timestamp]) – 回测的关闭开始时间 注意: 这将应用于最外层的执行者的日历。

  • end_time (Union[str, pd.Timestamp]) – 回测的结束时间 注意: 这将应用于最外层的执行器的日历。 例如,Executor[day](Executor[1min]),设置 end_time == 20XX0301 将包括20XX0301的所有分钟

  • strategy (Union[str, dict, BaseStrategy]) –

    用于初始化最外层的投资组合策略。请参阅init_instance_by_config的文档以获取更多信息。

    例如:

    # 字典
    strategy = {
        "class": "TopkDropoutStrategy",
        "module_path": "qlib.contrib.strategy.signal_strategy",
        "kwargs": {
            "signal": (model, dataset),
            "topk": 50,
            "n_drop": 5,
        },
    }
    # BaseStrategy
    pred_score = pd.read_pickle("score.pkl")["score"]
    STRATEGY_CONFIG = {
        "topk": 50,
        "n_drop": 5,
        "signal": pred_score,
    }
    strategy = TopkDropoutStrategy(**STRATEGY_CONFIG)
    # 字符串示例。
    # 1) 指定一个pickle对象
    #     - 路径如 'file:////obj.pkl'
    # 2) 指定一个类名
    #     - "ClassName":  将使用 getattr(module, "ClassName")()。
    # 3) 指定带有类名的模块路径
    #     - "a.b.c.ClassName" 将使用 getattr(, "ClassName")()。
    

  • executor (Union[str, dict, BaseExecutor]) – 用于初始化最外层的执行器。

  • benchmark (str) – 用于报告的基准。

  • account (Union[float, int, Position]) –

    描述如何创建账户的信息

    对于 floatint:

    仅使用初始现金创建账户

    对于 Position:

    使用带有头寸的账户

  • exchange_kwargs (dict) –

    用于初始化Exchange的关键字参数 例如

    exchange_kwargs = {
        "freq": freq,
        "limit_threshold": None, # limit_threshold 为 None,使用 C.limit_threshold
        "deal_price": None, # deal_price 为 None,使用 C.deal_price
        "open_cost": 0.0005,
        "close_cost": 0.0015,
        "min_cost": 5,
    }
    

  • pos_type (str) – 位置的类型。

Returns:

  • report_normal (pd.DataFrame) – 回测报告

  • positions_normal (pd.DataFrame) – 回测持仓

qlib.contrib.evaluate.long_short_backtest(pred, topk=50, deal_price=None, shift=1, open_cost=0, close_cost=0, trade_unit=None, limit_threshold=None, min_cost=5, subscribe_fields=[], extract_codes=False)

一个多空策略的回测

Parameters:
  • pred – 在第T天产生的交易信号。

  • topk – 做空排名前k的证券和做多排名前k的证券。

  • deal_price – 交易的价格。

  • shift – 是否将预测结果向后移动一天。如果shift==1,交易日将为T+1。

  • open_cost – 开启事务的成本。

  • close_cost – 关闭交易成本。

  • trade_unit – 中国A股为100。

  • limit_threshold – 例如,限制移动0.1(10%),多头和空头具有相同的限制。

  • min_cost – 最小交易成本。

  • subscribe_fields – 订阅字段。

  • extract_codes – bool. 我们是否会将从预测中提取的代码传递给交易所。 注意:使用离线qlib会更快。

Returns:

回测的结果,它由一个字典表示。 { “long”: long_returns(excess), “short”: short_returns(excess), “long_short”: long_short_returns}

报告

工作流程

实验管理器

class qlib.workflow.expm.ExpManager(uri: str, default_exp_name: str | None)

这是用于管理实验的ExpManager类。API设计类似于mlflow。 (链接:https://mlflow.org/docs/latest/python_api/mlflow.html

ExpManager 预期是一个单例(顺便说一下,我们可以有多个具有不同 uri 的 Experiment。用户可以从不同的 uri 获取不同的实验,然后比较它们的记录)。全局配置(即 `C)也是一个单例。

所以我们尝试将它们对齐。它们共享相同的变量,称为default uri。请参阅ExpManager.default_uri以了解变量共享的详细信息。

当用户开始一个实验时,用户可能希望将uri设置为特定的uri(在此期间它将覆盖默认uri),然后取消设置特定uri并回退到默认uriExpManager._active_exp_uri就是那个特定uri

__init__(uri: str, default_exp_name: str | None)
start_exp(*, experiment_id: str | None = None, experiment_name: str | None = None, recorder_id: str | None = None, recorder_name: str | None = None, uri: str | None = None, resume: bool = False, **kwargs) Experiment

开始一个实验。此方法包括首先获取或创建一个实验,然后将其设置为活动状态。

维护_active_exp_uri包含在start_exp中,剩余的实现应包含在子类的_end_exp中

Parameters:
  • experiment_id (str) – 当前实验的ID。

  • experiment_name (str) – 活动实验的名称。

  • recorder_id (str) – 要启动的记录器的ID。

  • recorder_name (str) – 要启动的记录器的名称。

  • uri (str) – 当前的跟踪URI。

  • resume (boolean) – 是否恢复实验和记录器。

Return type:

一个活跃的实验。

end_exp(recorder_status: str = 'SCHEDULED', **kwargs)

结束一个活跃的实验。

维护_active_exp_uri包含在end_exp中,剩余的实现应包含在子类的_end_exp中

Parameters:
  • experiment_name (str) – 活动实验的名称。

  • recorder_status (str) – 实验活动记录器的状态。

create_exp(experiment_name: str | None = None)

创建一个实验。

Parameters:

experiment_name (str) – 实验名称,必须是唯一的。

Return type:

一个实验对象。

Raises:

ExpAlreadyExistError

search_records(experiment_ids=None, **kwargs)

获取符合实验搜索条件的记录的pandas DataFrame。 输入是用户想要应用的搜索条件。

Returns:

  • 一个pandas.DataFrame的记录,其中每个指标、参数和标签

  • 都被扩展为它们自己的列,分别命名为metrics.*, params.*, 和 tags.*

  • 对于没有特定指标、参数或标签的记录,它们的

  • 值将分别为(NumPy) Nan, None, 或 None。

get_exp(*, experiment_id=None, experiment_name=None, create: bool = True, start: bool = False)

检索一个实验。此方法包括获取一个活动的实验,以及获取或创建一个特定的实验。

当用户指定实验ID和名称时,该方法将尝试返回特定的实验。 当用户未提供记录器ID或名称时,该方法将尝试返回当前活动的实验。 create参数决定了如果实验之前未被创建,该方法是否将根据用户的规格自动创建一个新的实验。

  • 如果 create 为 True:

    • 如果存在active experiment

      • 未指定id或name,返回当前活动的实验。

      • 如果指定了id或name,则返回指定的实验。如果没有找到这样的实验,则使用给定的id或name创建一个新实验。如果start设置为True,则实验设置为活动状态。

    • 如果active experiment不存在:

      • 未指定ID或名称,创建默认实验。

      • 如果指定了id或name,则返回指定的实验。如果没有找到这样的实验,则使用给定的id或name创建一个新实验。如果start设置为True,则实验设置为活动状态。

  • 否则如果 create 为 False:

    • 如果存在active experiment

      • 未指定id或name,返回当前活动的实验。

      • 如果指定了id或name,则返回指定的实验。如果没有找到这样的实验,则引发错误。

    • 如果active experiment不存在:

      • 未指定ID或名称。如果默认实验存在,则返回它,否则引发错误。

      • 如果指定了id或name,则返回指定的实验。如果没有找到这样的实验,则引发错误。

Parameters:
  • experiment_id (str) – 要返回的实验的id。

  • experiment_name (str) – 要返回的实验名称。

  • create (boolean) – 如果之前没有创建过实验,则创建它。

  • 开始 (布尔值) – 如果创建了新实验,则启动它。

Return type:

一个实验对象。

delete_exp(experiment_id=None, experiment_name=None)

删除一个实验。

Parameters:
  • experiment_id (str) – 实验ID。

  • experiment_name (str) – 实验名称。

property default_uri

从qlib.config.C获取默认的跟踪URI

property uri

获取默认的跟踪URI或当前URI。

Return type:

跟踪URI字符串。

list_experiments()

列出所有现有的实验。

Return type:

存储的实验信息的字典(名称 -> 实验)。

实验

class qlib.workflow.exp.Experiment(id, name)

这是每个正在运行的实验的Experiment类。API的设计类似于mlflow。 (链接:https://mlflow.org/docs/latest/python_api/mlflow.html

__init__(id, name)
start(*, recorder_id=None, recorder_name=None, resume=False)

开始实验并将其设置为活动状态。此方法还将启动一个新的记录器。

Parameters:
  • recorder_id (str) – 要创建的记录器的ID。

  • recorder_name (str) – 要创建的记录器的名称。

  • resume (bool) – 是否恢复第一个记录器

Return type:

一个活跃的记录器。

end(recorder_status='SCHEDULED')

结束实验。

Parameters:

recorder_status (str) – 结束时要设置的记录器状态(SCHEDULED, RUNNING, FINISHED, FAILED)。

create_recorder(recorder_name=None)

为每个实验创建一个记录器。

Parameters:

recorder_name (str) – 要创建的记录器的名称。

Return type:

一个记录器对象。

search_records(**kwargs)

获取符合实验搜索条件的记录的pandas DataFrame。 输入是用户想要应用的搜索条件。

Returns:

  • 一个pandas.DataFrame的记录,其中每个指标、参数和标签

  • 都被扩展为它们自己的列,分别命名为metrics.*, params.*, 和 tags.*

  • 对于没有特定指标、参数或标签的记录,它们的

  • 值将分别为(NumPy) Nan, None, 或 None。

delete_recorder(recorder_id)

为每个实验创建一个记录器。

Parameters:

recorder_id (str) – 要删除的记录器的ID。

get_recorder(recorder_id=None, recorder_name=None, create: bool = True, start: bool = False) Recorder

为用户检索记录器。当用户指定记录器ID和名称时,该方法将尝试返回特定的记录器。当用户未提供记录器ID或名称时,该方法将尝试返回当前活动的记录器。create参数决定了如果记录器之前未被创建,该方法是否会自动根据用户的规格创建一个新的记录器。

  • 如果 create 为 True:

    • 如果存在active recorder

      • 未指定id或name,返回活动的记录器。

      • 如果指定了id或name,则返回指定的记录器。如果未找到此类实验,则使用给定的id或name创建一个新的记录器。如果start设置为True,则记录器设置为活动状态。

    • 如果active recorder不存在:

      • 未指定ID或名称,创建一个新的记录器。

      • 如果指定了id或name,则返回指定的实验。如果未找到此类实验,则使用给定的id或name创建一个新的记录器。如果start设置为True,则记录器设置为活动状态。

  • 否则如果 create 为 False:

    • 如果存在active recorder

      • 未指定id或name,返回活动的记录器。

      • 如果指定了id或name,则返回指定的记录器。如果未找到此类exp,则引发错误。

    • 如果active recorder不存在:

      • 未指定ID或名称,引发错误。

      • 如果指定了id或name,则返回指定的记录器。如果未找到此类exp,则引发错误。

Parameters:
  • recorder_id (str) – 要删除的记录器的ID。

  • recorder_name (str) – 要删除的记录器的名称。

  • create (boolean) – 如果之前没有创建过记录器,则创建它。

  • start (boolean) – 如果创建了新的记录器,则启动它。

Return type:

一个记录器对象。

list_recorders(rtype: Literal['dict', 'list'] = 'dict', **flt_kwargs) List[Recorder] | Dict[str, Recorder]

列出此实验的所有现有记录器。在调用此方法之前,请先获取实验实例。 如果用户想使用R.list_recorders()方法,请参考QlibRecorder中的相关API文档。

flt_kwargsdict

根据条件筛选记录器 例如:list_recorders(status=Recorder.STATUS_FI)

Returns:

if rtype == “dict”:

一个存储的记录器信息的字典(id -> recorder)。

elif rtype == “list”:

一个记录器的列表。

Return type:

返回类型取决于 rtype

记录器

class qlib.workflow.recorder.Recorder(experiment_id, name)

这是用于记录实验的Recorder类。API设计类似于mlflow。 (链接:https://mlflow.org/docs/latest/python_api/mlflow.html

记录器的状态可以是SCHEDULED(已计划)、RUNNING(运行中)、FINISHED(已完成)、FAILED(失败)。

__init__(experiment_id, name)
save_objects(local_path=None, artifact_path=None, **kwargs)

将预测文件或模型检查点等对象保存到工件URI。用户可以通过关键字参数(名称:值)保存对象。

请参考qlib.workflow的文档:R.save_objects

Parameters:
  • local_path (str) – 如果提供,则将文件或目录保存到工件URI。

  • artifact_path=None (str) – 工件在URI中存储的相对路径。

load_object(name)

加载对象,例如预测文件或模型检查点。

Parameters:

name (str) – 要加载的文件的名称。

Return type:

保存的对象。

start_run()

开始运行或恢复记录器。返回值可以用作with块中的上下文管理器;否则,您必须调用end_run()来终止当前运行。(请参阅mlflow中的ActiveRun类)

Return type:

一个活动的运行对象(例如 mlflow.ActiveRun 对象)。

end_run()

结束一个活动的记录器。

log_params(**kwargs)

记录当前运行的一批参数。

Parameters:

arguments (keyword) – 要记录为参数的键值对。

log_metrics(step=None, **kwargs)

为当前运行记录多个指标。

Parameters:

arguments (keyword) – 要记录为指标的键值对。

log_artifact(local_path: str, artifact_path: str | None = None)

将本地文件或目录记录为当前活动运行的工件。

Parameters:
  • local_path (str) – 要写入文件的路径。

  • artifact_path (可选[str]) – 如果提供,则为artifact_uri中要写入的目录。

set_tags(**kwargs)

为当前运行记录一批标签。

Parameters:

arguments (keyword) – 要记录为标签的键值对。

delete_tags(*keys)

从运行中删除一些标签。

Parameters:

keys (series of strs of the keys) – 要删除的标签的所有名称。

list_artifacts(artifact_path: str | None = None)

列出记录器的所有工件。

Parameters:

artifact_path (str) – 工件在URI中存储的相对路径。

Return type:

存储的工件信息(名称、路径等)列表。

download_artifact(path: str, dst_path: str | None = None) str

如果适用,从运行中下载一个工件文件或目录到本地目录,并返回其本地路径。

Parameters:
  • path (str) – 所需工件的相对源路径。

  • dst_path (可选[str]) – 本地文件系统目标目录的绝对路径,用于下载指定的工件。此目录必须已经存在。如果未指定,工件将被下载到本地文件系统上一个新的唯一命名的目录中。

Returns:

所需工件的本地路径。

Return type:

字符串

list_metrics()

列出记录器的所有指标。

Return type:

存储的指标字典。

list_params()

列出记录器的所有参数。

Return type:

存储的参数字典。

list_tags()

列出记录器的所有标签。

Return type:

存储的标签字典。

记录模板

class qlib.workflow.record_temp.RecordTemp(recorder)

这是记录模板类,使用户能够以特定格式生成实验结果,如IC和回测。

save(**kwargs)

它的行为与self.recorder.save_objects相同。 但它是一个更简单的接口,因为用户不必关心get_pathartifact_path

__init__(recorder)
generate(**kwargs)

生成某些记录,如IC、回测等,并保存它们。

Parameters:

kwargs

load(name: str, parents: bool = True)

它的行为与self.recorder.load_object相同。 但它是一个更简单的接口,因为用户不必关心get_pathartifact_path

Parameters:
  • name (str) – 要加载的文件的名称。

  • parents (bool) – 每个记录器都有不同的 artifact_path。 因此,parents 会递归地在父类中查找路径。 子类具有更高的优先级

Return type:

存储的记录。

list()

列出支持的工件。 用户不必考虑self.get_path

Return type:

所有支持的工件列表。

check(include_self: bool = False, parents: bool = True)

检查记录是否正确生成并保存。 在以下示例中非常有用

  • 在生成新内容之前检查依赖文件是否完整。

  • 检查最终文件是否已完成

Parameters:
  • include_self (bool) – 文件是否由自身生成

  • parents (bool) – 我们是否检查父级

Raises:

FileNotFoundError – 记录是否存储正确。

class qlib.workflow.record_temp.SignalRecord(model=None, dataset=None, recorder=None)

这是生成信号预测的Signal Record类。该类继承自RecordTemp类。

__init__(model=None, dataset=None, recorder=None)
generate(**kwargs)

生成某些记录,如IC、回测等,并保存它们。

Parameters:

kwargs

list()

列出支持的工件。 用户不必考虑self.get_path

Return type:

所有支持的工件列表。

class qlib.workflow.record_temp.ACRecordTemp(recorder, skip_existing=False)

自动检查记录模板

__init__(recorder, skip_existing=False)
generate(*args, **kwargs)

自动检查文件,然后运行具体的生成任务

class qlib.workflow.record_temp.HFSignalRecord(recorder, **kwargs)

这是信号分析记录类,用于生成如IC和IR等分析结果。此类继承自RecordTemp类。

depend_cls

SignalRecord 的别名

__init__(recorder, **kwargs)
generate()

生成某些记录,如IC、回测等,并保存它们。

Parameters:

kwargs

list()

列出支持的工件。 用户不必考虑self.get_path

Return type:

所有支持的工件列表。

class qlib.workflow.record_temp.SigAnaRecord(recorder, ana_long_short=False, ann_scaler=252, label_col=0, skip_existing=False)

这是信号分析记录类,用于生成诸如IC和IR的分析结果。 此类继承自RecordTemp类。

depend_cls

SignalRecord 的别名

__init__(recorder, ana_long_short=False, ann_scaler=252, label_col=0, skip_existing=False)
list()

列出支持的工件。 用户不必考虑self.get_path

Return type:

所有支持的工件列表。

class qlib.workflow.record_temp.PortAnaRecord(recorder, config=None, risk_analysis_freq: List | str | None = None, indicator_analysis_freq: List | str | None = None, indicator_analysis_method=None, skip_existing=False, **kwargs)

这是生成分析结果(如回测结果)的投资组合分析记录类。该类继承自RecordTemp类。

以下文件将存储在记录器中

  • report_normal.pkl & positions_normal.pkl:

    • 回测的返回报告和详细持仓,由qlib/contrib/evaluate.py:backtest返回

  • port_analysis.pkl : 您的投资组合的风险分析,由qlib/contrib/evaluate.py:risk_analysis返回

depend_cls

SignalRecord 的别名

__init__(recorder, config=None, risk_analysis_freq: List | str | None = None, indicator_analysis_freq: List | str | None = None, indicator_analysis_method=None, skip_existing=False, **kwargs)
config[“strategy”]dict

定义策略类以及kwargs。

config[“executor”]dict

定义执行器类以及kwargs。

config[“backtest”]dict

定义回测的kwargs。

risk_analysis_freqstr|List[str]

风险分析报告频率

indicator_analysis_freqstr|List[str]

指标分析报告频率

indicator_analysis_methodstr, optional, default by None

候选值包括‘mean’, ‘amount_weighted’, ‘value_weighted’

list()

列出支持的工件。 用户不必考虑self.get_path

Return type:

所有支持的工件列表。

class qlib.workflow.record_temp.MultiPassPortAnaRecord(recorder, pass_num=10, shuffle_init_score=True, **kwargs)

这是多次投资组合分析记录类,它多次运行回测并生成诸如回测结果的分析结果。此类继承自PortAnaRecord类。

如果启用了shuffle_init_score,第一个回测日期的预测分数将被随机打乱,因此初始位置将是随机的。 shuffle_init_score仅在信号用作占位符时有效。占位符将被保存在recorder中的pred.pkl替换。

Parameters:
  • recorder (Recorder) – 用于保存回测结果的记录器。

  • pass_num (int) – 回测通过的次数。

  • shuffle_init_score (bool) – 是否打乱第一个回测日期的预测分数。

depend_cls

SignalRecord 的别名

__init__(recorder, pass_num=10, shuffle_init_score=True, **kwargs)
Parameters:
  • recorder (Recorder) – 用于保存回测结果的记录器。

  • pass_num (int) – 回测通过的次数。

  • shuffle_init_score (bool) – 是否打乱第一个回测日期的预测分数。

list()

列出支持的工件。 用户不必考虑self.get_path

Return type:

所有支持的工件列表。

任务管理

任务生成

TaskGenerator 模块可以根据 TaskGen 和一些任务模板生成许多任务。

qlib.workflow.task.gen.task_generator(tasks, generators) list

使用TaskGen列表和任务模板列表来生成不同的任务。

例如:

有3个任务模板a、b、c和2个任务生成器A、B。A将从模板生成2个任务,B将从模板生成3个任务。 task_generator([a, b, c], [A, B])最终将生成3*2*3 = 18个任务。

Parameters:
  • tasks (List[dict] 或 dict) – 任务模板列表或单个任务

  • 生成器 (列表[TaskGen] 或 TaskGen) – 一个TaskGen列表或单个TaskGen

Returns:

任务列表

Return type:

列表

class qlib.workflow.task.gen.TaskGen

生成不同任务的基类

示例 1:

输入:一个特定的任务模板和滚动步骤

输出:任务的滚动版本

示例 2:

输入:一个特定的任务模板和损失列表

输出:一组具有不同损失的任务

abstract generate(task: dict) List[dict]

基于任务模板生成不同的任务

Parameters:

任务 (字典) – 一个任务模板

Returns:

任务列表

Return type:

列表[字典]

qlib.workflow.task.gen.handler_mod(task: dict, rolling_gen)

帮助在使用RollingGen时修改处理程序的结束时间 它尝试处理以下情况

  • Hander的数据end_time早于数据集的test_data的段。

    • 为了处理这个问题,处理程序的数据的结束时间被延长了。

如果处理程序的end_time为None,则无需更改其结束时间。

Parameters:
  • 任务 (字典) – 一个任务模板

  • rg (RollingGen) – RollingGen的一个实例

qlib.workflow.task.gen.trunc_segments(ta: TimeAdjuster, segments: Dict[str, Timestamp], days, test_key='test')

为了避免未来信息的泄露,应根据测试的start_time截断片段

注意

此函数将原地更改段

class qlib.workflow.task.gen.RollingGen(step: int = 40, rtype: str = 'expanding', ds_extra_mod_func: None | ~typing.Callable = <function handler_mod>, test_key='test', train_key='train', trunc_days: int | None = None, task_copy_func: ~typing.Callable = <function deepcopy>)
__init__(step: int = 40, rtype: str = 'expanding', ds_extra_mod_func: None | ~typing.Callable = <function handler_mod>, test_key='test', train_key='train', trunc_days: int | None = None, task_copy_func: ~typing.Callable = <function deepcopy>)

生成滚动任务

Parameters:
  • step (int) – 滚动的步长

  • rtype (str) – 滚动类型(扩展、滑动)

  • ds_extra_mod_func (Callable) – 一个类似的方法:handler_mod(task: dict, rg: RollingGen) 在生成任务后执行一些额外的操作。例如,使用handler_mod来修改数据集处理程序的结束时间。

  • trunc_days (int) – 截断一些数据以避免未来的信息泄露

  • task_copy_func (Callable) – 用于复制整个任务的函数。当用户希望在任务之间共享某些内容时,这非常有用。

gen_following_tasks(task: dict, test_end: Timestamp) List[dict]

task生成以下滚动任务,直到test_end

Parameters:
  • 任务 (字典) – Qlib 任务格式

  • test_end (pd.Timestamp) – 最新的滚动任务包括 test_end

Returns:

以下任务的task`(`task本身除外)

Return type:

列表[字典]

generate(task: dict) List[dict]

将任务转换为滚动任务。

Parameters:

task (dict) –

描述任务的字典。例如。

DEFAULT_TASK = {
    "model": {
        "class": "LGBModel",
        "module_path": "qlib.contrib.model.gbdt",
    },
    "dataset": {
        "class": "DatasetH",
        "module_path": "qlib.data.dataset",
        "kwargs": {
            "handler": {
                "class": "Alpha158",
                "module_path": "qlib.contrib.data.handler",
                "kwargs": {
                    "start_time": "2008-01-01",
                    "end_time": "2020-08-01",
                    "fit_start_time": "2008-01-01",
                    "fit_end_time": "2014-12-31",
                    "instruments": "csi100",
                },
            },
            "segments": {
                "train": ("2008-01-01", "2014-12-31"),
                "valid": ("2015-01-01", "2016-12-20"),  # 请避免将未来的测试数据泄露到验证集中
                "test": ("2017-01-01", "2020-08-01"),
            },
        },
    },
    "record": [
        {
            "class": "SignalRecord",
            "module_path": "qlib.workflow.record_temp",
        },
    ]
}

Returns:

列表[字典]

Return type:

任务列表

class qlib.workflow.task.gen.MultiHorizonGenBase(horizon: List[int] = [5], label_leak_n=2)
__init__(horizon: List[int] = [5], label_leak_n=2)

此任务生成器尝试基于现有任务为不同的时间范围生成任务

Parameters:
  • horizon (List[int]) – 任务可能的时间范围

  • label_leak_n (int) – 预测后需要多少天才能获得完整的标签 例如: - 用户在T天进行预测(在T天获得收盘价后) - 标签是在T + 1天买入股票并在T + 2天卖出股票的回报 - label_leak_n将为2(例如,泄露了两天的信息以利用此样本)

abstract set_horizon(task: dict, hr: int)

此方法旨在就地更改任务

Parameters:
  • 任务 (字典) – Qlib的任务

  • hr (int) – 任务的时间范围

generate(task: dict)

基于任务模板生成不同的任务

Parameters:

任务 (字典) – 一个任务模板

Returns:

任务列表

Return type:

列表[字典]

任务管理器

TaskManager 可以自动获取未使用的任务,并管理一组任务的生命周期,包括错误处理。 这些功能可以并发运行任务,并确保每个任务只被使用一次。 Task Manager 会将所有任务存储在 MongoDB 中。 用户在使用此模块时必须完成 MongoDB 的配置。

TaskManager中的任务由3部分组成 - 任务描述:描述将定义任务 - 任务状态:任务的状态 - 任务结果:用户可以通过任务描述和任务结果获取任务。

class qlib.workflow.task.manage.TaskManager(task_pool: str)

这是当任务由TaskManager创建时,任务的样子

{
    'def': pickle serialized task definition.  using pickle will make it easier
    'filter': json-like data. This is for filtering the tasks.
    'status': 'waiting' | 'running' | 'done'
    'res': pickle serialized task result,
}

任务管理器假设您只会更新您获取的任务。 Mongo的获取和更新操作将确保数据更新的安全性。

这个类可以作为命令行工具使用。以下是几个示例。 您可以使用以下命令查看管理模块的帮助: python -m qlib.workflow.task.manage -h # 显示管理模块CLI的手册 python -m qlib.workflow.task.manage wait -h # 显示管理模块中wait命令的手册

python -m qlib.workflow.task.manage -t <pool_name> wait
python -m qlib.workflow.task.manage -t <pool_name> task_stat

注意

假设:MongoDB中的数据被编码,而从MongoDB中取出的数据被解码

以下是四种状态:

STATUS_WAITING: 等待训练

STATUS_RUNNING: 训练中

STATUS_PART_DONE: 已完成某些步骤,正在等待下一步

STATUS_DONE: 所有工作已完成

__init__(task_pool: str)

初始化任务管理器,记得先声明MongoDB的URL和数据库名称。 一个TaskManager实例服务于一个特定的任务池。 该模块的静态方法服务于整个MongoDB。

Parameters:

task_pool (str) – MongoDB中集合的名称

static list() list

列出数据库的所有集合(task_pool)。

Returns:

列表

replace_task(task, new_task)

使用新任务替换旧任务

Parameters:
  • task – 旧任务

  • new_task – 新任务

insert_task(task)

插入一个任务。

Parameters:

task – 等待插入的任务

Returns:

pymongo.results.InsertOneResult

insert_task_def(task_def)

插入一个任务到任务池

Parameters:

task_def (dict) – 任务定义

Return type:

pymongo.results.InsertOneResult

create_task(task_def_l, dry_run=False, print_nt=False) List[str]

如果task_def_l中的任务是新的,则将新任务插入到task_pool中,并记录inserted_id。 如果任务不是新的,则只需查询其_id。

Parameters:
  • task_def_l (list) – 任务列表

  • dry_run (bool) – 是否将这些新任务插入任务池

  • print_nt (bool) – 如果打印新任务

Returns:

任务定义列表的_id列表

Return type:

列表[str]

fetch_task(query={}, status='waiting') dict

使用查询来获取任务。

Parameters:
  • query (dict, 可选) – 查询字典。默认为 {}。

  • status (str, optional) – [描述]. 默认为 STATUS_WAITING.

Returns:

解码后的任务(集合中的文档)

Return type:

字典

safe_fetch_task(query={}, status='waiting')

使用带有contextmanager的查询从task_pool中获取任务

Parameters:

query (dict) – 查询的字典

Returns:

字典

Return type:

解码后的任务(集合中的文档)

query(query={}, decode=True)

查询集合中的任务。 如果迭代生成器花费太长时间,此函数可能会引发异常 pymongo.errors.CursorNotFound: cursor id not found

python -m qlib.workflow.task.manage -t query ‘{“_id”: “615498be837d0053acbc5d58”}’

Parameters:
  • query (dict) – 查询的字典

  • decode (bool) –

Returns:

字典

Return type:

解码后的任务(集合中的文档)

re_query(_id) dict

使用 _id 查询任务。

Parameters:

_id (str) – 文档的_id

Returns:

解码后的任务(集合中的文档)

Return type:

字典

commit_task_res(task, res, status='done')

将结果提交到 task['res']。

Parameters:
  • 任务 ([类型]) – [描述]

  • res (object) – 你想要保存的结果

  • status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_DONE。

return_task(task, status='waiting')

将任务返回到状态。通常用于错误处理。

Parameters:
  • 任务 ([类型]) – [描述]

  • status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_WAITING。

remove(query={})

使用查询删除任务

Parameters:

query (dict) – 查询的字典

task_stat(query={}) dict

计算每个状态中的任务数量。

Parameters:

query (dict, 可选) – 查询字典。默认为 {}。

Returns:

字典

reset_waiting(query={})

将所有正在运行的任务重置为等待状态。当某些正在运行的任务意外退出时可以使用。

Parameters:

query (dict, 可选) – 查询字典。默认为 {}。

prioritize(task, priority: int)

设置任务的优先级

Parameters:
  • 任务 (字典) – 来自数据库的任务查询

  • priority (int) – 目标优先级

wait(query={})

在多进程处理时,主进程可能从TaskManager中获取不到任何内容,因为仍然有一些任务在运行。 因此,主进程应等待,直到所有任务都被其他进程或机器训练好。

Parameters:

query (dict, 可选) – 查询字典。默认为 {}。

qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)

当任务池不为空(有等待中的任务)时,使用 task_func 从 task_pool 中获取并运行任务

运行此方法后,有以下4种情况(before_status -> after_status):

STATUS_WAITING -> STATUS_DONE: 使用 task["def"] 作为 task_func 参数,这意味着任务尚未开始

STATUS_WAITING -> STATUS_PART_DONE: 使用 task["def"] 作为 task_func 参数

STATUS_PART_DONE -> STATUS_PART_DONE: 使用 task[“res”] 作为 task_func 参数,这意味着任务已开始但未完成

STATUS_PART_DONE -> STATUS_DONE: 使用 task[“res”] 作为 task_func 参数

Parameters:
  • task_func (Callable) –

    def (task_def, **kwargs) ->

    运行任务的函数

  • task_pool (str) – 任务池的名称(MongoDB中的集合)

  • query (dict) – 在获取任务时将使用此字典来查询任务池

  • force_release (bool) – 程序是否会强制释放资源

  • before_status (str:) – 将获取并训练处于before_status状态的任务。可以是STATUS_WAITING, STATUS_PART_DONE。

  • after_status (str:) – 训练后的任务将变为after_status。可以是STATUS_WAITING, STATUS_PART_DONE。

  • kwargstask_func的参数

训练器

训练器将训练一系列任务并返回一系列模型记录器。 每个训练器包括两个步骤:train(创建模型记录器)和end_train(修改模型记录器)。

这是一个名为DelayTrainer的概念,它可以用于在线模拟并行训练。 在DelayTrainer中,第一步只是将一些必要的信息保存到模型记录器中,而第二步将在最后完成,可以执行一些并发且耗时的操作,例如模型拟合。

Qlib 提供了两种训练器,TrainerR 是最简单的方式,而 TrainerRM 基于 TaskManager 来自动管理任务的生命周期。

qlib.model.trainer.begin_task_train(task_config: dict, experiment_name: str, recorder_name: str | None = None) Recorder

开始任务训练以启动记录器并保存任务配置。

Parameters:
  • task_config (dict) – 任务的配置

  • experiment_name (str) – 实验的名称

  • recorder_name (str) – 给定的名称将作为记录器名称。如果为None,则使用rid。

Returns:

模型记录器

Return type:

Recorder

qlib.model.trainer.end_task_train(rec: Recorder, experiment_name: str) Recorder

完成带有真实模型拟合和保存的任务训练。

Parameters:
  • rec (Recorder) – 记录器将被恢复

  • experiment_name (str) – 实验的名称

Returns:

模型记录器

Return type:

Recorder

qlib.model.trainer.task_train(task_config: dict, experiment_name: str, recorder_name: str | None = None) Recorder

基于任务的培训将分为两个步骤。

Parameters:
  • task_config (dict) – 任务的配置。

  • experiment_name (str) – 实验的名称

  • recorder_name (str) – 记录器的名称

Returns:

记录器

Return type:

记录器的实例

class qlib.model.trainer.Trainer

训练器可以训练一系列模型。 有Trainer和DelayTrainer,可以通过它们何时完成实际训练来区分。

__init__()
train(tasks: list, *args, **kwargs) list

给定一系列任务定义,开始训练,并返回模型。

对于Trainer,它在这个方法中完成实际的训练。 对于DelayTrainer,它在这个方法中只做一些准备工作。

Parameters:

tasks – 任务列表

Returns:

模型列表

Return type:

列表

end_train(models: list, *args, **kwargs) list

给定一个模型列表,在训练结束时如果需要完成某些操作。 这些模型可能是Recorder、文本文件、数据库等。

对于Trainer,它在这个方法中进行一些收尾工作。 对于DelayTrainer,它在这个方法中完成实际的训练。

Parameters:

models – 模型列表

Returns:

模型列表

Return type:

列表

is_delay() bool

如果Trainer将延迟完成end_train

Returns:

如果 DelayTrainer

Return type:

布尔

has_worker() bool

一些训练器有后端工作器来支持并行训练 这个方法可以判断工作器是否启用。

Returns:

如果工作者已启用

Return type:

布尔

worker()

启动工作器

Raises:

NotImplementedError: – 如果工作器不受支持

class qlib.model.trainer.TrainerR(experiment_name: str | None = None, train_func: ~typing.Callable = <function task_train>, call_in_subproc: bool = False, default_rec_name: str | None = None)

基于(R)ecorder的训练器。 它将线性地训练一系列任务并返回一系列模型记录器。

假设:模型由task定义,结果将保存到Recorder

__init__(experiment_name: str | None = None, train_func: ~typing.Callable = <function task_train>, call_in_subproc: bool = False, default_rec_name: str | None = None)

初始化 TrainerR。

Parameters:
  • experiment_name (str, optional) – 实验的默认名称。

  • train_func (Callable, optional) – 默认的训练方法。默认为 task_train

  • call_in_subproc (bool) – 在子进程中调用该过程以强制释放内存

train(tasks: list, train_func: Callable | None = None, experiment_name: str | None = None, **kwargs) List[Recorder]

给定一个任务列表并返回一个训练好的Recorder列表。顺序可以得到保证。

Parameters:
  • 任务 (列表) – 基于任务字典的定义列表

  • train_func (Callable) – 需要至少包含tasksexperiment_name的训练方法。如果为None,则使用默认的训练方法。

  • experiment_name (str) – 实验名称,None表示使用默认名称。

  • kwargs – train_func的参数。

Returns:

记录器列表

Return type:

列表[Recorder]

end_train(models: list, **kwargs) List[Recorder]

将STATUS_END标签设置到记录器。

Parameters:

models (list) – 训练好的记录器列表。

Returns:

与参数相同的列表。

Return type:

列表[Recorder]

class qlib.model.trainer.DelayTrainerR(experiment_name: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, **kwargs)

基于TrainerR的延迟实现,这意味着train方法可能只做一些准备工作,而end_train方法可以进行实际的模型拟合。

__init__(experiment_name: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, **kwargs)

初始化 TrainerRM。

Parameters:
  • experiment_name (str) – 实验的默认名称。

  • train_func (Callable, optional) – 默认的训练方法。默认为 begin_task_train

  • end_train_func (Callable, optional) – 默认的结束训练方法。默认为 end_task_train

end_train(models, end_train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]

给定一个Recorder列表并返回一个训练过的Recorder列表。 该类将完成实际数据加载和模型拟合。

Parameters:
  • models (列表) – 一个Recorder的列表,任务已经保存到它们中

  • end_train_func (Callable, optional) – 需要至少包含recordersexperiment_name的end_train方法。默认为None,使用self.end_train_func。

  • experiment_name (str) – 实验名称,None表示使用默认名称。

  • kwargs – end_train_func 的参数。

Returns:

记录器列表

Return type:

列表[Recorder]

class qlib.model.trainer.TrainerRM(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function task_train>, skip_run_task: bool = False, default_rec_name: str | None = None)

基于(R)ecorder和Task(M)anager的训练器。 它可以以多进程的方式训练一系列任务并返回一系列模型记录器。

假设:task 将被保存到 TaskManager 中,并且 task 将从 TaskManager 中获取并进行训练

__init__(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function task_train>, skip_run_task: bool = False, default_rec_name: str | None = None)

初始化 TrainerR。

Parameters:
  • experiment_name (str) – 实验的默认名称。

  • task_pool (str) – TaskManager中的任务池名称。如果为None,则使用与experiment_name相同的名称。

  • train_func (Callable, optional) – 默认的训练方法。默认为 task_train

  • skip_run_task (bool) – 如果 skip_run_task == True: 仅在 worker 中运行 run_task。否则跳过 run_task。

train(tasks: list, train_func: Callable | None = None, experiment_name: str | None = None, before_status: str = 'waiting', after_status: str = 'done', default_rec_name: str | None = None, **kwargs) List[Recorder]

给定一个任务列表并返回一个训练好的Recorder列表。顺序可以得到保证。

此方法默认为单进程,但TaskManager提供了一种并行训练的好方法。 用户可以自定义他们的train_func以实现多进程甚至多机器。

Parameters:
  • 任务 (列表) – 基于任务字典的定义列表

  • train_func (Callable) – 需要至少包含tasksexperiment_name的训练方法。如果为None,则使用默认的训练方法。

  • experiment_name (str) – 实验名称,None表示使用默认名称。

  • before_status (str) – 将获取并训练处于before_status状态的任务。可以是STATUS_WAITING, STATUS_PART_DONE。

  • after_status (str) – 训练后的任务将变为after_status。可以是STATUS_WAITING, STATUS_PART_DONE。

  • kwargs – train_func的参数。

Returns:

记录器列表

Return type:

列表[Recorder]

end_train(recs: list, **kwargs) List[Recorder]

将STATUS_END标签设置到记录器。

Parameters:

recs (list) – 训练好的记录器列表。

Returns:

与参数相同的列表。

Return type:

列表[Recorder]

worker(train_func: Callable | None = None, experiment_name: str | None = None)

train的多进程方法。它可以与train共享同一个task_pool,并且可以在其他进程或其他机器上运行。

Parameters:
  • train_func (Callable) – 需要至少包含tasksexperiment_name的训练方法。如果为None,则使用默认的训练方法。

  • experiment_name (str) – 实验名称,None表示使用默认名称。

has_worker() bool

一些训练器有后端工作器来支持并行训练 这个方法可以判断工作器是否启用。

Returns:

如果工作者已启用

Return type:

布尔

class qlib.model.trainer.DelayTrainerRM(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, skip_run_task: bool = False, **kwargs)

基于TrainerRM的延迟实现,这意味着train方法可能只做一些准备工作,而end_train方法可以进行实际的模型拟合。

__init__(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, skip_run_task: bool = False, **kwargs)

初始化 DelayTrainerRM。

Parameters:
  • experiment_name (str) – 实验的默认名称。

  • task_pool (str) – TaskManager中的任务池名称。如果为None,则使用与experiment_name相同的名称。

  • train_func (Callable, optional) – 默认的训练方法。默认为 begin_task_train

  • end_train_func (Callable, optional) – 默认的结束训练方法。默认为 end_task_train

  • skip_run_task (bool) – 如果 skip_run_task == True: 仅在 worker 中运行 run_task。否则跳过 run_task。 例如,在 CPU VM 上启动 trainer,然后等待在 GPU VM 上完成任务。

train(tasks: list, train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]

与TrainerRM的train相同,after_status将为STATUS_PART_DONE。

Parameters:
  • 任务 (列表) – 基于任务字典的定义列表

  • train_func (Callable) – 需要至少包含tasksexperiment_name的训练方法。默认值为None,表示使用self.train_func。

  • experiment_name (str) – 实验名称,None表示使用默认名称。

Returns:

记录器列表

Return type:

列表[Recorder]

end_train(recs, end_train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]

给定一个Recorder列表并返回一个训练过的Recorder列表。 该类将完成实际数据加载和模型拟合。

Parameters:
  • recs (list) – 一个Recorder列表,任务已经保存到它们中。

  • end_train_func (Callable, optional) – 需要至少包含recordersexperiment_name的end_train方法。默认为None,使用self.end_train_func。

  • experiment_name (str) – 实验名称,None表示使用默认名称。

  • kwargs – end_train_func 的参数。

Returns:

记录器列表

Return type:

列表[Recorder]

worker(end_train_func=None, experiment_name: str | None = None)

end_train的多处理方法。它可以与end_train共享同一个任务池,并且可以在其他进程或其他机器上运行。

Parameters:
  • end_train_func (Callable, optional) – 需要至少包含recordersexperiment_name的end_train方法。默认为None,使用self.end_train_func。

  • experiment_name (str) – 实验名称,None表示使用默认名称。

has_worker() bool

一些训练器有后端工作器来支持并行训练 这个方法可以判断工作器是否启用。

Returns:

如果工作者已启用

Return type:

布尔

收集器

收集器模块可以从各处收集对象并对其进行处理,例如合并、分组、求平均值等。

class qlib.workflow.task.collect.Collector(process_list=[])

收集不同结果的收集器

__init__(process_list=[])

初始化收集器。

Parameters:

process_list (listCallable) – 处理字典的处理器列表或处理器实例。

collect() dict

收集结果并返回一个类似 {key: things} 的字典

Returns:

收集后的字典。

例如:

{“prediction”: pd.Series}

{“IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}

Return type:

字典

static process_collect(collected_dict, process_list=[], *args, **kwargs) dict

对collect返回的字典进行一系列处理,并返回一个类似{key: things}的字典 例如,你可以进行分组和集成。

Parameters:
  • collected_dict (dict) – 由collect返回的字典

  • process_list (listCallable) – 处理字典的处理器列表或处理器实例。 处理器的顺序与列表顺序相同。 例如:[Group1(…, Ensemble1()), Group2(…, Ensemble2())]

Returns:

处理后的字典。

Return type:

字典

class qlib.workflow.task.collect.MergeCollector(collector_dict: Dict[str, Collector], process_list: List[Callable] = [], merge_func=None)

一个收集器,用于收集其他收集器的结果

例如:

我们有两个收集器,分别命名为A和B。 A可以收集{“prediction”: pd.Series},B可以收集{“IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}。 然后在这个类的收集之后,我们可以收集{“A_prediction”: pd.Series, “B_IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}

__init__(collector_dict: Dict[str, Collector], process_list: List[Callable] = [], merge_func=None)

初始化 MergeCollector。

Parameters:
  • collector_dict (Dict[str,Collector]) – 类似 {collector_key, Collector} 的字典

  • process_list (List[Callable]) – 处理字典的处理器列表或处理器实例。

  • merge_func (Callable) – 一个用于生成最外层键的方法。给定的参数是来自collector_dict的collector_key和每个收集器收集后的key。 如果为None,则使用元组连接它们,例如“ABC”+(“a”,”b”) -> (“ABC”, (“a”,”b”))。

collect() dict

收集collector_dict的所有结果,并将最外层的键更改为重组键。

Returns:

收集后的字典。

Return type:

字典

class qlib.workflow.task.collect.RecorderCollector(experiment, process_list=[], rec_key_func=None, rec_filter_func=None, artifacts_path={'pred': 'pred.pkl'}, artifacts_key=None, list_kwargs={}, status: Iterable = {'FINISHED'})
__init__(experiment, process_list=[], rec_key_func=None, rec_filter_func=None, artifacts_path={'pred': 'pred.pkl'}, artifacts_key=None, list_kwargs={}, status: Iterable = {'FINISHED'})

初始化 RecorderCollector。

Parameters:
  • 实验 – (实验或字符串): 一个实验的实例或实验的名称 (可调用): 一个可调用的函数,返回一个实验列表

  • process_list (listCallable) – 处理字典的处理器列表或处理器实例。

  • rec_key_func (Callable) – 一个用于获取记录器键的函数。如果为None,则使用记录器ID。

  • rec_filter_func (Callable, optional) – 通过返回True或False来过滤记录器。默认为None。

  • artifacts_path (dict, optional) – Recorder 中的工件名称及其路径。默认为 {“pred”: “pred.pkl”, “IC”: “sig_analysis/ic.pkl”}。

  • artifacts_key (strList, 可选) – 你想要获取的artifacts key。如果为None,则获取所有artifacts。

  • list_kwargs (str) – list_recorders函数的参数。

  • status (Iterable) – 仅收集具有特定状态的记录器。None 表示收集所有记录器

collect(artifacts_key=None, rec_filter_func=None, only_exist=True) dict

根据过滤后的记录器收集不同的工件。

Parameters:
  • artifacts_key (strList, 可选) – 你想要获取的artifacts key。如果为None,则使用默认值。

  • rec_filter_func (Callable, optional) – 通过返回True或False来过滤记录器。如果为None,则使用默认值。

  • only_exist (bool, optional) – 如果仅在记录器确实存在时收集工件。 如果为True,加载时出现异常的记录器将不会被收集。但如果为False,它将引发异常。

Returns:

收集后的字典类似于 {artifact: {rec_key: object}}

Return type:

字典

get_exp_name() str

获取实验名称

Returns:

实验名称

Return type:

字符串

分组

Group可以根据group_func对一组对象进行分组,并将它们转换为字典。 分组后,我们提供了一种方法来减少它们。

例如:

分组: {(A,B,C1): 对象, (A,B,C2): 对象} -> {(A,B): {C1: 对象, C2: 对象}} 归约: {(A,B): {C1: 对象, C2: 对象}} -> {(A,B): 对象}

class qlib.model.ens.group.Group(group_func=None, ens: Ensemble | None = None)

根据字典对对象进行分组

__init__(group_func=None, ens: Ensemble | None = None)

初始化组。

Parameters:
  • group_func (Callable, optional) –

    给定一个字典并返回组键和其中一个组元素。

    例如:{(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}

  • 无。 (默认值为) –

  • ens (Ensemble, optional) – 如果不为None,则在分组后对分组值进行集成。

group(*args, **kwargs) dict

将一组对象分组并将它们转换为字典。

例如:{(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}

Returns:

分组字典

Return type:

字典

reduce(*args, **kwargs) dict

减少分组字典。

例如:{(A,B): {C1: object, C2: object}} -> {(A,B): object}

Returns:

简化字典

Return type:

字典

class qlib.model.ens.group.RollingGroup(ens=<qlib.model.ens.ensemble.RollingEnsemble object>)

对滚动字典进行分组

group(rolling_dict: dict) dict

给定一个滚动字典如 {(A,B,R): things},返回分组后的字典如 {(A,B): {R:things}}

注意:有一个假设,即滚动键位于键元组的末尾,因为滚动结果总是需要首先进行集成。

Parameters:

rolling_dict (dict) – 一个滚动的字典。如果键不是元组,则不执行任何操作。

Returns:

分组字典

Return type:

字典

__init__(ens=<qlib.model.ens.ensemble.RollingEnsemble object>)

初始化组。

Parameters:
  • group_func (Callable, optional) –

    给定一个字典并返回组键和其中一个组元素。

    例如:{(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}

  • 无。 (默认值为) –

  • ens (Ensemble, optional) – 如果不为None,则在分组后对分组值进行集成。

集成

集成模块可以合并集成中的对象。例如,如果有许多子模型的预测,我们可能需要将它们合并成一个集成预测。

class qlib.model.ens.ensemble.Ensemble

将ensemble_dict合并到一个ensemble对象中。

例如:{Rollinga_b: 对象, Rollingb_c: 对象} -> 对象

当调用这个类时:

Args:

ensemble_dict (dict): 等待合并的集成字典,如 {name: things}

Returns:

对象:集成对象

class qlib.model.ens.ensemble.SingleKeyEnsemble

如果字典中只有一个键和值,则提取该对象。使结果更易读。 {Only key: Only value} -> Only value

如果有超过1个键或少于1个键,则不执行任何操作。 你甚至可以递归地运行这个操作,使字典更易读。

注意:默认情况下递归运行。

当调用这个类时:

Args:

ensemble_dict (dict): 字典。字典的键将被忽略。

Returns:

dict: 可读的字典。

class qlib.model.ens.ensemble.RollingEnsemble

将类似predictionIC的滚动数据框字典合并为一个整体。

注意:字典的值必须是pd.DataFrame,并且具有索引“datetime”。

当调用这个类时:

Args:

ensemble_dict (dict): 一个类似 {“A”: pd.DataFrame, “B”: pd.DataFrame} 的字典。 字典的键将被忽略。

Returns:

pd.DataFrame: 滚动操作的完整结果。

class qlib.model.ens.ensemble.AverageEnsemble

将形状相同的字典数据框(如predictionIC)进行平均和标准化,整合成一个集合。

注意:字典的值必须是pd.DataFrame,并且具有索引“datetime”。如果是嵌套字典,则需要将其展平。

当调用这个类时:

Args:

ensemble_dict (dict): 一个类似 {“A”: pd.DataFrame, “B”: pd.DataFrame} 的字典。 字典的键将被忽略。

Returns:

pd.DataFrame: 平均化和标准化的完整结果。

工具

一些用于任务管理的工具。

qlib.workflow.task.utils.get_mongodb() Database

获取MongoDB中的数据库,这意味着你需要首先声明数据库的地址和名称。

例如:

使用 qlib.init():

mongo_conf = {
    "task_url": task_url,  # your MongoDB url
    "task_db_name": task_db_name,  # database name
}
qlib.init(..., mongo=mongo_conf)

在qlib.init()之后:

C["mongo"] = {
    "task_url" : "mongodb://localhost:27017/",
    "task_db_name" : "rolling_db"
}
Returns:

数据库实例

Return type:

数据库

qlib.workflow.task.utils.list_recorders(experiment, rec_filter_func=None)

列出实验中可以通过过滤器的所有记录器。

Parameters:
  • 实验 (strExperiment) – 实验的名称或实例

  • rec_filter_func (Callable, optional) – 返回True以保留给定的记录器。默认为None。

Returns:

过滤后的字典 {rid: recorder}。

Return type:

字典

class qlib.workflow.task.utils.TimeAdjuster(future=True, end_time=None)

找到合适的日期并调整日期。

__init__(future=True, end_time=None)
set_end_time(end_time=None)

设置结束时间。如果为None,则使用日历的结束时间。

Parameters:

end_time

get(idx: int)

通过索引获取日期时间。

Parameters:

idx (int) – 日历的索引

max() Timestamp

返回最大日历日期时间

align_idx(time_point, tp_type='start') int

对齐日历中时间点的索引。

Parameters:
  • time_point

  • tp_type (str) –

Returns:

索引

Return type:

整数

cal_interval(time_point_A, time_point_B) int

计算交易日间隔(time_point_A - time_point_B)

Parameters:
  • time_point_A – time_point_A

  • time_point_B – time_point_B(是time_point_A的过去)

Returns:

A和B之间的间隔

Return type:

整数

align_time(time_point, tp_type='start') Timestamp

将时间点对齐到日历的交易日期

Parameters:
  • time_point – 时间点

  • tp_type – str 时间点类型 (“start”, “end”)

Returns:

pd.Timestamp

align_seg(segment: dict | tuple) dict | tuple

将给定日期对齐到交易日

例如:

input: {'train': ('2008-01-01', '2014-12-31'), 'valid': ('2015-01-01', '2016-12-31'), 'test': ('2017-01-01', '2020-08-01')}

output: {'train': (Timestamp('2008-01-02 00:00:00'), Timestamp('2014-12-31 00:00:00')),
        'valid': (Timestamp('2015-01-05 00:00:00'), Timestamp('2016-12-30 00:00:00')),
        'test': (Timestamp('2017-01-03 00:00:00'), Timestamp('2020-07-31 00:00:00'))}
Parameters:

segment

Returns:

Union[dict, tuple]

Return type:

开始和结束交易日期(pd.Timestamp)在给定的开始和结束日期之间。

truncate(segment: tuple, test_start, days: int) tuple

根据test_start日期截断段

Parameters:
  • segment (tuple) – 时间段

  • test_start

  • days (int) – 需要截断的交易天数 此段数据可能需要‘days’天的数据 days 是基于 test_start 的。 例如,如果标签包含未来2天的信息,预测范围为1天。 (例如,预测目标是 Ref($close, -2)/Ref($close, -1) - 1) 天数应为 2 + 1 == 3 天。

Returns:

元组

Return type:

新段落

shift(seg: tuple, step: int, rtype='sliding') tuple

调整时间段的时间

如果段中存在None(表示无界索引),此方法将返回None。

Parameters:
  • seg – 日期时间片段

  • step (int) – 滚动步长

  • rtype (str) – 滚动类型(“滑动”或“扩展”)

Returns:

元组

Return type:

新段落

Raises:

KeyError: – 如果索引(包括开始和结束)超出self.cal范围,shift将引发错误

qlib.workflow.task.utils.replace_task_handler_with_cache(task: dict, cache_dir: str | Path = '.') dict

将任务中的处理程序替换为缓存处理程序。 它将自动缓存文件并将其保存在cache_dir中。

>>> import qlib
>>> qlib.auto_init()
>>> import datetime
>>> # it is simplified task
>>> task = {"dataset": {"kwargs":{'handler': {'class': 'Alpha158', 'module_path': 'qlib.contrib.data.handler', 'kwargs': {'start_time': datetime.date(2008, 1, 1), 'end_time': datetime.date(2020, 8, 1), 'fit_start_time': datetime.date(2008, 1, 1), 'fit_end_time': datetime.date(2014, 12, 31), 'instruments': 'CSI300'}}}}}
>>> new_task = replace_task_handler_with_cache(task)
>>> print(new_task)
{'dataset': {'kwargs': {'handler': 'file...Alpha158.3584f5f8b4.pkl'}}}

在线服务

在线管理器

OnlineManager 可以管理一组 Online Strategy 并动态运行它们。

随着时间的推移,决定性模型也会发生变化。在这个模块中,我们称那些贡献模型为在线模型。 在每个例程中(例如每天或每分钟),在线模型可能会发生变化,并且它们的预测需要更新。 因此,这个模块提供了一系列方法来控制这个过程。

该模块还提供了一种方法来模拟历史中的在线策略。这意味着你可以验证你的策略或找到一个更好的策略。

在不同情况下使用不同的训练器共有4种情况:

情况

描述

在线 + 培训师

当你想进行一个真实的例行程序时,训练器将帮助你训练模型。它将逐任务、逐策略地训练模型。

在线 + 延迟训练器

DelayTrainer 将跳过具体训练,直到所有任务已由不同策略准备完毕。它使用户可以在 routinefirst_train 结束时并行训练所有任务。否则,这些函数将在每个策略准备任务时卡住。

模拟 + 训练器

它的行为将与在线 + 训练器相同。唯一的区别是它用于模拟/回测,而不是在线交易。

模拟 + 延迟训练器

当你的模型没有任何时间依赖性时,你可以使用DelayTrainer来实现多任务处理的能力。这意味着所有例程中的所有任务都可以在模拟结束时进行REAL训练。信号将在不同的时间段准备好(基于是否有任何新模型在线)。

以下是一些伪代码,展示了每种情况的工作流程

For simplicity
  • 策略中仅使用一种策略

  • update_online_pred 仅在在线模式下调用,其他情况下会被忽略

  1. 在线 + 培训师

tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # for each strategy
    strategy.prepare_online_models(models)  # for each strategy

    trainer.end_train(models)
    prepare_signals()  # prepare trading signals daily

在线 + 延迟训练器: 工作流程与 在线 + 训练器 相同。

  1. 模拟 + 延迟训练器

# simulate
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # for each strategy
    strategy.prepare_online_models(models)  # for each strategy
# delay_prepare()
# FIXME: Currently the delay_prepare is not implemented in a proper way.
trainer.end_train(<for all previous models>)
prepare_signals()

# 我们能否简化当前的工作流程?

  • 可以减少任务的状态数量吗?

    • 对于每个任务,我们有三个阶段(即任务、部分训练任务、最终训练任务)

class qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')

OnlineManager 可以使用 Online Strategy 管理在线模型。 它还提供了哪些模型在什么时间在线的历史记录。

__init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')

初始化在线管理器。 一个在线管理器必须至少有一个在线策略。

Parameters:
  • 策略 (联合[OnlineStrategy, 列表[OnlineStrategy]]) – 一个OnlineStrategy实例或一个OnlineStrategy列表

  • begin_time (Union[str,pd.Timestamp], optional) – OnlineManager 将从这个时间开始。默认为 None,表示使用最近的日期。

  • trainer (qlib.model.trainer.Trainer) – 用于训练任务的训练器。如果为None,则使用TrainerR。

  • freq (str, 可选) – 数据频率。默认为“day”。

first_train(strategies: List[OnlineStrategy] | None = None, model_kwargs: dict = {})

从每个策略的first_tasks方法中获取任务并训练它们。 如果使用DelayTrainer,它可以在每个策略的first_tasks之后一起完成所有训练。

Parameters:
  • 策略 (列表[OnlineStrategy]) – 策略列表(添加策略时需要此参数)。如果为None,则使用默认策略。

  • model_kwargs (dict) – prepare_online_models的参数

routine(cur_time: str | Timestamp | None = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})

每个策略的典型更新过程并记录在线历史。

典型的更新过程在例行程序之后,例如每天或每月。 过程是:更新预测 -> 准备任务 -> 准备在线模型 -> 准备信号。

如果使用DelayTrainer,它可以在每个策略的prepare_tasks之后一起完成训练。

Parameters:
  • cur_time (Union[str,pd.Timestamp], optional) – 在此时间运行例行方法。默认为 None。

  • task_kwargs (dict) – prepare_tasks的参数

  • model_kwargs (dict) – prepare_online_models的参数

  • signal_kwargs (dict) – prepare_signals 的参数

get_collector(**kwargs) MergeCollector

获取Collector的实例,以从每个策略中收集结果。 这个收集器可以作为信号准备的基础。

Parameters:

**kwargs – get_collector 的参数。

Returns:

用于合并其他收集器的收集器。

Return type:

MergeCollector

add_strategy(strategies: OnlineStrategy | List[OnlineStrategy])

向OnlineManager添加一些新策略。

Parameters:

策略 (联合[在线策略, 列表[在线策略]]) – 一个在线策略的列表

prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)

在准备好上一次例行程序的数据(箱线图中的一个箱子)后,这意味着例行程序的结束,我们可以为下一个例行程序准备交易信号。

注意:给定一组预测,这些预测结束时间之前的所有信号都将被妥善准备。

即使最新的信号已经存在,最新的计算结果也会被覆盖。

注意

给定某个时间的预测,在此之前的所有信号都将准备就绪。

Parameters:
  • prepare_func (Callable, optional) – 在收集后从字典中获取信号。默认为 AverageEnsemble(),由 MergeCollector 收集的结果必须是 {xxx:pred}。

  • over_write (bool, optional) – 如果为True,新信号将覆盖原有信号。如果为False,新信号将追加到信号末尾。默认为False。

Returns:

信号。

Return type:

pd.DataFrame

get_signals() Series | DataFrame

获取准备好的在线信号。

Returns:

pd.Series 用于每个日期时间只有一个信号。 pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。

Return type:

联合[pd.Series, pd.DataFrame]

simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) Series | DataFrame

从当前时间开始,此方法将模拟OnlineManager中的每个例程,直到结束时间。

考虑到并行训练,模型和信号可以在所有常规模拟之后准备。

延迟训练的方式可以是 DelayTrainer,而延迟准备信号的方式可以是 delay_prepare

Parameters:
  • end_time – 模拟结束的时间

  • frequency – 日历频率

  • task_kwargs (dict) – prepare_tasks的参数

  • model_kwargs (dict) – prepare_online_models的参数

  • signal_kwargs (dict) – prepare_signals 的参数

Returns:

pd.Series 用于每个日期时间只有一个信号。 pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。

Return type:

联合[pd.Series, pd.DataFrame]

delay_prepare(model_kwargs={}, signal_kwargs={})

如果某些内容正在等待准备,请准备所有模型和信号。

Parameters:
  • model_kwargsend_train的参数

  • signal_kwargsprepare_signals的参数

在线策略

OnlineStrategy 模块是在线服务的一个组成部分。

class qlib.workflow.online.strategy.OnlineStrategy(name_id: str)

OnlineStrategy 正在与 Online Manager 合作,响应任务的生成方式、模型的更新以及信号的准备。

__init__(name_id: str)

初始化在线策略。 此模块必须使用Trainer来完成模型训练。

Parameters:
  • name_id (str) – 一个唯一的名称或ID。

  • trainer (qlib.model.trainer.Trainer, optional) – 一个Trainer的实例。默认为None。

prepare_tasks(cur_time, **kwargs) List[dict]

在例程结束后,检查我们是否需要根据cur_time(None表示最新)准备和训练一些新任务。 返回等待训练的新任务。

您可以通过OnlineTool.online_models找到最新的在线模型。

prepare_online_models(trained_models, cur_time=None) List[object]

从训练好的模型中选择一些模型并将它们设置为在线模型。 这是一个典型的将所有训练好的模型上线的方法,你可以覆盖它以实现更复杂的方法。 如果你仍然需要它们,你可以通过OnlineTool.online_models找到最后上线的模型。

注意:将所有在线模型重置为训练好的模型。如果没有训练好的模型,则不执行任何操作。

NOTE:

当前的实现非常朴素。这里有一个更复杂的情况,更接近实际场景。 1. 在test_start前一天(时间戳T)训练新模型 2. 在test_start(通常在时间戳T + 1)切换模型

Parameters:
  • 模型 (列表) – 一个模型列表。

  • cur_time (pd.Dataframe) – 来自OnlineManger的当前时间。如果为None,则表示最新时间。

Returns:

在线模型列表。

Return type:

列表[对象]

first_tasks() List[dict]

首先生成一系列任务并返回它们。

get_collector() Collector

获取Collector的实例,以收集此策略的不同结果。

For example:
  1. 在Recorder中收集预测

  2. 在txt文件中收集信号

Returns:

Collector

class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

此示例策略始终使用最新的滚动模型 sas 在线模型。

__init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

初始化滚动策略。

假设:name_id的字符串、实验名称和训练者的实验名称是相同的。

Parameters:
  • name_id (str) – 一个唯一的名称或ID。也将是实验的名称。

  • task_template (Union[dict, List[dict]]) – 一个任务模板列表或单个模板,将用于通过rolling_gen生成多个任务。

  • rolling_gen (RollingGen) – RollingGen的一个实例

get_collector(process_list=[<qlib.model.ens.group.RollingGroup object>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)

获取Collector的实例以收集结果。返回的收集器必须区分不同模型的结果。

假设:模型可以根据模型名称和滚动测试段进行区分。 如果您不希望使用此假设,请实现您的方法或使用另一个rec_key_func。

Parameters:
  • rec_key_func (Callable) – 一个用于获取记录器键的函数。如果为None,则使用记录器ID。

  • rec_filter_func (Callable, optional) – 通过返回True或False来过滤记录器。默认为None。

  • artifacts_key (List[str], optional) – 你想要获取的artifacts键。如果为None,则获取所有artifacts。

first_tasks() List[dict]

使用rolling_gen基于task_template生成不同的任务。

Returns:

任务列表

Return type:

列表[字典]

prepare_tasks(cur_time) List[dict]

根据cur_time准备新任务(None表示最新的)。

您可以通过OnlineToolR.online_models找到最新的在线模型。

Returns:

新任务的列表。

Return type:

列表[字典]

在线工具

OnlineTool 是一个用于设置和取消设置一系列在线模型的模块。 在线模型是在某些时间点上的一些决定性模型,这些模型可以随时间的变化而变化。 这使得我们能够使用高效的子模型来适应市场风格的变化。

class qlib.workflow.online.utils.OnlineTool

OnlineTool 将管理实验中的 在线 模型,这些实验包括模型记录器。

__init__()

初始化在线工具。

set_online_tag(tag, recorder: list | object)

tag设置为模型以标记是否在线。

Parameters:
  • tag (str) – ONLINE_TAGOFFLINE_TAG 中的标签

  • recorder (Union[list,object]) – 模型的记录器

get_online_tag(recorder: object) str

给定一个模型记录器并返回其在线标签。

Parameters:

recorder (Object) – 模型的记录器

Returns:

在线标签

Return type:

字符串

reset_online_tag(recorder: list | object)

将所有模型离线并将记录器设置为“在线”。

Parameters:

recorder (Union[list,object]) – 您想要重置为“在线”的记录器。

online_models() list

获取当前的在线模型

Returns:

一个在线模型的列表。

Return type:

列表

update_online_pred(to_date=None)

更新在线模型的预测到to_date。

Parameters:

to_date (pd.Timestamp) – 在此日期之前的预测将被更新。如果为None,则更新到最新。

class qlib.workflow.online.utils.OnlineToolR(default_exp_name: str | None = None)

基于(R)ecorder的OnlineTool实现。

__init__(default_exp_name: str | None = None)

初始化 OnlineToolR。

Parameters:

default_exp_name (str) – 默认的实验名称。

set_online_tag(tag, recorder: Recorder | List)

tag设置为模型的记录器,以标记是否在线。

Parameters:
  • tag (str) – ONLINE_TAG, NEXT_ONLINE_TAG, OFFLINE_TAG中的标签

  • recorder (Union[Recorder, List]) – Recorder的列表或Recorder的实例

get_online_tag(recorder: Recorder) str

给定一个模型记录器并返回其在线标签。

Parameters:

recorder (Recorder) – 一个记录器的实例

Returns:

在线标签

Return type:

字符串

reset_online_tag(recorder: Recorder | List, exp_name: str | None = None)

将所有模型离线并将记录器设置为“在线”。

Parameters:
  • recorder (Union[Recorder, List]) – 你想要重置为“在线”的recorder。

  • exp_name (str) – 实验名称。如果为None,则使用default_exp_name。

online_models(exp_name: str | None = None) list

获取当前的在线模型

Parameters:

exp_name (str) – 实验名称。如果为None,则使用default_exp_name。

Returns:

一个在线模型的列表。

Return type:

列表

update_online_pred(to_date=None, from_date=None, exp_name: str | None = None)

更新在线模型的预测至to_date。

Parameters:
  • to_date (pd.Timestamp) – 在此日期之前的预测将被更新。如果为None,则更新到日历中的最新时间。

  • exp_name (str) – 实验名称。如果为None,则使用default_exp_name。

记录更新器

Updater 是一个模块,用于在股票数据更新时更新诸如预测之类的工件。

class qlib.workflow.online.update.RMDLoader(rec: Recorder)

记录器模型数据集加载器

__init__(rec: Recorder)
get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) DatasetH

加载、配置和设置数据集。

此数据集用于推理。

Parameters:
  • start_time – 基础数据的开始时间

  • end_time – 基础数据的结束时间

  • segments – dict 数据集的分段配置 由于时间序列数据集(TSDatasetH),测试段可能与start_time和end_time不同

  • unprepared_dataset – Optional[DatasetH] 如果用户不想从记录器加载数据集,请指定用户的数据集

Returns:

DatasetH的实例

Return type:

DatasetH

class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)

更新特定的记录器

__init__(record: Recorder, *args, **kwargs)
abstract update(*args, **kwargs)

更新特定记录器的信息

class qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

基于数据集的更新器

  • 为基于Qlib数据集的数据更新提供更新功能

假设

  • 基于Qlib数据集

  • 要更新的数据是一个多级索引的pd.DataFrame。例如标签、预测。

                             LABEL0
    datetime   instrument
    2021-05-10 SH600000    0.006965
               SH600004    0.003407
    ...                         ...
    2021-05-28 SZ300498    0.015748
               SZ300676   -0.001321
    
__init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

初始化 PredUpdater。

在以下情况下的预期行为:

  • 如果 to_date 大于日历中的最大日期,数据将更新到最新日期

  • 如果在from_date之前或to_date之后有数据,只有from_dateto_date之间的数据会受到影响。

Parameters:
  • record – 记录器

  • to_date

    将预测更新到to_date

    如果to_date为None:

    数据将更新到最新日期。

  • from_date

    更新将从from_date开始

    如果from_date为None:

    更新将在历史数据中最新数据后的下一个tick时进行

  • hist_ref

    int 有时,数据集会有历史依赖。 将问题留给用户设置历史依赖的长度 如果用户没有指定此参数,Updater 将尝试加载数据集以自动确定 hist_ref

    注意

    start_time 不包括在 hist_ref 中;因此,在大多数情况下,hist_ref 将是 step_len - 1

  • loader_cls – 类型 用于加载模型和数据集的类

prepare_data(unprepared_dataset: DatasetH | None = None) DatasetH

加载数据集 - 如果指定了 unprepared_dataset,则直接准备数据集 - 否则,

将此函数分离将使数据集更易于重用

Returns:

DatasetH的实例

Return type:

DatasetH

update(dataset: DatasetH | None = None, write: bool = True, ret_new: bool = False) object | None
Parameters:
  • dataset (DatasetH) – DatasetH: DatasetH的实例。如果为None,则重新准备。

  • write (bool) – 是否执行写入操作

  • ret_new (bool) – 更新后的数据是否会被返回

Returns:

更新后的数据集

Return type:

可选[对象]

abstract get_update_data(dataset: Dataset) DataFrame

根据给定的数据集返回更新后的数据

get_update_dataupdate 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)

class qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

更新记录器中的预测

get_update_data(dataset: Dataset) DataFrame

根据给定的数据集返回更新后的数据

get_update_dataupdate 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)

class qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)

更新记录器中的标签

假设 - 标签是从record_temp.SignalRecord生成的。

__init__(record: Recorder, to_date=None, **kwargs)

初始化 PredUpdater。

在以下情况下的预期行为:

  • 如果 to_date 大于日历中的最大日期,数据将更新到最新日期

  • 如果在from_date之前或to_date之后有数据,只有from_dateto_date之间的数据会受到影响。

Parameters:
  • record – 记录器

  • to_date

    将预测更新到to_date

    如果to_date为None:

    数据将更新到最新日期。

  • from_date

    更新将从from_date开始

    如果from_date为None:

    更新将在历史数据中最新数据后的下一个tick时进行

  • hist_ref

    int 有时,数据集会有历史依赖。 将问题留给用户设置历史依赖的长度 如果用户没有指定此参数,Updater 将尝试加载数据集以自动确定 hist_ref

    注意

    start_time 不包括在 hist_ref 中;因此,在大多数情况下,hist_ref 将是 step_len - 1

  • loader_cls – 类型 用于加载模型和数据集的类

get_update_data(dataset: Dataset) DataFrame

根据给定的数据集返回更新后的数据

get_update_dataupdate 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)

工具

可序列化

class qlib.utils.serial.Serializable

Serializable 将改变 pickle 的行为。

判断在转储时是否保留或删除属性的规则。 优先级较高的规则位于顶部 - 在配置属性列表中 -> 总是删除 - 在包含属性列表中 -> 总是保留 - 在排除属性列表中 -> 总是删除 - 名称不以_开头 -> 保留 - 名称以_开头 -> 如果dump_all为真则保留,否则删除

它提供了一种语法糖,用于区分用户不希望保存的属性。 - 例如,一个可学习的Datahandler在转储到磁盘时只想保存参数而不保存数据。

__init__()
property dump_all

对象会转储所有对象吗

config(recursive=False, **kwargs)

配置可序列化对象

Parameters:
  • keys (kwargs 可能包括以下内容) –

    dump_allbool

    对象是否转储所有对象

    excludelist

    哪些属性不会被转储

    includelist

    哪些属性会被转储

  • recursive (bool) – 配置是否递归

to_pickle(path: Path | str, **kwargs)

将自身转储到pickle文件中。

path (Union[Path, str]): 要转储的路径

kwargs 可能包含以下键

dump_allbool

对象会转储所有对象吗

excludelist

哪些属性不会被转储

includelist

将转储什么属性

classmethod load(filepath)

从文件路径加载可序列化的类。

Parameters:

filepath (str) – 文件的路径

Raises:

TypeError – 腌制的文件必须是 type(cls)

Returns:

type(cls)的实例

Return type:

type(cls)

classmethod get_backend()

返回一个可序列化类的真实后端。pickle_backend 的值可以是“pickle”或“dill”。

Returns:

基于pickle_backend的pickle或dill模块

Return type:

模块

static general_dump(obj, path: Path | str)

对象的通用转储方法

Parameters:
  • obj (object) – 要转储的对象

  • path (Union[Path, str]) – 数据将被转储的目标路径

强化学习

基础组件

class qlib.rl.Interpreter

解释器是模拟器产生的状态与强化学习策略所需状态之间的媒介。 解释器是双向的:

  1. 从模拟器状态到策略状态(也称为观察),请参见StateInterpreter

  2. 从策略动作到模拟器接受的动作,请参见ActionInterpreter

继承两个子类之一来定义你自己的解释器。 这个超类仅用于isinstance检查。

建议解释器是无状态的,这意味着在解释器中使用self.xxx存储临时信息是反模式。未来,我们可能支持通过调用self.env.register_state()来注册一些与解释器相关的状态,但这不在第一版的计划中。

class qlib.rl.StateInterpreter(*args, **kwds)

状态解释器,将qlib执行器的执行结果解释为rl环境状态

validate(obs: ObsType) None

验证观察是否属于预定义的观察空间。

interpret(simulator_state: StateType) ObsType

解释模拟器的状态。

Parameters:

simulator_state – 通过 simulator.get_state() 获取。

Return type:

策略所需的状态。应符合在observation_space中定义的状态空间。

class qlib.rl.ActionInterpreter(*args, **kwds)

动作解释器,将强化学习代理的动作解释为qlib订单

validate(action: PolicyActType) None

验证一个动作是否属于预定义的动作空间。

interpret(simulator_state: StateType, action: PolicyActType) ActType

将策略动作转换为模拟器动作。

Parameters:
  • simulator_state – 通过 simulator.get_state() 获取。

  • action – 策略给出的原始动作。

Return type:

模拟器所需的操作,

class qlib.rl.Reward(*args, **kwds)

奖励计算组件,接受一个参数:模拟器的状态。返回一个实数:奖励。

子类应实现 reward(simulator_state) 以实现自己的奖励计算方案。

reward(simulator_state: SimulatorState) float

实现此方法以获得您自己的奖励。

class qlib.rl.RewardCombination(rewards: Dict[str, Tuple[Reward, float]])

多重奖励的组合。

__init__(rewards: Dict[str, Tuple[Reward, float]]) None
reward(simulator_state: Any) float

实现此方法以获得您自己的奖励。

class qlib.rl.Simulator(initial: InitialStateType, **kwargs: Any)

模拟器通过__init__重置,并通过step(action)进行转换。

为了使数据流清晰,我们对Simulator做出以下限制:

  1. 修改模拟器内部状态的唯一方法是使用step(action)

  2. 外部模块可以通过使用simulator.get_state()读取模拟器的状态,并通过调用simulator.done()来检查模拟器是否处于结束状态。

模拟器被定义为与三种类型绑定:

  • InitialStateType 是用于创建模拟器的数据类型。

  • StateType 是模拟器状态(state)的类型。

  • ActTypeaction 的类型,这是在每个步骤中接收到的输入。

不同的模拟器可能共享相同的StateType。例如,当它们处理相同的任务时,但使用不同的模拟实现。使用相同的类型,它们可以安全地共享MDP中的其他组件。

模拟器是短暂的。模拟器的生命周期从一个初始状态开始,到轨迹结束为止。 换句话说,当轨迹结束时,模拟器会被回收。 如果模拟器想要在之间共享上下文(例如,为了加速目的), 这可以通过访问环境包装器的弱引用来实现。

env

env-wrapper的参考,在某些特殊情况下可能有用。 不鼓励模拟器使用此功能,因为它容易引发错误。

Type:

可选的[EnvWrapper]

__init__(initial: InitialStateType, **kwargs: Any) None
step(action: ActType) None

接收一个ActType类型的动作。

模拟器应更新其内部状态,并返回None。 更新后的状态可以通过simulator.get_state()获取。

done() bool

检查模拟器是否处于“完成”状态。 当模拟器处于“完成”状态时, 它不应再接收任何step请求。 由于模拟器是短暂的,要重置模拟器, 应销毁旧的模拟器并创建一个新的模拟器。

策略

class qlib.rl.strategy.SingleOrderStrategy(order: Order, trade_range: TradeRange | None = None)

用于生成具有恰好一个订单的交易决策的策略。

__init__(order: Order, trade_range: TradeRange | None = None) None
Parameters:
  • outer_trade_decision (BaseTradeDecision, optional) –

    外部策略的交易决策,该策略依赖于此决策,并且将在 [start_time, end_time] 期间进行交易,默认为 None

    • 如果该策略用于拆分交易决策,将会使用此决策

    • 如果该策略用于投资组合管理,可以忽略此决策

  • level_infra (LevelInfrastructure, 可选) – 用于回测的级别共享基础设施,包括交易日历

  • common_infra (CommonInfrastructure, optional) – 用于回测的通用基础设施,包括交易账户、交易交易所等。

  • trade_exchange (Exchange) –

    提供市场信息的交易所,用于处理订单和生成报告

    • 如果 trade_exchange 为 None,self.trade_exchange 将被设置为 common_infra

    • 它允许在不同的执行中使用不同的 trade_exchanges。

    • 例如:

      • 在每日执行中,每日交易所和每分钟交易所都可以使用,但推荐使用每日交易所,因为它运行更快。

      • 在每分钟执行中,每日交易所不可用,仅推荐使用每分钟交易所。

generate_trade_decision(execute_result: list | None = None) TradeDecisionWO

在每个交易时段生成交易决策

Parameters:

execute_result (List[object], optional) –

交易决策的执行结果,默认为 None

  • 首次调用 generate_trade_decision 时,execute_result 可能为 None

训练器

训练、测试、推理工具。

class qlib.rl.trainer.Trainer(*, max_iters: int | None = None, val_every_n_iters: int | None = None, loggers: LogWriter | List[LogWriter] | None = None, callbacks: List[Callback] | None = None, finite_env_type: FiniteEnvType = 'subproc', concurrency: int = 2, fast_dev_run: int | None = None)

用于在特定任务上训练策略的工具。

与传统深度学习训练器不同,此训练器的迭代是“收集”,而不是“epoch”或“mini-batch”。 在每次收集中,Collector收集一定数量的策略-环境交互,并将它们累积到一个回放缓冲区中。此缓冲区用作训练策略的“数据”。 在每次收集结束时,策略会被更新多次。

该API与PyTorch Lightning有一些相似之处, 但本质上不同,因为这个训练器是为RL应用构建的,因此大多数配置都在RL上下文中。 我们仍在寻找整合现有训练器库的方法,因为构建一个与这些库一样强大的训练器看起来需要很大的努力,而且,这不是我们的主要目标。

它与 tianshou的内置训练器 有本质上的不同,因为它远比那些复杂得多。

Parameters:
  • max_iters – 停止前的最大迭代次数。

  • val_every_n_iters – 每n次迭代执行一次验证(即训练收集)。

  • logger – 用于记录回测结果的日志记录器。必须存在日志记录器,因为没有日志记录器,所有信息都将丢失。

  • finite_env_type – 有限环境实现的类型。

  • 并发 – 并行工作线程。

  • fast_dev_run – 创建一个用于调试的子集。 具体实现方式取决于训练容器的实现。 对于TrainingVessel,如果大于零, 将使用大小为fast_dev_run的随机子集 代替train_initial_statesval_initial_states

should_stop: bool

设置为停止训练。

metrics: dict

训练/验证/测试中产生的数值指标。 在训练/验证过程中,指标将显示最新一集的结果。 当每次训练/验证迭代完成时,指标将是该迭代中所有遇到的结果的汇总。

在每次新的训练迭代中清除。

在拟合过程中,验证指标将以前缀 val/ 显示。

current_iter: int

当前训练迭代(收集)。

__init__(*, max_iters: int | None = None, val_every_n_iters: int | None = None, loggers: LogWriter | List[LogWriter] | None = None, callbacks: List[Callback] | None = None, finite_env_type: FiniteEnvType = 'subproc', concurrency: int = 2, fast_dev_run: int | None = None)
loggers: List[LogWriter]

日志写入器的列表。

initialize()

初始化整个训练过程。

这里的状态应与state_dict同步。

initialize_iter()

初始化一次迭代/收集。

state_dict() dict

尽可能将当前训练的每个状态放入一个字典中。

它不会尝试处理在一次训练收集中间所有可能的状态。 对于大多数情况,在每次迭代结束时,事情通常应该是正确的。

请注意,收集器中的重放缓冲区数据将会丢失,这也是预期的行为。

load_state_dict(state_dict: dict) None

将所有状态加载到当前训练器中。

named_callbacks() Dict[str, Callback]

检索一组回调函数,每个回调函数都有一个名称。 在保存检查点时非常有用。

named_loggers() Dict[str, LogWriter]

检索一组具有名称的记录器。 在保存检查点时非常有用。

fit(vessel: TrainingVesselBase, ckpt_path: Path | None = None) None

在定义的模拟器上训练RL策略。

Parameters:
  • vessel – 训练中使用的所有元素的集合。

  • ckpt_path – 加载一个预训练/暂停的训练检查点。

test(vessel: TrainingVesselBase) None

测试RL策略与模拟器的对比。

模拟器将使用在test_seed_iterator中生成的数据进行输入。

Parameters:

vessel – 所有相关元素的集合。

venv_from_iterator(iterator: Iterable[InitialStateType]) FiniteVectorEnv

从迭代器和训练容器创建一个向量化环境。

class qlib.rl.trainer.TrainingVessel(*, simulator_fn: Callable[[InitialStateType], Simulator[InitialStateType, StateType, ActType]], state_interpreter: StateInterpreter[StateType, ObsType], action_interpreter: ActionInterpreter[StateType, PolicyActType, ActType], policy: BasePolicy, reward: Reward, train_initial_states: Sequence[InitialStateType] | None = None, val_initial_states: Sequence[InitialStateType] | None = None, test_initial_states: Sequence[InitialStateType] | None = None, buffer_size: int = 20000, episode_per_iter: int = 1000, update_kwargs: Dict[str, Any] = None)

训练容器的默认实现。

__init__ 接受一系列初始状态,以便可以创建迭代器。 train, validate, test 每个都执行一次收集(并且在训练中也会更新)。 默认情况下,训练初始状态将在训练期间无限重复, 并且收集器将控制每次迭代的回合数。 在验证和测试中,验证/测试初始状态将仅使用一次。

额外的超参数(仅在训练中使用)包括:

  • buffer_size: 回放缓冲区的大小。

  • episode_per_iter: 每次训练收集的回合数。可以通过快速开发运行进行覆盖。

  • update_kwargs: 出现在 policy.update 中的关键字参数。 例如,dict(repeat=10, batch_size=64)

__init__(*, simulator_fn: Callable[[InitialStateType], Simulator[InitialStateType, StateType, ActType]], state_interpreter: StateInterpreter[StateType, ObsType], action_interpreter: ActionInterpreter[StateType, PolicyActType, ActType], policy: BasePolicy, reward: Reward, train_initial_states: Sequence[InitialStateType] | None = None, val_initial_states: Sequence[InitialStateType] | None = None, test_initial_states: Sequence[InitialStateType] | None = None, buffer_size: int = 20000, episode_per_iter: int = 1000, update_kwargs: Dict[str, Any] = None)
train_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

重写此方法以创建用于训练的种子迭代器。 如果可迭代对象是上下文管理器,则整个训练将在with块中调用, 并且迭代器将在训练完成后自动关闭。

val_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

重写此方法以创建用于验证的种子迭代器。

test_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

重写此方法以创建用于测试的种子迭代器。

train(vector_env: FiniteVectorEnv) Dict[str, Any]

创建一个收集器并收集episode_per_iter个片段。 在收集的回放缓冲区上更新策略。

validate(vector_env: FiniteVectorEnv) Dict[str, Any]

实现此功能以一次性验证策略。

test(vector_env: FiniteVectorEnv) Dict[str, Any]

实现此功能以在测试环境中评估策略一次。

class qlib.rl.trainer.TrainingVesselBase(*args, **kwds)

一艘包含模拟器、解释器和策略的船将被发送到训练器。 这个类控制训练中与算法相关的部分,而训练器负责运行时的部分。

该船还定义了核心训练部分的最重要逻辑,并且(可选地)定义了一些回调,以在特定事件中插入自定义逻辑。

train_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

重写此方法以创建用于训练的种子迭代器。 如果可迭代对象是上下文管理器,则整个训练将在with块中调用, 并且迭代器将在训练完成后自动关闭。

val_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

重写此方法以创建用于验证的种子迭代器。

test_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]

重写此方法以创建用于测试的种子迭代器。

train(vector_env: BaseVectorEnv) Dict[str, Any]

实现此功能以训练一次迭代。在强化学习中,一次迭代通常指一次收集。

validate(vector_env: FiniteVectorEnv) Dict[str, Any]

实现此功能以一次性验证策略。

test(vector_env: FiniteVectorEnv) Dict[str, Any]

实现此功能以在测试环境中评估策略一次。

state_dict() Dict

返回当前船只状态的检查点。

load_state_dict(state_dict: Dict) None

从之前保存的状态字典中恢复检查点。

class qlib.rl.trainer.Checkpoint(dirpath: Path, filename: str = '{iter:03d}.pth', save_latest: Literal['link', 'copy'] | None = 'link', every_n_iters: int | None = None, time_interval: int | None = None, save_on_fit_end: bool = True)

定期保存检查点以实现持久化和恢复。

参考:https://github.com/PyTorchLightning/pytorch-lightning/blob/bfa8b7be/pytorch_lightning/callbacks/model_checkpoint.py

Parameters:
  • dirpath – 保存检查点文件的目录。

  • filename

    检查点文件名。可以包含自动填充的命名格式化选项。 例如:{iter:03d}-{reward:.2f}.pth。 支持的参数名称有:

    • iter (int)

    • trainer.metrics 中的指标

    • 时间字符串,格式为 %Y%m%d%H%M%S

  • save_latest – 将最新的检查点保存在 latest.pth 中。 如果使用 linklatest.pth 将创建为软链接。 如果使用 copylatest.pth 将存储为单独的副本。 设置为 none 以禁用此功能。

  • every_n_iters – 检查点会在每n次训练迭代结束时保存,如果适用的话,在验证之后。

  • time_interval – 检查点再次保存前的最大时间(秒)。

  • save_on_fit_end – 在训练结束时保存最后一个检查点。 如果已经保存了检查点,则不执行任何操作。

__init__(dirpath: Path, filename: str = '{iter:03d}.pth', save_latest: Literal['link', 'copy'] | None = 'link', every_n_iters: int | None = None, time_interval: int | None = None, save_on_fit_end: bool = True)
on_fit_end(trainer: Trainer, vessel: TrainingVesselBase) None

在整个拟合过程结束后调用。

on_iter_end(trainer: Trainer, vessel: TrainingVesselBase) None

在每次迭代结束时调用。 这是在current_iter增加之后调用的,当之前的迭代被认为完成时。

class qlib.rl.trainer.EarlyStopping(monitor: str = 'reward', min_delta: float = 0.0, patience: int = 0, mode: Literal['min', 'max'] = 'max', baseline: float |