工作流元数据#

可观测性对于工作流非常重要——有时我们不仅希望获得输出,还希望了解内部状态(例如,测量性能或发现瓶颈)。工作流元数据提供了多种统计数据,帮助理解工作流,从基本的运行状态和任务选项到性能和用户指定的元数据。

检索元数据#

工作流元数据可以通过 workflow.get_metadata(workflow_id) 获取。例如:

import ray
from ray import workflow

@ray.remote
def add(left: int, right: int) -> int:
    return left + right

workflow.run(add.bind(10, 20), workflow_id="add_example")

workflow_metadata = workflow.get_metadata("add_example")

assert workflow_metadata["status"] == "SUCCESSFUL"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" in workflow_metadata["stats"]

你也可以通过提供任务名称来检索单个工作流任务的元数据:

workflow.run(
    add.options(
        **workflow.options(task_id="add_task")
    ).bind(10, 20), workflow_id="add_example_2")

task_metadata = workflow.get_metadata("add_example_2", task_id="add_task")

assert "start_time" in workflow_metadata["stats"]
assert "end_time" in workflow_metadata["stats"]

用户定义的元数据#

用户可以为工作流或工作流任务添加自定义元数据,这在您希望将一些额外信息附加到工作流或工作流任务时非常有用。

  • 可以通过 .run(metadata=metadata) 添加工作流级别的元数据

  • 任务级元数据可以通过 .options(**workflow.options(metadata=metadata)) 或装饰器 @workflow.options(metadata=metadata) 添加。

workflow.run(add.options(**workflow.options(task_id="add_task", metadata={"task_k": "task_v"})).bind(10, 20),
    workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})

assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
assert workflow.get_metadata("add_example_3", task_id="add_task")["user_metadata"] == {"task_k": "task_v"}

注意:用户定义的元数据必须是一个值为 JSON 可序列化的 Python 字典。

可用指标#

工作流级别

  • status: 工作流状态,可以是 RUNNING、FAILED、RESUMABLE、CANCELED 或 SUCCESSFUL 之一。

  • user_metadata: 用户通过 workflow.run() 提供的自定义元数据的 Python 字典。

  • stats: 工作流运行统计信息,包括工作流开始时间和结束时间。

任务级别

  • 名称:任务的名称,由用户通过 task.options(**workflow.options(task_id=xxx)) 提供或由系统生成。

  • task_options: 任务的选项,由用户通过 task.options() 提供或由系统默认提供。

  • user_metadata: 一个由用户通过 task.options() 提供的自定义元数据的 Python 字典。

  • stats: 任务运行统计信息,包括任务开始时间和结束时间。

注释#

get_output() 不同,get_metadata() 在调用时会立即返回结果,这也意味着如果相应的元数据不可用,结果中不会包含所有字段(例如,metadata["stats"]["end_time"] 在流程完成之前将不可用)。

import time

@ray.remote
def simple():
    time.sleep(1000)
    return 0

workflow.run_async(simple.bind(), workflow_id="workflow_id")

# make sure workflow task starts running
time.sleep(2)

workflow_metadata = workflow.get_metadata("workflow_id")
assert workflow_metadata["status"] == "RUNNING"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" not in workflow_metadata["stats"]

workflow.cancel("workflow_id")

workflow_metadata = workflow.get_metadata("workflow_id")
assert workflow_metadata["status"] == "CANCELED"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" not in workflow_metadata["stats"]

2. For resumed workflows, the current behavior is that “stats” will be updated whenever a workflow is resumed.

from pathlib import Path

workflow_id = "simple"

error_flag = Path("error")
error_flag.touch()

@ray.remote
def simple():
    if error_flag.exists():
        raise ValueError()
    return 0

try:
    workflow.run(simple.bind(), workflow_id=workflow_id)
except ray.exceptions.RayTaskError:
    pass

workflow_metadata_failed = workflow.get_metadata(workflow_id)
assert workflow_metadata_failed["status"] == "FAILED"

# remove flag to make task success
error_flag.unlink()
ref = workflow.resume_async(workflow_id)
assert ray.get(ref) == 0

workflow_metadata_resumed = workflow.get_metadata(workflow_id)
assert workflow_metadata_resumed["status"] == "SUCCESSFUL"

# make sure resume updated running metrics
assert workflow_metadata_resumed["stats"]["start_time"] > workflow_metadata_failed["stats"]["start_time"]
assert workflow_metadata_resumed["stats"]["end_time"] > workflow_metadata_failed["stats"]["end_time"]