LlamaIndex中的多智能体模式
当需要多个专家共同解决任务时,在LlamaIndex中有几种可选方案,每种方案都在便利性和灵活性之间进行权衡。本页将介绍三种最常见的模式,说明何时选择每种模式,并为每种方法提供最简代码示例。
- AgentWorkflow(内置) – 声明一组智能体并让
AgentWorkflow管理交接流程。章节 完整笔记本 - 编排器模式(内置) – 一个“编排器”智能体选择接下来调用哪个子智能体;这些子智能体作为工具向其开放。章节 完整笔记本
- 自定义规划器(DIY) – 您自行编写LLM提示(通常为XML/JSON格式)来规划执行序列,并通过代码强制调用智能体。章节 完整笔记本
## Pattern 1 – AgentWorkflow (i.e. linear "swarm" pattern)
使用时机 – 您希望无需额外代码即可获得开箱即用的多智能体行为,并且对 AgentWorkflow 自带的默认交接启发式规则感到满意。
AgentWorkflow 本身是一个预配置用于理解智能体、状态和工具调用的工作流。您提供一个或多个智能体的数组,指定起始智能体,它将:
- 将用户消息提供给根智能体。
- 执行智能体选择的任何工具。
- 允许智能体在决定时“移交”控制权给另一个智能体。
- 重复直到智能体返回最终答案。
注意:在任何时刻,当前活跃的智能体都可以选择将控制权交还给用户。
以下是多智能体报告生成示例的浓缩版本。三个智能体协作完成调研、撰写和审阅报告的工作。(…表示为简洁起见省略的代码。)
from llama_index.core.agent.workflow import AgentWorkflow, FunctionAgent
# --- create our specialist agents ------------------------------------------------research_agent = FunctionAgent( name="ResearchAgent", description="Search the web and record notes.", system_prompt="You are a researcher… hand off to WriteAgent when ready.", llm=llm, tools=[search_web, record_notes], can_handoff_to=["WriteAgent"],)
write_agent = FunctionAgent( name="WriteAgent", description="Writes a markdown report from the notes.", system_prompt="You are a writer… ask ReviewAgent for feedback when done.", llm=llm, tools=[write_report], can_handoff_to=["ReviewAgent", "ResearchAgent"],)
review_agent = FunctionAgent( name="ReviewAgent", description="Reviews a report and gives feedback.", system_prompt="You are a reviewer…", # etc. llm=llm, tools=[review_report], can_handoff_to=["WriteAgent"],)
# --- wire them together ----------------------------------------------------------agent_workflow = AgentWorkflow( agents=[research_agent, write_agent, review_agent], root_agent=research_agent.name, initial_state={ "research_notes": {}, "report_content": "Not written yet.", "review": "Review required.", },)
resp = await agent_workflow.run( user_msg="Write me a report on the history of the web …")print(resp)AgentWorkflow 负责所有编排工作,在执行过程中持续推送事件流,让您能够随时向用户通报进度。
## Pattern 2 – Orchestrator agent (sub-agents as tools)
使用时机 – 当你需要一个统一决策所有步骤的中央节点来注入自定义逻辑,但仍希望保持声明式智能体即工具的体验,而非自行编写规划器。
在这种模式中,您仍然构建专业智能体(ResearchAgent、WriteAgent、ReviewAgent),但您不要求它们相互交接任务。相反,您将每个智能体的run方法作为工具公开,并将这些工具提供给一个新的顶层智能体——协调器。
你可以在agents_as_tools 笔记本中查看完整示例。
import refrom llama_index.core.agent.workflow import FunctionAgentfrom llama_index.core.workflow import Context
# assume research_agent / write_agent / review_agent defined as before# except we really only need the `search_web` tool at a minimum
async def call_research_agent(ctx: Context, prompt: str) -> str: """Useful for recording research notes based on a specific prompt.""" result = await research_agent.run( user_msg=f"Write some notes about the following: {prompt}" )
async with ctx.store.edit_state() as ctx_state: ctx_state["state"]["research_notes"].append(str(result))
return str(result)
async def call_write_agent(ctx: Context) -> str: """Useful for writing a report based on the research notes or revising the report based on feedback.""" async with ctx.store.edit_state() as ctx_state: notes = ctx_state["state"].get("research_notes", None) if not notes: return "No research notes to write from."
user_msg = f"Write a markdown report from the following notes. Be sure to output the report in the following format: <report>...</report>:\n\n"
# Add the feedback to the user message if it exists feedback = ctx_state["state"].get("review", None) if feedback: user_msg += f"<feedback>{feedback}</feedback>\n\n"
# Add the research notes to the user message notes = "\n\n".join(notes) user_msg += f"<research_notes>{notes}</research_notes>\n\n"
# Run the write agent result = await write_agent.run(user_msg=user_msg) report = re.search( r"<report>(.*)</report>", str(result), re.DOTALL ).group(1) ctx_state["state"]["report_content"] = str(report)
return str(report)
async def call_review_agent(ctx: Context) -> str: """Useful for reviewing the report and providing feedback.""" async with ctx.store.edit_state() as ctx_state: report = ctx_state["state"].get("report_content", None) if not report: return "No report content to review."
result = await review_agent.run( user_msg=f"Review the following report: {report}" ) ctx_state["state"]["review"] = result
return result
orchestrator = FunctionAgent( system_prompt=( "You are an expert in the field of report writing. " "You are given a user request and a list of tools that can help with the request. " "You are to orchestrate the tools to research, write, and review a report on the given topic. " "Once the review is positive, you should notify the user that the report is ready to be accessed." ), llm=orchestrator_llm, tools=[ call_research_agent, call_write_agent, call_review_agent, ], initial_state={ "research_notes": [], "report_content": None, "review": None, },)
response = await orchestrator.run( user_msg="Write me a report on the history of the web …")print(response)因为编排器只是另一个 FunctionAgent,你可以免费获得流式处理、工具调用和状态管理功能——但你仍然完全控制智能体的调用方式和整体控制流程(工具总是会返回到编排器)。
## Pattern 3 – Custom planner (DIY prompting + parsing)
何时使用 – 终极灵活性。您需要强制实施特定计划格式、与外部调度器集成,或收集先前模式无法开箱即用提供的额外元数据。
这里的思路是,您编写一个提示词,指示大语言模型输出结构化计划(XML / JSON / YAML)。您自己的Python代码解析该计划并命令式执行它。从属智能体可以是任何东西——FunctionAgent、RAG流水线或其他服务。
以下是一个工作流程的最小化草图,能够规划、执行计划并检查是否需要进一步步骤。您可以在custom_multi_agent notebook中查看完整示例。
import reimport xml.etree.ElementTree as ETfrom pydantic import BaseModel, Fieldfrom typing import Any, Optional
from llama_index.core.llms import ChatMessagefrom llama_index.core.workflow import ( Context, Event, StartEvent, StopEvent, Workflow, step,)
# Assume we created helper functions to call the agents
PLANNER_PROMPT = """You are a planner chatbot.
Given a user request and the current state, break the solution into ordered <step> blocks. Each step must specify the agent to call and the message to send, e.g.<plan> <step agent=\"ResearchAgent\">search for …</step> <step agent=\"WriteAgent\">draft a report …</step> ...</plan>
<state>{state}</state>
<available_agents>{available_agents}</available_agents>
The general flow should be:- Record research notes- Write a report- Review the report- Write the report again if the review is not positive enough
If the user request does not require any steps, you can skip the <plan> block and respond directly."""
class InputEvent(StartEvent): user_msg: Optional[str] = Field(default=None) chat_history: list[ChatMessage] state: Optional[dict[str, Any]] = Field(default=None)
class OutputEvent(StopEvent): response: str chat_history: list[ChatMessage] state: dict[str, Any]
class StreamEvent(Event): delta: str
class PlanEvent(Event): step_info: str
# Modelling the planclass PlanStep(BaseModel): agent_name: str agent_input: str
class Plan(BaseModel): steps: list[PlanStep]
class ExecuteEvent(Event): plan: Plan chat_history: list[ChatMessage]
class PlannerWorkflow(Workflow): llm: OpenAI = OpenAI( model="o3-mini", api_key="sk-proj-...", ) agents: dict[str, FunctionAgent] = { "ResearchAgent": research_agent, "WriteAgent": write_agent, "ReviewAgent": review_agent, }
@step async def plan( self, ctx: Context, ev: InputEvent ) -> ExecuteEvent | OutputEvent: # Set initial state if it exists if ev.state: await ctx.store.set("state", ev.state)
chat_history = ev.chat_history
if ev.user_msg: user_msg = ChatMessage( role="user", content=ev.user_msg, ) chat_history.append(user_msg)
# Inject the system prompt with state and available agents state = await ctx.store.get("state") available_agents_str = "\n".join( [ f'<agent name="{agent.name}">{agent.description}</agent>' for agent in self.agents.values() ] ) system_prompt = ChatMessage( role="system", content=PLANNER_PROMPT.format( state=str(state), available_agents=available_agents_str, ), )
# Stream the response from the llm response = await self.llm.astream_chat( messages=[system_prompt] + chat_history, ) full_response = "" async for chunk in response: full_response += chunk.delta or "" if chunk.delta: ctx.write_event_to_stream( StreamEvent(delta=chunk.delta), )
# Parse the response into a plan and decide whether to execute or output xml_match = re.search(r"(<plan>.*</plan>)", full_response, re.DOTALL)
if not xml_match: chat_history.append( ChatMessage( role="assistant", content=full_response, ) ) return OutputEvent( response=full_response, chat_history=chat_history, state=state, ) else: xml_str = xml_match.group(1) root = ET.fromstring(xml_str) plan = Plan(steps=[]) for step in root.findall("step"): plan.steps.append( PlanStep( agent_name=step.attrib["agent"], agent_input=step.text.strip() if step.text else "", ) )
return ExecuteEvent(plan=plan, chat_history=chat_history)
@step async def execute(self, ctx: Context, ev: ExecuteEvent) -> InputEvent: chat_history = ev.chat_history plan = ev.plan
for step in plan.steps: agent = self.agents[step.agent_name] agent_input = step.agent_input ctx.write_event_to_stream( PlanEvent( step_info=f'<step agent="{step.agent_name}">{step.agent_input}</step>' ), )
if step.agent_name == "ResearchAgent": await call_research_agent(ctx, agent_input) elif step.agent_name == "WriteAgent": # Note: we aren't passing the input from the plan since # we're using the state to drive the write agent await call_write_agent(ctx) elif step.agent_name == "ReviewAgent": await call_review_agent(ctx)
state = await ctx.store.get("state") chat_history.append( ChatMessage( role="user", content=f"I've completed the previous steps, here's the updated state:\n\n<state>\n{state}\n</state>\n\nDo you need to continue and plan more steps?, If not, write a final response.", ) )
return InputEvent( chat_history=chat_history, )这种方法意味着您拥有编排循环的控制权,因此可以插入任何所需的定制逻辑、缓存或人工介入检查。
| 模式 | 代码行数 | 灵活性 | 内置流式传输 / 事件 |
|---|---|---|---|
| AgentWorkflow | ⭐ – 最少 | ★★ | 是 |
| 编排器智能体 | ⭐⭐ | ★★★ | 是(通过编排器) |
| 自定义规划器 | ⭐⭐⭐ | ★★★★★ | 是(通过子智能体)。顶层由您决定。 |
如果您正在快速构建原型,请从 AgentWorkflow 开始。当您需要对执行序列进行更多控制时,可转向编排智能体。仅当前两种模式无法满足您的流程需求时,才考虑使用自定义规划器。
接下来你将学习如何在单智能体和多智能体工作流中使用结构化输出