在线服务

介绍

../_images/online_serving.png

除了回测,测试模型是否有效的一种方法是在真实市场条件下进行预测,甚至基于这些预测进行实际交易。 Online Serving 是一组使用最新数据的在线模型模块, 其中包括 Online Manager, Online Strategy, Online Tool, Updater

这里有几个示例供参考,展示了Online Serving的不同功能。 如果你有许多模型或任务需要管理,请考虑任务管理。 这些示例基于任务管理中的一些组件,如TrainerRMCollector

注意: 用户应保持其数据源更新以支持在线服务。例如,Qlib 提供了 一批脚本 来帮助用户更新雅虎的每日数据。

目前已知的限制 - 目前,支持对下一个交易日的每日更新预测。但由于公共数据的限制 _,不支持为下一个交易日生成订单。

在线管理器

OnlineManager 可以管理一组 Online Strategy 并动态运行它们。

随着时间的推移,决定性模型也会发生变化。在这个模块中,我们称那些贡献模型为在线模型。 在每个例程中(例如每天或每分钟),在线模型可能会发生变化,并且它们的预测需要更新。 因此,这个模块提供了一系列方法来控制这个过程。

该模块还提供了一种方法来模拟历史中的在线策略。这意味着你可以验证你的策略或找到一个更好的策略。

在不同情况下使用不同的训练器共有4种情况:

情况

描述

在线 + 培训师

当你想进行一个真实的例行程序时,训练器将帮助你训练模型。它将逐任务、逐策略地训练模型。

在线 + 延迟训练器

DelayTrainer 将跳过具体训练,直到所有任务已由不同策略准备完毕。它使用户可以在 routinefirst_train 结束时并行训练所有任务。否则,这些函数将在每个策略准备任务时卡住。

模拟 + 训练器

它的行为将与在线 + 训练器相同。唯一的区别是它用于模拟/回测,而不是在线交易。

模拟 + 延迟训练器

当你的模型没有任何时间依赖性时,你可以使用DelayTrainer来实现多任务处理的能力。这意味着所有例程中的所有任务都可以在模拟结束时进行REAL训练。信号将在不同的时间段准备好(基于是否有任何新模型在线)。

以下是一些伪代码,展示了每种情况的工作流程

For simplicity
  • 策略中仅使用一种策略

  • update_online_pred 仅在在线模式下调用,其他情况下会被忽略

  1. 在线 + 培训师

tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # for each strategy
    strategy.prepare_online_models(models)  # for each strategy

    trainer.end_train(models)
    prepare_signals()  # prepare trading signals daily

在线 + 延迟训练器: 工作流程与 在线 + 训练器 相同。

  1. 模拟 + 延迟训练器

# simulate
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # for each strategy
    strategy.prepare_online_models(models)  # for each strategy
# delay_prepare()
# FIXME: Currently the delay_prepare is not implemented in a proper way.
trainer.end_train(<for all previous models>)
prepare_signals()

# 我们能否简化当前的工作流程?

  • 可以减少任务的状态数量吗?

    • 对于每个任务,我们有三个阶段(即任务、部分训练任务、最终训练任务)

class qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')

OnlineManager 可以使用 Online Strategy 管理在线模型。 它还提供了哪些模型在什么时间在线的历史记录。

__init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')

初始化在线管理器。 一个在线管理器必须至少有一个在线策略。

Parameters:
  • 策略 (联合[OnlineStrategy, 列表[OnlineStrategy]]) – 一个OnlineStrategy实例或一个OnlineStrategy列表

  • begin_time (Union[str,pd.Timestamp], optional) – OnlineManager 将从这个时间开始。默认为 None,表示使用最近的日期。

  • trainer (qlib.model.trainer.Trainer) – 用于训练任务的训练器。如果为None,则使用TrainerR。

  • freq (str, 可选) – 数据频率。默认为“day”。

first_train(strategies: List[OnlineStrategy] | None = None, model_kwargs: dict = {})

从每个策略的first_tasks方法中获取任务并训练它们。 如果使用DelayTrainer,它可以在每个策略的first_tasks之后一起完成所有训练。

Parameters:
  • 策略 (列表[OnlineStrategy]) – 策略列表(添加策略时需要此参数)。None 表示使用默认策略。

  • model_kwargs (dict) – prepare_online_models的参数

routine(cur_time: str | Timestamp | None = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})

每个策略的典型更新过程并记录在线历史。

典型的更新过程在例行程序之后,例如每天或每月。 过程是:更新预测 -> 准备任务 -> 准备在线模型 -> 准备信号。

如果使用DelayTrainer,它可以在每个策略的prepare_tasks之后一起完成训练。

