人在循环中#

在上一节团队中,我们已经了解了如何创建、观察和控制一个代理团队。本节将重点介绍如何从你的应用程序中与团队进行交互,并为其提供人的反馈。

有两种主要方式可以从您的应用程序与团队进行交互:

  1. 在团队运行期间——执行run()run_stream()时,通过UserProxyAgent提供反馈。

  2. 一旦运行终止,通过输入为下一次调用run()run_stream()提供反馈。

我们将在本节中介绍这两种方法。

要直接跳转到与Web和UI框架集成的代码示例,请查看以下链接:

在运行过程中提供反馈#

UserProxyAgent 是一个特殊的内置代理,它充当用户的代理,为团队提供反馈。

要使用UserProxyAgent,您可以创建一个实例 并在运行团队之前将其包含在团队中。 团队将决定何时调用UserProxyAgent 以向用户请求反馈。

例如在RoundRobinGroupChat团队中, UserProxyAgent按照它被传递给团队的顺序调用, 而在SelectorGroupChat团队中,选择器提示或选择器函数决定 UserProxyAgent何时被调用。

下图说明了如何在团队运行期间使用 UserProxyAgent 从用户那里获取反馈:

human-in-the-loop-user-proxy

粗箭头表示团队运行期间的控制流程: 当团队调用UserProxyAgent时, 控制权会转移到应用程序/用户,并等待反馈; 一旦反馈提供,控制权会转移回团队, 团队继续执行。

注意

当在运行期间调用UserProxyAgent时, 它会阻止团队的执行,直到用户提供反馈或出现错误。 这将阻碍团队的进展,并使团队处于一个无法保存或恢复的不稳定状态。

由于这种方法的阻塞性质,建议仅将其用于需要用户立即反馈的短交互,例如通过按钮点击请求批准或不批准,或者需要立即注意的警报,否则任务将失败。

这里是一个如何在诗歌生成任务中使用UserProxyAgentRoundRobinGroupChat的示例:

from autogen_agentchat.agents import AssistantAgent, UserProxyAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create the agents.
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
assistant = AssistantAgent("assistant", model_client=model_client)
user_proxy = UserProxyAgent("user_proxy", input_func=input)  # Use input() to get user input from console.

# Create the termination condition which will end the conversation when the user says "APPROVE".
termination = TextMentionTermination("APPROVE")

# Create the team.
team = RoundRobinGroupChat([assistant, user_proxy], termination_condition=termination)

# Run the conversation and stream to the console.
stream = team.run_stream(task="Write a 4-line poem about the ocean.")
# Use asyncio.run(...) when running in a script.
await Console(stream)
---------- user ----------
Write a 4-line poem about the ocean.
---------- assistant ----------
In endless blue where whispers play,  
The ocean's waves dance night and day.  
A world of depths, both calm and wild,  
Nature's heart, forever beguiled.  
TERMINATE
---------- user_proxy ----------
APPROVE
TaskResult(messages=[TextMessage(source='user', models_usage=None, metadata={}, content='Write a 4-line poem about the ocean.', type='TextMessage'), TextMessage(source='assistant', models_usage=RequestUsage(prompt_tokens=46, completion_tokens=43), metadata={}, content="In endless blue where whispers play,  \nThe ocean's waves dance night and day.  \nA world of depths, both calm and wild,  \nNature's heart, forever beguiled.  \nTERMINATE", type='TextMessage'), UserInputRequestedEvent(source='user_proxy', models_usage=None, metadata={}, request_id='2622a0aa-b776-4e54-9e8f-4ecbdf14b78d', content='', type='UserInputRequestedEvent'), TextMessage(source='user_proxy', models_usage=None, metadata={}, content='APPROVE', type='TextMessage')], stop_reason="Text 'APPROVE' mentioned")

从控制台输出中,你可以看到团队通过user_proxy向用户征求反馈,以批准生成的诗。

你可以提供你自己的输入函数给UserProxyAgent,以自定义反馈过程。例如,当团队作为web服务运行时,你可以使用自定义的输入函数来等待来自web socket连接的消息。以下代码片段展示了在使用FastAPI web框架时自定义输入函数的示例:

