跳转到内容

将您的工作流程作为服务器运行

workflows 库包含一个 WorkflowServer 类,允许您轻松通过 HTTP API 公开您的工作流。这提供了一种灵活的方式,可以从任何支持 HTTP 的客户端运行和管理工作流。

此外,WorkflowServer 部署了一个静态调试应用程序,允许您可视化、运行和调试工作流。该应用程序会自动挂载在运行服务器的根路径 / 下。

您可以创建一个服务器,添加您的工作流,并以编程方式运行它。当您希望将工作流服务器嵌入到更大的应用程序中时,这非常有用。

首先,创建一个Python文件(例如:my_server.py):

my_server.py
from workflows import Workflow, step
from workflows.context import Context
from workflows.events import Event, StartEvent, StopEvent
from workflows.server import WorkflowServer
class StreamEvent(Event):
sequence: int
# Define a simple workflow
class GreetingWorkflow(Workflow):
@step
async def greet(self, ctx: Context, ev: StartEvent) -> StopEvent:
for i in range(3):
ctx.write_event_to_stream(StreamEvent(sequence=i))
await asyncio.sleep(0.3)
name = ev.get("name", "World")
return StopEvent(result=f"Hello, {name}!")
greet_wf = GreetingWorkflow()
# Create a server instance
server = WorkflowServer()
# Add the workflow to the server
server.add_workflow("greet", greet_wf)
# To run the server programmatically (e.g., from your own script)
# import asyncio
#
# async def main():
# await server.serve(host="0.0.0.0", port=8080)
#
# if __name__ == "__main__":
# asyncio.run(main())

该库还提供了一个便捷的CLI,用于从包含 WorkflowServer 实例的文件中运行服务器。

给定上述示例中的 my_server.py 文件,您可以使用以下命令启动服务器:

Terminal window
python -m workflows.server my_server.py

服务器默认将在 0.0.0.0:8080 上启动。您可以使用 WORKFLOWS_PY_SERVER_HOSTWORKFLOWS_PY_SERVER_PORT 环境变量来配置主机和端口。

WorkflowServer 部署时附带一个静态调试应用程序,允许您可视化、运行和调试工作流。该应用程序会自动挂载在运行服务器的根路径 / 下。

Workflow Debugger UI

工作流调试界面提供几个关键功能:

  • 工作流可视化: 用户界面以可视化形式呈现工作流的静态结构及运行时的状态。您可以根据需要重新排列节点。
  • 自动模式检测: 如果您自定义启动/停止事件或内部事件的模式,用户界面将自动检测并显示适用于该模式的界面。
  • 人工参与循环: 当工作流运行时,您可以向工作流发送任何事件。这对于依赖人工输入才能继续执行的工作流非常有用。请查看事件日志顶部的 Send Event 按钮。
  • 事件日志: 所有流式事件都会在界面中记录,让您可以在右侧面板实时检查工作流的执行情况。
  • 多轮运行: 调试并比较多次运行结果。每次运行工作流时,左侧面板会追踪该次运行。
  • 多个工作流: 该用户界面将允许您运行挂载在 WorkflowServer 内的任何工作流。

有时,工作流会发送/接收在工作流中标注的事件(例如使用 ctx.wait_for_event())。在这些情况下,您仍然可以通过 server.add_workflow(..., additional_events=[...]) API 来告知用户界面这些事件,从而注入这些事件。然后,诸如 Send Event 功能的用户界面元素将会识别这些事件。

WorkflowServer 公开了以下 RESTful 端点:

方法路径描述
GET/health返回一个健康检查响应 ({"status": "healthy"})。
GET/workflows列出所有已注册工作流的名称。
POST/workflows/{name}/run同步运行指定的工作流并返回最终结果。
POST/workflows/{name}/run-nowait异步启动指定的工作流并返回一个 handler_id
GET/handlers/{handler_id}获取异步运行工作流的结果。如果仍在运行则返回 202 Accepted,如果工作流失败则返回 500,如果工作流完成则返回 200
GET/events/{handler_id}从正在运行的工作流中流式传输所有事件,以换行符分隔的JSON格式(如果启用了SSE,则为application/x-ndjsontext/event-stream)。
POST/events/{handler_id}在工作流执行期间向其发送事件(适用于人工介入场景)
GET/handlers获取所有工作流处理器(运行中和已完成)
POST/handlers/{handler_id}/cancel停止并取消工作流的执行。

