在线服务
介绍
除了回测,测试模型是否有效的一种方法是在真实市场条件下进行预测,甚至基于这些预测进行实际交易。
Online Serving 是一组使用最新数据的在线模型模块,
其中包括 Online Manager, Online Strategy, Online Tool, Updater。
这里有几个示例供参考,展示了Online Serving的不同功能。
如果你有许多模型或任务需要管理,请考虑任务管理。
这些示例基于任务管理中的一些组件,如TrainerRM或Collector。
注意: 用户应保持其数据源更新以支持在线服务。例如,Qlib 提供了 一批脚本 来帮助用户更新雅虎的每日数据。
目前已知的限制
- 目前,支持对下一个交易日的每日更新预测。但由于公共数据的限制
在线管理器
OnlineManager 可以管理一组 Online Strategy 并动态运行它们。
随着时间的推移,决定性模型也会发生变化。在这个模块中,我们称那些贡献模型为在线模型。 在每个例程中(例如每天或每分钟),在线模型可能会发生变化,并且它们的预测需要更新。 因此,这个模块提供了一系列方法来控制这个过程。
该模块还提供了一种方法来模拟历史中的在线策略。这意味着你可以验证你的策略或找到一个更好的策略。
在不同情况下使用不同的训练器共有4种情况:
情况 |
描述 |
|---|---|
在线 + 培训师 |
当你想进行一个真实的例行程序时,训练器将帮助你训练模型。它将逐任务、逐策略地训练模型。 |
在线 + 延迟训练器 |
DelayTrainer 将跳过具体训练,直到所有任务已由不同策略准备完毕。它使用户可以在 routine 或 first_train 结束时并行训练所有任务。否则,这些函数将在每个策略准备任务时卡住。 |
模拟 + 训练器 |
它的行为将与在线 + 训练器相同。唯一的区别是它用于模拟/回测,而不是在线交易。 |
模拟 + 延迟训练器 |
当你的模型没有任何时间依赖性时,你可以使用DelayTrainer来实现多任务处理的能力。这意味着所有例程中的所有任务都可以在模拟结束时进行REAL训练。信号将在不同的时间段准备好(基于是否有任何新模型在线)。 |
以下是一些伪代码,展示了每种情况的工作流程
- For simplicity
策略中仅使用一种策略
update_online_pred 仅在在线模式下调用,其他情况下会被忽略
在线 + 培训师
tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
trainer.end_train(models)
prepare_signals() # prepare trading signals daily
在线 + 延迟训练器: 工作流程与 在线 + 训练器 相同。
模拟 + 延迟训练器
# simulate
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
# OnlineManager.routine
models = trainer.train(strategy.prepare_tasks()) # for each strategy
strategy.prepare_online_models(models) # for each strategy
# delay_prepare()
# FIXME: Currently the delay_prepare is not implemented in a proper way.
trainer.end_train(<for all previous models>)
prepare_signals()
# 我们能否简化当前的工作流程?
可以减少任务的状态数量吗?
对于每个任务,我们有三个阶段(即任务、部分训练任务、最终训练任务)
- class qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
OnlineManager 可以使用 Online Strategy 管理在线模型。 它还提供了哪些模型在什么时间在线的历史记录。
- __init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')
初始化在线管理器。 一个在线管理器必须至少有一个在线策略。
- Parameters:
策略 (联合[OnlineStrategy, 列表[OnlineStrategy]]) – 一个OnlineStrategy实例或一个OnlineStrategy列表
begin_time (Union[str,pd.Timestamp], optional) – OnlineManager 将从这个时间开始。默认为 None,表示使用最近的日期。
trainer (qlib.model.trainer.Trainer) – 用于训练任务的训练器。如果为None,则使用TrainerR。
freq (str, 可选) – 数据频率。默认为“day”。
- first_train(strategies: List[OnlineStrategy] | None = None, model_kwargs: dict = {})
从每个策略的first_tasks方法中获取任务并训练它们。 如果使用DelayTrainer,它可以在每个策略的first_tasks之后一起完成所有训练。
- Parameters:
策略 (列表[OnlineStrategy]) – 策略列表(添加策略时需要此参数)。None 表示使用默认策略。
model_kwargs (dict) – prepare_online_models的参数
- routine(cur_time: str | Timestamp | None = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})
每个策略的典型更新过程并记录在线历史。
典型的更新过程在例行程序之后,例如每天或每月。 过程是:更新预测 -> 准备任务 -> 准备在线模型 -> 准备信号。
如果使用DelayTrainer,它可以在每个策略的prepare_tasks之后一起完成训练。
- Parameters:
cur_time (Union[str,pd.Timestamp], optional) – 在此时间运行例行方法。默认为 None。
task_kwargs (dict) – prepare_tasks的参数
model_kwargs (dict) – prepare_online_models 的参数
signal_kwargs (dict) – prepare_signals 的参数
- get_collector(**kwargs) MergeCollector
获取Collector的实例,以从每个策略中收集结果。 这个收集器可以作为信号准备的基础。
- Parameters:
**kwargs – get_collector 的参数。
- Returns:
用于合并其他收集器的收集器。
- Return type:
- add_strategy(strategies: OnlineStrategy | List[OnlineStrategy])
向OnlineManager添加一些新策略。
- prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)
在准备好上一次例行程序的数据(箱线图中的一个箱子)后,这意味着例行程序的结束,我们可以为下一个例行程序准备交易信号。
注意:给定一组预测,这些预测结束时间之前的所有信号都将被妥善准备。
即使最新的信号已经存在,最新的计算结果也会被覆盖。
注意
给定某个时间的预测,在此之前的所有信号都将准备就绪。
- Parameters:
prepare_func (Callable, optional) – 在收集后从字典中获取信号。默认为 AverageEnsemble(),由 MergeCollector 收集的结果必须是 {xxx:pred}。
over_write (bool, optional) – 如果为True,新信号将覆盖原有信号。如果为False,新信号将追加到信号末尾。默认为False。
- Returns:
信号。
- Return type:
pd.DataFrame
- get_signals() Series | DataFrame
获取准备好的在线信号。
- Returns:
pd.Series 用于每个日期时间只有一个信号。 pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。
- Return type:
Union[pd.Series, pd.DataFrame]
- simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) Series | DataFrame
从当前时间开始,此方法将模拟OnlineManager中的每个例程,直到结束时间。
考虑到并行训练,模型和信号可以在所有常规模拟之后准备。
延迟训练的方式可以是
DelayTrainer,而延迟准备信号的方式可以是delay_prepare。- Parameters:
end_time – 模拟结束的时间
frequency – 日历频率
task_kwargs (dict) – prepare_tasks的参数
model_kwargs (dict) – prepare_online_models的参数
signal_kwargs (dict) – prepare_signals 的参数
- Returns:
pd.Series 用于每个日期时间只有一个信号。 pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。
- Return type:
联合[pd.Series, pd.DataFrame]
- delay_prepare(model_kwargs={}, signal_kwargs={})
如果某些内容正在等待准备,请准备所有模型和信号。
- Parameters:
model_kwargs – end_train的参数
signal_kwargs – prepare_signals的参数
在线策略
OnlineStrategy 模块是在线服务的一个组成部分。
- class qlib.workflow.online.strategy.OnlineStrategy(name_id: str)
OnlineStrategy 正在与 Online Manager 合作,响应任务的生成方式、模型的更新以及信号的准备。
- __init__(name_id: str)
初始化在线策略。 此模块必须使用Trainer来完成模型训练。
- Parameters:
name_id (str) – 一个唯一的名称或ID。
训练器 (qlib.model.trainer.Trainer, 可选) – 一个训练器的实例。默认为 None。
- prepare_tasks(cur_time, **kwargs) List[dict]
在例程结束后,检查我们是否需要根据cur_time(None表示最新)准备和训练一些新任务。 返回等待训练的新任务。
您可以通过OnlineTool.online_models找到最新的在线模型。
- prepare_online_models(trained_models, cur_time=None) List[object]
从训练好的模型中选择一些模型并将它们设置为在线模型。 这是一个典型的将所有训练好的模型上线的方法,你可以覆盖它以实现更复杂的方法。 如果你仍然需要它们,你可以通过OnlineTool.online_models找到最后上线的模型。
注意:将所有在线模型重置为训练好的模型。如果没有训练好的模型,则不执行任何操作。
- NOTE:
当前的实现非常朴素。这里有一个更复杂的情况,更接近实际场景。 1. 在test_start前一天(时间戳T)训练新模型 2. 在test_start(通常在时间戳T + 1)切换模型
- Parameters:
模型 (列表) – 一个模型列表。
cur_time (pd.Dataframe) – 来自OnlineManger的当前时间。如果为None,则表示最新时间。
- Returns:
在线模型列表。
- Return type:
列表[对象]
- first_tasks() List[dict]
首先生成一系列任务并返回它们。
- class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
此示例策略始终使用最新的滚动模型 sas 在线模型。
- __init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)
初始化滚动策略。
假设:name_id的字符串、实验名称和训练者的实验名称是相同的。
- Parameters:
name_id (str) – 一个唯一的名称或ID。也将是实验的名称。
task_template (Union[dict, List[dict]]) – 一个任务模板列表或单个模板,将用于通过rolling_gen生成多个任务。
rolling_gen (RollingGen) – RollingGen的一个实例
- get_collector(process_list=[<qlib.model.ens.group.RollingGroup object>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)
获取Collector的实例以收集结果。返回的收集器必须区分不同模型的结果。
假设:模型可以根据模型名称和滚动测试段进行区分。 如果您不希望使用此假设,请实现您的方法或使用另一个rec_key_func。
- Parameters:
rec_key_func (Callable) – 一个用于获取记录器键的函数。如果为None,则使用记录器ID。
rec_filter_func (Callable, optional) – 通过返回True或False来过滤记录器。默认为None。
artifacts_key (List[str], optional) – 你想要获取的artifacts键。如果为None,则获取所有artifacts。
- first_tasks() List[dict]
使用rolling_gen基于task_template生成不同的任务。
- Returns:
任务列表
- Return type:
列表[字典]
- prepare_tasks(cur_time) List[dict]
根据cur_time准备新任务(None表示最新的)。
您可以通过OnlineToolR.online_models找到最新的在线模型。
- Returns:
新任务的列表。
- Return type:
列表[字典]
在线工具
OnlineTool 是一个用于设置和取消设置一系列在线模型的模块。 在线模型是在某些时间点上的一些决定性模型,这些模型可以随时间的变化而变化。 这使得我们能够使用高效的子模型来适应市场风格的变化。
- class qlib.workflow.online.utils.OnlineTool
OnlineTool 将管理实验中的 在线 模型,这些实验包括模型记录器。
- __init__()
初始化在线工具。
- set_online_tag(tag, recorder: list | object)
将tag设置为模型以标记是否在线。
- Parameters:
tag (str) – ONLINE_TAG 和 OFFLINE_TAG 中的标签
recorder (Union[list,object]) – 模型的记录器
- get_online_tag(recorder: object) str
给定一个模型记录器并返回其在线标签。
- Parameters:
recorder (Object) – 模型的记录器
- Returns:
在线标签
- Return type:
字符串
- reset_online_tag(recorder: list | object)
将所有模型离线并将记录器设置为“在线”。
- Parameters:
recorder (Union[list,object]) – 您想要重置为“在线”的记录器。
- online_models() list
获取当前的在线模型
- Returns:
一个在线模型的列表。
- Return type:
列表
- update_online_pred(to_date=None)
更新在线模型的预测到to_date。
- Parameters:
to_date (pd.Timestamp) – 在此日期之前的预测将被更新。如果为None,则更新到最新。
- class qlib.workflow.online.utils.OnlineToolR(default_exp_name: str | None = None)
基于(R)ecorder的OnlineTool实现。
- __init__(default_exp_name: str | None = None)
初始化 OnlineToolR。
- Parameters:
default_exp_name (str) – 默认的实验名称。
- set_online_tag(tag, recorder: Recorder | List)
将tag设置为模型的记录器,以标记是否在线。
- Parameters:
tag (str) – ONLINE_TAG, NEXT_ONLINE_TAG, OFFLINE_TAG中的标签
recorder (Union[Recorder, List]) – Recorder的列表或Recorder的实例
- get_online_tag(recorder: Recorder) str
给定一个模型记录器并返回其在线标签。
- Parameters:
recorder (Recorder) – 一个记录器的实例
- Returns:
在线标签
- Return type:
字符串
- reset_online_tag(recorder: Recorder | List, exp_name: str | None = None)
将所有模型离线并将记录器设置为“在线”。
- Parameters:
recorder (Union[Recorder, List]) – 你想要重置为‘在线’的recorder。
exp_name (str) – 实验名称。如果为None,则使用default_exp_name。
- online_models(exp_name: str | None = None) list
获取当前的在线模型
- Parameters:
exp_name (str) – 实验名称。如果为None,则使用default_exp_name。
- Returns:
一个在线模型的列表。
- Return type:
列表
- update_online_pred(to_date=None, from_date=None, exp_name: str | None = None)
更新在线模型的预测至to_date。
- Parameters:
to_date (pd.Timestamp) – 在此日期之前的预测将被更新。如果为None,则更新到日历中的最新时间。
exp_name (str) – 实验名称。如果为None,则使用default_exp_name。
更新器
Updater 是一个模块,用于在股票数据更新时更新诸如预测之类的工件。
- class qlib.workflow.online.update.RMDLoader(rec: Recorder)
记录器模型数据集加载器
- __init__(rec: Recorder)
- get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) DatasetH
加载、配置和设置数据集。
此数据集用于推理。
- Parameters:
start_time – 基础数据的开始时间
end_time – 基础数据的结束时间
segments – dict 数据集的分段配置 由于时间序列数据集(TSDatasetH),测试段可能与start_time和end_time不同
unprepared_dataset – Optional[DatasetH] 如果用户不想从记录器加载数据集,请指定用户的数据集
- Returns:
DatasetH的实例
- Return type:
- class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)
更新特定的记录器
- __init__(record: Recorder, *args, **kwargs)
- abstract update(*args, **kwargs)
更新特定记录器的信息
- class qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
基于数据集的更新器
为基于Qlib数据集的数据更新提供更新功能
假设
基于Qlib数据集
要更新的数据是一个多级索引的pd.DataFrame。例如标签、预测。
LABEL0 datetime instrument 2021-05-10 SH600000 0.006965 SH600004 0.003407 ... ... 2021-05-28 SZ300498 0.015748 SZ300676 -0.001321
- __init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
初始化 PredUpdater。
在以下情况下的预期行为:
如果 to_date 大于日历中的最大日期,数据将更新到最新日期
如果在from_date之前或to_date之后有数据,只有from_date和to_date之间的数据会受到影响。
- Parameters:
record – 记录器
to_date –
将预测更新到to_date
如果to_date为None:
数据将更新到最新日期。
from_date –
更新将从from_date开始
如果from_date为None:
更新将在历史数据中最新数据后的下一个tick时进行
hist_ref –
int 有时,数据集会有历史依赖。 将问题留给用户设置历史依赖的长度 如果用户没有指定此参数,Updater 将尝试加载数据集以自动确定 hist_ref
注意
start_time 不包括在 hist_ref 中;因此,在大多数情况下,hist_ref 将是 step_len - 1
loader_cls – 类型 用于加载模型和数据集的类
- prepare_data(unprepared_dataset: DatasetH | None = None) DatasetH
加载数据集 - 如果指定了 unprepared_dataset,则直接准备数据集 - 否则,
将此函数分离将使数据集更易于重用
- Returns:
DatasetH的实例
- Return type:
- update(dataset: DatasetH | None = None, write: bool = True, ret_new: bool = False) object | None
- Parameters:
dataset (DatasetH) – DatasetH: DatasetH的实例。如果为None,则重新准备。
write (bool) – 是否执行写入操作
ret_new (bool) – 更新后的数据是否会被返回
- Returns:
更新后的数据集
- Return type:
可选[对象]
- abstract get_update_data(dataset: Dataset) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 和 update 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)
- class qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)
更新记录器中的预测
- get_update_data(dataset: Dataset) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 和 update 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)
- class qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)
更新记录器中的标签
假设 - 标签是从record_temp.SignalRecord生成的。
- __init__(record: Recorder, to_date=None, **kwargs)
初始化 PredUpdater。
在以下情况下的预期行为:
如果 to_date 大于日历中的最大日期,数据将更新到最新日期
如果在from_date之前或to_date之后有数据,只有from_date和to_date之间的数据会受到影响。
- Parameters:
record – 记录器
to_date –
将预测更新到to_date
如果to_date为None:
数据将更新到最新日期。
from_date –
更新将从from_date开始
如果from_date为None:
更新将在历史数据中最新数据后的下一个tick时进行
hist_ref –
int 有时,数据集会有历史依赖。 将问题留给用户设置历史依赖的长度 如果用户没有指定此参数,Updater 将尝试加载数据集以自动确定 hist_ref
注意
start_time 不包括在 hist_ref 中;因此,在大多数情况下,hist_ref 将是 step_len - 1
loader_cls – 类型 用于加载模型和数据集的类
- get_update_data(dataset: Dataset) DataFrame
根据给定的数据集返回更新后的数据
get_update_data 和 update 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)