将您的工作流程作为服务器运行
workflows 库包含一个 WorkflowServer 类,允许您轻松通过 HTTP API 公开您的工作流。这提供了一种灵活的方式,可以从任何支持 HTTP 的客户端运行和管理工作流。
此外,WorkflowServer 部署了一个静态调试应用程序,允许您可视化、运行和调试工作流。该应用程序会自动挂载在运行服务器的根路径 / 下。
您可以创建一个服务器,添加您的工作流,并以编程方式运行它。当您希望将工作流服务器嵌入到更大的应用程序中时,这非常有用。
首先,创建一个Python文件(例如:my_server.py):
from workflows import Workflow, stepfrom workflows.context import Contextfrom workflows.events import Event, StartEvent, StopEventfrom workflows.server import WorkflowServer
class StreamEvent(Event): sequence: int
# Define a simple workflowclass GreetingWorkflow(Workflow): @step async def greet(self, ctx: Context, ev: StartEvent) -> StopEvent: for i in range(3): ctx.write_event_to_stream(StreamEvent(sequence=i)) await asyncio.sleep(0.3)
name = ev.get("name", "World") return StopEvent(result=f"Hello, {name}!")
greet_wf = GreetingWorkflow()
# Create a server instanceserver = WorkflowServer()
# Add the workflow to the serverserver.add_workflow("greet", greet_wf)
# To run the server programmatically (e.g., from your own script)# import asyncio## async def main():# await server.serve(host="0.0.0.0", port=8080)## if __name__ == "__main__":# asyncio.run(main())该库还提供了一个便捷的CLI,用于从包含 WorkflowServer 实例的文件中运行服务器。
给定上述示例中的 my_server.py 文件,您可以使用以下命令启动服务器:
python -m workflows.server my_server.py服务器默认将在 0.0.0.0:8080 上启动。您可以使用
WORKFLOWS_PY_SERVER_HOST 和 WORKFLOWS_PY_SERVER_PORT 环境变量来配置主机和端口。
WorkflowServer 部署时附带一个静态调试应用程序,允许您可视化、运行和调试工作流。该应用程序会自动挂载在运行服务器的根路径 / 下。