Parameters:
  • cur_time (Union[str,pd.Timestamp], optional) – 在此时间运行例行方法。默认为 None。

  • task_kwargs (dict) – prepare_tasks的参数

  • model_kwargs (dict) – prepare_online_models 的参数

  • signal_kwargs (dict) – prepare_signals 的参数

get_collector(**kwargs) MergeCollector

获取Collector的实例,以从每个策略中收集结果。 这个收集器可以作为信号准备的基础。

Parameters:

**kwargs – get_collector 的参数。

Returns:

用于合并其他收集器的收集器。

Return type:

MergeCollector

add_strategy(strategies: OnlineStrategy | List[OnlineStrategy])

向OnlineManager添加一些新策略。

Parameters:

策略 (联合[在线策略, 列表[在线策略]]) – 一个在线策略的列表

prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)

在准备好上一次例行程序的数据(箱线图中的一个箱子)后,这意味着例行程序的结束,我们可以为下一个例行程序准备交易信号。

注意:给定一组预测,这些预测结束时间之前的所有信号都将被妥善准备。

即使最新的信号已经存在,最新的计算结果也会被覆盖。

注意

给定某个时间的预测,在此之前的所有信号都将准备就绪。

Parameters:
  • prepare_func (Callable, optional) – 在收集后从字典中获取信号。默认为 AverageEnsemble(),由 MergeCollector 收集的结果必须是 {xxx:pred}。

  • over_write (bool, optional) – 如果为True,新信号将覆盖原有信号。如果为False,新信号将追加到信号末尾。默认为False。

Returns:

信号。

Return type:

pd.DataFrame

get_signals() Series | DataFrame

获取准备好的在线信号。

Returns:

pd.Series 用于每个日期时间只有一个信号。 pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。

Return type:

Union[pd.Series, pd.DataFrame]

simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) Series | DataFrame

从当前时间开始,此方法将模拟OnlineManager中的每个例程,直到结束时间。

考虑到并行训练,模型和信号可以在所有常规模拟之后准备。

延迟训练的方式可以是 DelayTrainer,而延迟准备信号的方式可以是 delay_prepare

Parameters:
  • end_time – 模拟结束的时间

  • frequency – 日历频率

  • task_kwargs (dict) – prepare_tasks的参数

  • model_kwargs (dict) – prepare_online_models的参数

  • signal_kwargs (dict) – prepare_signals 的参数

Returns:

pd.Series 用于每个日期时间只有一个信号。 pd.DataFrame 用于多个信号,例如,买入和卖出操作使用不同的交易信号。

Return type:

联合[pd.Series, pd.DataFrame]

delay_prepare(model_kwargs={}, signal_kwargs={})

如果某些内容正在等待准备,请准备所有模型和信号。

Parameters:
  • model_kwargsend_train的参数

  • signal_kwargsprepare_signals的参数

在线策略

OnlineStrategy 模块是在线服务的一个组成部分。

class qlib.workflow.online.strategy.OnlineStrategy(name_id: str)

OnlineStrategy 正在与 Online Manager 合作,响应任务的生成方式、模型的更新以及信号的准备。

__init__(name_id: str)

初始化在线策略。 此模块必须使用Trainer来完成模型训练。

Parameters:
  • name_id (str) – 一个唯一的名称或ID。

  • 训练器 (qlib.model.trainer.Trainer, 可选) – 一个训练器的实例。默认为 None。

prepare_tasks(cur_time, **kwargs) List[dict]

在例程结束后,检查我们是否需要根据cur_time(None表示最新)准备和训练一些新任务。 返回等待训练的新任务。

您可以通过OnlineTool.online_models找到最新的在线模型。

prepare_online_models(trained_models, cur_time=None) List[object]

从训练好的模型中选择一些模型并将它们设置为在线模型。 这是一个典型的将所有训练好的模型上线的方法,你可以覆盖它以实现更复杂的方法。 如果你仍然需要它们,你可以通过OnlineTool.online_models找到最后上线的模型。

注意:将所有在线模型重置为训练好的模型。如果没有训练好的模型,则不执行任何操作。

NOTE:

当前的实现非常朴素。这里有一个更复杂的情况,更接近实际场景。 1. 在test_start前一天(时间戳T)训练新模型 2. 在test_start(通常在时间戳T + 1)切换模型

Parameters:
  • 模型 (列表) – 一个模型列表。

  • cur_time (pd.Dataframe) – 来自OnlineManger的当前时间。如果为None,则表示最新时间。

Returns:

在线模型列表。

Return type:

列表[对象]

first_tasks() List[dict]

首先生成一系列任务并返回它们。

