任务管理

介绍

工作流部分介绍了如何以松散耦合的方式运行研究工作流。但是当你使用qrun时,它只能执行一个task。 为了自动生成和执行不同的任务,Task Management提供了一个包括任务生成任务存储任务训练任务收集的完整流程。 通过这个模块,用户可以在不同的时间段、不同的损失函数、甚至不同的模型中自动运行他们的task。任务生成、模型训练以及数据合并和收集的过程如下图所示。

../_images/Task-Gen-Recorder-Collector.svg

整个过程可以用于Online Serving

整个过程的示例显示在这里

任务生成

一个taskModelDatasetRecord或用户添加的任何内容组成。 具体的任务模板可以在 Task Section中查看。 尽管任务模板是固定的,但用户可以自定义他们的TaskGen以通过任务模板生成不同的task

这是TaskGen的基类:

class qlib.workflow.task.gen.TaskGen

生成不同任务的基类

示例 1:

输入:一个特定的任务模板和滚动步骤

输出:任务的滚动版本

示例 2:

输入:一个特定的任务模板和损失列表

输出:一组具有不同损失的任务

abstract generate(task: dict) List[dict]

基于任务模板生成不同的任务

Parameters:

任务 (字典) – 一个任务模板

Returns:

任务列表

Return type:

列表[字典]

Qlib 提供了一个类 RollingGen 来生成不同日期段的数据集的 task 列表。 这个类允许用户在一个实验中验证不同时期的数据对模型的影响。更多信息请参见 这里

任务存储

为了实现更高的效率和集群操作的可能性,Task Manager 会将所有任务存储在 MongoDB 中。 TaskManager 可以自动获取未完成的任务,并管理一组任务的生命周期,包括错误处理。 用户在使用此模块时必须完成 MongoDB 的配置。

用户需要提供MongoDB的URL和数据库名称,以便在初始化中使用TaskManager,或者像这样进行声明。

from qlib.config import C
C["mongo"] = {
    "task_url" : "mongodb://localhost:27017/", # your MongoDB url
    "task_db_name" : "rolling_db" # database name
}
class qlib.workflow.task.manage.TaskManager(task_pool: str)

这是当任务由TaskManager创建时,任务的样子

{
    'def': pickle serialized task definition.  using pickle will make it easier
    'filter': json-like data. This is for filtering the tasks.
    'status': 'waiting' | 'running' | 'done'
    'res': pickle serialized task result,
}

任务管理器假设您只会更新您获取的任务。 Mongo的获取和更新操作将确保数据更新的安全性。

这个类可以作为命令行工具使用。以下是几个示例。 您可以使用以下命令查看管理模块的帮助: python -m qlib.workflow.task.manage -h # 显示管理模块CLI的手册 python -m qlib.workflow.task.manage wait -h # 显示管理模块中wait命令的手册

python -m qlib.workflow.task.manage -t <pool_name> wait
python -m qlib.workflow.task.manage -t <pool_name> task_stat

注意

假设:MongoDB中的数据被编码,而从MongoDB中取出的数据被解码

以下是四种状态:

STATUS_WAITING: 等待训练

STATUS_RUNNING: 训练中

STATUS_PART_DONE: 已完成某些步骤,正在等待下一步

STATUS_DONE: 所有工作已完成

__init__(task_pool: str)

初始化任务管理器,记得先声明MongoDB的URL和数据库名称。 一个TaskManager实例服务于一个特定的任务池。 该模块的静态方法服务于整个MongoDB。

Parameters:

task_pool (str) – MongoDB中集合的名称

static list() list

列出数据库的所有集合(task_pool)。

Returns:

列表

replace_task(task, new_task)

使用新任务替换旧任务

Parameters:
  • task – 旧任务

  • new_task – 新任务

insert_task(task)

插入一个任务。

Parameters:

task – 等待插入的任务

Returns:

pymongo.results.InsertOneResult