要运行一个工作流并等待其完成,请向 /workflows/{name}/run 发送一个 POST 请求。

请求体:

{
"start_event": {},
"context": {},
"handler_id": "",
}
  • start_eventstart_event:StartEvent 或其子类的序列化表示。建议将其用作工作流输入。
  • contextcontext: 工作流上下文的序列化表示
  • handler_idhandler_id: 工作流处理程序标识符,用于从先前完成的运行继续执行。

成功响应 (200 OK):

{
"result": "The workflow has been successfully run"
}

要启动工作流而不等待其完成,请使用 /run-nowait 端点。

请求体:

{
"start_event": {},
"context": {},
"handler_id": ""
}

请求体参数与 /run 端点相同。

成功响应 (200 OK):

{
"handler_id": "someUniqueId123",
"status": "started"
}

然后你可以使用 handler_id 来检查结果或流式事件。

此端点仅在您之前使用 /run-nowait 异步启动工作流时有效

要将事件流式传输为服务器发送事件(SSE)或多行JSON负载,您可以向/events/{handler_id}端点发送请求,并附带您先前启动的异步工作流运行的处理程序ID。

查询参数

  • sse(设置为 "true" 或 "false",非必需):若为 true 则通过服务器发送事件(text/event-stream)流式传输事件,否则通过多行 JSON 负载(application/x-ndjson)流式传输。默认为 true。
  • acquire_timeoutacquire_timeout (可转换为浮点数的字符串,非必需):获取锁以遍历事件的超时时间
  • include_internalinclude_internal (设置为 "true" 或 "false",非必需):如果设为 true 则流式传输内部工作流事件。默认为 false。
  • include_qualified_nameinclude_qualified_name(设置为“true”或“false”,非必需):在响应体中包含事件的限定名称。默认为true。

示例请求

Terminal window
curl http://localhost:80/events/someUniqueId123?sse=false&acquire_timeout=1&include_internal=false&include_qualified_name=true

成功响应 (200 OK)

单一事件载荷:

{
"value": {"result": 12},
"qualified_name": "__main__.MathEvent",
"type": "__main__.MathEvent",
"types": ["workflows.events.Event", "__main__.MathEvent"],
}

重要注意事项

  • 每个工作流运行只允许一个读取器流式传输事件
  • 一旦事件被流式传输,它们将无法恢复(除非您在客户端实现了某些持久化逻辑)

我们正在努力改进这两个方面,因此服务器行为可能会有变化

此端点仅当您之前使用 /run-nowait 异步启动工作流时有效

要获取先前启动的异步工作流运行结果,您可以使用 /results/{handler_id} 端点并传入该运行的处理器ID。

示例请求

Terminal window
curl http://localhost:80/results/someUniqueId123

Successful response (200 OK)

{
"handler_id": "someUniqueId123",
"workflow_name": "math_workflow",
"run_id": "uniqueRunId456",
"error": null,
"result": {
"sum": 15,
"subtraction": 9,
"multiplication": 36,
"division": 4,
},
"status": "completed",
"started_at": "2024-10-21T14:32:15.123Z",
"updated_at": "2024-10-21T14:45:30.456Z",
"completed_at": "2024-10-21T14:45:30.456Z"
}

已接受的响应 (202 ACCEPTED)

当工作流仍在运行时,状态码 202 会被返回,因此尚未产生结果。

当工作流运行需要外部输入时(例如需要人工介入),您可以向 events/{handler_id} 端点发送 POST 请求,附带要发送的事件数据(以及可选的要发送到的工作流步骤),以提供所述的外部输入。

请求体