get_collector() Collector

获取Collector的实例,以收集此策略的不同结果。

For example:
  1. 在Recorder中收集预测

  2. 在txt文件中收集信号

Returns:

Collector

class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

此示例策略始终使用最新的滚动模型 sas 在线模型。

__init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

初始化滚动策略。

假设:name_id的字符串、实验名称和训练者的实验名称是相同的。

Parameters:
  • name_id (str) – 一个唯一的名称或ID。也将是实验的名称。

  • task_template (Union[dict, List[dict]]) – 一个任务模板列表或单个模板,将用于通过rolling_gen生成多个任务。

  • rolling_gen (RollingGen) – RollingGen的一个实例

get_collector(process_list=[<qlib.model.ens.group.RollingGroup object>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)

获取Collector的实例以收集结果。返回的收集器必须区分不同模型的结果。

假设:模型可以根据模型名称和滚动测试段进行区分。 如果您不希望使用此假设,请实现您的方法或使用另一个rec_key_func。

Parameters:
  • rec_key_func (Callable) – 一个用于获取记录器键的函数。如果为None,则使用记录器ID。

  • rec_filter_func (Callable, optional) – 通过返回True或False来过滤记录器。默认为None。

  • artifacts_key (List[str], optional) – 你想要获取的artifacts键。如果为None,则获取所有artifacts。

first_tasks() List[dict]

使用rolling_gen基于task_template生成不同的任务。

Returns:

任务列表

Return type:

列表[字典]

prepare_tasks(cur_time) List[dict]

根据cur_time准备新任务(None表示最新的)。

您可以通过OnlineToolR.online_models找到最新的在线模型。

Returns:

新任务的列表。

Return type:

列表[字典]

在线工具

OnlineTool 是一个用于设置和取消设置一系列在线模型的模块。 在线模型是在某些时间点上的一些决定性模型,这些模型可以随时间的变化而变化。 这使得我们能够使用高效的子模型来适应市场风格的变化。

class qlib.workflow.online.utils.OnlineTool

OnlineTool 将管理实验中的 在线 模型,这些实验包括模型记录器。

__init__()

初始化在线工具。

set_online_tag(tag, recorder: list | object)

tag设置为模型以标记是否在线。

Parameters:
  • tag (str) – ONLINE_TAGOFFLINE_TAG 中的标签

  • recorder (Union[list,object]) – 模型的记录器

get_online_tag(recorder: object) str

给定一个模型记录器并返回其在线标签。

Parameters:

recorder (Object) – 模型的记录器

Returns:

在线标签

Return type:

字符串

reset_online_tag(recorder: list | object)

将所有模型离线并将记录器设置为“在线”。

Parameters:

recorder (Union[list,object]) – 您想要重置为“在线”的记录器。

online_models() list

获取当前的在线模型

Returns:

一个在线模型的列表。

Return type:

列表

update_online_pred(to_date=None)

更新在线模型的预测到to_date。

Parameters:

to_date (pd.Timestamp) – 在此日期之前的预测将被更新。如果为None,则更新到最新。

class qlib.workflow.online.utils.OnlineToolR(default_exp_name: str | None = None)

基于(R)ecorder的OnlineTool实现。

__init__(default_exp_name: str | None = None)

初始化 OnlineToolR。

Parameters:

default_exp_name (str) – 默认的实验名称。

set_online_tag(tag, recorder: Recorder | List)

tag设置为模型的记录器,以标记是否在线。

Parameters:
  • tag (str) – ONLINE_TAG, NEXT_ONLINE_TAG, OFFLINE_TAG中的标签

  • recorder (Union[Recorder, List]) – Recorder的列表或Recorder的实例

get_online_tag(recorder: Recorder) str

给定一个模型记录器并返回其在线标签。

Parameters:

recorder (Recorder) – 一个记录器的实例

Returns:

在线标签

Return type:

字符串

reset_online_tag(recorder: Recorder | List, exp_name: str | None = None)

将所有模型离线并将记录器设置为“在线”。

Parameters:
  • recorder (Union[Recorder, List]) – 你想要重置为‘在线’的recorder。

  • exp_name (str) – 实验名称。如果为None,则使用default_exp_name。

online_models(exp_name: str | None = None) list

获取当前的在线模型

Parameters:

exp_name (str) – 实验名称。如果为None,则使用default_exp_name。

Returns:

一个在线模型的列表。

Return type:

列表

update_online_pred(to_date=None, from_date=None, exp_name: str | None = None)

更新在线模型的预测至to_date。

Parameters:
  • to_date (pd.Timestamp) – 在此日期之前的预测将被更新。如果为None,则更新到日历中的最新时间。

  • exp_name (str) – 实验名称。如果为None,则使用default_exp_name。

更新器

Updater 是一个模块,用于在股票数据更新时更新诸如预测之类的工件。

class qlib.workflow.online.update.RMDLoader(rec: Recorder)

记录器模型数据集加载器

__init__(rec: Recorder)
get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) DatasetH

