工作流元数据#
可观测性对于工作流非常重要——有时我们不仅希望获得输出,还希望了解内部状态(例如,测量性能或发现瓶颈)。工作流元数据提供了多种统计数据,帮助理解工作流,从基本的运行状态和任务选项到性能和用户指定的元数据。
检索元数据#
工作流元数据可以通过 workflow.get_metadata(workflow_id) 获取。例如:
import ray
from ray import workflow
@ray.remote
def add(left: int, right: int) -> int:
return left + right
workflow.run(add.bind(10, 20), workflow_id="add_example")
workflow_metadata = workflow.get_metadata("add_example")
assert workflow_metadata["status"] == "SUCCESSFUL"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" in workflow_metadata["stats"]
你也可以通过提供任务名称来检索单个工作流任务的元数据:
workflow.run(
add.options(
**workflow.options(task_id="add_task")
).bind(10, 20), workflow_id="add_example_2")
task_metadata = workflow.get_metadata("add_example_2", task_id="add_task")
assert "start_time" in workflow_metadata["stats"]
assert "end_time" in workflow_metadata["stats"]
用户定义的元数据#
用户可以为工作流或工作流任务添加自定义元数据,这在您希望将一些额外信息附加到工作流或工作流任务时非常有用。
可以通过
.run(metadata=metadata)添加工作流级别的元数据任务级元数据可以通过
.options(**workflow.options(metadata=metadata))或装饰器@workflow.options(metadata=metadata)添加。
workflow.run(add.options(**workflow.options(task_id="add_task", metadata={"task_k": "task_v"})).bind(10, 20),
workflow_id="add_example_3", metadata={"workflow_k": "workflow_v"})
assert workflow.get_metadata("add_example_3")["user_metadata"] == {"workflow_k": "workflow_v"}
assert workflow.get_metadata("add_example_3", task_id="add_task")["user_metadata"] == {"task_k": "task_v"}
注意:用户定义的元数据必须是一个值为 JSON 可序列化的 Python 字典。
可用指标#
工作流级别
status: 工作流状态,可以是 RUNNING、FAILED、RESUMABLE、CANCELED 或 SUCCESSFUL 之一。
user_metadata: 用户通过
workflow.run()提供的自定义元数据的 Python 字典。stats: 工作流运行统计信息,包括工作流开始时间和结束时间。
任务级别
名称:任务的名称,由用户通过
task.options(**workflow.options(task_id=xxx))提供或由系统生成。task_options: 任务的选项,由用户通过
task.options()提供或由系统默认提供。user_metadata: 一个由用户通过
task.options()提供的自定义元数据的 Python 字典。stats: 任务运行统计信息,包括任务开始时间和结束时间。
注释#
与 get_output() 不同,get_metadata() 在调用时会立即返回结果,这也意味着如果相应的元数据不可用,结果中不会包含所有字段(例如,metadata["stats"]["end_time"] 在流程完成之前将不可用)。
import time
@ray.remote
def simple():
time.sleep(1000)
return 0
workflow.run_async(simple.bind(), workflow_id="workflow_id")
# make sure workflow task starts running
time.sleep(2)
workflow_metadata = workflow.get_metadata("workflow_id")
assert workflow_metadata["status"] == "RUNNING"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" not in workflow_metadata["stats"]
workflow.cancel("workflow_id")
workflow_metadata = workflow.get_metadata("workflow_id")
assert workflow_metadata["status"] == "CANCELED"
assert "start_time" in workflow_metadata["stats"]
assert "end_time" not in workflow_metadata["stats"]
2. For resumed workflows, the current behavior is that “stats” will be updated whenever a workflow is resumed.
from pathlib import Path
workflow_id = "simple"
error_flag = Path("error")
error_flag.touch()
@ray.remote
def simple():
if error_flag.exists():
raise ValueError()
return 0
try:
workflow.run(simple.bind(), workflow_id=workflow_id)
except ray.exceptions.RayTaskError:
pass
workflow_metadata_failed = workflow.get_metadata(workflow_id)
assert workflow_metadata_failed["status"] == "FAILED"
# remove flag to make task success
error_flag.unlink()
ref = workflow.resume_async(workflow_id)
assert ray.get(ref) == 0
workflow_metadata_resumed = workflow.get_metadata(workflow_id)
assert workflow_metadata_resumed["status"] == "SUCCESSFUL"
# make sure resume updated running metrics
assert workflow_metadata_resumed["stats"]["start_time"] > workflow_metadata_failed["stats"]["start_time"]
assert workflow_metadata_resumed["stats"]["end_time"] > workflow_metadata_failed["stats"]["end_time"]