{
"event": {"__is_pydantic": true, "value": {"feedback": "This is great!", "approved": true}, "qualified_name": "__main__.HumanFeedbackEvent"},
"step": "process_human_feedback"
}
  • eventevent: 工作流事件的序列化表示。
  • stepstep (可选): 要发送事件的目标步骤名称。

Successful response (200 OK)

{
"status": "sent"
}

取消工作流运行 (/handlers/{handler_id}/cancel)

Section titled “Canceling a workflow run (/handlers/{handler_id}/cancel)”

要停止正在运行的工作流处理器,通过取消其任务,并可选地从持久化存储中移除关联的处理器,您可以使用 /handlers/{handler_id}/cancel

查询参数

  • purgepurge(可设置为“true”或“false”,非必需):是否从持久化存储中移除与工作流关联的处理程序。默认为false。

示例请求

Terminal window
curl -X POST http://localhost:80/handlers/someUniqueId123/cancel?purge=true

Successful response (200 OK)

{
"status": "deleted", // or canceled if purge is false
}

为了以编程方式与已部署的 WorkflowServer 进行交互,除了上面详述的原始API调用之外,我们还提供了一个 WorkflowClient 类。

WorkflowClient 提供了列出可用工作流和工作流处理器的方法,用于验证服务器健康状态、运行工作流(同步和异步方式)、流式传输事件以及发送事件。

假设您正在运行上述服务器示例,我们可以通过以下方式使用 WorkflowClient

from workflows.client import WorkflowClient
async def main()
client = WorkflowClient(base_url="http://0.0.0.0:8080")
workflows = await client.list_workflows()
print("===== AVAILABLE WORKFLOWS ====")
print(workflows)
await client.is_healthy() # will raise an exception if the server is not healthy
handler = await client.run_workflow_nowait(
"greet",
start_event=StartEvent(name="John"),
context=None,
)
handler_id = handler.handler_id
print("==== STARTING THE WORKFLOW ===")
print(f"Workflow running with handler ID: {handler_id}")
print("=== STREAMING EVENTS ===")
async for event in client.get_workflow_events(handler_id=handler_id):
print("Received data:", event)
result = await client.get_handler(handler_id)
print(f"Final result: {result.result} (status: {result.status})")
if __name__ == "__main__":
import asyncio
asyncio.run(main())

您也可以使用客户端交互式运行人在回路工作流,例如:

from workflows import Workflow, step
from workflows.context import Context
from workflows.events import (
StartEvent,
StopEvent,
InputRequiredEvent,
HumanResponseEvent,
)
from workflows.server import WorkflowServer
class RequestEvent(InputRequiredEvent):
prompt: str
class ResponseEvent(HumanResponseEvent):
response: str
class OutEvent(StopEvent):
output: str
class HumanInTheLoopWorkflow(Workflow):
@step
async def prompt_human(self, ev: StartEvent, ctx: Context) -> RequestEvent:
return RequestEvent(prompt="What is your name?")
@step
async def greet_human(self, ev: ResponseEvent) -> OutEvent:
return OutEvent(output=f"Hello, {ev.response}")
server = WorkflowServer()
server.add_workflow("human", HumanInTheLoopWorkflow(timeout=1000))
await server.serve("0.0.0.0", "8080")

现在你可以运行工作流,当需要人工交互时,将人工响应发送回来:

from workflow.client import WorkflowClient
client = WorkflowClient(base_url="http://0.0.0.0:8080")
handler = await client.run_workflow_nowait("human")
handler_id = handler.handler_id
print(handler_id)
async for event in client.get_workflow_events(handler_id=handler_id):
if "RequestEvent" == event.type
print(
"Workflow is requiring human input:",
event.value.get("prompt", ""),
)
name = input("Reply here: ")
sent_event = await client.send_event(
handler_id=handler_id,
event=ResponseEvent(response=name.capitalize().strip()),
)
msg = "Event has been sent" if sent_event else "Event failed to send"
print(msg)
result = await client.get_handler(handler_id)
print(f"Workflow complete with status: {result.status})")
res = OutEvent.model_validate(result.result)
print("Received final message:", res.output)