任务管理
介绍
工作流部分介绍了如何以松散耦合的方式运行研究工作流。但是当你使用qrun时,它只能执行一个task。
为了自动生成和执行不同的任务,Task Management提供了一个包括任务生成、任务存储、任务训练和任务收集的完整流程。
通过这个模块,用户可以在不同的时间段、不同的损失函数、甚至不同的模型中自动运行他们的task。任务生成、模型训练以及数据合并和收集的过程如下图所示。
整个过程可以用于Online Serving。
整个过程的示例显示在这里。
任务生成
一个task由Model、Dataset、Record或用户添加的任何内容组成。
具体的任务模板可以在
Task Section中查看。
尽管任务模板是固定的,但用户可以自定义他们的TaskGen以通过任务模板生成不同的task。
这是TaskGen的基类:
- class qlib.workflow.task.gen.TaskGen
生成不同任务的基类
示例 1:
输入:一个特定的任务模板和滚动步骤
输出:任务的滚动版本
示例 2:
输入:一个特定的任务模板和损失列表
输出:一组具有不同损失的任务
- abstract generate(task: dict) List[dict]
基于任务模板生成不同的任务
- Parameters:
任务 (字典) – 一个任务模板
- Returns:
任务列表
- Return type:
列表[字典]
Qlib 提供了一个类 RollingGen 来生成不同日期段的数据集的 task 列表。
这个类允许用户在一个实验中验证不同时期的数据对模型的影响。更多信息请参见 这里。
任务存储
为了实现更高的效率和集群操作的可能性,Task Manager 会将所有任务存储在 MongoDB 中。
TaskManager 可以自动获取未完成的任务,并管理一组任务的生命周期,包括错误处理。
用户在使用此模块时必须完成 MongoDB 的配置。
用户需要提供MongoDB的URL和数据库名称,以便在初始化中使用TaskManager,或者像这样进行声明。
from qlib.config import C C["mongo"] = { "task_url" : "mongodb://localhost:27017/", # your MongoDB url "task_db_name" : "rolling_db" # database name }
- class qlib.workflow.task.manage.TaskManager(task_pool: str)
这是当任务由TaskManager创建时,任务的样子
{ 'def': pickle serialized task definition. using pickle will make it easier 'filter': json-like data. This is for filtering the tasks. 'status': 'waiting' | 'running' | 'done' 'res': pickle serialized task result, }
任务管理器假设您只会更新您获取的任务。 Mongo的获取和更新操作将确保数据更新的安全性。
这个类可以作为命令行工具使用。以下是几个示例。 您可以使用以下命令查看管理模块的帮助: python -m qlib.workflow.task.manage -h # 显示管理模块CLI的手册 python -m qlib.workflow.task.manage wait -h # 显示管理模块中wait命令的手册
python -m qlib.workflow.task.manage -t <pool_name> wait python -m qlib.workflow.task.manage -t <pool_name> task_stat
注意
假设:MongoDB中的数据被编码,而从MongoDB中取出的数据被解码
以下是四种状态:
STATUS_WAITING: 等待训练
STATUS_RUNNING: 训练中
STATUS_PART_DONE: 已完成某些步骤,正在等待下一步
STATUS_DONE: 所有工作已完成
- __init__(task_pool: str)
初始化任务管理器,记得先声明MongoDB的URL和数据库名称。 一个TaskManager实例服务于一个特定的任务池。 该模块的静态方法服务于整个MongoDB。
- Parameters:
task_pool (str) – MongoDB中集合的名称
- static list() list
列出数据库的所有集合(task_pool)。
- Returns:
列表
- replace_task(task, new_task)
使用新任务替换旧任务
- Parameters:
task – 旧任务
new_task – 新任务
- insert_task(task)
插入一个任务。
- Parameters:
task – 等待插入的任务
- Returns:
pymongo.results.InsertOneResult
- insert_task_def(task_def)
插入一个任务到任务池
- Parameters:
task_def (dict) – 任务定义
- Return type:
pymongo.results.InsertOneResult
- create_task(task_def_l, dry_run=False, print_nt=False) List[str]
如果task_def_l中的任务是新的,则将新任务插入到task_pool中,并记录inserted_id。 如果任务不是新的,则只需查询其_id。
- Parameters:
task_def_l (list) – 任务列表
dry_run (bool) – 是否将这些新任务插入任务池
print_nt (bool) – 如果打印新任务
- Returns:
任务定义列表的_id列表
- Return type:
列表[str]
- fetch_task(query={}, status='waiting') dict
使用查询来获取任务。
- Parameters:
query (dict, 可选) – 查询字典。默认为 {}。
status (str, optional) – [描述]. 默认为 STATUS_WAITING.
- Returns:
解码后的任务(集合中的文档)
- Return type:
字典
- safe_fetch_task(query={}, status='waiting')
使用带有contextmanager的查询从task_pool中获取任务
- Parameters:
query (dict) – 查询的字典
- Returns:
字典
- Return type:
解码后的任务(集合中的文档)
- query(query={}, decode=True)
查询集合中的任务。 如果迭代生成器花费太长时间,此函数可能会引发异常 pymongo.errors.CursorNotFound: cursor id not found
python -m qlib.workflow.task.manage -t
query ‘{“_id”: “615498be837d0053acbc5d58”}’ - Parameters:
query (dict) – 查询的字典
decode (bool) –
- Returns:
字典
- Return type:
解码后的任务(集合中的文档)
- re_query(_id) dict
使用 _id 查询任务。
- Parameters:
_id (str) – 文档的_id
- Returns:
解码后的任务(集合中的文档)
- Return type:
字典
- commit_task_res(task, res, status='done')
将结果提交到 task['res']。
- Parameters:
任务 ([类型]) – [描述]
res (object) – 你想要保存的结果
status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_DONE。
- return_task(task, status='waiting')
将任务返回到状态。通常用于错误处理。
- Parameters:
任务 ([类型]) – [描述]
status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE。默认为 STATUS_WAITING。
- remove(query={})
使用查询删除任务
- Parameters:
query (dict) – 查询的字典
- task_stat(query={}) dict
计算每个状态中的任务数量。
- Parameters:
query (dict, 可选) – 查询字典。默认为 {}。
- Returns:
字典
- reset_waiting(query={})
将所有正在运行的任务重置为等待状态。当某些正在运行的任务意外退出时可以使用。
- Parameters:
query (dict, 可选) – 查询字典。默认为 {}。
- prioritize(task, priority: int)
设置任务的优先级
- Parameters:
任务 (字典) – 来自数据库的任务查询
priority (int) – 目标优先级
- wait(query={})
在多进程处理时,主进程可能从TaskManager中获取不到任何内容,因为仍然有一些任务在运行。 因此,主进程应等待,直到所有任务都被其他进程或机器训练好。
- Parameters:
query (dict, 可选) – 查询字典。默认为 {}。
有关Task Manager的更多信息可以在这里找到。
任务训练
在生成并存储这些task之后,现在是时候运行处于WAITING状态的task了。
Qlib提供了一个名为run_task的方法来运行任务池中的这些task,然而,用户也可以自定义任务的执行方式。
获取task_func的一个简单方法是直接使用qlib.model.trainer.task_train。
它将运行由task定义的整个工作流程,包括Model、Dataset、Record。
- qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)
当任务池不为空(有等待中的任务)时,使用 task_func 从 task_pool 中获取并运行任务
运行此方法后,有以下4种情况(before_status -> after_status):
STATUS_WAITING -> STATUS_DONE: 使用 task["def"] 作为 task_func 参数,这意味着任务尚未开始
STATUS_WAITING -> STATUS_PART_DONE: 使用 task["def"] 作为 task_func 参数
STATUS_PART_DONE -> STATUS_PART_DONE: 使用 task[“res”] 作为 task_func 参数,这意味着任务已开始但未完成
STATUS_PART_DONE -> STATUS_DONE: 使用 task[“res”] 作为 task_func 参数
- Parameters:
task_func (Callable) –
def (task_def, **kwargs) ->
运行任务的函数
task_pool (str) – 任务池的名称(MongoDB中的集合)
query (dict) – 在获取任务时将使用此字典来查询任务池
force_release (bool) – 程序是否会强制释放资源
before_status (str:) – 将获取并训练处于before_status状态的任务。可以是STATUS_WAITING, STATUS_PART_DONE。
after_status (str:) – 训练后的任务将变为after_status。可以是STATUS_WAITING, STATUS_PART_DONE。
kwargs – task_func的参数
同时,Qlib 提供了一个名为 Trainer 的模块。
- class qlib.model.trainer.Trainer
训练器可以训练一系列模型。 有Trainer和DelayTrainer,可以通过它们何时完成实际训练来区分。
- __init__()
- train(tasks: list, *args, **kwargs) list
给定一系列任务定义,开始训练,并返回模型。
对于Trainer,它在这个方法中完成实际的训练。 对于DelayTrainer,它在这个方法中只做一些准备工作。
- Parameters:
tasks – 任务列表
- Returns:
模型列表
- Return type:
列表
- end_train(models: list, *args, **kwargs) list
给定一个模型列表,在训练结束时如果需要完成某些操作。 这些模型可能是Recorder、文本文件、数据库等。
对于Trainer,它在这个方法中进行一些收尾工作。 对于DelayTrainer,它在这个方法中完成实际的训练。
- Parameters:
models – 模型列表
- Returns:
模型列表
- Return type:
列表
- is_delay() bool
如果Trainer将延迟完成end_train。
- Returns:
如果 DelayTrainer
- Return type:
布尔
- has_worker() bool
一些训练器有后端工作器来支持并行训练 这个方法可以判断工作器是否启用。
- Returns:
如果工作者已启用
- Return type:
布尔
- worker()
启动工作器
- Raises:
NotImplementedError: – 如果工作器不受支持
Trainer 将训练一系列任务并返回一系列模型记录器。
Qlib 提供了两种 Trainer,TrainerR 是最简单的方式,而 TrainerRM 基于 TaskManager 来自动管理任务的生命周期。
如果您不想使用 Task Manager 来管理任务,那么使用 TrainerR 来训练由 TaskGen 生成的任务列表就足够了。
这里 是关于不同 Trainer 的详细信息。
任务收集
在收集模型训练结果之前,您需要使用 qlib.init 来指定 mlruns 的路径。
为了在训练后收集task的结果,Qlib提供了Collector、Group和Ensemble,以可读、可扩展和松散耦合的方式收集结果。
Collector 可以从任何地方收集对象并对它们进行处理,例如合并、分组、平均等。它有两个步骤的操作,包括 collect(将任何内容收集到一个字典中)和 process_collect(处理收集的字典)。
Group 也有两个步骤,包括 group(可以根据 group_func 将一组对象分组并将其转换为字典)和 reduce(可以根据某些规则将字典变成一个整体)。
例如:{(A,B,C1): object, (A,B,C2): object} —group—> {(A,B): {C1: object, C2: object}} —reduce—> {(A,B): object}
Ensemble 可以合并集合中的对象。
例如:{C1: 对象, C2: 对象} —Ensemble—> 对象。
你可以在 Collector 的 process_list 中设置你想要的集合。
常见的集合包括 AverageEnsemble 和 RollingEnsemble。平均集合用于在同一时间段内集合不同模型的结果。滚动集合用于在同一时间段内集合不同模型的结果。
因此,层次结构是Collector的第二步对应于Group。而Group的第二步对应于Ensemble。