ray.workflow.run#

ray.workflow.run(dag: DAGNode, *args, workflow_id: str | None = None, metadata: Dict[str, Any] | None = None, **kwargs) Any[源代码]#

运行一个工作流程。

如果具有给定ID的工作流已经存在,它将被恢复。

示例

import ray
from ray import workflow

@ray.remote
def book_flight(origin: str, dest: str):
   return f"Flight: {origin}->{dest}"

@ray.remote
def book_hotel(location: str):
   return f"Hotel: {location}"

@ray.remote
def finalize_trip(bookings: List[Any]):
   return ' | '.join(ray.get(bookings))

flight1 = book_flight.bind("OAK", "SAN")
flight2 = book_flight.bind("SAN", "OAK")
hotel = book_hotel.bind("SAN")
trip = finalize_trip.bind([flight1, flight2, hotel])
print(workflow.run(trip))
Flight: OAK->SAN | Flight: SAN->OAK | Hotel: SAN
参数:
  • workflow_id – 一个可用于恢复工作流的唯一标识符。如果未指定,将生成一个随机ID。

  • metadata – 要添加到工作流中的元数据。它必须能够序列化为json。

返回:

运行结果。

PublicAPI (alpha): 此API处于alpha阶段,可能在稳定之前发生变化。