函数调用智能体的工作流程¶
本笔记本将逐步指导如何设置一个Workflow
来从头开始构建一个函数调用智能体。
函数调用智能体通过使用支持API中工具/函数的LLM(如OpenAI、Ollama、Anthropic等)来调用函数和使用工具。
我们的工作流程将具备状态记忆功能,并能够调用LLM来选择工具并处理用户输入的消息。
In [ ]:
Copied!
!pip install -U llama-index
!pip install -U llama-index
In [ ]:
Copied!
import os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
导入 os
os.environ["OPENAI_API_KEY"] = "sk-proj-..."
[可选] 使用Llamatrace设置可观测性¶
设置追踪功能以可视化工作流程中的每个步骤。
由于工作流默认采用异步优先设计,这一切在笔记本环境中都能顺畅运行。若您在自己的代码中执行,当不存在已激活的异步事件循环时,您需要使用asyncio.run()
来启动一个异步事件循环。
async def main():
<async code>
if __name__ == "__main__":
import asyncio
asyncio.run(main())
In [ ]:
Copied!
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event
class InputEvent(Event):
input: list[ChatMessage]
class StreamEvent(Event):
delta: str
class ToolCallEvent(Event):
tool_calls: list[ToolSelection]
class FunctionOutputEvent(Event):
output: ToolOutput
from llama_index.core.llms import ChatMessage
from llama_index.core.tools import ToolSelection, ToolOutput
from llama_index.core.workflow import Event
class InputEvent(Event):
input: list[ChatMessage]
class StreamEvent(Event):
delta: str
class ToolCallEvent(Event):
tool_calls: list[ToolSelection]
class FunctionOutputEvent(Event):
output: ToolOutput
In [ ]:
Copied!
from typing import Any, List
from llama_index.core.llms.function_calling import FunctionCallingLLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
class FuncationCallingAgent(Workflow):
def __init__(
self,
*args: Any,
llm: FunctionCallingLLM | None = None,
tools: List[BaseTool] | None = None,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.tools = tools or []
self.llm = llm or OpenAI()
assert self.llm.metadata.is_function_calling_model
@step
async def prepare_chat_history(
self, ctx: Context, ev: StartEvent
) -> InputEvent:
# clear sources
await ctx.set("sources", [])
# check if memory is setup
memory = await ctx.get("memory", default=None)
if not memory:
memory = ChatMemoryBuffer.from_defaults(llm=self.llm)
# get user input
user_input = ev.input
user_msg = ChatMessage(role="user", content=user_input)
memory.put(user_msg)
# get chat history
chat_history = memory.get()
# update context
await ctx.set("memory", memory)
return InputEvent(input=chat_history)
@step
async def handle_llm_input(
self, ctx: Context, ev: InputEvent
) -> ToolCallEvent | StopEvent:
chat_history = ev.input
# stream the response
response_stream = await self.llm.astream_chat_with_tools(
self.tools, chat_history=chat_history
)
async for response in response_stream:
ctx.write_event_to_stream(StreamEvent(delta=response.delta or ""))
# save the final response, which should have all content
memory = await ctx.get("memory")
memory.put(response.message)
await ctx.set("memory", memory)
# get tool calls
tool_calls = self.llm.get_tool_calls_from_response(
response, error_on_no_tool_call=False
)
if not tool_calls:
sources = await ctx.get("sources", default=[])
return StopEvent(
result={"response": response, "sources": [*sources]}
)
else:
return ToolCallEvent(tool_calls=tool_calls)
@step
async def handle_tool_calls(
self, ctx: Context, ev: ToolCallEvent
) -> InputEvent:
tool_calls = ev.tool_calls
tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}
tool_msgs = []
sources = await ctx.get("sources", default=[])
# call tools -- safely!
for tool_call in tool_calls:
tool = tools_by_name.get(tool_call.tool_name)
additional_kwargs = {
"tool_call_id": tool_call.tool_id,
"name": tool.metadata.get_name(),
}
if not tool:
tool_msgs.append(
ChatMessage(
role="tool",
content=f"Tool {tool_call.tool_name} does not exist",
additional_kwargs=additional_kwargs,
)
)
continue
try:
tool_output = tool(**tool_call.tool_kwargs)
sources.append(tool_output)
tool_msgs.append(
ChatMessage(
role="tool",
content=tool_output.content,
additional_kwargs=additional_kwargs,
)
)
except Exception as e:
tool_msgs.append(
ChatMessage(
role="tool",
content=f"Encountered error in tool call: {e}",
additional_kwargs=additional_kwargs,
)
)
# update memory
memory = await ctx.get("memory")
for msg in tool_msgs:
memory.put(msg)
await ctx.set("sources", sources)
await ctx.set("memory", memory)
chat_history = memory.get()
return InputEvent(input=chat_history)
from typing import Any, List
from llama_index.core.llms.function_calling import FunctionCallingLLM
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.core.tools.types import BaseTool
from llama_index.core.workflow import (
Context,
Workflow,
StartEvent,
StopEvent,
step,
)
from llama_index.llms.openai import OpenAI
class FuncationCallingAgent(Workflow):
def __init__(
self,
*args: Any,
llm: FunctionCallingLLM | None = None,
tools: List[BaseTool] | None = None,
**kwargs: Any,
) -> None:
super().__init__(*args, **kwargs)
self.tools = tools or []
self.llm = llm or OpenAI()
assert self.llm.metadata.is_function_calling_model
@step
async def prepare_chat_history(
self, ctx: Context, ev: StartEvent
) -> InputEvent:
# 清除来源
await ctx.set("sources", [])
# 检查是否设置了记忆
memory = await ctx.get("memory", default=None)
if not memory:
memory = ChatMemoryBuffer.from_defaults(llm=self.llm)
# 获取用户输入
user_input = ev.input
user_msg = ChatMessage(role="user", content=user_input)
memory.put(user_msg)
# 获取聊天历史
chat_history = memory.get()
# 更新上下文
await ctx.set("memory", memory)
return InputEvent(input=chat_history)
@step
async def handle_llm_input(
self, ctx: Context, ev: InputEvent
) -> ToolCallEvent | StopEvent:
chat_history = ev.input
# 流式传输响应
response_stream = await self.llm.astream_chat_with_tools(
self.tools, chat_history=chat_history
)
async for response in response_stream:
ctx.write_event_to_stream(StreamEvent(delta=response.delta or ""))
# 保存最终响应,应包含所有内容
memory = await ctx.get("memory")
memory.put(response.message)
await ctx.set("memory", memory)
# 获取工具调用
tool_calls = self.llm.get_tool_calls_from_response(
response, error_on_no_tool_call=False
)
if not tool_calls:
sources = await ctx.get("sources", default=[])
return StopEvent(
result={"response": response, "sources": [*sources]}
)
else:
return ToolCallEvent(tool_calls=tool_calls)
@step
async def handle_tool_calls(
self, ctx: Context, ev: ToolCallEvent
) -> InputEvent:
tool_calls = ev.tool_calls
tools_by_name = {tool.metadata.get_name(): tool for tool in self.tools}
tool_msgs = []
sources = await ctx.get("sources", default=[])
# 安全地调用工具
for tool_call in tool_calls:
tool = tools_by_name.get(tool_call.tool_name)
additional_kwargs = {
"tool_call_id": tool_call.tool_id,
"name": tool.metadata.get_name(),
}
if not tool:
tool_msgs.append(
ChatMessage(
role="tool",
content=f"工具 {tool_call.tool_name} 不存在",
additional_kwargs=additional_kwargs,
)
)
continue
try:
tool_output = tool(**tool_call.tool_kwargs)
sources.append(tool_output)
tool_msgs.append(
ChatMessage(
role="tool",
content=tool_output.content,
additional_kwargs=additional_kwargs,
)
)
except Exception as e:
tool_msgs.append(
ChatMessage(
role="tool",
content=f"工具调用中遇到错误: {e}",
additional_kwargs=additional_kwargs,
)
)
# 更新记忆
memory = await ctx.get("memory")
for msg in tool_msgs:
memory.put(msg)
await ctx.set("sources", sources)
await ctx.set("memory", memory)
chat_history = memory.get()
return InputEvent(input=chat_history)
就这样!让我们稍微探索一下我们编写的工作流程。
prepare_chat_history()
:
这是我们的主要入口点。它负责将用户消息添加到内存中,并使用内存获取最新的聊天记录。它返回一个InputEvent
。
handle_llm_input()
:
由InputEvent
触发,它使用聊天历史记录和工具来提示llm。如果发现工具调用,则发出ToolCallEvent
。否则,我们认为工作流已完成并发出StopEvent
handle_tool_calls()
:
由ToolCallEvent
触发,它会调用工具并处理错误,然后返回工具输出。该事件会触发一个循环,因为它会发出InputEvent
,这将使我们回到handle_llm_input()
运行工作流程!¶
注意: 使用循环时,我们需要留意运行时间。这里,我们设置了120秒的超时限制。
In [ ]:
Copied!
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
def add(x: int, y: int) -> int:
"""Useful function to add two numbers."""
return x + y
def multiply(x: int, y: int) -> int:
"""Useful function to multiply two numbers."""
return x * y
tools = [
FunctionTool.from_defaults(add),
FunctionTool.from_defaults(multiply),
]
agent = FuncationCallingAgent(
llm=OpenAI(model="gpt-4o-mini"), tools=tools, timeout=120, verbose=True
)
ret = await agent.run(input="Hello!")
from llama_index.core.tools import FunctionTool
from llama_index.llms.openai import OpenAI
def add(x: int, y: int) -> int:
"""用于将两个数字相加的实用函数。"""
return x + y
def multiply(x: int, y: int) -> int:
"""用于将两个数字相乘的实用函数。"""
return x * y
tools = [
FunctionTool.from_defaults(add),
FunctionTool.from_defaults(multiply),
]
agent = FuncationCallingAgent(
llm=OpenAI(model="gpt-4o-mini"), tools=tools, timeout=120, verbose=True
)
ret = await agent.run(input="Hello!")
Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent
In [ ]:
Copied!
print(ret["response"])
print(ret["response"])
assistant: Hello! How can I assist you today?
In [ ]:
Copied!
ret = await agent.run(input="What is (2123 + 2321) * 312?")
ret = await agent.run(input="(2123 + 2321) * 312等于多少?")
Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event ToolCallEvent Running step handle_tool_calls Step handle_tool_calls produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event ToolCallEvent Running step handle_tool_calls Step handle_tool_calls produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent
聊天历史记录¶
默认情况下,工作流会为每次运行创建一个新的Context
。这意味着聊天历史记录不会在运行之间保留。不过,我们可以传递自己的Context
给工作流来保存聊天历史。
In [ ]:
Copied!
from llama_index.core.workflow import Context
ctx = Context(agent)
ret = await agent.run(input="Hello! My name is Logan.", ctx=ctx)
print(ret["response"])
ret = await agent.run(input="What is my name?", ctx=ctx)
print(ret["response"])
from llama_index.core.workflow import Context
ctx = Context(agent)
ret = await agent.run(input="你好!我叫Logan。", ctx=ctx)
print(ret["response"])
ret = await agent.run(input="我的名字是什么?", ctx=ctx)
print(ret["response"])
Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent assistant: Hello, Logan! How can I assist you today? Running step prepare_chat_history Step prepare_chat_history produced event InputEvent Running step handle_llm_input Step handle_llm_input produced event StopEvent assistant: Your name is Logan.
流式传输¶
使用从.run()
方法返回的handler
,我们还可以访问流式事件。
In [ ]:
Copied!
agent = FuncationCallingAgent(
llm=OpenAI(model="gpt-4o-mini"), tools=tools, timeout=120, verbose=False
)
handler = agent.run(input="Hello! Write me a short story about a cat.")
async for event in handler.stream_events():
if isinstance(event, StreamEvent):
print(event.delta, end="", flush=True)
response = await handler
# print(ret["response"])
agent = FuncationCallingAgent(
llm=OpenAI(model="gpt-4o-mini"), tools=tools, timeout=120, verbose=False
)
handler = agent.run(input="你好!请给我写一个关于猫咪的短篇故事。")
async for event in handler.stream_events():
if isinstance(event, StreamEvent):
print(event.delta, end="", flush=True)
response = await handler
# print(ret["response"])
Once upon a time in a quaint little village, there lived a curious cat named Whiskers. Whiskers was no ordinary cat; he had a beautiful coat of orange and white fur that shimmered in the sunlight, and his emerald green eyes sparkled with mischief. Every day, Whiskers would explore the village, visiting the bakery for a whiff of freshly baked bread and the flower shop to sniff the colorful blooms. The villagers adored him, often leaving out little treats for their favorite feline. One sunny afternoon, while wandering near the edge of the village, Whiskers stumbled upon a hidden path that led into the woods. His curiosity piqued, he decided to follow the path, which was lined with tall trees and vibrant wildflowers. As he ventured deeper, he heard a soft, melodic sound that seemed to beckon him. Following the enchanting music, Whiskers soon found himself in a clearing where a group of woodland creatures had gathered. They were having a grand celebration, complete with dancing, singing, and a feast of berries and nuts. The animals welcomed Whiskers with open paws, inviting him to join their festivities. Whiskers, delighted by the warmth and joy of his new friends, danced and played until the sun began to set. As the sky turned shades of pink and orange, he realized it was time to return home. The woodland creatures gifted him a small, sparkling acorn as a token of their friendship. From that day on, Whiskers would often visit the clearing, sharing stories of the village and enjoying the company of his woodland friends. He learned that adventure and friendship could be found in the most unexpected places, and he cherished every moment spent in the magical woods. And so, Whiskers continued to live his life filled with curiosity, laughter, and the warmth of friendship, reminding everyone that sometimes, the best adventures are just a whisker away.