加载、配置和设置数据集。

此数据集用于推理。

Parameters:
  • start_time – 基础数据的开始时间

  • end_time – 基础数据的结束时间

  • segments – dict 数据集的分段配置 由于时间序列数据集(TSDatasetH),测试段可能与start_time和end_time不同

  • unprepared_dataset – Optional[DatasetH] 如果用户不想从记录器加载数据集,请指定用户的数据集

Returns:

DatasetH的实例

Return type:

DatasetH

class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)

更新特定的记录器

__init__(record: Recorder, *args, **kwargs)
abstract update(*args, **kwargs)

更新特定记录器的信息

class qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

基于数据集的更新器

  • 为基于Qlib数据集的数据更新提供更新功能

假设

  • 基于Qlib数据集

  • 要更新的数据是一个多级索引的pd.DataFrame。例如标签、预测。

                             LABEL0
    datetime   instrument
    2021-05-10 SH600000    0.006965
               SH600004    0.003407
    ...                         ...
    2021-05-28 SZ300498    0.015748
               SZ300676   -0.001321
    
__init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

初始化 PredUpdater。

在以下情况下的预期行为:

  • 如果 to_date 大于日历中的最大日期,数据将更新到最新日期

  • 如果在from_date之前或to_date之后有数据,只有from_dateto_date之间的数据会受到影响。

Parameters:
  • record – 记录器

  • to_date

    将预测更新到to_date

    如果to_date为None:

    数据将更新到最新日期。

  • from_date

    更新将从from_date开始

    如果from_date为None:

    更新将在历史数据中最新数据后的下一个tick时进行

  • hist_ref

    int 有时,数据集会有历史依赖。 将问题留给用户设置历史依赖的长度 如果用户没有指定此参数,Updater 将尝试加载数据集以自动确定 hist_ref

    注意

    start_time 不包括在 hist_ref 中;因此,在大多数情况下,hist_ref 将是 step_len - 1

  • loader_cls – 类型 用于加载模型和数据集的类

prepare_data(unprepared_dataset: DatasetH | None = None) DatasetH

加载数据集 - 如果指定了 unprepared_dataset,则直接准备数据集 - 否则,

将此函数分离将使数据集更易于重用

Returns:

DatasetH的实例

Return type:

DatasetH

update(dataset: DatasetH | None = None, write: bool = True, ret_new: bool = False) object | None
Parameters:
  • dataset (DatasetH) – DatasetH: DatasetH的实例。如果为None,则重新准备。

  • write (bool) – 是否执行写入操作

  • ret_new (bool) – 更新后的数据是否会被返回

Returns:

更新后的数据集

Return type:

可选[对象]

abstract get_update_data(dataset: Dataset) DataFrame

根据给定的数据集返回更新后的数据

get_update_dataupdate 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)

class qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

更新记录器中的预测

get_update_data(dataset: Dataset) DataFrame

根据给定的数据集返回更新后的数据

get_update_dataupdate 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)

class qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)

更新记录器中的标签

假设 - 标签是从record_temp.SignalRecord生成的。

__init__(record: Recorder, to_date=None, **kwargs)

初始化 PredUpdater。

在以下情况下的预期行为:

  • 如果 to_date 大于日历中的最大日期,数据将更新到最新日期

  • 如果在from_date之前或to_date之后有数据,只有from_dateto_date之间的数据会受到影响。

Parameters:
  • record – 记录器

  • to_date

    将预测更新到to_date

    如果to_date为None:

    数据将更新到最新日期。

  • from_date

    更新将从from_date开始

    如果from_date为None:

    更新将在历史数据中最新数据后的下一个tick时进行

  • hist_ref

    int 有时,数据集会有历史依赖。 将问题留给用户设置历史依赖的长度 如果用户没有指定此参数,Updater 将尝试加载数据集以自动确定 hist_ref

    注意

    start_time 不包括在 hist_ref 中;因此,在大多数情况下,hist_ref 将是 step_len - 1

  • loader_cls – 类型 用于加载模型和数据集的类

get_update_data(dataset: Dataset) DataFrame

根据给定的数据集返回更新后的数据

get_update_dataupdate 之间的区别 - update_date 只包含一些特定数据的功能 - update 包含一些常规步骤(例如准备数据集、检查)