工作流调试界面提供几个关键功能:
- 工作流可视化: 用户界面以可视化形式呈现工作流的静态结构及运行时的状态。您可以根据需要重新排列节点。
- 自动模式检测: 如果您自定义启动/停止事件或内部事件的模式,用户界面将自动检测并显示适用于该模式的界面。
- 人工参与循环: 当工作流运行时,您可以向工作流发送任何事件。这对于依赖人工输入才能继续执行的工作流非常有用。请查看事件日志顶部的
Send Event按钮。 - 事件日志: 所有流式事件都会在界面中记录,让您可以在右侧面板实时检查工作流的执行情况。
- 多轮运行: 调试并比较多次运行结果。每次运行工作流时,左侧面板会追踪该次运行。
- 多个工作流: 该用户界面将允许您运行挂载在
WorkflowServer内的任何工作流。
有时,工作流会发送/接收在工作流中标注的事件(例如使用 ctx.wait_for_event())。在这些情况下,您仍然可以通过 server.add_workflow(..., additional_events=[...]) API 来告知用户界面这些事件,从而注入这些事件。然后,诸如 Send Event 功能的用户界面元素将会识别这些事件。
WorkflowServer 公开了以下 RESTful 端点:
| 方法 | 路径 | 描述 |
|---|---|---|
GET | /health | 返回一个健康检查响应 ({"status": "healthy"})。 |
GET | /workflows | 列出所有已注册工作流的名称。 |
POST | /workflows/{name}/run | 同步运行指定的工作流并返回最终结果。 |
POST | /workflows/{name}/run-nowait | 异步启动指定的工作流并返回一个 handler_id。 |
GET | /handlers/{handler_id} | 获取异步运行工作流的结果。如果仍在运行则返回 202 Accepted,如果工作流失败则返回 500,如果工作流完成则返回 200。 |
GET | /events/{handler_id} | 从正在运行的工作流中流式传输所有事件,以换行符分隔的JSON格式(如果启用了SSE,则为application/x-ndjson和text/event-stream)。 |
POST | /events/{handler_id} | 在工作流执行期间向其发送事件(适用于人工介入场景) |
GET | /handlers | 获取所有工作流处理器(运行中和已完成) |
POST | /handlers/{handler_id}/cancel | 停止并取消工作流的执行。 |
运行工作流 (/run)
Section titled “Running a Workflow (/run)”要运行一个工作流并等待其完成,请向 /workflows/{name}/run 发送一个 POST 请求。
请求体:
{ "start_event": {}, "context": {}, "handler_id": "",}start_eventstart_event:StartEvent 或其子类的序列化表示。建议将其用作工作流输入。contextcontext: 工作流上下文的序列化表示handler_idhandler_id: 工作流处理程序标识符,用于从先前完成的运行继续执行。
成功响应 (200 OK):
{ "result": "The workflow has been successfully run"}异步运行工作流 (/run-nowait)
Section titled “Running a Workflow Asynchronously (/run-nowait)”要启动工作流而不等待其完成,请使用 /run-nowait 端点。
请求体:
{ "start_event": {}, "context": {}, "handler_id": ""}请求体参数与 /run 端点相同。
成功响应 (200 OK):
{ "handler_id": "someUniqueId123", "status": "started"}然后你可以使用 handler_id 来检查结果或流式事件。
流式事件 (GET /events/{handler_id})
Section titled “Streaming events (GET /events/{handler_id})”此端点仅在您之前使用
/run-nowait异步启动工作流时有效
要将事件流式传输为服务器发送事件(SSE)或多行JSON负载,您可以向/events/{handler_id}端点发送请求,并附带您先前启动的异步工作流运行的处理程序ID。
查询参数
sse(设置为 "true" 或 "false",非必需):若为 true 则通过服务器发送事件(text/event-stream)流式传输事件,否则通过多行 JSON 负载(application/x-ndjson)流式传输。默认为 true。acquire_timeoutacquire_timeout (可转换为浮点数的字符串,非必需):获取锁以遍历事件的超时时间include_internalinclude_internal (设置为 "true" 或 "false",非必需):如果设为 true 则流式传输内部工作流事件。默认为 false。include_qualified_nameinclude_qualified_name(设置为“true”或“false”,非必需):在响应体中包含事件的限定名称。默认为true。
示例请求
curl http://localhost:80/events/someUniqueId123?sse=false&acquire_timeout=1&include_internal=false&include_qualified_name=true成功响应 (200 OK)
单一事件载荷:
{ "value": {"result": 12}, "qualified_name": "__main__.MathEvent", "type": "__main__.MathEvent", "types": ["workflows.events.Event", "__main__.MathEvent"],}重要注意事项
- 每个工作流运行只允许一个读取器流式传输事件
- 一旦事件被流式传输,它们将无法恢复(除非您在客户端实现了某些持久化逻辑)
我们正在努力改进这两个方面,因此服务器行为可能会有变化
从工作流执行中获取结果 (/results/{handler_id})
Section titled “Getting the result from a workflow execution (/results/{handler_id})”此端点仅当您之前使用
/run-nowait异步启动工作流时有效
要获取先前启动的异步工作流运行结果,您可以使用 /results/{handler_id} 端点并传入该运行的处理器ID。
示例请求
curl http://localhost:80/results/someUniqueId123Successful response (200 OK)
{ "handler_id": "someUniqueId123", "workflow_name": "math_workflow", "run_id": "uniqueRunId456", "error": null, "result": { "sum": 15, "subtraction": 9, "multiplication": 36, "division": 4, }, "status": "completed", "started_at": "2024-10-21T14:32:15.123Z", "updated_at": "2024-10-21T14:45:30.456Z", "completed_at": "2024-10-21T14:45:30.456Z"}已接受的响应 (202 ACCEPTED)
当工作流仍在运行时,状态码 202 会被返回,因此尚未产生结果。
发送事件 (POST /events/{handler_id})
Section titled “Sending an event (POST /events/{handler_id})”当工作流运行需要外部输入时(例如需要人工介入),您可以向 events/{handler_id} 端点发送 POST 请求,附带要发送的事件数据(以及可选的要发送到的工作流步骤),以提供所述的外部输入。
请求体
{ "event": {"__is_pydantic": true, "value": {"feedback": "This is great!", "approved": true}, "qualified_name": "__main__.HumanFeedbackEvent"}, "step": "process_human_feedback"}eventevent: 工作流事件的序列化表示。stepstep (可选): 要发送事件的目标步骤名称。
Successful response (200 OK)
{ "status": "sent"}取消工作流运行 (/handlers/{handler_id}/cancel)
Section titled “Canceling a workflow run (/handlers/{handler_id}/cancel)”要停止正在运行的工作流处理器,通过取消其任务,并可选地从持久化存储中移除关联的处理器,您可以使用 /handlers/{handler_id}/cancel。
查询参数
purgepurge(可设置为“true”或“false”,非必需):是否从持久化存储中移除与工作流关联的处理程序。默认为false。
示例请求
curl -X POST http://localhost:80/handlers/someUniqueId123/cancel?purge=trueSuccessful response (200 OK)
{ "status": "deleted", // or canceled if purge is false}使用 WorkflowClient 与服务器交互
Section titled “Using WorkflowClient to Interact with Servers”为了以编程方式与已部署的 WorkflowServer 进行交互,除了上面详述的原始API调用之外,我们还提供了一个 WorkflowClient 类。
WorkflowClient 提供了列出可用工作流和工作流处理器的方法,用于验证服务器健康状态、运行工作流(同步和异步方式)、流式传输事件以及发送事件。
假设您正在运行上述服务器示例,我们可以通过以下方式使用 WorkflowClient:
from workflows.client import WorkflowClient
async def main() client = WorkflowClient(base_url="http://0.0.0.0:8080") workflows = await client.list_workflows() print("===== AVAILABLE WORKFLOWS ====") print(workflows) await client.is_healthy() # will raise an exception if the server is not healthy handler = await client.run_workflow_nowait( "greet", start_event=StartEvent(name="John"), context=None, ) handler_id = handler.handler_id print("==== STARTING THE WORKFLOW ===") print(f"Workflow running with handler ID: {handler_id}") print("=== STREAMING EVENTS ===")
async for event in client.get_workflow_events(handler_id=handler_id): print("Received data:", event) result = await client.get_handler(handler_id)
print(f"Final result: {result.result} (status: {result.status})")
if __name__ == "__main__": import asyncio asyncio.run(main())您也可以使用客户端交互式运行人在回路工作流,例如:
from workflows import Workflow, stepfrom workflows.context import Contextfrom workflows.events import ( StartEvent, StopEvent, InputRequiredEvent, HumanResponseEvent,)from workflows.server import WorkflowServer
class RequestEvent(InputRequiredEvent): prompt: str
class ResponseEvent(HumanResponseEvent): response: str
class OutEvent(StopEvent): output: str
class HumanInTheLoopWorkflow(Workflow): @step async def prompt_human(self, ev: StartEvent, ctx: Context) -> RequestEvent: return RequestEvent(prompt="What is your name?")
@step async def greet_human(self, ev: ResponseEvent) -> OutEvent: return OutEvent(output=f"Hello, {ev.response}")
server = WorkflowServer()server.add_workflow("human", HumanInTheLoopWorkflow(timeout=1000))await server.serve("0.0.0.0", "8080")现在你可以运行工作流,当需要人工交互时,将人工响应发送回来:
from workflow.client import WorkflowClient
client = WorkflowClient(base_url="http://0.0.0.0:8080")handler = await client.run_workflow_nowait("human")handler_id = handler.handler_idprint(handler_id)async for event in client.get_workflow_events(handler_id=handler_id): if "RequestEvent" == event.type print( "Workflow is requiring human input:", event.value.get("prompt", ""), ) name = input("Reply here: ") sent_event = await client.send_event( handler_id=handler_id, event=ResponseEvent(response=name.capitalize().strip()), ) msg = "Event has been sent" if sent_event else "Event failed to send" print(msg)result = await client.get_handler(handler_id)print(f"Workflow complete with status: {result.status})")res = OutEvent.model_validate(result.result)print("Received final message:", res.output)