insert_task_def(task_def)

插入一个任务到任务池

Parameters:

task_def (dict) – 任务定义

Return type:

pymongo.results.InsertOneResult

create_task(task_def_l, dry_run=False, print_nt=False) List[str]

如果task_def_l中的任务是新的,则将新任务插入到task_pool中,并记录inserted_id。 如果任务不是新的,则只需查询其_id。

Parameters:
  • task_def_l (list) – 任务列表

  • dry_run (bool) – 是否将这些新任务插入任务池

  • print_nt (bool) – 如果打印新任务

Returns:

任务定义列表的_id列表

Return type:

列表[str]

fetch_task(query={}, status='waiting') dict

使用查询来获取任务。

Parameters:
  • query (dict, 可选) – 查询字典。默认为 {}。

  • status (str, optional) – [描述]. 默认为 STATUS_WAITING.

Returns:

解码后的任务(集合中的文档)

Return type:

字典

safe_fetch_task(query={}, status='waiting')

使用带有contextmanager的查询从task_pool中获取任务

Parameters:

query (dict) – 查询的字典

Returns:

字典

Return type:

解码后的任务(集合中的文档)

query(query={}, decode=True)

查询集合中的任务。 如果迭代生成器花费太长时间,此函数可能会引发异常 pymongo.errors.CursorNotFound: cursor id not found

python -m qlib.workflow.task.manage -t query ‘{“_id”: “615498be837d0053acbc5d58”}’

Parameters:
  • query (dict) – 查询的字典

  • decode (bool) –

Returns:

字典

Return type:

解码后的任务(集合中的文档)

re_query(_id) dict

使用 _id 查询任务。

Parameters:

_id (str) – 文档的_id

Returns:

解码后的任务(集合中的文档)

Return type:

字典

commit_task_res(task, res, status='done')

将结果提交到 task['res']。

Parameters:
  • 任务 ([类型]) – [描述]

  • res (object) – 你想要保存的结果

  • status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_DONE。

return_task(task, status='waiting')

将任务返回到状态。通常用于错误处理。

Parameters:
  • 任务 ([类型]) – [描述]

  • status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_WAITING。

remove(query={})

使用查询删除任务

Parameters:

query (dict) – 查询的字典

task_stat(query={}) dict

计算每个状态中的任务数量。

Parameters:

query (dict, 可选) – 查询字典。默认为 {}。

Returns:

字典

reset_waiting(query={})

将所有正在运行的任务重置为等待状态。当某些正在运行的任务意外退出时可以使用。

Parameters:

query (dict, 可选) – 查询字典。默认为 {}。

prioritize(task, priority: int)

设置任务的优先级

Parameters:
  • 任务 (字典) – 来自数据库的任务查询

  • priority (int) – 目标优先级

wait(query={})

在多进程处理时,主进程可能从TaskManager中获取不到任何内容,因为仍然有一些任务在运行。 因此,主进程应等待,直到所有任务都被其他进程或机器训练好。

Parameters:

query (dict, 可选) – 查询字典。默认为 {}。

有关Task Manager的更多信息可以在这里找到。

任务训练

在生成并存储这些task之后,现在是时候运行处于WAITING状态的task了。 Qlib提供了一个名为run_task的方法来运行任务池中的这些task,然而,用户也可以自定义任务的执行方式。 获取task_func的一个简单方法是直接使用qlib.model.trainer.task_train。 它将运行由task定义的整个工作流程,包括ModelDatasetRecord

qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)

当任务池不为空(有等待中的任务)时,使用 task_func 从 task_pool 中获取并运行任务

运行此方法后,有以下4种情况(before_status -> after_status):

STATUS_WAITING -> STATUS_DONE: 使用 task["def"] 作为 task_func 参数,这意味着任务尚未开始

STATUS_WAITING -> STATUS_PART_DONE: 使用 task["def"] 作为 task_func 参数

