API参考
在这里你可以找到所有的Qlib
接口。
数据
提供者
- class qlib.data.data.ProviderBackendMixin
这个辅助类试图使基于存储后端的提供者更加方便 如果提供者不依赖于后端存储,则不需要继承此类
- class qlib.data.data.CalendarProvider
日历提供者基类
提供日历数据。
- calendar(start_time=None, end_time=None, freq='day', future=False)
获取特定市场在给定时间范围内的日历。
- Parameters:
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选:年/季度/月/周/天。
future (bool) – 是否包括未来的交易日。
- Returns:
日历列表
- Return type:
列表
- locate_index(start_time: Timestamp | str, end_time: Timestamp | str, freq: str, future: bool = False)
在特定频率的日历中定位开始时间索引和结束时间索引。
- Parameters:
start_time (pd.Timestamp) – 时间范围的开始。
end_time (pd.Timestamp) – 时间范围的结束时间。
freq (str) – 时间频率,可选:年/季度/月/周/天。
future (bool) – 是否包括未来的交易日。
- Returns:
pd.Timestamp – 实际的开始时间。
pd.Timestamp – 实际的结束时间。
int – 开始时间的索引。
int – 结束时间的索引。
- load_calendar(freq, future)
从文件中加载原始日历时间戳。
- Parameters:
freq (str) – 读取日历文件的频率。
future (bool) –
- Returns:
时间戳列表
- Return type:
列表
- class qlib.data.data.InstrumentProvider
仪器提供者基类
提供仪器数据。
- static instruments(market: List | str = 'all', filter_pipe: List | None = None)
获取基础市场的通用配置字典,并添加几个动态过滤器。
- Parameters:
市场 (联合[列表, 字符串]) –
- 字符串:
市场/行业/指数的简称,例如 all/sse/szse/sse50/csi300/csi500。
- 列表:
[“ID1”, “ID2”]。股票列表
filter_pipe (list) – 动态过滤器的列表。
- Returns:
dict (如果 isinstance(market, str)) – 股票池配置的字典。
{market => 基础市场名称, filter_pipe => 过滤器列表}
示例 :
{'market': 'csi500', 'filter_pipe': [{'filter_type': 'ExpressionDFilter', 'rule_expression': '$open<40', 'filter_start_time': None, 'filter_end_time': None, 'keep': False}, {'filter_type': 'NameDFilter', 'name_rule_re': 'SH[0-9]{4}55', 'filter_start_time': None, 'filter_end_time': None}]}
list (如果 isinstance(market, list)) – 直接返回原始列表。 注意:这将使工具适用于更多情况。用户代码将更简单。
- abstract list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)
根据某个股票池配置列出工具。
- Parameters:
instruments (dict) – 股票池配置。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
as_list (bool) – 将仪器返回为列表或字典。
- Returns:
带有时间跨度的仪器列表或字典
- Return type:
字典或列表
- class qlib.data.data.FeatureProvider
特征提供者类
提供特征数据。
- abstract feature(instrument, field, start_time, end_time, freq)
获取特征数据。
- Parameters:
instrument (str) – 某种仪器。
field (str) – 特征的某个字段。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选:年/季度/月/周/天。
- Returns:
某个特征的数据
- Return type:
pd.Series
- class qlib.data.data.PITProvider
- abstract period_feature(instrument, field, start_index: int, end_index: int, cur_time: Timestamp, period: int | None = None) Series
获取start_index和end_index之间的历史周期数据序列
- Parameters:
start_index (int) – start_index 是相对于最新周期到 cur_time 的相对索引
end_index (int) – end_index 是相对于最新周期到 cur_time 的相对索引 在大多数情况下,start_index 和 end_index 将是非正值 例如,start_index == -3 end_index == 0 且当前周期索引为 cur_idx, 则将检索 [start_index + cur_idx, end_index + cur_idx] 之间的数据。
period (int) – 用于查询特定周期。 在Qlib中,周期用整数表示。(例如,202001可能代表2020年第一季度) 注意:period 将覆盖 start_index 和 end_index
- Returns:
索引将是整数,用于指示数据的周期 一个典型的例子将是 TODO
- Return type:
pd.Series
- Raises:
FileNotFoundError – 如果查询的数据不存在,将引发此异常。
- class qlib.data.data.ExpressionProvider
表达式提供者类
提供表达数据。
- __init__()
- abstract expression(instrument, field, start_time=None, end_time=None, freq='day') Series
获取表达式数据。
expression的职责 - 解析field并加载相应的数据。 - 在加载数据时,应处理数据的时间依赖性。get_expression_instance通常在此方法中使用。
- Parameters:
instrument (str) – 某种仪器。
field (str) – 特征的某个字段。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选:年/季度/月/周/天。
- Returns:
某个表达式的数据
数据有两种格式
带有日期时间索引的表达式
带有整数索引的表达式
因为日期时间不如
- Return type:
pd.Series
- class qlib.data.data.DatasetProvider
数据集提供者类
提供数据集数据。
- abstract dataset(instruments, fields, start_time=None, end_time=None, freq='day', inst_processors=[])
获取数据集数据。
- Parameters:
instruments (list 或 dict) – 仪器列表/字典或股票池配置字典。
fields (列表) – 特征实例的列表。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率。
inst_processors (Iterable[Union[dict, InstProcessor]]) – 对每个仪器执行的操作
- Returns:
一个带有
索引的pandas dataframe。 - Return type:
pd.DataFrame
- static get_instruments_d(instruments, freq)
解析不同类型的输入工具以输出instruments_d 输入工具格式错误将导致异常。
- static get_column_names(fields)
从输入字段获取列名
- static dataset_processor(instruments_d, column_names, start_time, end_time, freq, inst_processors=[])
加载并处理数据,返回数据集。 - 默认使用多核方法。
- static inst_calculator(inst, start_time, end_time, freq, column_names, spans=None, g_config=None, inst_processors=[])
计算一个工具的表达式,返回一个df结果。 如果表达式之前已经计算过,则从缓存中加载。
返回值:一个带有索引'datetime'和其他数据列的数据框。
- class qlib.data.data.LocalCalendarProvider(remote=False, backend={})
本地日历数据提供者类
从本地数据源提供日历数据。
- __init__(remote=False, backend={})
- load_calendar(freq, future)
从文件中加载原始日历时间戳。
- Parameters:
freq (str) – 读取日历文件的频率。
future (bool) –
- Returns:
时间戳列表
- Return type:
列表
- class qlib.data.data.LocalInstrumentProvider(backend={})
本地仪器数据提供者类
从本地数据源提供仪器数据。
- __init__(backend={}) None
- list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)
根据某个股票池配置列出工具。
- Parameters:
instruments (dict) – 股票池配置。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
as_list (bool) – 将仪器返回为列表或字典。
- Returns:
带有时间跨度的仪器列表或字典
- Return type:
字典或列表
- class qlib.data.data.LocalFeatureProvider(remote=False, backend={})
本地特征数据提供者类
从本地数据源提供特征数据。
- __init__(remote=False, backend={})
- feature(instrument, field, start_index, end_index, freq)
获取特征数据。
- Parameters:
instrument (str) – 某种仪器。
field (str) – 特征的某个字段。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选:年/季度/月/周/天。
- Returns:
某个特征的数据
- Return type:
pd.Series
- class qlib.data.data.LocalPITProvider
- period_feature(instrument, field, start_index, end_index, cur_time, period=None)
获取start_index和end_index之间的历史周期数据序列
- Parameters:
start_index (int) – start_index 是相对于最新周期到 cur_time 的相对索引
end_index (int) – end_index 是相对于最新周期到 cur_time 的相对索引 在大多数情况下,start_index 和 end_index 将是非正值 例如,start_index == -3 end_index == 0 且当前周期索引为 cur_idx, 则将检索 [start_index + cur_idx, end_index + cur_idx] 之间的数据。
period (int) – 用于查询特定周期。 在Qlib中,周期用整数表示。(例如,202001可能代表2020年第一季度) 注意:period 将覆盖 start_index 和 end_index
- Returns:
索引将是整数,用于指示数据的周期 一个典型的例子将是 TODO
- Return type:
pd.Series
- Raises:
FileNotFoundError – 如果查询的数据不存在,将引发此异常。
- class qlib.data.data.LocalExpressionProvider(time2idx=True)
本地表达式数据提供者类
从本地数据源提供表达数据。
- __init__(time2idx=True)
- expression(instrument, field, start_time=None, end_time=None, freq='day')
获取表达式数据。
expression的职责 - 解析field并加载相应的数据。 - 在加载数据时,应处理数据的时间依赖性。get_expression_instance通常在此方法中使用。
- Parameters:
instrument (str) – 某种仪器。
field (str) – 特征的某个字段。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选:年/季度/月/周/天。
- Returns:
某个表达式的数据
数据有两种格式
带有日期时间索引的表达式
带有整数索引的表达式
因为日期时间不如
- Return type:
pd.Series
- class qlib.data.data.LocalDatasetProvider(align_time: bool = True)
本地数据集数据提供者类
从本地数据源提供数据集数据。
- __init__(align_time: bool = True)
- Parameters:
align_time (bool) –
我们是否将时间对齐到日历 在某些数据集中,频率是灵活的,无法对齐。 对于具有固定频率和共享日历的数据,将数据对齐到日历将提供以下好处
将查询对齐到相同的参数,以便可以共享缓存。
- dataset(instruments, fields, start_time=None, end_time=None, freq='day', inst_processors=[])
获取数据集数据。
- Parameters:
instruments (list 或 dict) – 仪器列表/字典或股票池配置字典。
fields (列表) – 特征实例的列表。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率。
inst_processors (Iterable[Union[dict, InstProcessor]]) – 对每个仪器执行的操作
- Returns:
一个带有索引的pandas dataframe。
- Return type:
pd.DataFrame
- static multi_cache_walker(instruments, fields, start_time=None, end_time=None, freq='day')
此方法用于为客户准备表达式缓存。 然后客户端将自行从表达式缓存中加载数据。
- static cache_walker(inst, start_time, end_time, freq, column_names)
如果一个仪器的表达式之前没有被计算过,计算它并将其写入表达式缓存。
- class qlib.data.data.ClientCalendarProvider
客户端日历数据提供者类
通过作为客户端从服务器请求数据来提供日历数据。
- __init__()
- calendar(start_time=None, end_time=None, freq='day', future=False)
获取特定市场在给定时间范围内的日历。
- Parameters:
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率,可选:年/季度/月/周/天。
future (bool) – 是否包括未来的交易日。
- Returns:
日历列表
- Return type:
列表
- class qlib.data.data.ClientInstrumentProvider
客户端仪器数据提供者类
通过作为客户端从服务器请求数据来提供仪器数据。
- __init__()
- list_instruments(instruments, start_time=None, end_time=None, freq='day', as_list=False)
根据某个股票池配置列出工具。
- Parameters:
instruments (dict) – 股票池配置。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
as_list (bool) – 将仪器返回为列表或字典。
- Returns:
带有时间跨度的仪器列表或字典
- Return type:
字典或列表
- class qlib.data.data.ClientDatasetProvider
客户端数据集数据提供者类
通过作为客户端从服务器请求数据来提供数据集数据。
- __init__()
- dataset(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=0, return_uri=False, inst_processors=[])
获取数据集数据。
- Parameters:
instruments (list 或 dict) – 仪器列表/字典或股票池配置字典。
fields (列表) – 特征实例的列表。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
freq (str) – 时间频率。
inst_processors (Iterable[Union[dict, InstProcessor]]) – 对每个仪器执行的操作
- Returns:
一个带有索引的pandas dataframe。
- Return type:
pd.DataFrame
- class qlib.data.data.BaseProvider
本地提供者类 这是一组允许用户访问数据的接口。 由于PITD未公开向用户暴露,因此未包含在接口中。
为了保持与旧版qlib提供者的兼容性。
- features(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=None, inst_processors=[])
- Parameters:
disk_cache (int) – 是否跳过(0)/使用(1)/替换(2)磁盘缓存
此函数将尝试使用具有关键字disk_cache的缓存方法,如果由于DatasetD实例是提供者类而引发类型错误,则将使用提供者方法。
- class qlib.data.data.LocalProvider
- features_uri(instruments, fields, start_time, end_time, freq, disk_cache=1)
返回生成的特征/数据集缓存的URI
- Parameters:
disk_cache –
instruments –
fields –
start_time –
end_time –
freq –
- class qlib.data.data.ClientProvider
客户端提供者
作为客户端从服务器请求数据。可以提出请求:
日历:直接响应一个日历列表
工具(无过滤器):直接响应工具列表/字典
仪器(带过滤器):响应仪器列表/字典
功能:响应一个缓存URI
一般的工作流程描述如下: 当用户使用客户端提供者提出请求时,客户端提供者将连接服务器并发送请求。客户端将开始等待响应。响应将立即生成,指示缓存是否可用。只有当客户端收到响应说feature_available为真时,等待过程才会终止。 BUG:每次我们请求某些数据时,都需要连接到服务器,等待响应并断开连接。我们无法在一个连接内进行一系列请求。您可以参考https://python-socketio.readthedocs.io/en/latest/client.html获取python-socketIO客户端的文档。
- __init__()
- qlib.data.data.CalendarProviderWrapper
CalendarProvider
的别名
- qlib.data.data.InstrumentProviderWrapper
- qlib.data.data.FeatureProviderWrapper
FeatureProvider
的别名
- qlib.data.data.PITProviderWrapper
PITProvider
的别名
- qlib.data.data.ExpressionProviderWrapper
- qlib.data.data.DatasetProviderWrapper
DatasetProvider
的别名
- qlib.data.data.BaseProviderWrapper
BaseProvider
的别名
- qlib.data.data.register_all_wrappers(C)
过滤器
- class qlib.data.filter.BaseDFilter
动态仪器过滤器抽象类
用户可以覆盖此类以构建自己的过滤器
重写 __init__ 以输入过滤规则
重写 filter_main 以使用法规来过滤工具
- __init__()
- static from_config(config)
从配置字典构造实例。
- Parameters:
config (dict) – 配置参数的字典。
- abstract to_config()
从配置字典构造实例。
- Returns:
返回配置参数的字典。
- Return type:
字典
- class qlib.data.filter.SeriesDFilter(fstart_time=None, fend_time=None, keep=False)
动态仪器过滤器 用于过滤一系列特定特征的抽象类
过滤器应提供参数:
过滤开始时间
过滤结束时间
过滤规则
重写 __init__ 以分配特定规则来过滤系列。
重写 _getFilterSeries 以使用规则过滤系列并获取 {inst => series} 的字典,或重写 filter_main 以获取更高级的系列过滤规则
- __init__(fstart_time=None, fend_time=None, keep=False)
- Init function for filter base class.
根据由fstart_time和fend_time指定的某个时间段内的特定规则筛选一组工具。
- Parameters:
fstart_time (str) – 过滤器规则开始过滤工具的时间。
fend_time (str) – 过滤规则停止过滤工具的时间。
keep (bool) – 是否保留在过滤时间范围内不存在的特征的工具。
- filter_main(instruments, start_time=None, end_time=None)
实现此方法以过滤仪器。
- Parameters:
instruments (dict) – 输入要过滤的仪器。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
- Returns:
过滤后的工具,结构与输入工具相同。
- Return type:
字典
- class qlib.data.filter.NameDFilter(name_rule_re, fstart_time=None, fend_time=None)
名称动态仪器过滤器
根据规定的名称格式筛选仪器。
需要一个名称规则的正则表达式。
- __init__(name_rule_re, fstart_time=None, fend_time=None)
名称过滤器类的初始化函数
- Parameters:
name_rule_re (str) – 用于名称规则的正则表达式。
- static from_config(config)
从配置字典构造实例。
- Parameters:
config (dict) – 配置参数的字典。
- to_config()
从配置字典构造实例。
- Returns:
返回配置参数的字典。
- Return type:
字典
- class qlib.data.filter.ExpressionDFilter(rule_expression, fstart_time=None, fend_time=None, keep=False)
表达式动态仪器过滤器
根据特定表达式筛选仪器。
表示某个特征字段是必需的表达式规则。
示例
基本特征过滤器 : rule_expression = ‘$close/$open>5’
横截面特征过滤器 : rule_expression = ‘$rank($close)<10’
时间序列特征过滤器 : rule_expression = ‘$Ref($close, 3)>100’
- __init__(rule_expression, fstart_time=None, fend_time=None, keep=False)
表达式过滤器类的初始化函数
- Parameters:
fstart_time (str) – 从该时间开始筛选特征。
fend_time (str) – 过滤在此时间结束的特征。
rule_expression (str) – 规则的输入表达式。
- static from_config(config)
从配置字典构造实例。
- Parameters:
config (dict) – 配置参数的字典。
- to_config()
从配置字典构造实例。
- Returns:
返回配置参数的字典。
- Return type:
字典
类
- class qlib.data.base.Expression
表达式基类
表达式设计用于处理以下格式的数据计算 每个工具的数据具有两个维度,
特征
时间:它可以是观察时间或周期时间。
period time 是为点时间数据库设计的。例如,period time 可能是 2014Q4,它的值可以被多次观察(由于修正,不同时间可能会观察到不同的值)。
- load(instrument, start_index, end_index, *args)
加载特征 此函数负责根据表达式引擎加载特征/表达式。
具体实现将分为两部分:
缓存数据,处理错误。
这部分由所有表达式共享,并在Expression中实现
基于特定表达式处理和计算数据。
这部分在每个表达式中都不同,并在每个表达式中实现
表达式引擎由不同的数据共享。 不同的数据将为args提供不同的额外信息。
- Parameters:
instrument (str) – 仪器代码。
start_index (str) – 特征开始索引 [在日历中]。
end_index (str) – 特征结束索引 [在日历中]。
信息 (*args 可能包含以下内容) –
data (2) 如果在PIT中使用) –
- freq: str
特征频率。
arguments (包含以下内容) –
- freq: str
特征频率。
data –
- cur_pit:
它设计用于点时间数据。
- period: int
这用于查询特定时期。 在Qlib中,时期用int表示。(例如,202001可能代表2020年第一季度)
arguments –
- cur_pit:
它设计用于点时间数据。
- period: int
这用于查询特定时期。 在Qlib中,时期用int表示。(例如,202001可能代表2020年第一季度)
- Returns:
特征系列:该系列的索引是日历索引
- Return type:
pd.Series
- abstract get_longest_back_rolling()
获取该特征访问的历史数据的最长长度
这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需历史数据的长度。
- abstract get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征。
- Returns:
lft_etd, rght_etd
- Return type:
(int, int)
- class qlib.data.base.Feature(name=None)
静态表达式
这种特性将从提供者加载数据
- __init__(name=None)
- get_longest_back_rolling()
获取该特征访问的历史数据的最长长度
这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征。
- Returns:
lft_etd, rght_etd
- Return type:
(int, int)
- class qlib.data.base.PFeature(name=None)
- class qlib.data.base.ExpressionOps
操作符表达式
这种特性将使用操作符进行动态特性构建。
操作符
- class qlib.data.ops.ElemOperator(feature)
元素级运算符
- Parameters:
特征 (Expression) – 特征实例
- Returns:
特征操作输出
- Return type:
- __init__(feature)
- get_longest_back_rolling()
获取该特征访问的历史数据的最长长度
这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征。
- Returns:
lft_etd, rght_etd
- Return type:
(int, int)
- class qlib.data.ops.ChangeInstrument(instrument, feature)
更改仪器操作符 在某些情况下,人们可能希望在计算时更改为另一个仪器,例如,计算股票相对于市场指数的贝塔值。 这将需要将特征的计算从股票(原始仪器)更改为指数(参考仪器) :param instrument: 例如,SH000300(沪深300指数),或^GPSC(标普500指数)。 :type instrument: 下游操作应在其上执行的新仪器。 :param feature: :type feature: 为新仪器计算的特征。
- Returns:
特征操作输出
- Return type:
- __init__(instrument, feature)
- load(instrument, start_index, end_index, *args)
加载特征 此函数负责根据表达式引擎加载特征/表达式。
具体实现将分为两部分:
缓存数据,处理错误。
这部分由所有表达式共享,并在Expression中实现
基于特定表达式处理和计算数据。
这部分在每个表达式中都不同,并在每个表达式中实现
表达式引擎由不同的数据共享。 不同的数据将为args提供不同的额外信息。
- Parameters:
instrument (str) – 仪器代码。
start_index (str) – 特征开始索引 [在日历中]。
end_index (str) – 特征结束索引 [在日历中]。
信息 (*args 可能包含以下内容) –
data (2) 如果在PIT中使用) –
- freq: str
特征频率。
arguments (包含以下内容) –
- freq: str
特征频率。
data –
- cur_pit:
它设计用于点时间数据。
- period: int
这用于查询特定时期。 在Qlib中,时期用int表示。(例如,202001可能代表2020年第一季度)
arguments –
- cur_pit:
它设计用于点时间数据。
- period: int
这用于查询特定时期。 在Qlib中,时期用int表示。(例如,202001可能代表2020年第一季度)
- Returns:
特征系列:该系列的索引是日历索引
- Return type:
pd.Series
- class qlib.data.ops.NpElemOperator(feature, func)
Numpy 元素级运算符
- Parameters:
特征 (Expression) – 特征实例
func (str) – numpy 特征操作方法
- Returns:
特征操作输出
- Return type:
- __init__(feature, func)
- class qlib.data.ops.Abs(feature)
特征绝对值
- Parameters:
特征 (Expression) – 特征实例
- Returns:
具有绝对输出的特征实例
- Return type:
- __init__(feature)
- class qlib.data.ops.Sign(feature)
特征符号
- Parameters:
特征 (Expression) – 特征实例
- Returns:
带有签名的特征实例
- Return type:
- __init__(feature)
- class qlib.data.ops.Log(feature)
功能日志
- Parameters:
特征 (Expression) – 特征实例
- Returns:
带有日志的功能实例
- Return type:
- __init__(feature)
- class qlib.data.ops.Mask(feature, instrument)
特征掩码
- Parameters:
特征 (Expression) – 特征实例
instrument (str) – 仪器掩码
- Returns:
一个带有遮蔽工具的特征实例
- Return type:
- __init__(feature, instrument)
- class qlib.data.ops.Not(feature)
非运算符
- Parameters:
特征 (Expression) – 特征实例
- Returns:
特征元素不输出
- Return type:
- __init__(feature)
- class qlib.data.ops.PairOperator(feature_left, feature_right)
成对操作符
- Parameters:
feature_left (Expression) – 特征实例或数值
feature_right (Expression) – 特征实例或数值
- Returns:
两个特征的操作输出
- Return type:
- __init__(feature_left, feature_right)
- get_longest_back_rolling()
获取该特征访问的历史数据的最长长度
这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征。
- Returns:
lft_etd, rght_etd
- Return type:
(int, int)
- class qlib.data.ops.NpPairOperator(feature_left, feature_right, func)
Numpy 成对操作符
- Parameters:
feature_left (Expression) – 特征实例或数值
feature_right (Expression) – 特征实例或数值
func (str) – 操作函数
- Returns:
两个特征的操作输出
- Return type:
- __init__(feature_left, feature_right, func)
- class qlib.data.ops.Power(feature_left, feature_right)
幂运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
feature_left 中的基数提升到 feature_right 中的指数
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Add(feature_left, feature_right)
添加操作符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
两个特征的总和
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Sub(feature_left, feature_right)
减法运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
两个特征的减法
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Mul(feature_left, feature_right)
乘法运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
两个特征的乘积
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Div(feature_left, feature_right)
除法运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
两个特征的除法
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Greater(feature_left, feature_right)
大于运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
从输入的两个特征中提取的较大元素
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Less(feature_left, feature_right)
小于运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
从输入的两个特征中提取的较小元素
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Gt(feature_left, feature_right)
大于运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
布尔系列表示 left > right
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Ge(feature_left, feature_right)
大于等于运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
布尔系列表示 left >= right
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Lt(feature_left, feature_right)
小于运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
布尔系列表示 left < right
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Le(feature_left, feature_right)
小于等于运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
布尔系列表示 left <= right
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Eq(feature_left, feature_right)
等于运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
布尔系列表示 left == right
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Ne(feature_left, feature_right)
不等于运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
布尔系列表示 left != right
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.And(feature_left, feature_right)
与运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
两个特征逐行输出
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.Or(feature_left, feature_right)
或运算符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- Returns:
两个特征逐行 | 输出
- Return type:
- __init__(feature_left, feature_right)
- class qlib.data.ops.If(condition, feature_left, feature_right)
如果运算符
- Parameters:
condition (Expression) – 具有布尔值的特征实例作为条件
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
- __init__(condition, feature_left, feature_right)
- get_longest_back_rolling()
获取该特征访问的历史数据的最长长度
这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征。
- Returns:
lft_etd, rght_etd
- Return type:
(int, int)
- class qlib.data.ops.Rolling(feature, N, func)
滚动操作符 在pandas中,滚动和扩展的含义是相同的。 当窗口设置为0时,操作符的行为应遵循扩展 否则,它遵循滚动
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
func (str) – 滚动方法
- Returns:
滚动输出
- Return type:
- __init__(feature, N, func)
- get_longest_back_rolling()
获取该特征访问的历史数据的最长长度
这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征。
- Returns:
lft_etd, rght_etd
- Return type:
(int, int)
- class qlib.data.ops.Ref(feature, N)
特征参考
- Parameters:
特征 (Expression) – 特征实例
N (int) – N = 0,获取第一个数据;N > 0,获取N个周期前的数据;N < 0,获取未来的数据
- Returns:
具有目标引用的特征实例
- Return type:
- __init__(feature, N)
- get_longest_back_rolling()
获取该特征访问的历史数据的最长长度
这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征。
- Returns:
lft_etd, rght_etd
- Return type:
(int, int)
- class qlib.data.ops.Mean(feature, N)
滚动均值 (MA)
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动平均值的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Sum(feature, N)
滚动求和
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动求和的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Std(feature, N)
滚动标准差
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动标准差的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Var(feature, N)
滚动方差
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动方差特征的实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Skew(feature, N)
滚动偏度
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动偏度的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Kurt(feature, N)
滚动峰度
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动峰度的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Max(feature, N)
滚动最大值
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动最大值的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.IdxMax(feature, N)
滚动最大索引
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动最大索引的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Min(feature, N)
滚动最小值
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动最小值的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.IdxMin(feature, N)
滚动最小指数
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动最小索引的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Quantile(feature, N, qscore)
滚动分位数
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动分位数特征实例
- Return type:
- __init__(feature, N, qscore)
- class qlib.data.ops.Med(feature, N)
滚动中位数
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动中位数的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Mad(feature, N)
滚动平均绝对偏差
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动平均绝对偏差的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Rank(feature, N)
滚动排名(百分位数)
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有滚动排名的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Count(feature, N)
滚动计数
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有非NaN元素滚动计数的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Delta(feature, N)
滚动差值
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个特征实例,在滚动窗口中结束减去开始
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Slope(feature, N)
滚动斜率 此操作符计算idx和feature之间的斜率。 (例如:[
, , ] 和 [1, 2, 3]) 使用示例: - “Slope($close, %d)/$close”
# 待办事项: # 一些用户可能希望进行成对滚动,如Slope(A, B, N)
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有给定窗口线性回归斜率的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Rsquare(feature, N)
滚动R值平方
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
具有给定窗口的线性回归r值平方的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.Resi(feature, N)
滚动回归残差
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
具有给定窗口回归残差的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.WMA(feature, N)
滚动加权移动平均
- Parameters:
特征 (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有加权移动平均输出的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.EMA(feature, N)
滚动指数平均 (EMA)
- Parameters:
特征 (Expression) – 特征实例
N (int, float) – 滚动窗口大小
- Returns:
具有给定窗口回归r值平方的特征实例
- Return type:
- __init__(feature, N)
- class qlib.data.ops.PairRolling(feature_left, feature_right, N, func)
配对滚动操作符
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
具有两个输入特征的滚动输出的特征实例
- Return type:
- __init__(feature_left, feature_right, N, func)
- get_longest_back_rolling()
获取该特征访问的历史数据的最长长度
这是为了首先获取所需的数据范围以计算特定范围内的特征而设计的。然而,像Ref(Ref($close, -1), 1)这样的情况无法正确处理。
因此,这将仅用于检测所需历史数据的长度。
- get_extended_window_size()
get_extend_window_size
为了计算这个操作符在范围[start_index, end_index]内 我们必须获取范围[start_index - lft_etd, end_index + rght_etd]内的叶子特征。
- Returns:
lft_etd, rght_etd
- Return type:
(int, int)
- class qlib.data.ops.Corr(feature_left, feature_right, N)
滚动相关性
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有两个输入特征滚动相关性的特征实例
- Return type:
- __init__(feature_left, feature_right, N)
- class qlib.data.ops.Cov(feature_left, feature_right, N)
滚动协方差
- Parameters:
feature_left (Expression) – 特征实例
feature_right (Expression) – 特征实例
N (int) – 滚动窗口大小
- Returns:
一个具有两个输入特征的滚动最大值的特征实例
- Return type:
- __init__(feature_left, feature_right, N)
- class qlib.data.ops.TResample(feature, freq, func)
- __init__(feature, freq, func)
将数据重新采样到目标频率。 使用了pandas的resample函数。
时间戳将在重新采样后的时间跨度开始时。
- Parameters:
特征 (Expression) – 用于计算特征的表达式
freq (str) – 它将传递给重采样方法,以便根据给定的频率进行重采样
func (method) – 获取重采样值的方法 一些表达式是高频使用的
- class qlib.data.ops.OpsWrapper
运维包装器
- __init__()
- register(ops_list: List[Type[ExpressionOps] | dict])
注册操作符
- Parameters:
ops_list (List[Union[Type[ExpressionOps], dict]]) –
如果类型(ops_list)是List[Type[ExpressionOps]],ops_list的每个元素代表操作符类,应该是ExpressionOps的子类。
如果类型(ops_list)是List[dict],ops_list的每个元素代表操作符的配置,其格式如下:
{ "class": class_name, "module_path": path, }
注意:class应该是操作符的类名,module_path应该是python模块或文件路径。
- qlib.data.ops.register_all_ops(C)
注册所有操作符
缓存
- class qlib.data.cache.MemCacheUnit(*args, **kwargs)
内存缓存单元。
- __init__(*args, **kwargs)
- property limited
内存缓存是否有限
- class qlib.data.cache.MemCache(mem_cache_size_limit=None, limit_type='length')
内存缓存。
- __init__(mem_cache_size_limit=None, limit_type='length')
- Parameters:
mem_cache_size_limit – 缓存最大大小。
limit_type – 长度或大小;长度(调用函数:len),大小(调用函数:sys.getsizeof)。
- class qlib.data.cache.ExpressionCache(provider)
表达式缓存机制的基类。
此类用于包装表达式提供者,并带有自定义的表达式缓存机制。
注意
重写_uri和_expression方法以创建您自己的表达式缓存机制。
- expression(instrument, field, start_time, end_time, freq)
获取表达式数据。
注意
与表达式提供者中的expression方法相同的接口
- update(cache_uri: str | Path, freq: str = 'day')
将表达式缓存更新到最新的日历。
重写此方法以定义如何根据用户自己的缓存机制更新表达式缓存。
- Parameters:
cache_uri (str 或 Path) – 表达式缓存文件的完整URI(包括目录路径)。
freq (str) –
- Returns:
0(成功更新)/ 1(无需更新)/ 2(更新失败)。
- Return type:
整数
- class qlib.data.cache.DatasetCache(provider)
数据集缓存机制的基类。
此类用于使用自定义数据集缓存机制包装数据集提供者。
注意
重写 _uri 和 _dataset 方法以创建您自己的数据集缓存机制。
- dataset(instruments, fields, start_time=None, end_time=None, freq='day', disk_cache=1, inst_processors=[])
获取特征数据集。
注意
与数据集提供者中的dataset方法相同的接口
注意
服务器使用redis_lock来确保不会触发读写冲突,但没有考虑客户端读取者。
- update(cache_uri: str | Path, freq: str = 'day')
更新数据集缓存到最新的日历。
重写此方法以定义如何根据用户自己的缓存机制更新数据集缓存。
- Parameters:
cache_uri (str 或 Path) – 数据集缓存文件的完整URI(包括目录路径)。
freq (str) –
- Returns:
0(成功更新)/ 1(无需更新)/ 2(更新失败)
- Return type:
整数
- static cache_to_origin_data(data, fields)
将缓存数据转换为原始数据
- Parameters:
data – pd.DataFrame, 缓存数据。
fields – 特征字段。
- Returns:
pd.DataFrame.
- static normalize_uri_args(instruments, fields, freq)
规范化URI参数
- class qlib.data.cache.DiskExpressionCache(provider, **kwargs)
为服务器准备的缓存机制。
- __init__(provider, **kwargs)
- gen_expression_cache(expression_data, cache_path, instrument, field, freq, last_update)
使用二进制文件保存类似特征数据。
- update(sid, cache_uri, freq: str = 'day')
将表达式缓存更新到最新的日历。
重写此方法以定义如何根据用户自己的缓存机制更新表达式缓存。
- Parameters:
cache_uri (str 或 Path) – 表达式缓存文件的完整URI(包括目录路径)。
freq (str) –
- Returns:
0(成功更新)/ 1(无需更新)/ 2(更新失败)。
- Return type:
整数
- class qlib.data.cache.DiskDatasetCache(provider, **kwargs)
为服务器准备的缓存机制。
- __init__(provider, **kwargs)
- classmethod read_data_from_cache(cache_path: str | Path, start_time, end_time, fields)
read_cache_from
此函数可以从磁盘缓存数据集中读取数据
- Parameters:
cache_path –
start_time –
end_time –
fields – 数据集缓存的字段顺序已排序。因此,重新排列列以使其一致。
- Returns:
- class IndexManager(cache_path: str | Path)
锁不在类中考虑。请在代码外部考虑锁。 此类是磁盘数据的代理。
- __init__(cache_path: str | Path)
- gen_dataset_cache(cache_path: str | Path, instruments, fields, freq, inst_processors=[])
注意
此函数不考虑缓存读写锁。请在此函数外部获取锁。
缓存格式包含3部分(后跟典型文件名)。
索引 : cache/d41366901e25de3ec47297f12e2ba11d.index
文件的内容可能采用以下格式(pandas.Series)
start end 1999-11-10 00:00:00 0 1 1999-11-11 00:00:00 1 2 1999-11-12 00:00:00 2 3 ...
注意
开始是关闭的。结束是开放的!!!!!
每行包含两个元素
,并以时间戳作为其索引。 它表示timestamp数据的start_index(包含)和end_index(不包含)。
元数据:cache/d41366901e25de3ec47297f12e2ba11d.meta
数据 : cache/d41366901e25de3ec47297f12e2ba11d
这是一个按日期时间排序的hdf文件
- Parameters:
cache_path – 存储缓存的路径。
instruments – 用于存储缓存的工具。
fields – 用于存储缓存的字段。
freq – 存储缓存的频率。
inst_processors – 仪器处理器。
:返回类型 pd.DataFrame; 返回的DataFrame的字段与函数的参数一致。
- update(cache_uri, freq: str = 'day')
更新数据集缓存到最新的日历。
重写此方法以定义如何根据用户自己的缓存机制更新数据集缓存。
- Parameters:
cache_uri (str 或 Path) – 数据集缓存文件的完整URI(包括目录路径)。
freq (str) –
- Returns:
0(成功更新)/ 1(无需更新)/ 2(更新失败)
- Return type:
整数
存储
- class qlib.data.storage.storage.BaseStorage
- class qlib.data.storage.storage.CalendarStorage(freq: str, future: bool, **kwargs)
CalendarStorage的方法和List的同名方法的行为保持一致
- __init__(freq: str, future: bool, **kwargs)
- property data: Iterable[str]
获取所有数据
- Raises:
ValueError – 如果数据(存储)不存在,则引发 ValueError
- index(value: str) int
- Raises:
ValueError – 如果数据(存储)不存在,则引发 ValueError
- class qlib.data.storage.storage.InstrumentStorage(market: str, freq: str, **kwargs)
- __init__(market: str, freq: str, **kwargs)
- property data: Dict[str, List[Tuple[str, str]]]
获取所有数据
- Raises:
ValueError – 如果数据(存储)不存在,则引发 ValueError
- update([E, ]**F) None. Update D from mapping/iterable E and F.
注释
如果 E 存在并且具有 .keys() 方法,则执行: for k in E: D[k] = E[k]
如果 E 存在且缺少 .keys() 方法,则执行: for (k, v) in E: D[k] = v
无论哪种情况,接下来都是:for k, v in F.items(): D[k] = v
- class qlib.data.storage.storage.FeatureStorage(instrument: str, field: str, freq: str, **kwargs)
- __init__(instrument: str, field: str, freq: str, **kwargs)
- property data: Series
获取所有数据
注释
如果数据(存储)不存在,返回空的pd.Series:return pd.Series(dtype=np.float32)
- property start_index: int | None
获取 FeatureStorage 起始索引
注释
如果数据(存储)不存在,返回 None
- property end_index: int | None
获取 FeatureStorage 结束索引
注释
数据范围的右索引(两侧都是闭合的)
下一个数据追加点将是 end_index + 1
如果数据(存储)不存在,返回 None
- write(data_array: List | ndarray | Tuple, index: int | None = None)
从索引开始将data_array写入FeatureStorage。
注释
如果索引为None,则将data_array附加到特征。
如果 len(data_array) == 0; 返回
如果 (index - self.end_index) >= 1,self[end_index+1: index] 将会被填充为 np.nan
示例
feature: 3 4 4 5 5 6 >>> self.write([6, 7], index=6) feature: 3 4 4 5 5 6 6 6 7 7 >>> self.write([8], index=9) feature: 3 4 4 5 5 6 6 6 7 7 8 np.nan 9 8 >>> self.write([1, np.nan], index=3) feature: 3 1 4 np.nan 5 6 6 6 7 7 8 np.nan 9 8
- rebase(start_index: int | None = None, end_index: int | None = None)
重新调整FeatureStorage的start_index和end_index。
start_index 和 end_index 是闭区间:[start_index, end_index]
示例
feature: 3 4 4 5 5 6 >>> self.rebase(start_index=4) feature: 4 5 5 6 >>> self.rebase(start_index=3) feature: 3 np.nan 4 5 5 6 >>> self.write([3], index=3) feature: 3 3 4 5 5 6 >>> self.rebase(end_index=4) feature: 3 3 4 5 >>> self.write([6, 7, 8], index=4) feature: 3 3 4 6 5 7 6 8 >>> self.rebase(start_index=4, end_index=5) feature: 4 6 5 7
- rewrite(data: List | ndarray | Tuple, index: int)
用数据覆盖FeatureStorage中的所有数据
- Parameters:
data (Union[List, np.ndarray, Tuple]) – 数据
index (int) – 数据起始索引
- class qlib.data.storage.file_storage.FileStorageMixin
FileStorageMixin,适用于FileXXXStorage 子类需要具有provider_uri、freq、storage_name、file_name属性
- check()
检查 self.uri
- Raises:
ValueError –
- class qlib.data.storage.file_storage.FileCalendarStorage(freq: str, future: bool, provider_uri: dict | None = None, **kwargs)
- __init__(freq: str, future: bool, provider_uri: dict | None = None, **kwargs)
- property data: List[str]
获取所有数据
- Raises:
ValueError – 如果数据(存储)不存在,则引发 ValueError
- index(value: str) int
- Raises:
ValueError – 如果数据(存储)不存在,则引发 ValueError
- class qlib.data.storage.file_storage.FileInstrumentStorage(market: str, freq: str, provider_uri: dict | None = None, **kwargs)
- __init__(market: str, freq: str, provider_uri: dict | None = None, **kwargs)
- property data: Dict[str, List[Tuple[str, str]]]
获取所有数据
- Raises:
ValueError – 如果数据(存储)不存在,则引发 ValueError
- update([E, ]**F) None. Update D from mapping/iterable E and F.
注释
如果 E 存在并且具有 .keys() 方法,则执行: for k in E: D[k] = E[k]
如果 E 存在且缺少 .keys() 方法,则执行: for (k, v) in E: D[k] = v
无论哪种情况,接下来都是:for k, v in F.items(): D[k] = v
- class qlib.data.storage.file_storage.FileFeatureStorage(instrument: str, field: str, freq: str, provider_uri: dict | None = None, **kwargs)
- __init__(instrument: str, field: str, freq: str, provider_uri: dict | None = None, **kwargs)
- property data: Series
获取所有数据
注释
如果数据(存储)不存在,返回空的pd.Series:return pd.Series(dtype=np.float32)
- write(data_array: List | ndarray, index: int | None = None) None
从索引开始将data_array写入FeatureStorage。
注释
如果索引为None,则将data_array附加到特征。
如果 len(data_array) == 0; 返回
如果 (index - self.end_index) >= 1,self[end_index+1: index] 将会被填充为 np.nan
示例
feature: 3 4 4 5 5 6 >>> self.write([6, 7], index=6) feature: 3 4 4 5 5 6 6 6 7 7 >>> self.write([8], index=9) feature: 3 4 4 5 5 6 6 6 7 7 8 np.nan 9 8 >>> self.write([1, np.nan], index=3) feature: 3 1 4 np.nan 5 6 6 6 7 7 8 np.nan 9 8
- property start_index: int | None
获取 FeatureStorage 起始索引
注释
如果数据(存储)不存在,返回 None
- property end_index: int | None
获取 FeatureStorage 结束索引
注释
数据范围的右索引(两侧都是闭合的)
下一个数据追加点将是 end_index + 1
如果数据(存储)不存在,返回 None
数据集
数据集类
- class qlib.data.dataset.__init__.Dataset(**kwargs)
为模型训练和推理准备数据。
- __init__(**kwargs)
init 旨在完成以下步骤:
- init the sub instance and the state of the dataset(info to prepare the data)
准备数据的基本状态的名称不应以“_”开头,以便在序列化时可以在磁盘上序列化。
- setup data
与数据相关的属性名称应以“_”开头,以便在序列化时不会保存到磁盘上。
数据可以指定信息以计算准备所需的基本数据
- config(**kwargs)
config 用于配置无法从数据中学习的参数
- setup_data(**kwargs)
设置数据。
我们为以下情况拆分setup_data函数:
用户有一个带有学习状态的Dataset对象在磁盘上。
用户从磁盘加载数据集对象。
用户调用setup_data来加载新数据。
用户根据之前的状态为模型准备数据。
- prepare(**kwargs) object
数据集的类型取决于模型。(可能是pd.DataFrame、pytorch.DataLoader等) 参数应指定准备数据的范围 该方法应: - 处理数据
返回处理后的数据
- Returns:
返回对象
- Return type:
对象
- class qlib.data.dataset.__init__.DatasetH(handler: Dict | DataHandler, segments: Dict[str, Tuple], fetch_kwargs: Dict = {}, **kwargs)
带有数据(H)处理器的数据集
用户应尝试将数据预处理函数放入处理程序中。 只有以下数据处理函数应放置在数据集中:
处理与特定模型相关。
处理与数据分割有关。
- __init__(handler: Dict | DataHandler, segments: Dict[str, Tuple], fetch_kwargs: Dict = {}, **kwargs)
设置基础数据。
- Parameters:
handler (Union[dict, DataHandler]) –
handler 可以是:
DataHandler 的实例
DataHandler 的配置。请参考 DataHandler
segments (dict) –
描述数据分段的选项。 以下是一些示例:
1) 'segments': { 'train': ("2008-01-01", "2014-12-31"), 'valid': ("2017-01-01", "2020-08-01",), 'test': ("2015-01-01", "2016-12-31",), } 2) 'segments': { 'insample': ("2008-01-01", "2014-12-31"), 'outsample': ("2017-01-01", "2020-08-01",), }
- config(handler_kwargs: dict | None = None, **kwargs)
初始化数据集H
- Parameters:
handler_kwargs (dict) –
DataHandler的配置,可能包含以下参数:
DataHandler.conf_data的参数,例如'instruments'、'start_time'和'end_time'。
kwargs (dict) –
DatasetH的配置,例如
- segmentsdict
segments的配置,与self.__init__中的‘segments’相同
- setup_data(handler_kwargs: dict | None = None, **kwargs)
设置数据
- Parameters:
handler_kwargs (dict) –
DataHandler的初始化参数,可能包括以下参数:
init_type : 处理程序的初始化类型
enable_cache : 是否启用缓存
- prepare(segments: List[str] | Tuple[str] | str | slice | Index, col_set='__all', data_key='infer', **kwargs) List[DataFrame] | DataFrame
准备用于学习和推理的数据。
- Parameters:
segments (Union[List[Text], Tuple[Text], Text, slice]) –
描述要准备的数据范围 以下是一些示例:
’train’
[‘train’, ‘valid’]
col_set (str) –
col_set 将在获取数据时传递给 self.handler。 TODO: 使其自动化:
选择 DK_I 用于测试数据
选择 DK_L 用于训练数据。
data_key (str) – 要获取的数据: DK_* 默认是 DK_I,表示获取用于推理的数据。
kwargs –
- kwargs 可能包含的参数:
- flt_colstr
它仅存在于 TSDatasetH 中,可用于添加一列数据(True 或 False)以过滤数据。 此参数仅在 TSDatasetH 实例中支持。
- Return type:
联合[列表[pd.DataFrame], pd.DataFrame]
- Raises:
NotImplementedError: –
数据加载器
- class qlib.data.dataset.loader.DataLoader
DataLoader 设计用于从原始数据源加载原始数据。
- abstract load(instruments, start_time=None, end_time=None) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多重索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
- Parameters:
instruments (str 或 dict) – 它可以是市场名称或由InstrumentProvider生成的仪器配置文件。 如果instruments的值为None,则表示没有进行过滤。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
- Returns:
数据从底层源加载
- Return type:
pd.DataFrame
- Raises:
KeyError: – 如果不支持仪器过滤器,则引发 KeyError
- class qlib.data.dataset.loader.DLWParser(config: list | tuple | dict)
用于特征和名称的(D)ata(L)oader (W)ith (P)arser
提取这个类以便QlibDataLoader和其他数据加载器(如QdbDataLoader)可以共享字段。
- __init__(config: list | tuple | dict)
- Parameters:
config (Union[list, tuple, dict]) –
Config 将用于描述字段和列名
<config> := { "group_name1": <fields_info1> "group_name2": <fields_info2> } 或 <config> := <fields_info> <fields_info> := ["expr", ...] | (["expr", ...], ["col_name", ...]) # 注意:列表或元组在解析时将被视为相同的东西
- abstract load_group_df(instruments, exprs: list, names: list, start_time: str | Timestamp | None = None, end_time: str | Timestamp | None = None, gp_name: str | None = None) DataFrame
加载特定组的数据框
- Parameters:
instruments – 仪器。
exprs (list) – 描述数据内容的表达式。
names (list) – 数据的名称。
- Returns:
查询的数据框。
- Return type:
pd.DataFrame
- load(instruments=None, start_time=None, end_time=None) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多重索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
- Parameters:
instruments (str 或 dict) – 它可以是市场名称或由InstrumentProvider生成的仪器配置文件。 如果instruments的值为None,则表示没有进行过滤。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
- Returns:
数据从底层源加载
- Return type:
pd.DataFrame
- Raises:
KeyError: – 如果不支持仪器过滤器,则引发 KeyError
- class qlib.data.dataset.loader.QlibDataLoader(config: Tuple[list, tuple, dict], filter_pipe: List | None = None, swap_level: bool = True, freq: str | dict = 'day', inst_processors: dict | list | None = None)
与QlibDataLoader相同。字段可以通过配置定义
- __init__(config: Tuple[list, tuple, dict], filter_pipe: List | None = None, swap_level: bool = True, freq: str | dict = 'day', inst_processors: dict | list | None = None)
- Parameters:
config (Tuple[list, tuple, dict]) – 请参考DLWParser的文档
filter_pipe – 仪器的过滤管道
swap_level – 是否交换MultiIndex的层级
freq (dict 或 str) – 如果 type(config) == dict 且 type(freq) == str,则使用 freq 加载配置数据。 如果 type(config) == dict 且 type(freq) == dict,则使用 freq[
] 加载 config[ ] 数据 inst_processors (dict | list) – 如果 inst_processors 不为 None 且 type(config) == dict;使用 inst_processors[
] 加载 config[ ] 数据 如果 inst_processors 是一个列表,那么它将应用于所有组。
- load_group_df(instruments, exprs: list, names: list, start_time: str | Timestamp | None = None, end_time: str | Timestamp | None = None, gp_name: str | None = None) DataFrame
加载特定组的数据框
- Parameters:
instruments – 仪器。
exprs (list) – 用于描述数据内容的表达式。
names (list) – 数据的名称。
- Returns:
查询的数据框。
- Return type:
pd.DataFrame
- class qlib.data.dataset.loader.StaticDataLoader(config: dict | str | DataFrame, join='outer')
支持从文件或提供的来源加载数据的DataLoader。
- __init__(config: dict | str | DataFrame, join='outer')
- Parameters:
config (dict) – {fields_group: <路径或对象>}
join (str) – 如何对齐不同的数据框
- load(instruments=None, start_time=None, end_time=None) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多重索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
- Parameters:
instruments (str 或 dict) – 它可以是市场名称或由InstrumentProvider生成的仪器配置文件。 如果instruments的值为None,则表示没有进行过滤。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
- Returns:
数据从底层源加载
- Return type:
pd.DataFrame
- Raises:
KeyError: – 如果不支持仪器过滤器,则引发 KeyError
- class qlib.data.dataset.loader.NestedDataLoader(dataloader_l: List[Dict], join='left')
我们有多个DataLoader,我们可以使用这个类来组合它们。
- __init__(dataloader_l: List[Dict], join='left') None
- Parameters:
dataloader_l (list[dict]) –
一个数据加载器的列表,例如
nd = NestedDataLoader( dataloader_l=[ { "class": "qlib.contrib.data.loader.Alpha158DL", }, { "class": "qlib.contrib.data.loader.Alpha360DL", "kwargs": { "config": { "label": ( ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]) } } } ] )
join – 在合并时它将传递给pd.concat。
- load(instruments=None, start_time=None, end_time=None) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多重索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
- Parameters:
instruments (str 或 dict) – 它可以是市场名称或由InstrumentProvider生成的仪器配置文件。 如果instruments的值为None,则表示没有进行过滤。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
- Returns:
数据从底层源加载
- Return type:
pd.DataFrame
- Raises:
KeyError: – 如果不支持仪器过滤器,则引发 KeyError
- class qlib.data.dataset.loader.DataLoaderDH(handler_config: dict, fetch_kwargs: dict = {}, is_group=False)
基于(D)ata (H)andler的DataLoader 它设计用于从数据处理器加载多个数据 - 如果您只想从单个数据处理器加载数据,您可以在单个数据处理器中编写它们
待办事项:是什么让这个模块不那么容易使用。
适用于在线场景
底层数据处理器应进行配置。但数据加载器未提供此类接口和钩子。
- __init__(handler_config: dict, fetch_kwargs: dict = {}, is_group=False)
- Parameters:
handler_config (dict) –
handler_config 将用于描述处理程序
<handler_config> := { "group_name1": <handler> "group_name2": <handler> } 或 <handler_config> := <handler> <handler> := DataHandler 实例 | DataHandler 配置
fetch_kwargs (dict) – fetch_kwargs 将用于描述 fetch 方法的不同参数,例如 col_set、squeeze、data_key 等。
is_group (bool) – is_group 将用于描述 handler_config 的键是否为组
- load(instruments=None, start_time=None, end_time=None) DataFrame
将数据加载为 pd.DataFrame。
数据示例(列的多重索引是可选的):
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
- Parameters:
instruments (str 或 dict) – 它可以是市场名称或由InstrumentProvider生成的仪器配置文件。 如果instruments的值为None,则表示没有进行过滤。
start_time (str) – 时间范围的开始。
end_time (str) – 时间范围的结束时间。
- Returns:
数据从底层源加载
- Return type:
pd.DataFrame
- Raises:
KeyError: – 如果不支持仪器过滤器,则引发 KeyError
数据处理器
- class qlib.data.dataset.handler.DataHandler(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, init_data=True, fetch_orig=True)
使用处理程序的步骤 1. 初始化数据处理器(通过init调用)。 2. 使用数据。
数据处理器尝试维护一个具有两个级别的处理器。 datetime 和 instruments。
可以支持索引级别的任何顺序(顺序将在数据中隐含)。 当数据框索引名称缺失时,将使用顺序<datetime, instruments>。
数据示例: 列的多重索引是可选的。
feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
提高数据处理器性能的技巧 - 使用col_set=CS_RAW获取数据将返回原始数据,并可能避免在调用loc时pandas复制数据
- __init__(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, init_data=True, fetch_orig=True)
- Parameters:
instruments – 要检索的股票列表。
start_time – 原始数据的开始时间。
end_time – 原始数据的结束时间。
data_loader (Union[dict, str, DataLoader]) – 用于加载数据的数据加载器。
init_data – 在构造函数中初始化原始数据。
fetch_orig (bool) – 如果可能,返回原始数据而不是副本。
- config(**kwargs)
数据的配置。 # 从数据源加载哪些数据
此方法将在从数据集中加载腌制处理程序时使用。 数据将使用不同的时间范围进行初始化。
- setup_data(enable_cache: bool = False)
在多次运行初始化的情况下设置数据
它负责维护以下变量 1) self._data
- Parameters:
enable_cache (bool) –
默认值为 false:
如果 enable_cache == True:
处理后的数据将保存在磁盘上,当我们下次调用 init 时,处理程序将直接从磁盘加载缓存的数据
- fetch(selector: Timestamp | slice | str | Index = slice(None, None, None), level: str | int = 'datetime', col_set: str | List[str] = '__all', squeeze: bool = False, proc_func: Callable | None = None) DataFrame
从底层数据源获取数据
设计动机: - 为底层数据提供统一的接口。 - 有可能使接口更加友好。 - 用户可以在这一额外层中获取数据时提高性能。
- Parameters:
selector (Union[pd.Timestamp, slice, str]) –
描述如何通过索引选择数据 它可以分为以下几类
获取单个索引
获取一系列索引
一个切片范围
特定索引的pd.Index
可能会出现以下冲突
["20200101", "20210101"] 是选择这个切片还是这两天?
切片具有更高的优先级
level (Union[str, int]) – 选择数据的索引级别
col_set (Union[str, List[str]]) –
如果 isinstance(col_set, str):
选择一组有意义的 pd.Index 列。(例如:特征、列)
如果 col_set == CS_RAW:
将返回原始数据集。
如果 isinstance(col_set, List[str]):
选择多组有意义的列,返回的数据具有多个层次
proc_func (Callable) –
提供一个钩子函数,用于在获取数据之前处理数据
一个例子来解释钩子的必要性:
一个数据集学习了一些处理器来处理与数据分割相关的数据
每次准备数据时都会应用这些处理器
学习到的处理器要求在拟合和应用时数据框保持相同的格式
然而,数据格式会根据参数而变化
因此,处理器应该应用于底层数据
squeeze (bool) – 是否压缩列和索引
- Return type:
pd.DataFrame.
- get_cols(col_set='__all') list
获取列名
- Parameters:
col_set (str) – 选择一组有意义的列。(例如:特征、列)
- Returns:
列名列表
- Return type:
列表
- get_range_selector(cur_date: Timestamp | str, periods: int) slice
根据周期数获取范围选择器
- Parameters:
cur_date (pd.Timestamp 或 str) – 当前日期
periods (int) – 周期数
- get_range_iterator(periods: int, min_periods: int | None = None, **kwargs) Iterator[Tuple[Timestamp, DataFrame]]
获取具有给定周期的切片数据的迭代器
- Parameters:
periods (int) – 周期数。
min_periods (int) – 切片数据帧的最小周期。
kwargs (dict) – 将被传递给 self.fetch。
- class qlib.data.dataset.handler.DataHandlerLP(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, infer_processors: List = [], learn_processors: List = [], shared_processors: List = [], process_type='append', drop_raw=False, **kwargs)
带有(L)earnable (P)rocessor的DataHandler
此处理程序将生成三块数据,格式为pd.DataFrame。
DK_R / self._data: 从加载器加载的原始数据
DK_I / self._infer: 用于推理处理的数据
DK_L / self._learn: 用于学习模型处理的数据。
使用不同处理器工作流进行学习和推理的动机 以下是一些示例。
用于学习和推理的工具集可能不同。
某些样本的处理可能依赖于标签(例如,一些达到限制的样本可能需要额外处理或被丢弃)。
这些处理器仅适用于学习阶段。
数据处理器的提示
为了减少内存成本
drop_raw=True: 这将会在原始数据上就地修改数据;
请注意,像self._infer或self._learn这样的处理数据与Qlib的Dataset中的“train”和“test”等segments概念不同。
处理过的数据如 self._infer 或 self._learn 是通过不同处理器处理的基础数据
segments 在 Qlib 的 Dataset 中,如“train”和“test”,只是查询数据时的时间分段(在时间序列中,“train”通常早于“test”)。
例如,您可以在“训练”时间段查询由infer_processors处理的data._infer。
- __init__(instruments=None, start_time=None, end_time=None, data_loader: dict | str | DataLoader | None = None, infer_processors: List = [], learn_processors: List = [], shared_processors: List = [], process_type='append', drop_raw=False, **kwargs)
- Parameters:
infer_processors (list) –
用于生成推理数据的处理器列表
示例:
1) classname & kwargs: { "class": "MinMaxNorm", "kwargs": { "fit_start_time": "20080101", "fit_end_time": "20121231" } } 2) Only classname: "DropnaFeature" 3) object instance of Processor
learn_processors (list) – 类似于 infer_processors,但用于生成学习模型的数据
process_type (str) –
PTYPE_I = ‘independent’
self._infer 将由 infer_processors 处理
self._learn 将由 learn_processors 处理
PTYPE_A = ‘append’
self._infer 将由 infer_processors 处理
self._learn 将由 infer_processors + learn_processors 处理
(例如,self._infer 由 learn_processors 处理)
drop_raw (bool) – 是否删除原始数据
- fit()
在不处理数据的情况下拟合数据
- fit_process_data()
拟合和处理数据
fit 的输入将是前一个处理器的输出
- process_data(with_fit: bool = False)
处理数据。如果需要,请使用processor.fit
符号:(数据) [处理器]
# 当 self.process_type == DataHandlerLP.PTYPE_I 时的数据处理流程
(self._data)-[shared_processors]-(_shared_df)-[learn_processors]-(_learn_df) \ -[infer_processors]-(_infer_df)
# 当 self.process_type == DataHandlerLP.PTYPE_A 时的数据处理流程
(self._data)-[shared_processors]-(_shared_df)-[infer_processors]-(_infer_df)-[learn_processors]-(_learn_df)
- Parameters:
with_fit (bool) – fit 的输入将是前一个处理器的输出
- config(processor_kwargs: dict | None = None, **kwargs)
数据的配置。 # 从数据源加载哪些数据
此方法将在从数据集中加载腌制处理程序时使用。 数据将使用不同的时间范围进行初始化。
- setup_data(init_type: str = 'fit_seq', **kwargs)
设置数据以防多次运行初始化
- Parameters:
init_type (str) – 上面列出的类型 IT_*。
enable_cache (bool) –
默认值为 false:
如果 enable_cache == True:
处理后的数据将保存在磁盘上,当我们下次调用 init 时,处理程序将直接从磁盘加载缓存的数据
- fetch(selector: Timestamp | slice | str = slice(None, None, None), level: str | int = 'datetime', col_set='__all', data_key: Literal['raw', 'infer', 'learn'] = 'infer', squeeze: bool = False, proc_func: Callable | None = None) DataFrame
从底层数据源获取数据
- Parameters:
selector (Union[pd.Timestamp, slice, str]) – 描述如何通过索引选择数据。
level (Union[str, int]) – 选择数据的索引级别。
col_set (str) – 选择一组有意义的列。(例如:特征、列)。
data_key (str) – 要获取的数据: DK_*.
proc_func (Callable) – 请参考DataHandler.fetch的文档
- Return type:
pd.DataFrame
- get_cols(col_set='__all', data_key: Literal['raw', 'infer', 'learn'] = 'infer') list
获取列名
- Parameters:
col_set (str) – 选择一组有意义的列。(例如:特征、列)。
data_key (DATA_KEY_TYPE) – 要获取的数据: DK_*.
- Returns:
列名列表
- Return type:
列表
- classmethod cast(handler: DataHandlerLP) DataHandlerLP
Motivation
用户在他的自定义包中创建了一个数据处理器。然后他希望将处理后的处理器分享给其他用户,而不引入包依赖和复杂的数据处理逻辑。
这个类通过将类转换为DataHandlerLP并仅保留处理后的数据来实现这一点
- Parameters:
handler (DataHandlerLP) – DataHandlerLP 的一个子类
- Returns:
转换后的处理数据
- Return type:
- classmethod from_df(df: DataFrame) DataHandlerLP
动机: - 当用户想要快速获取数据处理程序时。
创建的数据处理程序将只有一个共享的Dataframe,没有处理器。 创建处理程序后,用户可能经常希望转储处理程序以便重用 这是一个典型的使用案例
from qlib.data.dataset import DataHandlerLP dh = DataHandlerLP.from_df(df) dh.to_pickle(fname, dump_all=True)
待办事项: - StaticDataLoader 相当慢。它不必再次复制数据…
处理器
- qlib.data.dataset.processor.get_group_columns(df: DataFrame, group: str | None)
从多索引列的DataFrame中获取一组列
- Parameters:
df (pd.DataFrame) – 包含多列。
group (str) – 特征组的名称,即组索引的第一级值。
- class qlib.data.dataset.processor.Processor
- fit(df: DataFrame | None = None)
学习数据处理参数
- Parameters:
df (pd.DataFrame) – 当我们逐个使用处理器拟合和处理数据时。拟合函数依赖于前一个处理器的输出,即 df。
- is_for_infer() bool
这个处理器是否可用于推理 有些处理器不能用于推理。
- Returns:
如果它可用于推理。
- Return type:
布尔
- readonly() bool
处理器在处理时是否将输入数据视为只读(即不写入输入数据)
了解只读信息有助于Handler避免不必要的复制
- config(**kwargs)
配置可序列化对象
- Parameters:
keys (kwargs 可能包括以下内容) –
- dump_allbool
对象是否转储所有对象
- excludelist
哪些属性不会被转储
- includelist
哪些属性会被转储
recursive (bool) – 配置是否递归
- class qlib.data.dataset.processor.DropnaProcessor(fields_group=None)
- __init__(fields_group=None)
- readonly()
处理器在处理时是否将输入数据视为只读(即不写入输入数据)
了解只读信息有助于Handler避免不必要的复制
- class qlib.data.dataset.processor.DropnaLabel(fields_group='label')
- __init__(fields_group='label')
- is_for_infer() bool
样本根据标签被丢弃。因此,它不适用于推理
- class qlib.data.dataset.processor.DropCol(col_list=[])
- __init__(col_list=[])
- readonly()
处理器在处理时是否将输入数据视为只读(即不写入输入数据)
了解只读信息有助于Handler避免不必要的复制
- class qlib.data.dataset.processor.FilterCol(fields_group='feature', col_list=[])
- __init__(fields_group='feature', col_list=[])
- readonly()
处理器在处理时是否将输入数据视为只读(即不写入输入数据)
了解只读信息有助于Handler避免不必要的复制
- class qlib.data.dataset.processor.TanhProcess
使用tanh处理噪声数据
- class qlib.data.dataset.processor.ProcessInf
处理无限
- class qlib.data.dataset.processor.Fillna(fields_group=None, fill_value=0)
处理NaN
- __init__(fields_group=None, fill_value=0)
- class qlib.data.dataset.processor.MinMaxNorm(fit_start_time, fit_end_time, fields_group=None)
- __init__(fit_start_time, fit_end_time, fields_group=None)
- fit(df: DataFrame | None = None)
学习数据处理参数
- Parameters:
df (pd.DataFrame) – 当我们逐个使用处理器拟合和处理数据时。拟合函数依赖于前一个处理器的输出,即 df。
- class qlib.data.dataset.processor.ZScoreNorm(fit_start_time, fit_end_time, fields_group=None)
Z分数归一化
- __init__(fit_start_time, fit_end_time, fields_group=None)
- fit(df: DataFrame | None = None)
学习数据处理参数
- Parameters:
df (pd.DataFrame) – 当我们逐个使用处理器拟合和处理数据时。拟合函数依赖于前一个处理器的输出,即 df。
- class qlib.data.dataset.processor.RobustZScoreNorm(fit_start_time, fit_end_time, fields_group=None, clip_outlier=True)
鲁棒Z分数归一化
- Use robust statistics for Z-Score normalization:
mean(x) = median(x) std(x) = MAD(x) * 1.4826
- Reference:
- __init__(fit_start_time, fit_end_time, fields_group=None, clip_outlier=True)
- fit(df: DataFrame | None = None)
学习数据处理参数
- Parameters:
df (pd.DataFrame) – 当我们逐个使用处理器拟合和处理数据时。拟合函数依赖于前一个处理器的输出,即 df。
- class qlib.data.dataset.processor.CSZScoreNorm(fields_group=None, method='zscore')
横截面Z分数归一化
- __init__(fields_group=None, method='zscore')
- class qlib.data.dataset.processor.CSRankNorm(fields_group=None)
横截面排名归一化。 “横截面”通常用于描述数据操作。 对不同股票的操作通常称为横截面操作。
例如,CSRankNorm 是一种操作,它按每天对数据进行分组,并在每天对所有股票进行排名。
关于3.46和0.5的解释
import numpy as np import pandas as pd x = np.random.random(10000) # for any variable x_rank = pd.Series(x).rank(pct=True) # if it is converted to rank, it will be a uniform distributed x_rank_norm = (x_rank - x_rank.mean()) / x_rank.std() # Normally, we will normalize it to make it like normal distribution x_rank.mean() # accounts for 0.5 1 / x_rank.std() # accounts for 3.46
- __init__(fields_group=None)
- class qlib.data.dataset.processor.CSZFillna(fields_group=None)
横截面填充空值
- __init__(fields_group=None)
- class qlib.data.dataset.processor.HashStockFormat
处理从df到哈希库存格式的存储
- class qlib.data.dataset.processor.TimeRangeFlt(start_time: Timestamp | str | None = None, end_time: Timestamp | str | None = None, freq: str = 'day')
这是一个用于筛选股票的过滤器。 只保留从start_time到end_time存在的数据(中间的存在性未检查)。 警告:可能会导致泄漏!!!
- __init__(start_time: Timestamp | str | None = None, end_time: Timestamp | str | None = None, freq: str = 'day')
- Parameters:
start_time (可选[联合[pd.Timestamp, str]]) – 数据必须早于(或等于)start_time None 表示数据不会基于start_time进行过滤
end_time (可选[联合[pd.Timestamp, str]]) – 类似于 start_time
freq (str) – 日历的频率
贡献
模型
- class qlib.model.base.Model
可学习模型
- fit(dataset: Dataset, reweighter: Reweighter)
从基础模型学习模型
注意
学习模型的属性名称不应以‘_’开头。这样模型才能被转储到磁盘。
以下代码示例展示了如何从dataset中检索x_train、y_train和w_train:
# get features and labels df_train, df_valid = dataset.prepare( ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L ) x_train, y_train = df_train["feature"], df_train["label"] x_valid, y_valid = df_valid["feature"], df_valid["label"] # get weights try: wdf_train, wdf_valid = dataset.prepare(["train", "valid"], col_set=["weight"], data_key=DataHandlerLP.DK_L) w_train, w_valid = wdf_train["weight"], wdf_valid["weight"] except KeyError as e: w_train = pd.DataFrame(np.ones_like(y_train.values), index=y_train.index) w_valid = pd.DataFrame(np.ones_like(y_valid.values), index=y_valid.index)
- Parameters:
dataset (Dataset) – 数据集将生成模型训练所需的处理数据。
- class qlib.model.base.ModelFT
模型 (F)ine(t)unable
- abstract finetune(dataset: Dataset)
基于给定数据集的微调模型
使用qlib.workflow.R微调模型的典型用例。
# start exp to train init model with R.start(experiment_name="init models"): model.fit(dataset) R.save_objects(init_model=model) rid = R.get_recorder().id # Finetune model based on previous trained model with R.start(experiment_name="finetune model"): recorder = R.get_recorder(recorder_id=rid, experiment_name="init models") model = recorder.load_object("init_model") model.finetune(dataset, num_boost_round=10)
- Parameters:
dataset (Dataset) – 数据集将从模型训练中生成处理后的数据集。
策略
- class qlib.contrib.strategy.TopkDropoutStrategy(*, topk, n_drop, method_sell='bottom', method_buy='top', hold_thresh=1, only_tradable=False, forbid_all_trade_at_limit=True, **kwargs)
- __init__(*, topk, n_drop, method_sell='bottom', method_buy='top', hold_thresh=1, only_tradable=False, forbid_all_trade_at_limit=True, **kwargs)
- Parameters:
topk (int) – 投资组合中的股票数量。
n_drop (int) – 每个交易日要替换的股票数量。
method_sell (str) – 丢弃方法,随机/底部。
method_buy (str) – 随机/顶部的dropout method_buy。
hold_thresh (int) – 卖出股票前的最小持有天数,将检查 current.get_stock_count(order.stock_id) >= self.hold_thresh。
only_tradable (bool) –
策略在买卖时是否只考虑可交易的股票。
如果 only_tradable 为真:
策略将根据股票信息的可交易状态做出决策,并避免买卖它们。
否则:
策略将在不检查股票可交易状态的情况下做出买卖决策。
forbid_all_trade_at_limit (bool) –
如果达到涨停或跌停时禁止所有交易。
如果 forbid_all_trade_at_limit:
当价格达到涨停/跌停时,策略将不会进行任何交易,即使在现实中允许,也不会在涨停时卖出或在跌停时买入。
否则:
策略将在涨停时卖出并在跌停时买入。
- generate_trade_decision(execute_result=None)
在每个交易时段生成交易决策
- Parameters:
execute_result (List[object], optional) –
交易决策的执行结果,默认为 None
首次调用 generate_trade_decision 时,execute_result 可能为 None
- class qlib.contrib.strategy.WeightStrategyBase(*, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWOInteract'>, **kwargs)
- __init__(*, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWOInteract'>, **kwargs)
- signal :
描述信号的信息。请参考qlib.backtest.signal.create_signal_from的文档 策略的决策将基于给定的信号
- trade_exchangeExchange
提供市场信息的交易所,用于处理订单和生成报告
如果 trade_exchange 为 None,self.trade_exchange 将被设置为 common_infra
它允许在不同的执行中使用不同的trade_exchanges。
例如:
在日常执行中,每日交换和每分钟交换都是可用的,但建议使用每日交换,因为它运行得更快。
在每分钟执行中,每日交换不可用,仅推荐使用每分钟交换。
- generate_target_weight_position(score, current, trade_start_time, trade_end_time)
根据该日期的分数和当前位置生成目标位置。现金不包括在位置中。
- Parameters:
score (pd.Series) – 该交易日的预测分数,索引为stock_id,包含‘score’列。
current (Position()) – 当前位置。
trade_start_time (pd.Timestamp) –
trade_end_time (pd.Timestamp) –
- generate_trade_decision(execute_result=None)
在每个交易时段生成交易决策
- Parameters:
execute_result (List[object], optional) –
交易决策的执行结果,默认为 None
首次调用 generate_trade_decision 时,execute_result 可能为 None
- class qlib.contrib.strategy.EnhancedIndexingStrategy(*, riskmodel_root, market='csi500', turn_limit=None, name_mapping={}, optimizer_kwargs={}, verbose=False, **kwargs)
增强型索引策略
增强型指数投资结合了主动管理和被动管理的艺术,旨在控制风险暴露(即跟踪误差)的同时,在投资组合回报方面超越基准指数(例如,S&P 500)。
用户需要准备如下的风险模型数据:
├── /path/to/riskmodel ├──── 20210101 ├────── factor_exp.{csv|pkl|h5} ├────── factor_cov.{csv|pkl|h5} ├────── specific_risk.{csv|pkl|h5} ├────── blacklist.{csv|pkl|h5} # optional
风险模型数据可以从风险数据提供商处获取。你也可以使用 qlib.model.riskmodel.structured.StructuredCovEstimator 来准备这些数据。
- Parameters:
riskmodel_path (str) – 风险模型路径
name_mapping (dict) – 替代文件名
- __init__(*, riskmodel_root, market='csi500', turn_limit=None, name_mapping={}, optimizer_kwargs={}, verbose=False, **kwargs)
- signal :
描述信号的信息。请参考qlib.backtest.signal.create_signal_from的文档 策略的决策将基于给定的信号
- trade_exchangeExchange
提供市场信息的交易所,用于处理订单和生成报告
如果 trade_exchange 为 None,self.trade_exchange 将被设置为 common_infra
它允许在不同的执行中使用不同的trade_exchanges。
例如:
在日常执行中,每日交换和每分钟交换都是可用的,但建议使用每日交换,因为它运行得更快。
在每分钟执行中,每日交换不可用,仅推荐使用每分钟交换。
- generate_target_weight_position(score, current, trade_start_time, trade_end_time)
根据该日期的分数和当前位置生成目标位置。现金不包括在位置中。
- Parameters:
score (pd.Series) – 该交易日的预测分数,索引为stock_id,包含‘score’列。
current (Position()) – 当前位置。
trade_start_time (pd.Timestamp) –
trade_end_time (pd.Timestamp) –
- class qlib.contrib.strategy.TWAPStrategy(outer_trade_decision: BaseTradeDecision = None, level_infra: LevelInfrastructure = None, common_infra: CommonInfrastructure = None, trade_exchange: Exchange = None)
TWAP交易策略
注意
此TWAP策略将在交易时进行四舍五入。这将使TWAP交易策略在总交易单位金额小于交易步长时更早生成订单。
- reset(outer_trade_decision: BaseTradeDecision | None = None, **kwargs)
- Parameters:
outer_trade_decision (BaseTradeDecision, 可选) –
- generate_trade_decision(execute_result=None)
在每个交易时段生成交易决策
- Parameters:
execute_result (List[object], optional) –
交易决策的执行结果,默认为 None
首次调用 generate_trade_decision 时,execute_result 可能为 None
- class qlib.contrib.strategy.SBBStrategyBase(outer_trade_decision: BaseTradeDecision = None, level_infra: LevelInfrastructure = None, common_infra: CommonInfrastructure = None, trade_exchange: Exchange = None)
在每两个相邻的交易条中选择更好的一个进行卖出或买入。
- reset(outer_trade_decision: BaseTradeDecision | None = None, **kwargs)
- Parameters:
outer_trade_decision (BaseTradeDecision, optional) –
- generate_trade_decision(execute_result=None)
在每个交易时段生成交易决策
- Parameters:
execute_result (List[object], optional) –
交易决策的执行结果,默认为 None
首次调用 generate_trade_decision 时,execute_result 可能为 None
- class qlib.contrib.strategy.SBBStrategyEMA(outer_trade_decision: BaseTradeDecision | None = None, instruments: List | str = 'csi300', freq: str = 'day', trade_exchange: Exchange | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs)
在每两个相邻的交易(B)ars中选择(S)更好的一个,根据(EMA)信号进行卖出或买入。
- __init__(outer_trade_decision: BaseTradeDecision | None = None, instruments: List | str = 'csi300', freq: str = 'day', trade_exchange: Exchange | None = None, level_infra: LevelInfrastructure | None = None, common_infra: CommonInfrastructure | None = None, **kwargs)
- Parameters:
instruments (Union[List, str], optional) – EMA信号的instruments,默认为“csi300”
freq (str, optional) – EMA信号的频率,默认为“day” 注意:freq 可能与 time_per_step 不同
- reset_level_infra(level_infra)
重置级别共享基础设施 - 重置交易日历后,信号将会改变
- class qlib.contrib.strategy.SoftTopkStrategy(model, dataset, topk, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWInteract'>, max_sold_weight=1.0, risk_degree=0.95, buy_method='first_fill', trade_exchange=None, level_infra=None, common_infra=None, **kwargs)
- __init__(model, dataset, topk, order_generator_cls_or_obj=<class 'qlib.contrib.strategy.order_generator.OrderGenWInteract'>, max_sold_weight=1.0, risk_degree=0.95, buy_method='first_fill', trade_exchange=None, level_infra=None, common_infra=None, **kwargs)
- Parameters:
topk (int) – 要购买的 top-N 股票
risk_degree (float) –
占总价值的持仓百分比 buy_method:
rank_fill: 首先分配权重给排名高的股票(1/topk max) average_fill: 平均分配权重给排名高的股票。
- get_risk_degree(trade_step=None)
返回您将用于投资的总价值的比例。 动态的risk_degree将导致市场时机选择
- generate_target_weight_position(score, current, trade_start_time, trade_end_time)
- Parameters:
score – 该交易日的预测分数,pd.Series,索引为stock_id,包含‘score’列
current – 当前位置,使用 Position() 类
trade_date –
交易日期
根据该日期的评分和当前持仓生成目标持仓
持仓中不考虑缓存
评估
- qlib.contrib.evaluate.risk_analysis(r, N: int | None = None, freq: str = 'day')
风险分析 注意: 年化收益的计算与年化收益的定义不同。 这是设计上的实现。 Qlib试图通过求和而不是乘积来累积收益,以避免累积曲线呈指数偏斜。 在Qlib中,所有年化收益的计算都遵循这一原则。
待办事项:添加一个参数以启用使用生产积累回报计算指标。
- Parameters:
r (pandas.Series) – 每日收益序列。
N (int) – 用于年化信息比率的缩放因子(日:252,周:50,月:12),至少需要存在N和freq中的一个
freq (str) – 用于计算缩放器的分析频率,至少需要存在N和freq中的一个
- qlib.contrib.evaluate.indicator_analysis(df, method='mean')
分析交易的统计时间序列指标
- Parameters:
df (pandas.DataFrame) –
- 列: 如 [‘pa’, ‘pos’, ‘ffr’, ‘deal_amount’, ‘value’].
- 必要字段:
’pa’ 是交易指标中的价格优势
’pos’ 是交易指标中的正利率
’ffr’ 是交易指标中的履行率
- 可选字段:
’deal_amount’ 是总交易量,仅在方法为 ‘amount_weighted’ 时必要
’value’ 是总交易价值,仅在方法为 ‘value_weighted’ 时必要
索引: Index(datetime)
method (str, optional) –
pa/ffr的统计方法,默认为“mean”
如果方法是‘mean’,计算每个交易指标的平均统计值
如果方法是‘amount_weighted’,计算每个交易指标的deal_amount加权平均统计值
如果方法是‘value_weighted’,计算每个交易指标的价值加权平均统计值
注意:pos的统计方法始终为“mean”
- Returns:
每个交易指标的统计值
- Return type:
pd.DataFrame
- qlib.contrib.evaluate.backtest_daily(start_time: str | Timestamp, end_time: str | Timestamp, strategy: str | dict | BaseStrategy, executor: str | dict | BaseExecutor | None = None, account: float | int | Position = 100000000.0, benchmark: str = 'SH000300', exchange_kwargs: dict | None = None, pos_type: str = 'Position')
初始化策略和执行器,然后执行每日频率的回测
- Parameters:
start_time (Union[str, pd.Timestamp]) – 回测的关闭开始时间 注意: 这将应用于最外层的执行者的日历。
end_time (Union[str, pd.Timestamp]) – 回测的结束时间 注意: 这将应用于最外层的执行器的日历。 例如,Executor[day](Executor[1min]),设置 end_time == 20XX0301 将包括20XX0301的所有分钟
strategy (Union[str, dict, BaseStrategy]) –
用于初始化最外层的投资组合策略。请参阅init_instance_by_config的文档以获取更多信息。
例如:
# 字典 strategy = { "class": "TopkDropoutStrategy", "module_path": "qlib.contrib.strategy.signal_strategy", "kwargs": { "signal": (model, dataset), "topk": 50, "n_drop": 5, }, } # BaseStrategy pred_score = pd.read_pickle("score.pkl")["score"] STRATEGY_CONFIG = { "topk": 50, "n_drop": 5, "signal": pred_score, } strategy = TopkDropoutStrategy(**STRATEGY_CONFIG) # 字符串示例。 # 1) 指定一个pickle对象 # - 路径如 'file:///
/obj.pkl' # 2) 指定一个类名 # - "ClassName": 将使用 getattr(module, "ClassName")()。 # 3) 指定带有类名的模块路径 # - "a.b.c.ClassName" 将使用 getattr(, "ClassName")()。 executor (Union[str, dict, BaseExecutor]) – 用于初始化最外层的执行器。
benchmark (str) – 用于报告的基准。
account (Union[float, int, Position]) –
描述如何创建账户的信息
对于 float 或 int:
仅使用初始现金创建账户
对于 Position:
使用带有头寸的账户
exchange_kwargs (dict) –
用于初始化Exchange的关键字参数 例如
exchange_kwargs = { "freq": freq, "limit_threshold": None, # limit_threshold 为 None,使用 C.limit_threshold "deal_price": None, # deal_price 为 None,使用 C.deal_price "open_cost": 0.0005, "close_cost": 0.0015, "min_cost": 5, }
pos_type (str) – 位置的类型。
- Returns:
report_normal (pd.DataFrame) – 回测报告
positions_normal (pd.DataFrame) – 回测持仓
- qlib.contrib.evaluate.long_short_backtest(pred, topk=50, deal_price=None, shift=1, open_cost=0, close_cost=0, trade_unit=None, limit_threshold=None, min_cost=5, subscribe_fields=[], extract_codes=False)
一个多空策略的回测
- Parameters:
pred – 在第T天产生的交易信号。
topk – 做空排名前k的证券和做多排名前k的证券。
deal_price – 交易的价格。
shift – 是否将预测结果向后移动一天。如果shift==1,交易日将为T+1。
open_cost – 开启事务的成本。
close_cost – 关闭交易成本。
trade_unit – 中国A股为100。
limit_threshold – 例如,限制移动0.1(10%),多头和空头具有相同的限制。
min_cost – 最小交易成本。
subscribe_fields – 订阅字段。
extract_codes – bool. 我们是否会将从预测中提取的代码传递给交易所。 注意:使用离线qlib会更快。
- Returns:
回测的结果,它由一个字典表示。 { “long”: long_returns(excess), “short”: short_returns(excess), “long_short”: long_short_returns}
报告
工作流程
实验管理器
- class qlib.workflow.expm.ExpManager(uri: str, default_exp_name: str | None)
这是用于管理实验的ExpManager类。API设计类似于mlflow。 (链接:https://mlflow.org/docs/latest/python_api/mlflow.html)
ExpManager 预期是一个单例(顺便说一下,我们可以有多个具有不同 uri 的 Experiment。用户可以从不同的 uri 获取不同的实验,然后比较它们的记录)。全局配置(即 `C)也是一个单例。
所以我们尝试将它们对齐。它们共享相同的变量,称为default uri。请参阅ExpManager.default_uri以了解变量共享的详细信息。
当用户开始一个实验时,用户可能希望将uri设置为特定的uri(在此期间它将覆盖默认uri),然后取消设置特定uri并回退到默认uri。ExpManager._active_exp_uri就是那个特定uri。
- __init__(uri: str, default_exp_name: str | None)
- start_exp(*, experiment_id: str | None = None, experiment_name: str | None = None, recorder_id: str | None = None, recorder_name: str | None = None, uri: str | None = None, resume: bool = False, **kwargs) Experiment
开始一个实验。此方法包括首先获取或创建一个实验,然后将其设置为活动状态。
维护_active_exp_uri包含在start_exp中,剩余的实现应包含在子类的_end_exp中
- Parameters:
experiment_id (str) – 当前实验的ID。
experiment_name (str) – 活动实验的名称。
recorder_id (str) – 要启动的记录器的ID。
recorder_name (str) – 要启动的记录器的名称。
uri (str) – 当前的跟踪URI。
resume (boolean) – 是否恢复实验和记录器。
- Return type:
一个活跃的实验。
- end_exp(recorder_status: str = 'SCHEDULED', **kwargs)
结束一个活跃的实验。
维护_active_exp_uri包含在end_exp中,剩余的实现应包含在子类的_end_exp中
- Parameters:
experiment_name (str) – 活动实验的名称。
recorder_status (str) – 实验活动记录器的状态。
- create_exp(experiment_name: str | None = None)
创建一个实验。
- Parameters:
experiment_name (str) – 实验名称,必须是唯一的。
- Return type:
一个实验对象。
- Raises:
ExpAlreadyExistError –
- search_records(experiment_ids=None, **kwargs)
获取符合实验搜索条件的记录的pandas DataFrame。 输入是用户想要应用的搜索条件。
- Returns:
一个pandas.DataFrame的记录,其中每个指标、参数和标签
都被扩展为它们自己的列,分别命名为metrics.*, params.*, 和 tags.*
对于没有特定指标、参数或标签的记录,它们的
值将分别为(NumPy) Nan, None, 或 None。
- get_exp(*, experiment_id=None, experiment_name=None, create: bool = True, start: bool = False)
检索一个实验。此方法包括获取一个活动的实验,以及获取或创建一个特定的实验。
当用户指定实验ID和名称时,该方法将尝试返回特定的实验。 当用户未提供记录器ID或名称时,该方法将尝试返回当前活动的实验。 create参数决定了如果实验之前未被创建,该方法是否将根据用户的规格自动创建一个新的实验。
如果 create 为 True:
如果存在active experiment:
未指定id或name,返回当前活动的实验。
如果指定了id或name,则返回指定的实验。如果没有找到这样的实验,则使用给定的id或name创建一个新实验。如果start设置为True,则实验设置为活动状态。
如果active experiment不存在:
未指定ID或名称,创建默认实验。
如果指定了id或name,则返回指定的实验。如果没有找到这样的实验,则使用给定的id或name创建一个新实验。如果start设置为True,则实验设置为活动状态。
否则如果 create 为 False:
如果存在active experiment:
未指定id或name,返回当前活动的实验。
如果指定了id或name,则返回指定的实验。如果没有找到这样的实验,则引发错误。
如果active experiment不存在:
未指定ID或名称。如果默认实验存在,则返回它,否则引发错误。
如果指定了id或name,则返回指定的实验。如果没有找到这样的实验,则引发错误。
- Parameters:
experiment_id (str) – 要返回的实验的id。
experiment_name (str) – 要返回的实验名称。
create (boolean) – 如果之前没有创建过实验,则创建它。
开始 (布尔值) – 如果创建了新实验,则启动它。
- Return type:
一个实验对象。
- delete_exp(experiment_id=None, experiment_name=None)
删除一个实验。
- Parameters:
experiment_id (str) – 实验ID。
experiment_name (str) – 实验名称。
- property default_uri
从qlib.config.C获取默认的跟踪URI
- property uri
获取默认的跟踪URI或当前URI。
- Return type:
跟踪URI字符串。
- list_experiments()
列出所有现有的实验。
- Return type:
存储的实验信息的字典(名称 -> 实验)。
实验
- class qlib.workflow.exp.Experiment(id, name)
这是每个正在运行的实验的Experiment类。API的设计类似于mlflow。 (链接:https://mlflow.org/docs/latest/python_api/mlflow.html)
- __init__(id, name)
- start(*, recorder_id=None, recorder_name=None, resume=False)
开始实验并将其设置为活动状态。此方法还将启动一个新的记录器。
- Parameters:
recorder_id (str) – 要创建的记录器的ID。
recorder_name (str) – 要创建的记录器的名称。
resume (bool) – 是否恢复第一个记录器
- Return type:
一个活跃的记录器。
- end(recorder_status='SCHEDULED')
结束实验。
- Parameters:
recorder_status (str) – 结束时要设置的记录器状态(SCHEDULED, RUNNING, FINISHED, FAILED)。
- create_recorder(recorder_name=None)
为每个实验创建一个记录器。
- Parameters:
recorder_name (str) – 要创建的记录器的名称。
- Return type:
一个记录器对象。
- search_records(**kwargs)
获取符合实验搜索条件的记录的pandas DataFrame。 输入是用户想要应用的搜索条件。
- Returns:
一个pandas.DataFrame的记录,其中每个指标、参数和标签
都被扩展为它们自己的列,分别命名为metrics.*, params.*, 和 tags.*
对于没有特定指标、参数或标签的记录,它们的
值将分别为(NumPy) Nan, None, 或 None。
- delete_recorder(recorder_id)
为每个实验创建一个记录器。
- Parameters:
recorder_id (str) – 要删除的记录器的ID。
- get_recorder(recorder_id=None, recorder_name=None, create: bool = True, start: bool = False) Recorder
为用户检索记录器。当用户指定记录器ID和名称时,该方法将尝试返回特定的记录器。当用户未提供记录器ID或名称时,该方法将尝试返回当前活动的记录器。create参数决定了如果记录器之前未被创建,该方法是否会自动根据用户的规格创建一个新的记录器。
如果 create 为 True:
如果存在active recorder:
未指定id或name,返回活动的记录器。
如果指定了id或name,则返回指定的记录器。如果未找到此类实验,则使用给定的id或name创建一个新的记录器。如果start设置为True,则记录器设置为活动状态。
如果active recorder不存在:
未指定ID或名称,创建一个新的记录器。
如果指定了id或name,则返回指定的实验。如果未找到此类实验,则使用给定的id或name创建一个新的记录器。如果start设置为True,则记录器设置为活动状态。
否则如果 create 为 False:
如果存在active recorder:
未指定id或name,返回活动的记录器。
如果指定了id或name,则返回指定的记录器。如果未找到此类exp,则引发错误。
如果active recorder不存在:
未指定ID或名称,引发错误。
如果指定了id或name,则返回指定的记录器。如果未找到此类exp,则引发错误。
- Parameters:
recorder_id (str) – 要删除的记录器的ID。
recorder_name (str) – 要删除的记录器的名称。
create (boolean) – 如果之前没有创建过记录器,则创建它。
start (boolean) – 如果创建了新的记录器,则启动它。
- Return type:
一个记录器对象。
- list_recorders(rtype: Literal['dict', 'list'] = 'dict', **flt_kwargs) List[Recorder] | Dict[str, Recorder]
列出此实验的所有现有记录器。在调用此方法之前,请先获取实验实例。 如果用户想使用R.list_recorders()方法,请参考QlibRecorder中的相关API文档。
- flt_kwargsdict
根据条件筛选记录器 例如:list_recorders(status=Recorder.STATUS_FI)
- Returns:
- if rtype == “dict”:
一个存储的记录器信息的字典(id -> recorder)。
- elif rtype == “list”:
一个记录器的列表。
- Return type:
返回类型取决于 rtype
记录器
- class qlib.workflow.recorder.Recorder(experiment_id, name)
这是用于记录实验的Recorder类。API设计类似于mlflow。 (链接:https://mlflow.org/docs/latest/python_api/mlflow.html)
记录器的状态可以是SCHEDULED(已计划)、RUNNING(运行中)、FINISHED(已完成)、FAILED(失败)。
- __init__(experiment_id, name)
- save_objects(local_path=None, artifact_path=None, **kwargs)
将预测文件或模型检查点等对象保存到工件URI。用户可以通过关键字参数(名称:值)保存对象。
请参考qlib.workflow的文档:R.save_objects
- Parameters:
local_path (str) – 如果提供,则将文件或目录保存到工件URI。
artifact_path=None (str) – 工件在URI中存储的相对路径。
- load_object(name)
加载对象,例如预测文件或模型检查点。
- Parameters:
name (str) – 要加载的文件的名称。
- Return type:
保存的对象。
- start_run()
开始运行或恢复记录器。返回值可以用作with块中的上下文管理器;否则,您必须调用end_run()来终止当前运行。(请参阅mlflow中的ActiveRun类)
- Return type:
一个活动的运行对象(例如 mlflow.ActiveRun 对象)。
- end_run()
结束一个活动的记录器。
- log_params(**kwargs)
记录当前运行的一批参数。
- Parameters:
arguments (keyword) – 要记录为参数的键值对。
- log_metrics(step=None, **kwargs)
为当前运行记录多个指标。
- Parameters:
arguments (keyword) – 要记录为指标的键值对。
- log_artifact(local_path: str, artifact_path: str | None = None)
将本地文件或目录记录为当前活动运行的工件。
- Parameters:
local_path (str) – 要写入文件的路径。
artifact_path (可选[str]) – 如果提供,则为
artifact_uri
中要写入的目录。
- set_tags(**kwargs)
为当前运行记录一批标签。
- Parameters:
arguments (keyword) – 要记录为标签的键值对。
- delete_tags(*keys)
从运行中删除一些标签。
- Parameters:
keys (series of strs of the keys) – 要删除的标签的所有名称。
- list_artifacts(artifact_path: str | None = None)
列出记录器的所有工件。
- Parameters:
artifact_path (str) – 工件在URI中存储的相对路径。
- Return type:
存储的工件信息(名称、路径等)列表。
- download_artifact(path: str, dst_path: str | None = None) str
如果适用,从运行中下载一个工件文件或目录到本地目录,并返回其本地路径。
- Parameters:
path (str) – 所需工件的相对源路径。
dst_path (可选[str]) – 本地文件系统目标目录的绝对路径,用于下载指定的工件。此目录必须已经存在。如果未指定,工件将被下载到本地文件系统上一个新的唯一命名的目录中。
- Returns:
所需工件的本地路径。
- Return type:
字符串
- list_metrics()
列出记录器的所有指标。
- Return type:
存储的指标字典。
- list_params()
列出记录器的所有参数。
- Return type:
存储的参数字典。
- list_tags()
列出记录器的所有标签。
- Return type:
存储的标签字典。
记录模板
- class qlib.workflow.record_temp.RecordTemp(recorder)
这是记录模板类,使用户能够以特定格式生成实验结果,如IC和回测。
- save(**kwargs)
它的行为与self.recorder.save_objects相同。 但它是一个更简单的接口,因为用户不必关心get_path和artifact_path
- __init__(recorder)
- generate(**kwargs)
生成某些记录,如IC、回测等,并保存它们。
- Parameters:
kwargs –
- load(name: str, parents: bool = True)
它的行为与self.recorder.load_object相同。 但它是一个更简单的接口,因为用户不必关心get_path和artifact_path
- Parameters:
name (str) – 要加载的文件的名称。
parents (bool) – 每个记录器都有不同的 artifact_path。 因此,parents 会递归地在父类中查找路径。 子类具有更高的优先级
- Return type:
存储的记录。
- list()
列出支持的工件。 用户不必考虑self.get_path
- Return type:
所有支持的工件列表。
- check(include_self: bool = False, parents: bool = True)
检查记录是否正确生成并保存。 在以下示例中非常有用
在生成新内容之前检查依赖文件是否完整。
检查最终文件是否已完成
- Parameters:
include_self (bool) – 文件是否由自身生成
parents (bool) – 我们是否检查父级
- Raises:
FileNotFoundError – 记录是否存储正确。
- class qlib.workflow.record_temp.SignalRecord(model=None, dataset=None, recorder=None)
这是生成信号预测的Signal Record类。该类继承自
RecordTemp
类。- __init__(model=None, dataset=None, recorder=None)
- generate(**kwargs)
生成某些记录,如IC、回测等,并保存它们。
- Parameters:
kwargs –
- list()
列出支持的工件。 用户不必考虑self.get_path
- Return type:
所有支持的工件列表。
- class qlib.workflow.record_temp.ACRecordTemp(recorder, skip_existing=False)
自动检查记录模板
- __init__(recorder, skip_existing=False)
- generate(*args, **kwargs)
自动检查文件,然后运行具体的生成任务
- class qlib.workflow.record_temp.HFSignalRecord(recorder, **kwargs)
这是信号分析记录类,用于生成如IC和IR等分析结果。此类继承自
RecordTemp
类。- depend_cls
SignalRecord
的别名
- __init__(recorder, **kwargs)
- generate()
生成某些记录,如IC、回测等,并保存它们。
- Parameters:
kwargs –
- list()
列出支持的工件。 用户不必考虑self.get_path
- Return type:
所有支持的工件列表。
- class qlib.workflow.record_temp.SigAnaRecord(recorder, ana_long_short=False, ann_scaler=252, label_col=0, skip_existing=False)
这是信号分析记录类,用于生成诸如IC和IR的分析结果。 此类继承自
RecordTemp
类。- depend_cls
SignalRecord
的别名
- __init__(recorder, ana_long_short=False, ann_scaler=252, label_col=0, skip_existing=False)
- list()
列出支持的工件。 用户不必考虑self.get_path
- Return type:
所有支持的工件列表。
- class qlib.workflow.record_temp.PortAnaRecord(recorder, config=None, risk_analysis_freq: List | str | None = None, indicator_analysis_freq: List | str | None = None, indicator_analysis_method=None, skip_existing=False, **kwargs)
这是生成分析结果(如回测结果)的投资组合分析记录类。该类继承自
RecordTemp
类。以下文件将存储在记录器中
report_normal.pkl & positions_normal.pkl:
回测的返回报告和详细持仓,由qlib/contrib/evaluate.py:backtest返回
port_analysis.pkl : 您的投资组合的风险分析,由qlib/contrib/evaluate.py:risk_analysis返回
- depend_cls
SignalRecord
的别名
- __init__(recorder, config=None, risk_analysis_freq: List | str | None = None, indicator_analysis_freq: List | str | None = None, indicator_analysis_method=None, skip_existing=False, **kwargs)
- config[“strategy”]dict
定义策略类以及kwargs。
- config[“executor”]dict
定义执行器类以及kwargs。
- config[“backtest”]dict
定义回测的kwargs。
- risk_analysis_freqstr|List[str]
风险分析报告频率
- indicator_analysis_freqstr|List[str]
指标分析报告频率
- indicator_analysis_methodstr, optional, default by None
候选值包括‘mean’, ‘amount_weighted’, ‘value_weighted’
- list()
列出支持的工件。 用户不必考虑self.get_path
- Return type:
所有支持的工件列表。
- class qlib.workflow.record_temp.MultiPassPortAnaRecord(recorder, pass_num=10, shuffle_init_score=True, **kwargs)
这是多次投资组合分析记录类,它多次运行回测并生成诸如回测结果的分析结果。此类继承自
PortAnaRecord
类。如果启用了shuffle_init_score,第一个回测日期的预测分数将被随机打乱,因此初始位置将是随机的。 shuffle_init_score仅在信号用作
占位符时有效。占位符将被保存在recorder中的pred.pkl替换。 - Parameters:
recorder (Recorder) – 用于保存回测结果的记录器。
pass_num (int) – 回测通过的次数。
shuffle_init_score (bool) – 是否打乱第一个回测日期的预测分数。
- depend_cls
SignalRecord
的别名
- __init__(recorder, pass_num=10, shuffle_init_score=True, **kwargs)
- Parameters:
recorder (Recorder) – 用于保存回测结果的记录器。
pass_num (int) – 回测通过的次数。
shuffle_init_score (bool) – 是否打乱第一个回测日期的预测分数。
- list()
列出支持的工件。 用户不必考虑self.get_path
- Return type:
所有支持的工件列表。
任务管理
任务生成
TaskGenerator 模块可以根据 TaskGen 和一些任务模板生成许多任务。
- qlib.workflow.task.gen.task_generator(tasks, generators) list
使用TaskGen列表和任务模板列表来生成不同的任务。
例如:
有3个任务模板a、b、c和2个任务生成器A、B。A将从模板生成2个任务,B将从模板生成3个任务。 task_generator([a, b, c], [A, B])最终将生成3*2*3 = 18个任务。
- class qlib.workflow.task.gen.TaskGen
生成不同任务的基类
示例 1:
输入:一个特定的任务模板和滚动步骤
输出:任务的滚动版本
示例 2:
输入:一个特定的任务模板和损失列表
输出:一组具有不同损失的任务
- abstract generate(task: dict) List[dict]
基于任务模板生成不同的任务
- Parameters:
任务 (字典) – 一个任务模板
- Returns:
任务列表
- Return type:
列表[字典]
- qlib.workflow.task.gen.handler_mod(task: dict, rolling_gen)
帮助在使用RollingGen时修改处理程序的结束时间 它尝试处理以下情况
Hander的数据end_time早于数据集的test_data的段。
为了处理这个问题,处理程序的数据的结束时间被延长了。
如果处理程序的end_time为None,则无需更改其结束时间。
- Parameters:
任务 (字典) – 一个任务模板
rg (RollingGen) – RollingGen的一个实例
- qlib.workflow.task.gen.trunc_segments(ta: TimeAdjuster, segments: Dict[str, Timestamp], days, test_key='test')
为了避免未来信息的泄露,应根据测试的start_time截断片段
注意
此函数将原地更改段
- class qlib.workflow.task.gen.RollingGen(step: int = 40, rtype: str = 'expanding', ds_extra_mod_func: None | ~typing.Callable = <function handler_mod>, test_key='test', train_key='train', trunc_days: int | None = None, task_copy_func: ~typing.Callable = <function deepcopy>)
- __init__(step: int = 40, rtype: str = 'expanding', ds_extra_mod_func: None | ~typing.Callable = <function handler_mod>, test_key='test', train_key='train', trunc_days: int | None = None, task_copy_func: ~typing.Callable = <function deepcopy>)
生成滚动任务
- Parameters:
step (int) – 滚动的步长
rtype (str) – 滚动类型(扩展、滑动)
ds_extra_mod_func (Callable) – 一个类似的方法:handler_mod(task: dict, rg: RollingGen) 在生成任务后执行一些额外的操作。例如,使用
handler_mod
来修改数据集处理程序的结束时间。trunc_days (int) – 截断一些数据以避免未来的信息泄露
task_copy_func (Callable) – 用于复制整个任务的函数。当用户希望在任务之间共享某些内容时,这非常有用。
- gen_following_tasks(task: dict, test_end: Timestamp) List[dict]
为task生成以下滚动任务,直到test_end
- Parameters:
任务 (字典) – Qlib 任务格式
test_end (pd.Timestamp) – 最新的滚动任务包括 test_end
- Returns:
以下任务的task`(`task本身除外)
- Return type:
列表[字典]
- generate(task: dict) List[dict]
将任务转换为滚动任务。
- Parameters:
task (dict) –
描述任务的字典。例如。
DEFAULT_TASK = { "model": { "class": "LGBModel", "module_path": "qlib.contrib.model.gbdt", }, "dataset": { "class": "DatasetH", "module_path": "qlib.data.dataset", "kwargs": { "handler": { "class": "Alpha158", "module_path": "qlib.contrib.data.handler", "kwargs": { "start_time": "2008-01-01", "end_time": "2020-08-01", "fit_start_time": "2008-01-01", "fit_end_time": "2014-12-31", "instruments": "csi100", }, }, "segments": { "train": ("2008-01-01", "2014-12-31"), "valid": ("2015-01-01", "2016-12-20"), # 请避免将未来的测试数据泄露到验证集中 "test": ("2017-01-01", "2020-08-01"), }, }, }, "record": [ { "class": "SignalRecord", "module_path": "qlib.workflow.record_temp", }, ] }
- Returns:
列表[字典]
- Return type:
任务列表
- class qlib.workflow.task.gen.MultiHorizonGenBase(horizon: List[int] = [5], label_leak_n=2)
- __init__(horizon: List[int] = [5], label_leak_n=2)
此任务生成器尝试基于现有任务为不同的时间范围生成任务
- Parameters:
horizon (List[int]) – 任务可能的时间范围
label_leak_n (int) – 预测后需要多少天才能获得完整的标签 例如: - 用户在T天进行预测(在T天获得收盘价后) - 标签是在T + 1天买入股票并在T + 2天卖出股票的回报 - label_leak_n将为2(例如,泄露了两天的信息以利用此样本)
- abstract set_horizon(task: dict, hr: int)
此方法旨在就地更改任务
- Parameters:
任务 (字典) – Qlib的任务
hr (int) – 任务的时间范围
- generate(task: dict)
基于任务模板生成不同的任务
- Parameters:
任务 (字典) – 一个任务模板
- Returns:
任务列表
- Return type:
列表[字典]
任务管理器
TaskManager 可以自动获取未使用的任务,并管理一组任务的生命周期,包括错误处理。 这些功能可以并发运行任务,并确保每个任务只被使用一次。 Task Manager 会将所有任务存储在 MongoDB 中。 用户在使用此模块时必须完成 MongoDB 的配置。
TaskManager中的任务由3部分组成 - 任务描述:描述将定义任务 - 任务状态:任务的状态 - 任务结果:用户可以通过任务描述和任务结果获取任务。
- class qlib.workflow.task.manage.TaskManager(task_pool: str)
这是当任务由TaskManager创建时,任务的样子
{ 'def': pickle serialized task definition. using pickle will make it easier 'filter': json-like data. This is for filtering the tasks. 'status': 'waiting' | 'running' | 'done' 'res': pickle serialized task result, }
任务管理器假设您只会更新您获取的任务。 Mongo的获取和更新操作将确保数据更新的安全性。
这个类可以作为命令行工具使用。以下是几个示例。 您可以使用以下命令查看管理模块的帮助: python -m qlib.workflow.task.manage -h # 显示管理模块CLI的手册 python -m qlib.workflow.task.manage wait -h # 显示管理模块中wait命令的手册
python -m qlib.workflow.task.manage -t <pool_name> wait python -m qlib.workflow.task.manage -t <pool_name> task_stat
注意
假设:MongoDB中的数据被编码,而从MongoDB中取出的数据被解码
以下是四种状态:
STATUS_WAITING: 等待训练
STATUS_RUNNING: 训练中
STATUS_PART_DONE: 已完成某些步骤,正在等待下一步
STATUS_DONE: 所有工作已完成
- __init__(task_pool: str)
初始化任务管理器,记得先声明MongoDB的URL和数据库名称。 一个TaskManager实例服务于一个特定的任务池。 该模块的静态方法服务于整个MongoDB。
- Parameters:
task_pool (str) – MongoDB中集合的名称
- static list() list
列出数据库的所有集合(task_pool)。
- Returns:
列表
- replace_task(task, new_task)
使用新任务替换旧任务
- Parameters:
task – 旧任务
new_task – 新任务
- insert_task(task)
插入一个任务。
- Parameters:
task – 等待插入的任务
- Returns:
pymongo.results.InsertOneResult
- insert_task_def(task_def)
插入一个任务到任务池
- Parameters:
task_def (dict) – 任务定义
- Return type:
pymongo.results.InsertOneResult
- create_task(task_def_l, dry_run=False, print_nt=False) List[str]
如果task_def_l中的任务是新的,则将新任务插入到task_pool中,并记录inserted_id。 如果任务不是新的,则只需查询其_id。
- Parameters:
task_def_l (list) – 任务列表
dry_run (bool) – 是否将这些新任务插入任务池
print_nt (bool) – 如果打印新任务
- Returns:
任务定义列表的_id列表
- Return type:
列表[str]
- fetch_task(query={}, status='waiting') dict
使用查询来获取任务。
- Parameters:
query (dict, 可选) – 查询字典。默认为 {}。
status (str, optional) – [描述]. 默认为 STATUS_WAITING.
- Returns:
解码后的任务(集合中的文档)
- Return type:
字典
- safe_fetch_task(query={}, status='waiting')
使用带有contextmanager的查询从task_pool中获取任务
- Parameters:
query (dict) – 查询的字典
- Returns:
字典
- Return type:
解码后的任务(集合中的文档)
- query(query={}, decode=True)
查询集合中的任务。 如果迭代生成器花费太长时间,此函数可能会引发异常 pymongo.errors.CursorNotFound: cursor id not found
python -m qlib.workflow.task.manage -t query ‘{“_id”: “615498be837d0053acbc5d58”}’
- Parameters:
query (dict) – 查询的字典
decode (bool) –
- Returns:
字典
- Return type:
解码后的任务(集合中的文档)
- re_query(_id) dict
使用 _id 查询任务。
- Parameters:
_id (str) – 文档的_id
- Returns:
解码后的任务(集合中的文档)
- Return type:
字典
- commit_task_res(task, res, status='done')
将结果提交到 task['res']。
- Parameters:
任务 ([类型]) – [描述]
res (object) – 你想要保存的结果
status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_DONE。
- return_task(task, status='waiting')
将任务返回到状态。通常用于错误处理。
- Parameters:
任务 ([类型]) – [描述]
status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_WAITING。
- remove(query={})
使用查询删除任务
- Parameters:
query (dict) – 查询的字典
- task_stat(query={}) dict
计算每个状态中的任务数量。
- Parameters:
query (dict, 可选) – 查询字典。默认为 {}。
- Returns:
字典
- reset_waiting(query={})
将所有正在运行的任务重置为等待状态。当某些正在运行的任务意外退出时可以使用。
- Parameters:
query (dict, 可选) – 查询字典。默认为 {}。
- prioritize(task, priority: int)
设置任务的优先级
- Parameters:
任务 (字典) – 来自数据库的任务查询
priority (int) – 目标优先级
- wait(query={})
在多进程处理时,主进程可能从TaskManager中获取不到任何内容,因为仍然有一些任务在运行。 因此,主进程应等待,直到所有任务都被其他进程或机器训练好。
- Parameters:
query (dict, 可选) – 查询字典。默认为 {}。
- qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)
当任务池不为空(有等待中的任务)时,使用 task_func 从 task_pool 中获取并运行任务
运行此方法后,有以下4种情况(before_status -> after_status):
STATUS_WAITING -> STATUS_DONE: 使用 task["def"] 作为 task_func 参数,这意味着任务尚未开始
STATUS_WAITING -> STATUS_PART_DONE: 使用 task["def"] 作为 task_func 参数
STATUS_PART_DONE -> STATUS_PART_DONE: 使用 task[“res”] 作为 task_func 参数,这意味着任务已开始但未完成
STATUS_PART_DONE -> STATUS_DONE: 使用 task[“res”] 作为 task_func 参数
- Parameters:
task_func (Callable) –
def (task_def, **kwargs) ->
运行任务的函数
task_pool (str) – 任务池的名称(MongoDB中的集合)
query (dict) – 在获取任务时将使用此字典来查询任务池
force_release (bool) – 程序是否会强制释放资源
before_status (str:) – 将获取并训练处于before_status状态的任务。可以是STATUS_WAITING, STATUS_PART_DONE。
after_status (str:) – 训练后的任务将变为after_status。可以是STATUS_WAITING, STATUS_PART_DONE。
kwargs – task_func的参数
训练器
训练器将训练一系列任务并返回一系列模型记录器。
每个训练器包括两个步骤:train
(创建模型记录器)和end_train
(修改模型记录器)。
这是一个名为DelayTrainer
的概念,它可以用于在线模拟并行训练。
在DelayTrainer
中,第一步只是将一些必要的信息保存到模型记录器中,而第二步将在最后完成,可以执行一些并发且耗时的操作,例如模型拟合。
Qlib
提供了两种训练器,TrainerR
是最简单的方式,而 TrainerRM
基于 TaskManager 来自动管理任务的生命周期。
- qlib.model.trainer.begin_task_train(task_config: dict, experiment_name: str, recorder_name: str | None = None) Recorder
开始任务训练以启动记录器并保存任务配置。
- Parameters:
task_config (dict) – 任务的配置
experiment_name (str) – 实验的名称
recorder_name (str) – 给定的名称将作为记录器名称。如果为None,则使用rid。
- Returns:
模型记录器
- Return type:
- qlib.model.trainer.end_task_train(rec: Recorder, experiment_name: str) Recorder
完成带有真实模型拟合和保存的任务训练。
- qlib.model.trainer.task_train(task_config: dict, experiment_name: str, recorder_name: str | None = None) Recorder
基于任务的培训将分为两个步骤。
- Parameters:
task_config (dict) – 任务的配置。
experiment_name (str) – 实验的名称
recorder_name (str) – 记录器的名称
- Returns:
记录器
- Return type:
记录器的实例
- class qlib.model.trainer.Trainer
训练器可以训练一系列模型。 有Trainer和DelayTrainer,可以通过它们何时完成实际训练来区分。
- __init__()
- train(tasks: list, *args, **kwargs) list
给定一系列任务定义,开始训练,并返回模型。
对于Trainer,它在这个方法中完成实际的训练。 对于DelayTrainer,它在这个方法中只做一些准备工作。
- Parameters:
tasks – 任务列表
- Returns:
模型列表
- Return type:
列表
- end_train(models: list, *args, **kwargs) list
给定一个模型列表,在训练结束时如果需要完成某些操作。 这些模型可能是Recorder、文本文件、数据库等。
对于Trainer,它在这个方法中进行一些收尾工作。 对于DelayTrainer,它在这个方法中完成实际的训练。
- Parameters:
models – 模型列表
- Returns:
模型列表
- Return type:
列表
- is_delay() bool
如果Trainer将延迟完成end_train。
- Returns:
如果 DelayTrainer
- Return type:
布尔
- has_worker() bool
一些训练器有后端工作器来支持并行训练 这个方法可以判断工作器是否启用。
- Returns:
如果工作者已启用
- Return type:
布尔
- worker()
启动工作器
- Raises:
NotImplementedError: – 如果工作器不受支持
- class qlib.model.trainer.TrainerR(experiment_name: str | None = None, train_func: ~typing.Callable = <function task_train>, call_in_subproc: bool = False, default_rec_name: str | None = None)
基于(R)ecorder的训练器。 它将线性地训练一系列任务并返回一系列模型记录器。
假设:模型由task定义,结果将保存到Recorder。
- __init__(experiment_name: str | None = None, train_func: ~typing.Callable = <function task_train>, call_in_subproc: bool = False, default_rec_name: str | None = None)
初始化 TrainerR。
- Parameters:
experiment_name (str, optional) – 实验的默认名称。
train_func (Callable, optional) – 默认的训练方法。默认为 task_train。
call_in_subproc (bool) – 在子进程中调用该过程以强制释放内存
- train(tasks: list, train_func: Callable | None = None, experiment_name: str | None = None, **kwargs) List[Recorder]
给定一个任务列表并返回一个训练好的Recorder列表。顺序可以得到保证。
- Parameters:
任务 (列表) – 基于任务字典的定义列表
train_func (Callable) – 需要至少包含tasks和experiment_name的训练方法。如果为None,则使用默认的训练方法。
experiment_name (str) – 实验名称,None表示使用默认名称。
kwargs – train_func的参数。
- Returns:
记录器列表
- Return type:
列表[Recorder]
- class qlib.model.trainer.DelayTrainerR(experiment_name: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, **kwargs)
基于TrainerR的延迟实现,这意味着train方法可能只做一些准备工作,而end_train方法可以进行实际的模型拟合。
- __init__(experiment_name: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, **kwargs)
初始化 TrainerRM。
- Parameters:
experiment_name (str) – 实验的默认名称。
train_func (Callable, optional) – 默认的训练方法。默认为 begin_task_train。
end_train_func (Callable, optional) – 默认的结束训练方法。默认为 end_task_train。
- end_train(models, end_train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]
给定一个Recorder列表并返回一个训练过的Recorder列表。 该类将完成实际数据加载和模型拟合。
- Parameters:
models (列表) – 一个Recorder的列表,任务已经保存到它们中
end_train_func (Callable, optional) – 需要至少包含recorders和experiment_name的end_train方法。默认为None,使用self.end_train_func。
experiment_name (str) – 实验名称,None表示使用默认名称。
kwargs – end_train_func 的参数。
- Returns:
记录器列表
- Return type:
列表[Recorder]
- class qlib.model.trainer.TrainerRM(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function task_train>, skip_run_task: bool = False, default_rec_name: str | None = None)
基于(R)ecorder和Task(M)anager的训练器。 它可以以多进程的方式训练一系列任务并返回一系列模型记录器。
假设:task 将被保存到 TaskManager 中,并且 task 将从 TaskManager 中获取并进行训练
- __init__(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function task_train>, skip_run_task: bool = False, default_rec_name: str | None = None)
初始化 TrainerR。
- Parameters:
experiment_name (str) – 实验的默认名称。
task_pool (str) – TaskManager中的任务池名称。如果为None,则使用与experiment_name相同的名称。
train_func (Callable, optional) – 默认的训练方法。默认为 task_train。
skip_run_task (bool) – 如果 skip_run_task == True: 仅在 worker 中运行 run_task。否则跳过 run_task。
- train(tasks: list, train_func: Callable | None = None, experiment_name: str | None = None, before_status: str = 'waiting', after_status: str = 'done', default_rec_name: str | None = None, **kwargs) List[Recorder]
给定一个任务列表并返回一个训练好的Recorder列表。顺序可以得到保证。
此方法默认为单进程,但TaskManager提供了一种并行训练的好方法。 用户可以自定义他们的train_func以实现多进程甚至多机器。
- Parameters:
任务 (列表) – 基于任务字典的定义列表
train_func (Callable) – 需要至少包含tasks和experiment_name的训练方法。如果为None,则使用默认的训练方法。
experiment_name (str) – 实验名称,None表示使用默认名称。
before_status (str) – 将获取并训练处于before_status状态的任务。可以是STATUS_WAITING, STATUS_PART_DONE。
after_status (str) – 训练后的任务将变为after_status。可以是STATUS_WAITING, STATUS_PART_DONE。
kwargs – train_func的参数。
- Returns:
记录器列表
- Return type:
列表[Recorder]
- end_train(recs: list, **kwargs) List[Recorder]
将STATUS_END标签设置到记录器。
- Parameters:
recs (list) – 训练好的记录器列表。
- Returns:
与参数相同的列表。
- Return type:
列表[Recorder]
- worker(train_func: Callable | None = None, experiment_name: str | None = None)
train的多进程方法。它可以与train共享同一个task_pool,并且可以在其他进程或其他机器上运行。
- Parameters:
train_func (Callable) – 需要至少包含tasks和experiment_name的训练方法。如果为None,则使用默认的训练方法。
experiment_name (str) – 实验名称,None表示使用默认名称。
- has_worker() bool
一些训练器有后端工作器来支持并行训练 这个方法可以判断工作器是否启用。
- Returns:
如果工作者已启用
- Return type:
布尔
- class qlib.model.trainer.DelayTrainerRM(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, skip_run_task: bool = False, **kwargs)
基于TrainerRM的延迟实现,这意味着train方法可能只做一些准备工作,而end_train方法可以进行实际的模型拟合。
- __init__(experiment_name: str | None = None, task_pool: str | None = None, train_func=<function begin_task_train>, end_train_func=<function end_task_train>, skip_run_task: bool = False, **kwargs)
初始化 DelayTrainerRM。
- Parameters:
experiment_name (str) – 实验的默认名称。
task_pool (str) – TaskManager中的任务池名称。如果为None,则使用与experiment_name相同的名称。
train_func (Callable, optional) – 默认的训练方法。默认为 begin_task_train。
end_train_func (Callable, optional) – 默认的结束训练方法。默认为 end_task_train。
skip_run_task (bool) – 如果 skip_run_task == True: 仅在 worker 中运行 run_task。否则跳过 run_task。 例如,在 CPU VM 上启动 trainer,然后等待在 GPU VM 上完成任务。
- train(tasks: list, train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]
与TrainerRM的train相同,after_status将为STATUS_PART_DONE。
- Parameters:
任务 (列表) – 基于任务字典的定义列表
train_func (Callable) – 需要至少包含tasks和experiment_name的训练方法。默认值为None,表示使用self.train_func。
experiment_name (str) – 实验名称,None表示使用默认名称。
- Returns:
记录器列表
- Return type:
列表[Recorder]
- end_train(recs, end_train_func=None, experiment_name: str | None = None, **kwargs) List[Recorder]
给定一个Recorder列表并返回一个训练过的Recorder列表。 该类将完成实际数据加载和模型拟合。
- Parameters:
recs (list) – 一个Recorder列表,任务已经保存到它们中。
end_train_func (Callable, optional) – 需要至少包含recorders和experiment_name的end_train方法。默认为None,使用self.end_train_func。
experiment_name (str) – 实验名称,None表示使用默认名称。
kwargs – end_train_func 的参数。
- Returns:
记录器列表
- Return type:
列表[Recorder]
- worker(end_train_func=None, experiment_name: str | None = None)
end_train的多处理方法。它可以与end_train共享同一个任务池,并且可以在其他进程或其他机器上运行。
- Parameters:
end_train_func (Callable, optional) – 需要至少包含recorders和experiment_name的end_train方法。默认为None,使用self.end_train_func。
experiment_name (str) – 实验名称,None表示使用默认名称。
- has_worker() bool
一些训练器有后端工作器来支持并行训练 这个方法可以判断工作器是否启用。
- Returns:
如果工作者已启用
- Return type:
布尔
收集器
收集器模块可以从各处收集对象并对其进行处理,例如合并、分组、求平均值等。
- class qlib.workflow.task.collect.Collector(process_list=[])
收集不同结果的收集器
- __init__(process_list=[])
初始化收集器。
- Parameters:
process_list (list 或 Callable) – 处理字典的处理器列表或处理器实例。
- collect() dict
收集结果并返回一个类似 {key: things} 的字典
- Returns:
收集后的字典。
例如:
{“prediction”: pd.Series}
{“IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}
…
- Return type:
字典
- static process_collect(collected_dict, process_list=[], *args, **kwargs) dict
对collect返回的字典进行一系列处理,并返回一个类似{key: things}的字典 例如,你可以进行分组和集成。
- Parameters:
collected_dict (dict) – 由collect返回的字典
process_list (list 或 Callable) – 处理字典的处理器列表或处理器实例。 处理器的顺序与列表顺序相同。 例如:[Group1(…, Ensemble1()), Group2(…, Ensemble2())]
- Returns:
处理后的字典。
- Return type:
字典
- class qlib.workflow.task.collect.MergeCollector(collector_dict: Dict[str, Collector], process_list: List[Callable] = [], merge_func=None)
一个收集器,用于收集其他收集器的结果
例如:
我们有两个收集器,分别命名为A和B。 A可以收集{“prediction”: pd.Series},B可以收集{“IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}。 然后在这个类的收集之后,我们可以收集{“A_prediction”: pd.Series, “B_IC”: {“Xgboost”: pd.Series, “LSTM”: pd.Series}}
…
- __init__(collector_dict: Dict[str, Collector], process_list: List[Callable] = [], merge_func=None)
初始化 MergeCollector。
- Parameters:
collector_dict (Dict[str,Collector]) – 类似 {collector_key, Collector} 的字典
process_list (List[Callable]) – 处理字典的处理器列表或处理器实例。
merge_func (Callable) – 一个用于生成最外层键的方法。给定的参数是来自collector_dict的
collector_key
和每个收集器收集后的key
。 如果为None,则使用元组连接它们,例如“ABC”+(“a”,”b”) -> (“ABC”, (“a”,”b”))。
- collect() dict
收集collector_dict的所有结果,并将最外层的键更改为重组键。
- Returns:
收集后的字典。
- Return type:
字典
- class qlib.workflow.task.collect.RecorderCollector(experiment, process_list=[], rec_key_func=None, rec_filter_func=None, artifacts_path={'pred': 'pred.pkl'}, artifacts_key=None, list_kwargs={}, status: Iterable = {'FINISHED'})
- __init__(experiment, process_list=[], rec_key_func=None, rec_filter_func=None, artifacts_path={'pred': 'pred.pkl'}, artifacts_key=None, list_kwargs={}, status: Iterable = {'FINISHED'})
初始化 RecorderCollector。
- Parameters:
实验 – (实验或字符串): 一个实验的实例或实验的名称 (可调用): 一个可调用的函数,返回一个实验列表
process_list (list 或 Callable) – 处理字典的处理器列表或处理器实例。
rec_key_func (Callable) – 一个用于获取记录器键的函数。如果为None,则使用记录器ID。
rec_filter_func (Callable, optional) – 通过返回True或False来过滤记录器。默认为None。
artifacts_path (dict, optional) – Recorder 中的工件名称及其路径。默认为 {“pred”: “pred.pkl”, “IC”: “sig_analysis/ic.pkl”}。
artifacts_key (str 或 List, 可选) – 你想要获取的artifacts key。如果为None,则获取所有artifacts。
list_kwargs (str) – list_recorders函数的参数。
status (Iterable) – 仅收集具有特定状态的记录器。None 表示收集所有记录器
- collect(artifacts_key=None, rec_filter_func=None, only_exist=True) dict
根据过滤后的记录器收集不同的工件。
- Parameters:
artifacts_key (str 或 List, 可选) – 你想要获取的artifacts key。如果为None,则使用默认值。
rec_filter_func (Callable, optional) – 通过返回True或False来过滤记录器。如果为None,则使用默认值。
only_exist (bool, optional) – 如果仅在记录器确实存在时收集工件。 如果为True,加载时出现异常的记录器将不会被收集。但如果为False,它将引发异常。
- Returns:
收集后的字典类似于 {artifact: {rec_key: object}}
- Return type:
字典
- get_exp_name() str
获取实验名称
- Returns:
实验名称
- Return type:
字符串
分组
Group可以根据group_func对一组对象进行分组,并将它们转换为字典。 分组后,我们提供了一种方法来减少它们。
例如:
分组: {(A,B,C1): 对象, (A,B,C2): 对象} -> {(A,B): {C1: 对象, C2: 对象}} 归约: {(A,B): {C1: 对象, C2: 对象}} -> {(A,B): 对象}
- class qlib.model.ens.group.Group(group_func=None, ens: Ensemble | None = None)
根据字典对对象进行分组
- __init__(group_func=None, ens: Ensemble | None = None)
初始化组。
- Parameters:
group_func (Callable, optional) –
给定一个字典并返回组键和其中一个组元素。
例如:{(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}
无。 (默认值为) –
ens (Ensemble, optional) – 如果不为None,则在分组后对分组值进行集成。
- group(*args, **kwargs) dict
将一组对象分组并将它们转换为字典。
例如:{(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}
- Returns:
分组字典
- Return type:
字典
- reduce(*args, **kwargs) dict
减少分组字典。
例如:{(A,B): {C1: object, C2: object}} -> {(A,B): object}
- Returns:
简化字典
- Return type:
字典
- class qlib.model.ens.group.RollingGroup(ens=<qlib.model.ens.ensemble.RollingEnsemble object>)
对滚动字典进行分组
- group(rolling_dict: dict) dict
给定一个滚动字典如 {(A,B,R): things},返回分组后的字典如 {(A,B): {R:things}}
注意:有一个假设,即滚动键位于键元组的末尾,因为滚动结果总是需要首先进行集成。
- Parameters:
rolling_dict (dict) – 一个滚动的字典。如果键不是元组,则不执行任何操作。
- Returns:
分组字典
- Return type:
字典
集成
集成模块可以合并集成中的对象。例如,如果有许多子模型的预测,我们可能需要将它们合并成一个集成预测。
- class qlib.model.ens.ensemble.Ensemble
将ensemble_dict合并到一个ensemble对象中。
例如:{Rollinga_b: 对象, Rollingb_c: 对象} -> 对象
当调用这个类时:
- Args:
ensemble_dict (dict): 等待合并的集成字典,如 {name: things}
- Returns:
对象:集成对象
- class qlib.model.ens.ensemble.SingleKeyEnsemble
如果字典中只有一个键和值,则提取该对象。使结果更易读。 {Only key: Only value} -> Only value
如果有超过1个键或少于1个键,则不执行任何操作。 你甚至可以递归地运行这个操作,使字典更易读。
注意:默认情况下递归运行。
当调用这个类时:
- Args:
ensemble_dict (dict): 字典。字典的键将被忽略。
- Returns:
dict: 可读的字典。
- class qlib.model.ens.ensemble.RollingEnsemble
将类似prediction或IC的滚动数据框字典合并为一个整体。
注意:字典的值必须是pd.DataFrame,并且具有索引“datetime”。
当调用这个类时:
- Args:
ensemble_dict (dict): 一个类似 {“A”: pd.DataFrame, “B”: pd.DataFrame} 的字典。 字典的键将被忽略。
- Returns:
pd.DataFrame: 滚动操作的完整结果。
- class qlib.model.ens.ensemble.AverageEnsemble
将形状相同的字典数据框(如prediction或IC)进行平均和标准化,整合成一个集合。
注意:字典的值必须是pd.DataFrame,并且具有索引“datetime”。如果是嵌套字典,则需要将其展平。
当调用这个类时:
- Args:
ensemble_dict (dict): 一个类似 {“A”: pd.DataFrame, “B”: pd.DataFrame} 的字典。 字典的键将被忽略。
- Returns:
pd.DataFrame: 平均化和标准化的完整结果。
工具
一些用于任务管理的工具。
- qlib.workflow.task.utils.get_mongodb() Database
获取MongoDB中的数据库,这意味着你需要首先声明数据库的地址和名称。
例如:
使用 qlib.init():
mongo_conf = { "task_url": task_url, # your MongoDB url "task_db_name": task_db_name, # database name } qlib.init(..., mongo=mongo_conf)
在qlib.init()之后:
C["mongo"] = { "task_url" : "mongodb://localhost:27017/", "task_db_name" : "rolling_db" }
- Returns:
数据库实例
- Return type:
数据库
- qlib.workflow.task.utils.list_recorders(experiment, rec_filter_func=None)
列出实验中可以通过过滤器的所有记录器。
- Parameters:
实验 (str 或 Experiment) – 实验的名称或实例
rec_filter_func (Callable, optional) – 返回True以保留给定的记录器。默认为None。
- Returns:
过滤后的字典 {rid: recorder}。
- Return type:
字典
- class qlib.workflow.task.utils.TimeAdjuster(future=True, end_time=None)
找到合适的日期并调整日期。
- __init__(future=True, end_time=None)
- set_end_time(end_time=None)
设置结束时间。如果为None,则使用日历的结束时间。
- Parameters:
end_time –
- get(idx: int)
通过索引获取日期时间。
- Parameters:
idx (int) – 日历的索引
- max() Timestamp
返回最大日历日期时间
- align_idx(time_point, tp_type='start') int
对齐日历中时间点的索引。
- Parameters:
time_point –
tp_type (str) –
- Returns:
索引
- Return type:
整数
- cal_interval(time_point_A, time_point_B) int
计算交易日间隔(time_point_A - time_point_B)
- Parameters:
time_point_A – time_point_A
time_point_B – time_point_B(是time_point_A的过去)
- Returns:
A和B之间的间隔
- Return type:
整数
- align_time(time_point, tp_type='start') Timestamp
将时间点对齐到日历的交易日期
- Parameters:
time_point – 时间点
tp_type – str 时间点类型 (“start”, “end”)
- Returns:
pd.Timestamp
- align_seg(segment: dict | tuple) dict | tuple
将给定日期对齐到交易日
例如:
input: {'train': ('2008-01-01', '2014-12-31'), 'valid': ('2015-01-01', '2016-12-31'), 'test': ('2017-01-01', '2020-08-01')} output: {'train': (Timestamp('2008-01-02 00:00:00'), Timestamp('2014-12-31 00:00:00')), 'valid': (Timestamp('2015-01-05 00:00:00'), Timestamp('2016-12-30 00:00:00')), 'test': (Timestamp('2017-01-03 00:00:00'), Timestamp('2020-07-31 00:00:00'))}
- Parameters:
segment –
- Returns:
Union[dict, tuple]
- Return type:
开始和结束交易日期(pd.Timestamp)在给定的开始和结束日期之间。
- truncate(segment: tuple, test_start, days: int) tuple
根据test_start日期截断段
- Parameters:
segment (tuple) – 时间段
test_start –
days (int) – 需要截断的交易天数 此段数据可能需要‘days’天的数据 days 是基于 test_start 的。 例如,如果标签包含未来2天的信息,预测范围为1天。 (例如,预测目标是 Ref($close, -2)/Ref($close, -1) - 1) 天数应为 2 + 1 == 3 天。
- Returns:
元组
- Return type:
新段落
- shift(seg: tuple, step: int, rtype='sliding') tuple
调整时间段的时间
如果段中存在None(表示无界索引),此方法将返回None。
- Parameters:
seg – 日期时间片段
step (int) – 滚动步长
rtype (str) – 滚动类型(“滑动”或“扩展”)
- Returns:
元组
- Return type:
新段落
- Raises:
KeyError: – 如果索引(包括开始和结束)超出self.cal范围,shift将引发错误
- qlib.workflow.task.utils.replace_task_handler_with_cache(task: dict, cache_dir: str | Path = '.') dict
将任务中的处理程序替换为缓存处理程序。 它将自动缓存文件并将其保存在cache_dir中。
>>> import qlib >>> qlib.auto_init() >>> import datetime >>> # it is simplified task >>> task = {"dataset": {"kwargs":{'handler': {'class': 'Alpha158', 'module_path': 'qlib.contrib.data.handler', 'kwargs': {'start_time': datetime.date(2008, 1, 1), 'end_time': datetime.date(2020, 8, 1), 'fit_start_time': datetime.date(2008, 1, 1), 'fit_end_time': datetime.date(2014, 12, 31), 'instruments': 'CSI300'}}}}} >>> new_task = replace_task_handler_with_cache(task) >>> print(new_task) {'dataset': {'kwargs': {'handler': 'file...Alpha158.3584f5f8b4.pkl'}}}
在线服务
在线管理器
OnlineManager 可以管理一组 Online Strategy 并动态运行它们。
随着时间的推移,决定性模型也会发生变化。在这个模块中,我们称那些贡献模型为在线模型。 在每个例程中(例如每天或每分钟),在线模型可能会发生变化,并且它们的预测需要更新。 因此,这个模块提供了一系列方法来控制这个过程。
该模块还提供了一种方法来模拟历史中的在线策略。这意味着你可以验证你的策略或找到一个更好的策略。
在不同情况下使用不同的训练器共有4种情况:
情况 |
描述 |
---|---|
在线 + 培训师 |
当你想进行一个真实的例行程序时,训练器将帮助你训练模型。它将逐任务、逐策略地训练模型。 |
在线 + 延迟训练器 |
DelayTrainer 将跳过具体训练,直到所有任务已由不同策略准备完毕。它使用户可以在 routine 或 first_train 结束时并行训练所有任务。否则,这些函数将在每个策略准备任务时卡住。 |
模拟 + 训练器 |
它的行为将与在线 + 训练器相同。唯一的区别是它用于模拟/回测,而不是在线交易。 |
模拟 + 延迟训练器 |
当你的模型没有任何时间依赖性时,你可以使用DelayTrainer来实现多任务处理的能力。这意味着所有例程中的所有任务都可以在模拟结束时进行REAL训练。信号将在不同的时间段准备好(基于是否有任何新模型在线)。 |
以下是一些伪代码,展示了每种情况的工作流程
- For simplicity
策略中仅使用一种策略
update_online_pred 仅在在线模式下调用,其他情况下会被忽略
在线 + 培训师
tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
trainer.end_train(models)
prepare_signals() # prepare trading signals daily
在线 + 延迟训练器: 工作流程与 在线 + 训练器 相同。
模拟 + 延迟训练器
# simulate
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
# delay_prepare()
# FIXME: Currently the delay_prepare is not implemented in a proper way.
trainer.end_train(<for all previous models>)
prepare_signals()
# 我们能否简化当前的工作流程?
可以减少任务的状态数量吗?
对于每个任务,我们有三个阶段(即任务、部分训练任务、最终训练任务)
- class qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
OnlineManager 可以使用 Online Strategy 管理在线模型。 它还提供了哪些模型在什么时间在线的历史记录。
- __init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
初始化在线管理器。 一个在线管理器必须至少有一个在线策略。
- Parameters:
策略 (联合[OnlineStrategy, 列表[OnlineStrategy]]) – 一个OnlineStrategy实例或一个OnlineStrategy列表
begin_time (Union[str,pd.Timestamp], optional) – OnlineManager 将从这个时间开始。默认为 None,表示使用最近的日期。
trainer (qlib.model.trainer.Trainer) – 用于训练任务的训练器。如果为None,则使用TrainerR。
freq (str, 可选) – 数据频率。默认为“day”。
- first_train(strategies: List[OnlineStrategy] | None = None, model_kwargs: dict = {})
从每个策略的first_tasks方法中获取任务并训练它们。 如果使用DelayTrainer,它可以在每个策略的first_tasks之后一起完成所有训练。
- Parameters:
策略 (列表[OnlineStrategy]) – 策略列表(添加策略时需要此参数)。如果为None,则使用默认策略。
model_kwargs (dict) – prepare_online_models的参数
- routine(cur_time: str | Timestamp | None = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})
每个策略的典型更新过程并记录在线历史。
典型的更新过程在例行程序之后,例如每天或每月。 过程是:更新预测 -> 准备任务 -> 准备在线模型 -> 准备信号。
如果使用DelayTrainer,它可以在每个策略的prepare_tasks之后一起完成训练。
- Parameters:
cur_time (Union[str,pd.Timestamp], optional) – 在此时间运行例行方法。默认为 None。
task_kwargs (dict) – prepare_tasks的参数
model_kwargs (dict) – prepare_online_models的参数
signal_kwargs (dict) – prepare_signals 的参数
- get_collector(**kwargs) MergeCollector
获取Collector的实例,以从每个策略中收集结果。 这个收集器可以作为信号准备的基础。
- Parameters:
**kwargs – get_collector 的参数。
- Returns:
用于合并其他收集器的收集器。
- Return type:
- add_strategy(strategies: OnlineStrategy | List[OnlineStrategy])
向OnlineManager添加一些新策略。
- prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)
在准备好上一次例行程序的数据(箱线图中的一个箱子)后,这意味着例行程序的结束,我们可以为下一个例行程序准备交易信号。
注意:给定一组预测,这些预测结束时间之前的所有信号都将被妥善准备。
即使最新的信号已经存在,最新的计算结果也会被覆盖。
注意
给定某个时间的预测,在此之前的所有信号都将准备就绪。
- Parameters:
prepare_func (Callable, optional) – 在收集后从字典中获取信号。默认为 AverageEnsemble(),由 MergeCollector 收集的结果必须是 {xxx:pred}。
over_write (bool, optional) – 如果为True,新信号将覆盖原有信号。如果为False,新信号将追加到信号末尾。默认为False。
- Returns:
信号。
- Return type:
pd.DataFrame
- get_signals() Series | DataFrame
获取准备好的在线信号。
- Returns:
pd.Series 用于每个日期时间只有一个信号。 pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。
- Return type:
联合[pd.Series, pd.DataFrame]
- simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) Series | DataFrame
从当前时间开始,此方法将模拟OnlineManager中的每个例程,直到结束时间。
考虑到并行训练,模型和信号可以在所有常规模拟之后准备。
延迟训练的方式可以是
DelayTrainer
,而延迟准备信号的方式可以是delay_prepare
。- Parameters:
end_time – 模拟结束的时间
frequency – 日历频率
task_kwargs (dict) – prepare_tasks的参数
model_kwargs (dict) – prepare_online_models的参数
signal_kwargs (dict) – prepare_signals 的参数
- Returns:
pd.Series 用于每个日期时间只有一个信号。 pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。
- Return type:
联合[pd.Series, pd.DataFrame]
- delay_prepare(model_kwargs={}, signal_kwargs={})
如果某些内容正在等待准备,请准备所有模型和信号。
- Parameters:
model_kwargs – end_train的参数
signal_kwargs – prepare_signals的参数
在线策略
OnlineStrategy 模块是在线服务的一个组成部分。
- class qlib.workflow.online.strategy.OnlineStrategy(name_id: str)
OnlineStrategy 正在与 Online Manager 合作,响应任务的生成方式、模型的更新以及信号的准备。
- __init__(name_id: str)
初始化在线策略。 此模块必须使用Trainer来完成模型训练。
- Parameters:
name_id (str) – 一个唯一的名称或ID。
trainer (qlib.model.trainer.Trainer, optional) – 一个Trainer的实例。默认为None。
- prepare_tasks(cur_time, **kwargs) List[dict]
在例程结束后,检查我们是否需要根据cur_time(None表示最新)准备和训练一些新任务。 返回等待训练的新任务。
您可以通过OnlineTool.online_models找到最新的在线模型。
- prepare_online_models(trained_models, cur_time=None) List[object]
从训练好的模型中选择一些模型并将它们设置为在线模型。 这是一个典型的将所有训练好的模型上线的方法,你可以覆盖它以实现更复杂的方法。 如果你仍然需要它们,你可以通过OnlineTool.online_models找到最后上线的模型。
注意:将所有在线模型重置为训练好的模型。如果没有训练好的模型,则不执行任何操作。
- NOTE:
当前的实现非常朴素。这里有一个更复杂的情况,更接近实际场景。 1. 在test_start前一天(时间戳T)训练新模型 2. 在test_start(通常在时间戳T + 1)切换模型
- Parameters:
模型 (列表) – 一个模型列表。
cur_time (pd.Dataframe) – 来自OnlineManger的当前时间。如果为None,则表示最新时间。
- Returns:
在线模型列表。
- Return type:
列表[对象]
- first_tasks() List[dict]
首先生成一系列任务并返回它们。
- class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
此示例策略始终使用最新的滚动模型 sas 在线模型。
- __init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
初始化滚动策略。
假设:name_id的字符串、实验名称和训练者的实验名称是相同的。
- Parameters:
name_id (str) – 一个唯一的名称或ID。也将是实验的名称。
task_template (Union[dict, List[dict]]) – 一个任务模板列表或单个模板,将用于通过rolling_gen生成多个任务。
rolling_gen (RollingGen) – RollingGen的一个实例
- get_collector(process_list=[<qlib.model.ens.group.RollingGroup object>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)
获取Collector的实例以收集结果。返回的收集器必须区分不同模型的结果。
假设:模型可以根据模型名称和滚动测试段进行区分。 如果您不希望使用此假设,请实现您的方法或使用另一个rec_key_func。
- Parameters:
rec_key_func (Callable) – 一个用于获取记录器键的函数。如果为None,则使用记录器ID。
rec_filter_func (Callable, optional) – 通过返回True或False来过滤记录器。默认为None。
artifacts_key (List[str], optional) – 你想要获取的artifacts键。如果为None,则获取所有artifacts。
- first_tasks() List[dict]
使用rolling_gen基于task_template生成不同的任务。
- Returns:
任务列表
- Return type:
列表[字典]
- prepare_tasks(cur_time) List[dict]
根据cur_time准备新任务(None表示最新的)。
您可以通过OnlineToolR.online_models找到最新的在线模型。
- Returns:
新任务的列表。
- Return type:
列表[字典]
在线工具
OnlineTool 是一个用于设置和取消设置一系列在线模型的模块。 在线模型是在某些时间点上的一些决定性模型,这些模型可以随时间的变化而变化。 这使得我们能够使用高效的子模型来适应市场风格的变化。
- class qlib.workflow.online.utils.OnlineTool
OnlineTool 将管理实验中的 在线 模型,这些实验包括模型记录器。
- __init__()
初始化在线工具。
- set_online_tag(tag, recorder: list | object)
将tag设置为模型以标记是否在线。
- Parameters:
tag (str) – ONLINE_TAG 和 OFFLINE_TAG 中的标签
recorder (Union[list,object]) – 模型的记录器
- get_online_tag(recorder: object) str
给定一个模型记录器并返回其在线标签。
- Parameters:
recorder (Object) – 模型的记录器
- Returns:
在线标签
- Return type:
字符串
- reset_online_tag(recorder: list | object)
将所有模型离线并将记录器设置为“在线”。
- Parameters:
recorder (Union[list,object]) – 您想要重置为“在线”的记录器。
- online_models() list
获取当前的在线模型
- Returns:
一个在线模型的列表。
- Return type:
列表
- update_online_pred(to_date=None)
更新在线模型的预测到to_date。
- Parameters:
to_date (pd.Timestamp) – 在此日期之前的预测将被更新。如果为None,则更新到最新。
- class qlib.workflow.online.utils.OnlineToolR(default_exp_name: str | None = None)
基于(R)ecorder的OnlineTool实现。
- __init__(default_exp_name: str | None = None)
初始化 OnlineToolR。
- Parameters:
default_exp_name (str) – 默认的实验名称。
- set_online_tag(tag, recorder: Recorder | List)
将tag设置为模型的记录器,以标记是否在线。
- Parameters:
tag (str) – ONLINE_TAG, NEXT_ONLINE_TAG, OFFLINE_TAG中的标签
recorder (Union[Recorder, List]) – Recorder的列表或Recorder的实例
- get_online_tag(recorder: Recorder) str
给定一个模型记录器并返回其在线标签。
- Parameters:
recorder (Recorder) – 一个记录器的实例
- Returns:
在线标签
- Return type:
字符串
- reset_online_tag(recorder: Recorder | List, exp_name: str | None = None)
将所有模型离线并将记录器设置为“在线”。
- Parameters:
recorder (Union[Recorder, List]) – 你想要重置为“在线”的recorder。
exp_name (str) – 实验名称。如果为None,则使用default_exp_name。
- online_models(exp_name: str | None = None) list
获取当前的在线模型
- Parameters:
exp_name (str) – 实验名称。如果为None,则使用default_exp_name。
- Returns:
一个在线模型的列表。
- Return type:
列表
- update_online_pred(to_date=None, from_date=None, exp_name: str | None = None)
更新在线模型的预测至to_date。
- Parameters:
to_date (pd.Timestamp) – 在此日期之前的预测将被更新。如果为None,则更新到日历中的最新时间。
exp_name (str) – 实验名称。如果为None,则使用default_exp_name。
记录更新器
Updater 是一个模块,用于在股票数据更新时更新诸如预测之类的工件。
- class qlib.workflow.online.update.RMDLoader(rec: Recorder)
记录器模型数据集加载器
- get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) DatasetH
加载、配置和设置数据集。
此数据集用于推理。
- Parameters:
start_time – 基础数据的开始时间
end_time – 基础数据的结束时间
segments – dict 数据集的分段配置 由于时间序列数据集(TSDatasetH),测试段可能与start_time和end_time不同
unprepared_dataset – Optional[DatasetH] 如果用户不想从记录器加载数据集,请指定用户的数据集
- Returns:
DatasetH的实例
- Return type:
- class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)
更新特定的记录器
- abstract update(*args, **kwargs)
更新特定记录器的信息
- class qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
基于数据集的更新器
为基于Qlib数据集的数据更新提供更新功能
假设
基于Qlib数据集
要更新的数据是一个多级索引的pd.DataFrame。例如标签、预测。
LABEL0 datetime instrument 2021-05-10 SH600000 0.006965 SH600004 0.003407 ... ... 2021-05-28 SZ300498 0.015748 SZ300676 -0.001321
- __init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
初始化 PredUpdater。
在以下情况下的预期行为:
如果 to_date 大于日历中的最大日期,数据将更新到最新日期
如果在from_date之前或to_date之后有数据,只有from_date和to_date之间的数据会受到影响。
- Parameters:
record – 记录器
to_date –
将预测更新到to_date
如果to_date为None:
数据将更新到最新日期。
from_date –
更新将从from_date开始
如果from_date为None:
更新将在历史数据中最新数据后的下一个tick时进行
hist_ref –
int 有时,数据集会有历史依赖。 将问题留给用户设置历史依赖的长度 如果用户没有指定此参数,Updater 将尝试加载数据集以自动确定 hist_ref
注意
start_time 不包括在 hist_ref 中;因此,在大多数情况下,hist_ref 将是 step_len - 1
loader_cls – 类型 用于加载模型和数据集的类
- prepare_data(unprepared_dataset: DatasetH | None = None) DatasetH
加载数据集 - 如果指定了 unprepared_dataset,则直接准备数据集 - 否则,
将此函数分离将使数据集更易于重用
- Returns:
DatasetH的实例
- Return type:
- update(dataset: DatasetH | None = None, write: bool = True, ret_new: bool = False) object | None
- Parameters:
dataset (DatasetH) – DatasetH: DatasetH的实例。如果为None,则重新准备。
write (bool) – 是否执行写入操作
ret_new (bool) – 更新后的数据是否会被返回
- Returns:
更新后的数据集
- Return type:
可选[对象]
- abstract get_update_data(dataset: Dataset) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 和 update 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)
- class qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
更新记录器中的预测
- get_update_data(dataset: Dataset) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 和 update 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)
- class qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)
更新记录器中的标签
假设 - 标签是从record_temp.SignalRecord生成的。
- __init__(record: Recorder, to_date=None, **kwargs)
初始化 PredUpdater。
在以下情况下的预期行为:
如果 to_date 大于日历中的最大日期,数据将更新到最新日期
如果在from_date之前或to_date之后有数据,只有from_date和to_date之间的数据会受到影响。
- Parameters:
record – 记录器
to_date –
将预测更新到to_date
如果to_date为None:
数据将更新到最新日期。
from_date –
更新将从from_date开始
如果from_date为None:
更新将在历史数据中最新数据后的下一个tick时进行
hist_ref –
int 有时,数据集会有历史依赖。 将问题留给用户设置历史依赖的长度 如果用户没有指定此参数,Updater 将尝试加载数据集以自动确定 hist_ref
注意
start_time 不包括在 hist_ref 中;因此,在大多数情况下,hist_ref 将是 step_len - 1
loader_cls – 类型 用于加载模型和数据集的类
- get_update_data(dataset: Dataset) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 和 update 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)
工具
可序列化
- class qlib.utils.serial.Serializable
Serializable 将改变 pickle 的行为。
判断在转储时是否保留或删除属性的规则。 优先级较高的规则位于顶部 - 在配置属性列表中 -> 总是删除 - 在包含属性列表中 -> 总是保留 - 在排除属性列表中 -> 总是删除 - 名称不以_开头 -> 保留 - 名称以_开头 -> 如果dump_all为真则保留,否则删除
它提供了一种语法糖,用于区分用户不希望保存的属性。 - 例如,一个可学习的Datahandler在转储到磁盘时只想保存参数而不保存数据。
- __init__()
- property dump_all
对象会转储所有对象吗
- config(recursive=False, **kwargs)
配置可序列化对象
- Parameters:
keys (kwargs 可能包括以下内容) –
- dump_allbool
对象是否转储所有对象
- excludelist
哪些属性不会被转储
- includelist
哪些属性会被转储
recursive (bool) – 配置是否递归
- to_pickle(path: Path | str, **kwargs)
将自身转储到pickle文件中。
path (Union[Path, str]): 要转储的路径
kwargs 可能包含以下键
- dump_allbool
对象会转储所有对象吗
- excludelist
哪些属性不会被转储
- includelist
将转储什么属性
- classmethod load(filepath)
从文件路径加载可序列化的类。
- Parameters:
filepath (str) – 文件的路径
- Raises:
TypeError – 腌制的文件必须是 type(cls)
- Returns:
type(cls)的实例
- Return type:
type(cls)
- classmethod get_backend()
返回一个可序列化类的真实后端。pickle_backend 的值可以是“pickle”或“dill”。
- Returns:
基于pickle_backend的pickle或dill模块
- Return type:
模块
- static general_dump(obj, path: Path | str)
对象的通用转储方法
- Parameters:
obj (object) – 要转储的对象
path (Union[Path, str]) – 数据将被转储的目标路径
强化学习
基础组件
- class qlib.rl.Interpreter
解释器是模拟器产生的状态与强化学习策略所需状态之间的媒介。 解释器是双向的:
从模拟器状态到策略状态(也称为观察),请参见
StateInterpreter
。从策略动作到模拟器接受的动作,请参见
ActionInterpreter
。
继承两个子类之一来定义你自己的解释器。 这个超类仅用于isinstance检查。
建议解释器是无状态的,这意味着在解释器中使用
self.xxx
存储临时信息是反模式。未来,我们可能支持通过调用self.env.register_state()
来注册一些与解释器相关的状态,但这不在第一版的计划中。
- class qlib.rl.StateInterpreter(*args, **kwds)
状态解释器,将qlib执行器的执行结果解释为rl环境状态
- validate(obs: ObsType) None
验证观察是否属于预定义的观察空间。
- interpret(simulator_state: StateType) ObsType
解释模拟器的状态。
- Parameters:
simulator_state – 通过
simulator.get_state()
获取。- Return type:
策略所需的状态。应符合在
observation_space
中定义的状态空间。
- class qlib.rl.ActionInterpreter(*args, **kwds)
动作解释器,将强化学习代理的动作解释为qlib订单
- validate(action: PolicyActType) None
验证一个动作是否属于预定义的动作空间。
- interpret(simulator_state: StateType, action: PolicyActType) ActType
将策略动作转换为模拟器动作。
- Parameters:
simulator_state – 通过
simulator.get_state()
获取。action – 策略给出的原始动作。
- Return type:
模拟器所需的操作,
- class qlib.rl.Reward(*args, **kwds)
奖励计算组件,接受一个参数:模拟器的状态。返回一个实数:奖励。
子类应实现
reward(simulator_state)
以实现自己的奖励计算方案。- reward(simulator_state: SimulatorState) float
实现此方法以获得您自己的奖励。
- class qlib.rl.RewardCombination(rewards: Dict[str, Tuple[Reward, float]])
多重奖励的组合。
- reward(simulator_state: Any) float
实现此方法以获得您自己的奖励。
- class qlib.rl.Simulator(initial: InitialStateType, **kwargs: Any)
模拟器通过
__init__
重置,并通过step(action)
进行转换。为了使数据流清晰,我们对Simulator做出以下限制:
修改模拟器内部状态的唯一方法是使用
step(action)
。外部模块可以通过使用
simulator.get_state()
来读取模拟器的状态,并通过调用simulator.done()
来检查模拟器是否处于结束状态。
模拟器被定义为与三种类型绑定:
InitialStateType 是用于创建模拟器的数据类型。
StateType 是模拟器状态(state)的类型。
ActType 是 action 的类型,这是在每个步骤中接收到的输入。
不同的模拟器可能共享相同的StateType。例如,当它们处理相同的任务时,但使用不同的模拟实现。使用相同的类型,它们可以安全地共享MDP中的其他组件。
模拟器是短暂的。模拟器的生命周期从一个初始状态开始,到轨迹结束为止。 换句话说,当轨迹结束时,模拟器会被回收。 如果模拟器想要在之间共享上下文(例如,为了加速目的), 这可以通过访问环境包装器的弱引用来实现。
- env
env-wrapper的参考,在某些特殊情况下可能有用。 不鼓励模拟器使用此功能,因为它容易引发错误。
- Type:
可选的[EnvWrapper]
- __init__(initial: InitialStateType, **kwargs: Any) None
- step(action: ActType) None
接收一个ActType类型的动作。
模拟器应更新其内部状态,并返回None。 更新后的状态可以通过
simulator.get_state()
获取。
- done() bool
检查模拟器是否处于“完成”状态。 当模拟器处于“完成”状态时, 它不应再接收任何
step
请求。 由于模拟器是短暂的,要重置模拟器, 应销毁旧的模拟器并创建一个新的模拟器。
策略
- class qlib.rl.strategy.SingleOrderStrategy(order: Order, trade_range: TradeRange | None = None)
用于生成具有恰好一个订单的交易决策的策略。
- __init__(order: Order, trade_range: TradeRange | None = None) None
- Parameters:
outer_trade_decision (BaseTradeDecision, optional) –
外部策略的交易决策,该策略依赖于此决策,并且将在 [start_time, end_time] 期间进行交易,默认为 None
如果该策略用于拆分交易决策,将会使用此决策
如果该策略用于投资组合管理,可以忽略此决策
level_infra (LevelInfrastructure, 可选) – 用于回测的级别共享基础设施,包括交易日历
common_infra (CommonInfrastructure, optional) – 用于回测的通用基础设施,包括交易账户、交易交易所等。
trade_exchange (Exchange) –
提供市场信息的交易所,用于处理订单和生成报告
如果 trade_exchange 为 None,self.trade_exchange 将被设置为 common_infra
它允许在不同的执行中使用不同的 trade_exchanges。
例如:
在每日执行中,每日交易所和每分钟交易所都可以使用,但推荐使用每日交易所,因为它运行更快。
在每分钟执行中,每日交易所不可用,仅推荐使用每分钟交易所。
- generate_trade_decision(execute_result: list | None = None) TradeDecisionWO
在每个交易时段生成交易决策
- Parameters:
execute_result (List[object], optional) –
交易决策的执行结果,默认为 None
首次调用 generate_trade_decision 时,execute_result 可能为 None
训练器
训练、测试、推理工具。
- class qlib.rl.trainer.Trainer(*, max_iters: int | None = None, val_every_n_iters: int | None = None, loggers: LogWriter | List[LogWriter] | None = None, callbacks: List[Callback] | None = None, finite_env_type: FiniteEnvType = 'subproc', concurrency: int = 2, fast_dev_run: int | None = None)
用于在特定任务上训练策略的工具。
与传统深度学习训练器不同,此训练器的迭代是“收集”,而不是“epoch”或“mini-batch”。 在每次收集中,
Collector
收集一定数量的策略-环境交互,并将它们累积到一个回放缓冲区中。此缓冲区用作训练策略的“数据”。 在每次收集结束时,策略会被更新多次。该API与PyTorch Lightning有一些相似之处, 但本质上不同,因为这个训练器是为RL应用构建的,因此大多数配置都在RL上下文中。 我们仍在寻找整合现有训练器库的方法,因为构建一个与这些库一样强大的训练器看起来需要很大的努力,而且,这不是我们的主要目标。
它与 tianshou的内置训练器 有本质上的不同,因为它远比那些复杂得多。
- Parameters:
max_iters – 停止前的最大迭代次数。
val_every_n_iters – 每n次迭代执行一次验证(即训练收集)。
logger – 用于记录回测结果的日志记录器。必须存在日志记录器,因为没有日志记录器,所有信息都将丢失。
finite_env_type – 有限环境实现的类型。
并发 – 并行工作线程。
fast_dev_run – 创建一个用于调试的子集。 具体实现方式取决于训练容器的实现。 对于
TrainingVessel
,如果大于零, 将使用大小为fast_dev_run
的随机子集 代替train_initial_states
和val_initial_states
。
- should_stop: bool
设置为停止训练。
- metrics: dict
训练/验证/测试中产生的数值指标。 在训练/验证过程中,指标将显示最新一集的结果。 当每次训练/验证迭代完成时,指标将是该迭代中所有遇到的结果的汇总。
在每次新的训练迭代中清除。
在拟合过程中,验证指标将以前缀
val/
显示。
- current_iter: int
当前训练迭代(收集)。
- __init__(*, max_iters: int | None = None, val_every_n_iters: int | None = None, loggers: LogWriter | List[LogWriter] | None = None, callbacks: List[Callback] | None = None, finite_env_type: FiniteEnvType = 'subproc', concurrency: int = 2, fast_dev_run: int | None = None)
- initialize()
初始化整个训练过程。
这里的状态应与state_dict同步。
- initialize_iter()
初始化一次迭代/收集。
- state_dict() dict
尽可能将当前训练的每个状态放入一个字典中。
它不会尝试处理在一次训练收集中间所有可能的状态。 对于大多数情况,在每次迭代结束时,事情通常应该是正确的。
请注意,收集器中的重放缓冲区数据将会丢失,这也是预期的行为。
- load_state_dict(state_dict: dict) None
将所有状态加载到当前训练器中。
- named_callbacks() Dict[str, Callback]
检索一组回调函数,每个回调函数都有一个名称。 在保存检查点时非常有用。
- fit(vessel: TrainingVesselBase, ckpt_path: Path | None = None) None
在定义的模拟器上训练RL策略。
- Parameters:
vessel – 训练中使用的所有元素的集合。
ckpt_path – 加载一个预训练/暂停的训练检查点。
- test(vessel: TrainingVesselBase) None
测试RL策略与模拟器的对比。
模拟器将使用在
test_seed_iterator
中生成的数据进行输入。- Parameters:
vessel – 所有相关元素的集合。
- venv_from_iterator(iterator: Iterable[InitialStateType]) FiniteVectorEnv
从迭代器和训练容器创建一个向量化环境。
- class qlib.rl.trainer.TrainingVessel(*, simulator_fn: Callable[[InitialStateType], Simulator[InitialStateType, StateType, ActType]], state_interpreter: StateInterpreter[StateType, ObsType], action_interpreter: ActionInterpreter[StateType, PolicyActType, ActType], policy: BasePolicy, reward: Reward, train_initial_states: Sequence[InitialStateType] | None = None, val_initial_states: Sequence[InitialStateType] | None = None, test_initial_states: Sequence[InitialStateType] | None = None, buffer_size: int = 20000, episode_per_iter: int = 1000, update_kwargs: Dict[str, Any] = None)
训练容器的默认实现。
__init__
接受一系列初始状态,以便可以创建迭代器。train
,validate
,test
每个都执行一次收集(并且在训练中也会更新)。 默认情况下,训练初始状态将在训练期间无限重复, 并且收集器将控制每次迭代的回合数。 在验证和测试中,验证/测试初始状态将仅使用一次。额外的超参数(仅在训练中使用)包括:
buffer_size
: 回放缓冲区的大小。episode_per_iter
: 每次训练收集的回合数。可以通过快速开发运行进行覆盖。update_kwargs
: 出现在policy.update
中的关键字参数。 例如,dict(repeat=10, batch_size=64)
。
- __init__(*, simulator_fn: Callable[[InitialStateType], Simulator[InitialStateType, StateType, ActType]], state_interpreter: StateInterpreter[StateType, ObsType], action_interpreter: ActionInterpreter[StateType, PolicyActType, ActType], policy: BasePolicy, reward: Reward, train_initial_states: Sequence[InitialStateType] | None = None, val_initial_states: Sequence[InitialStateType] | None = None, test_initial_states: Sequence[InitialStateType] | None = None, buffer_size: int = 20000, episode_per_iter: int = 1000, update_kwargs: Dict[str, Any] = None)
- train_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]
重写此方法以创建用于训练的种子迭代器。 如果可迭代对象是上下文管理器,则整个训练将在with块中调用, 并且迭代器将在训练完成后自动关闭。
- val_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]
重写此方法以创建用于验证的种子迭代器。
- test_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]
重写此方法以创建用于测试的种子迭代器。
- train(vector_env: FiniteVectorEnv) Dict[str, Any]
创建一个收集器并收集
episode_per_iter
个片段。 在收集的回放缓冲区上更新策略。
- validate(vector_env: FiniteVectorEnv) Dict[str, Any]
实现此功能以一次性验证策略。
- test(vector_env: FiniteVectorEnv) Dict[str, Any]
实现此功能以在测试环境中评估策略一次。
- class qlib.rl.trainer.TrainingVesselBase(*args, **kwds)
一艘包含模拟器、解释器和策略的船将被发送到训练器。 这个类控制训练中与算法相关的部分,而训练器负责运行时的部分。
该船还定义了核心训练部分的最重要逻辑,并且(可选地)定义了一些回调,以在特定事件中插入自定义逻辑。
- train_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]
重写此方法以创建用于训练的种子迭代器。 如果可迭代对象是上下文管理器,则整个训练将在with块中调用, 并且迭代器将在训练完成后自动关闭。
- val_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]
重写此方法以创建用于验证的种子迭代器。
- test_seed_iterator() ContextManager[Iterable[InitialStateType]] | Iterable[InitialStateType]
重写此方法以创建用于测试的种子迭代器。
- train(vector_env: BaseVectorEnv) Dict[str, Any]
实现此功能以训练一次迭代。在强化学习中,一次迭代通常指一次收集。
- validate(vector_env: FiniteVectorEnv) Dict[str, Any]
实现此功能以一次性验证策略。
- test(vector_env: FiniteVectorEnv) Dict[str, Any]
实现此功能以在测试环境中评估策略一次。
- state_dict() Dict
返回当前船只状态的检查点。
- load_state_dict(state_dict: Dict) None
从之前保存的状态字典中恢复检查点。
- class qlib.rl.trainer.Checkpoint(dirpath: Path, filename: str = '{iter:03d}.pth', save_latest: Literal['link', 'copy'] | None = 'link', every_n_iters: int | None = None, time_interval: int | None = None, save_on_fit_end: bool = True)
定期保存检查点以实现持久化和恢复。
- Parameters:
dirpath – 保存检查点文件的目录。
filename –
检查点文件名。可以包含自动填充的命名格式化选项。 例如:
{iter:03d}-{reward:.2f}.pth
。 支持的参数名称有:iter (int)
trainer.metrics
中的指标时间字符串,格式为
%Y%m%d%H%M%S
save_latest – 将最新的检查点保存在
latest.pth
中。 如果使用link
,latest.pth
将创建为软链接。 如果使用copy
,latest.pth
将存储为单独的副本。 设置为 none 以禁用此功能。every_n_iters – 检查点会在每n次训练迭代结束时保存,如果适用的话,在验证之后。
time_interval – 检查点再次保存前的最大时间(秒)。
save_on_fit_end – 在训练结束时保存最后一个检查点。 如果已经保存了检查点,则不执行任何操作。
- __init__(dirpath: Path, filename: str = '{iter:03d}.pth', save_latest: Literal['link', 'copy'] | None = 'link', every_n_iters: int | None = None, time_interval: int | None = None, save_on_fit_end: bool = True)
- on_fit_end(trainer: Trainer, vessel: TrainingVesselBase) None
在整个拟合过程结束后调用。
- on_iter_end(trainer: Trainer, vessel: TrainingVesselBase) None
在每次迭代结束时调用。 这是在
current_iter
增加之后调用的,当之前的迭代被认为完成时。
- class qlib.rl.trainer.EarlyStopping(monitor: str = 'reward', min_delta: float = 0.0, patience: int = 0, mode: Literal['min', 'max'] = 'max', baseline: float |