@app.websocket("/ws/chat")
async def chat(websocket: WebSocket):
    await websocket.accept()

    async def _user_input(prompt: str, cancellation_token: CancellationToken | None) -> str:
        data = await websocket.receive_json() # Wait for user message from websocket.
        message = TextMessage.model_validate(data) # Assume user message is a TextMessage.
        return message.content
    
    # Create user proxy with custom input function
    # Run the team with the user proxy
    # ...

查看AgentChat FastAPI示例以获取完整示例。

关于ChainLitUserProxyAgent的集成,请参阅AgentChat ChainLit示例

为下一次运行提供反馈#

很多时候,应用程序或用户与代理团队在交互循环中互动:团队运行直到终止,应用程序或用户提供反馈,团队再次根据反馈运行。

这种方法在持久化会话中非常有用,适用于团队与应用程序/用户之间的异步通信:一旦团队完成一次运行,应用程序会保存团队的状态,将其放入持久化存储中,并在反馈到达时恢复团队。

注意

关于如何保存和加载团队的状态,请参考管理状态。 本节将重点讨论反馈机制。

下图展示了该方法中的控制流程:

human-in-the-loop-termination

有两种方法来实现这种方法:

  • 设置最大回合数,以便团队在指定的回合数后总是停止。

  • 使用终止条件,例如TextMentionTerminationHandoffTermination,允许团队根据其内部状态决定何时停止并将控制权返回。

您可以同时使用这两种方法来实现您期望的行为。

使用最大轮次#

此方法允许您通过设置最大轮次来暂停团队以等待用户输入。例如,您可以通过将max_turns设置为1来配置团队在第一个代理响应后停止。这在需要持续用户参与的场景中特别有用,例如在聊天机器人中。

要实现这一点,请在RoundRobinGroupChat()构造函数中设置max_turns参数。

team = RoundRobinGroupChat([...], max_turns=1)

一旦团队停止,回合数将被重置。当你恢复团队时,它将再次从0开始。然而,团队的内部状态将被保留,例如,RoundRobinGroupChat将从列表中的下一个代理继续,并保持相同的对话历史。

注意

max_turn 是特定于团队类的参数,目前仅支持 RoundRobinGroupChat, SelectorGroupChat, 和 Swarm。 当与终止条件一起使用时,团队将在任一条件满足时停止。

以下是如何在RoundRobinGroupChat中使用max_turns进行最多1轮诗歌生成任务的示例:

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create the agents.
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
assistant = AssistantAgent("assistant", model_client=model_client)

# Create the team setting a maximum number of turns to 1.
team = RoundRobinGroupChat([assistant], max_turns=1)

task = "Write a 4-line poem about the ocean."
while True:
    # Run the conversation and stream to the console.
    stream = team.run_stream(task=task)
    # Use asyncio.run(...) when running in a script.
    await Console(stream)
    # Get the user response.
    task = input("Enter your feedback (type 'exit' to leave): ")
    if task.lower().strip() == "exit":
        break
---------- user ----------
Write a 4-line poem about the ocean.
---------- assistant ----------
Endless waves in a dance with the shore,  
Whispers of secrets in tales from the roar,  
Beneath the vast sky, where horizons blend,  
The ocean’s embrace is a timeless friend.  
TERMINATE
[Prompt tokens: 46, Completion tokens: 48]
---------- Summary ----------
Number of messages: 2
Finish reason: Maximum number of turns 1 reached.
Total prompt tokens: 46
Total completion tokens: 48
Duration: 1.63 seconds
---------- user ----------
Can you make it about a person and its relationship with the ocean
---------- assistant ----------
She walks along the tide, where dreams intertwine,  
With every crashing wave, her heart feels aligned,  
In the ocean's embrace, her worries dissolve,  
A symphony of solace, where her spirit evolves.  
TERMINATE
[Prompt tokens: 117, Completion tokens: 49]
---------- Summary ----------
Number of messages: 2
Finish reason: Maximum number of turns 1 reached.
Total prompt tokens: 117
Total completion tokens: 49
Duration: 1.21 seconds

你可以看到团队在一个代理响应后立即停止了。

使用终止条件#