STATUS_PART_DONE -> STATUS_PART_DONE: 使用 task[“res”] 作为 task_func 参数,这意味着任务已开始但未完成

STATUS_PART_DONE -> STATUS_DONE: 使用 task[“res”] 作为 task_func 参数

Parameters:
  • task_func (Callable) –

    def (task_def, **kwargs) ->

    运行任务的函数

  • task_pool (str) – 任务池的名称(MongoDB中的集合)

  • query (dict) – 在获取任务时将使用此字典来查询任务池

  • force_release (bool) – 程序是否会强制释放资源

  • before_status (str:) – 将获取并训练处于before_status状态的任务。可以是STATUS_WAITING, STATUS_PART_DONE。

  • after_status (str:) – 训练后的任务将变为after_status。可以是STATUS_WAITING, STATUS_PART_DONE。

  • kwargstask_func的参数

同时,Qlib 提供了一个名为 Trainer 的模块。

class qlib.model.trainer.Trainer

训练器可以训练一系列模型。 有Trainer和DelayTrainer,可以通过它们何时完成实际训练来区分。

__init__()
train(tasks: list, *args, **kwargs) list

给定一系列任务定义,开始训练,并返回模型。

对于Trainer,它在这个方法中完成实际的训练。 对于DelayTrainer,它在这个方法中只做一些准备工作。

Parameters:

tasks – 任务列表

Returns:

模型列表

Return type:

列表

end_train(models: list, *args, **kwargs) list

给定一个模型列表,在训练结束时如果需要完成某些操作。 这些模型可能是Recorder、文本文件、数据库等。

对于Trainer,它在这个方法中进行一些收尾工作。 对于DelayTrainer,它在这个方法中完成实际的训练。

Parameters:

models – 模型列表

Returns:

模型列表

Return type:

列表

is_delay() bool

如果Trainer将延迟完成end_train

Returns:

如果 DelayTrainer

Return type:

布尔

has_worker() bool

一些训练器有后端工作器来支持并行训练 这个方法可以判断工作器是否启用。

Returns:

如果工作者已启用

Return type:

布尔

worker()

启动工作器

Raises:

NotImplementedError: – 如果工作器不受支持

Trainer 将训练一系列任务并返回一系列模型记录器。 Qlib 提供了两种 Trainer,TrainerR 是最简单的方式,而 TrainerRM 基于 TaskManager 来自动管理任务的生命周期。 如果您不想使用 Task Manager 来管理任务,那么使用 TrainerR 来训练由 TaskGen 生成的任务列表就足够了。 这里 是关于不同 Trainer 的详细信息。

任务收集

在收集模型训练结果之前,您需要使用 qlib.init 来指定 mlruns 的路径。

为了在训练后收集task的结果,Qlib提供了CollectorGroupEnsemble,以可读、可扩展和松散耦合的方式收集结果。

Collector 可以从任何地方收集对象并对它们进行处理,例如合并、分组、平均等。它有两个步骤的操作,包括 collect(将任何内容收集到一个字典中)和 process_collect(处理收集的字典)。

Group 也有两个步骤,包括 group(可以根据 group_func 将一组对象分组并将其转换为字典)和 reduce(可以根据某些规则将字典变成一个整体)。 例如:{(A,B,C1): object, (A,B,C2): object} —group—> {(A,B): {C1: object, C2: object}} —reduce—> {(A,B): object}

Ensemble 可以合并集合中的对象。 例如:{C1: 对象, C2: 对象} —Ensemble—> 对象。 你可以在 Collector 的 process_list 中设置你想要的集合。 常见的集合包括 AverageEnsembleRollingEnsemble。平均集合用于在同一时间段内集合不同模型的结果。滚动集合用于在同一时间段内集合不同模型的结果。

因此,层次结构是Collector的第二步对应于Group。而Group的第二步对应于Ensemble

欲了解更多信息,请参阅CollectorGroupEnsemble,或查看示例