在之前的章节中,我们已经看到了几个终止条件的例子。 在本节中,我们重点介绍HandoffTermination, 当一个代理发送HandoffMessage消息时,它就会停止团队。

让我们创建一个包含单个AssistantAgent代理的团队,并设置一个移交选项,然后运行该团队,处理一个需要用户额外输入的任务,因为该代理没有继续处理任务的相关工具。

注意

AssistantAgent一起使用的模型必须支持工具调用,以使用交接功能。

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import Handoff
from autogen_agentchat.conditions import HandoffTermination, TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create an OpenAI model client.
model_client = OpenAIChatCompletionClient(
    model="gpt-4o",
    # api_key="sk-...", # Optional if you have an OPENAI_API_KEY env variable set.
)

# Create a lazy assistant agent that always hands off to the user.
lazy_agent = AssistantAgent(
    "lazy_assistant",
    model_client=model_client,
    handoffs=[Handoff(target="user", message="Transfer to user.")],
    system_message="If you cannot complete the task, transfer to user. Otherwise, when finished, respond with 'TERMINATE'.",
)

# Define a termination condition that checks for handoff messages.
handoff_termination = HandoffTermination(target="user")
# Define a termination condition that checks for a specific text mention.
text_termination = TextMentionTermination("TERMINATE")

# Create a single-agent team with the lazy assistant and both termination conditions.
lazy_agent_team = RoundRobinGroupChat([lazy_agent], termination_condition=handoff_termination | text_termination)

# Run the team and stream to the console.
task = "What is the weather in New York?"
await Console(lazy_agent_team.run_stream(task=task), output_stats=True)
---------- user ----------
What is the weather in New York?
---------- lazy_assistant ----------
[FunctionCall(id='call_EAcMgrLGHdLw0e7iJGoMgxuu', arguments='{}', name='transfer_to_user')]
[Prompt tokens: 69, Completion tokens: 12]
---------- lazy_assistant ----------
[FunctionExecutionResult(content='Transfer to user.', call_id='call_EAcMgrLGHdLw0e7iJGoMgxuu')]
---------- lazy_assistant ----------
Transfer to user.
---------- Summary ----------
Number of messages: 4
Finish reason: Handoff to user from lazy_assistant detected.
Total prompt tokens: 69
Total completion tokens: 12
Duration: 0.69 seconds
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='What is the weather in New York?', type='TextMessage'), ToolCallRequestEvent(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=69, completion_tokens=12), content=[FunctionCall(id='call_EAcMgrLGHdLw0e7iJGoMgxuu', arguments='{}', name='transfer_to_user')], type='ToolCallRequestEvent'), ToolCallExecutionEvent(source='lazy_assistant', models_usage=None, content=[FunctionExecutionResult(content='Transfer to user.', call_id='call_EAcMgrLGHdLw0e7iJGoMgxuu')], type='ToolCallExecutionEvent'), HandoffMessage(source='lazy_assistant', models_usage=None, target='user', content='Transfer to user.', context=[], type='HandoffMessage')], stop_reason='Handoff to user from lazy_assistant detected.')

你可以看到团队因检测到交接信息而停止。让我们通过提供代理所需的信息来继续团队的工作。

await Console(lazy_agent_team.run_stream(task="The weather in New York is sunny."))
---------- user ----------
The weather in New York is sunny.
---------- lazy_assistant ----------
Great! Enjoy the sunny weather in New York! Is there anything else you'd like to know?
---------- lazy_assistant ----------
TERMINATE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='The weather in New York is sunny.', type='TextMessage'), TextMessage(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=110, completion_tokens=21), content="Great! Enjoy the sunny weather in New York! Is there anything else you'd like to know?", type='TextMessage'), TextMessage(source='lazy_assistant', models_usage=RequestUsage(prompt_tokens=137, completion_tokens=5), content='TERMINATE', type='TextMessage')], stop_reason="Text 'TERMINATE' mentioned")

你可以看到团队在用户提供信息后继续进行了操作。

注意

如果您正在使用Swarm团队与HandoffTermination目标用户,要恢复团队,您需要将task设置为HandoffMessage,并将target设置为您想要运行的下一个代理。有关更多详细信息,请参见Swarm