自定义代理#
您可能有一些代理,其行为不符合预设。在这种情况下,您可以构建自定义代理。
AgentChat中的所有代理都继承自BaseChatAgent
类,并实现以下抽象方法和属性:
on_messages()
: 定义代理在响应消息时的行为的抽象方法。当代理被要求提供响应时,该方法在run()
中被调用。它返回一个Response
对象。on_reset()
: 重置代理到其初始状态的抽象方法。当代理被要求重置自身时,会调用此方法。produced_message_types
: 列出代理在其响应中可以生成的ChatMessage
消息类型。
可选地,你可以实现on_messages_stream()
方法,以便在代理生成消息时进行流式传输。如果未实现此方法,代理将使用on_messages_stream()
的默认实现,该实现调用on_messages()
方法并生成响应中的所有消息。
CountDownAgent#
在这个例子中,我们创建了一个简单的代理,它从一个给定的数字倒数到零,并生成一条带有当前计数的消息流。
from typing import AsyncGenerator, List, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage, TextMessage
from autogen_core import CancellationToken
class CountDownAgent(BaseChatAgent):
def __init__(self, name: str, count: int = 3):
super().__init__(name, "A simple agent that counts down.")
self._count = count
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
# Calls the on_messages_stream.
response: Response | None = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
response = message
assert response is not None
return response
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
inner_messages: List[AgentEvent | ChatMessage] = []
for i in range(self._count, 0, -1):
msg = TextMessage(content=f"{i}...", source=self.name)
inner_messages.append(msg)
yield msg
# The response is returned at the end of the stream.
# It contains the final message and all the inner messages.
yield Response(chat_message=TextMessage(content="Done!", source=self.name), inner_messages=inner_messages)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
async def run_countdown_agent() -> None:
# Create a countdown agent.
countdown_agent = CountDownAgent("countdown")
# Run the agent with a given task and stream the response.
async for message in countdown_agent.on_messages_stream([], CancellationToken()):
if isinstance(message, Response):
print(message.chat_message.content)
else:
print(message.content)
# Use asyncio.run(run_countdown_agent()) when running in a script.
await run_countdown_agent()
3...
2...
1...
Done!
算术代理#
在这个例子中,我们创建了一个代理类,它可以对给定的整数执行简单的算术运算。然后,我们将在SelectorGroupChat
中使用这个代理类的不同实例,通过应用一系列的算术运算来将给定的整数转换为另一个整数。
ArithmeticAgent
类接受一个 operator_func
,它接受一个整数并返回一个整数,在应用算术运算后。
在其 on_messages
方法中,它将 operator_func
应用于输入消息中的整数,并返回带有结果的响应。
from typing import Callable, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.messages import ChatMessage
from autogen_agentchat.teams import SelectorGroupChat
from autogen_agentchat.ui import Console
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
class ArithmeticAgent(BaseChatAgent):
def __init__(self, name: str, description: str, operator_func: Callable[[int], int]) -> None:
super().__init__(name, description=description)
self._operator_func = operator_func
self._message_history: List[ChatMessage] = []
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
# Update the message history.
# NOTE: it is possible the messages is an empty list, which means the agent was selected previously.
self._message_history.extend(messages)
# Parse the number in the last message.
assert isinstance(self._message_history[-1], TextMessage)
number = int(self._message_history[-1].content)
# Apply the operator function to the number.
result = self._operator_func(number)
# Create a new message with the result.
response_message = TextMessage(content=str(result), source=self.name)
# Update the message history.
self._message_history.append(response_message)
# Return the response.
return Response(chat_message=response_message)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
pass
注意
on_messages
方法可能在消息列表为空时被调用,这意味着之前已经调用了代理,现在再次调用它,而没有来自调用者的任何新消息。因此,重要的是保留代理接收到的先前消息的历史记录,并使用该历史记录来生成响应。
现在我们可以创建一个带有5个ArithmeticAgent
实例的SelectorGroupChat
:
一个将输入整数加1的函数
一个从输入整数中减去1的,
一个将输入整数乘以2的函数,
一个将输入整数除以2并向下舍入到最接近的整数的函数,和
一个返回输入整数不变的内容。
然后我们使用这些代理创建一个SelectorGroupChat
,并设置相应的选择器设置:
允许连续选择同一个代理以支持重复操作,并且
自定义选择器提示,以根据特定任务定制模型的响应。
async def run_number_agents() -> None:
# Create agents for number operations.
add_agent = ArithmeticAgent("add_agent", "Adds 1 to the number.", lambda x: x + 1)
multiply_agent = ArithmeticAgent("multiply_agent", "Multiplies the number by 2.", lambda x: x * 2)
subtract_agent = ArithmeticAgent("subtract_agent", "Subtracts 1 from the number.", lambda x: x - 1)
divide_agent = ArithmeticAgent("divide_agent", "Divides the number by 2 and rounds down.", lambda x: x // 2)
identity_agent = ArithmeticAgent("identity_agent", "Returns the number as is.", lambda x: x)
# The termination condition is to stop after 10 messages.
termination_condition = MaxMessageTermination(10)
# Create a selector group chat.
selector_group_chat = SelectorGroupChat(
[add_agent, multiply_agent, subtract_agent, divide_agent, identity_agent],
model_client=OpenAIChatCompletionClient(model="gpt-4o"),
termination_condition=termination_condition,
allow_repeated_speaker=True, # Allow the same agent to speak multiple times, necessary for this task.
selector_prompt=(
"Available roles:\n{roles}\nTheir job descriptions:\n{participants}\n"
"Current conversation history:\n{history}\n"
"Please select the most appropriate role for the next message, and only return the role name."
),
)
# Run the selector group chat with a given task and stream the response.
task: List[ChatMessage] = [
TextMessage(content="Apply the operations to turn the given number into 25.", source="user"),
TextMessage(content="10", source="user"),
]
stream = selector_group_chat.run_stream(task=task)
await Console(stream)
# Use asyncio.run(run_number_agents()) when running in a script.
await run_number_agents()
---------- user ----------
Apply the operations to turn the given number into 25.
---------- user ----------
10
---------- multiply_agent ----------
20
---------- add_agent ----------
21
---------- multiply_agent ----------
42
---------- divide_agent ----------
21
---------- add_agent ----------
22
---------- add_agent ----------
23
---------- add_agent ----------
24
---------- add_agent ----------
25
---------- Summary ----------
Number of messages: 10
Finish reason: Maximum number of messages 10 reached, current message count: 10
Total prompt tokens: 0
Total completion tokens: 0
Duration: 2.40 seconds
从输出中,我们可以看到,代理已成功通过选择适当的代理按顺序应用算术操作,将输入的整数从10转换为25。
在自定义代理中使用自定义模型客户端#
AgentChat中的AssistantAgent
预设的一个关键特性是它接受一个model_client
参数,并可以使用它来响应消息。然而,在某些情况下,您可能希望您的代理使用当前不支持的定制模型客户端(请参阅支持的模型客户端)或定制模型行为。
你可以通过实现自定义模型客户端的自定义代理来完成此操作。
在下面的示例中,我们将逐步介绍一个自定义代理的示例,该代理直接使用Google Gemini SDK来响应消息。
注意:你需要安装Google Gemini SDK才能运行这个示例。你可以使用以下命令进行安装:
pip install google-genai
# !pip install google-genai
import os
from typing import AsyncGenerator, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage
from autogen_core import CancellationToken
from autogen_core.model_context import UnboundedChatCompletionContext
from autogen_core.models import AssistantMessage, RequestUsage, UserMessage
from google import genai
from google.genai import types
class GeminiAssistantAgent(BaseChatAgent):
def __init__(
self,
name: str,
description: str = "An agent that provides assistance with ability to use tools.",
model: str = "gemini-1.5-flash-002",
api_key: str = os.environ["GEMINI_API_KEY"],
system_message: str
| None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
):
super().__init__(name=name, description=description)
self._model_context = UnboundedChatCompletionContext()
self._model_client = genai.Client(api_key=api_key)
self._system_message = system_message
self._model = model
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
final_response = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
final_response = message
if final_response is None:
raise AssertionError("The stream should have returned the final result.")
return final_response
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
# Add messages to the model context
for msg in messages:
await self._model_context.add_message(UserMessage(content=msg.content, source=msg.source))
# Get conversation history
history = [
(msg.source if hasattr(msg, "source") else "system")
+ ": "
+ (msg.content if isinstance(msg.content, str) else "")
+ "\n"
for msg in await self._model_context.get_messages()
]
# Generate response using Gemini
response = self._model_client.models.generate_content(
model=self._model,
contents=f"History: {history}\nGiven the history, please provide a response",
config=types.GenerateContentConfig(
system_instruction=self._system_message,
temperature=0.3,
),
)
# Create usage metadata
usage = RequestUsage(
prompt_tokens=response.usage_metadata.prompt_token_count,
completion_tokens=response.usage_metadata.candidates_token_count,
)
# Add response to model context
await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))
# Yield the final response
yield Response(
chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
inner_messages=[],
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Reset the assistant by clearing the model context."""
await self._model_context.clear()
gemini_assistant = GeminiAssistantAgent("gemini_assistant")
await Console(gemini_assistant.run_stream(task="What is the capital of New York?"))
---------- user ----------
What is the capital of New York?
---------- gemini_assistant ----------
Albany
TERMINATE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='What is the capital of New York?', type='TextMessage'), TextMessage(source='gemini_assistant', models_usage=RequestUsage(prompt_tokens=46, completion_tokens=5), content='Albany\nTERMINATE\n', type='TextMessage')], stop_reason=None)
在上面的示例中,我们选择了提供model
、api_key
和system_message
作为参数——您可以选择提供您所使用的模型客户端所需的任何其他参数,或者适合您的应用程序设计的参数。
现在,让我们探索如何在AgentChat中将这个自定义代理作为团队的一部分使用。
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.ui import Console
# Create the primary agent.
primary_agent = AssistantAgent(
"primary",
model_client=OpenAIChatCompletionClient(model="gpt-4o-mini"),
system_message="You are a helpful AI assistant.",
)
# Create a critic agent based on our new GeminiAssistantAgent.
gemini_critic_agent = GeminiAssistantAgent(
"gemini_critic",
system_message="Provide constructive feedback. Respond with 'APPROVE' to when your feedbacks are addressed.",
)
# Define a termination condition that stops the task if the critic approves or after 10 messages.
termination = TextMentionTermination("APPROVE") | MaxMessageTermination(10)
# Create a team with the primary and critic agents.
team = RoundRobinGroupChat([primary_agent, gemini_critic_agent], termination_condition=termination)
await Console(team.run_stream(task="Write a Haiku poem with 4 lines about the fall season."))
---------- user ----------
Write a Haiku poem with 4 lines about the fall season.
---------- primary ----------
Crimson leaves cascade,
Whispering winds sing of change,
Chill wraps the fading,
Nature's quilt, rich and warm.
---------- gemini_critic ----------
The poem is good, but it has four lines instead of three. A haiku must have three lines with a 5-7-5 syllable structure. The content is evocative of autumn, but the form is incorrect. Please revise to adhere to the haiku's syllable structure.
---------- primary ----------
Thank you for your feedback! Here’s a revised haiku that follows the 5-7-5 syllable structure:
Crimson leaves drift down,
Chill winds whisper through the gold,
Autumn’s breath is near.
---------- gemini_critic ----------
The revised haiku is much improved. It correctly follows the 5-7-5 syllable structure and maintains the evocative imagery of autumn. APPROVE
TaskResult(messages=[TextMessage(source='user', models_usage=None, content='Write a Haiku poem with 4 lines about the fall season.', type='TextMessage'), TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=33, completion_tokens=31), content="Crimson leaves cascade, \nWhispering winds sing of change, \nChill wraps the fading, \nNature's quilt, rich and warm.", type='TextMessage'), TextMessage(source='gemini_critic', models_usage=RequestUsage(prompt_tokens=86, completion_tokens=60), content="The poem is good, but it has four lines instead of three. A haiku must have three lines with a 5-7-5 syllable structure. The content is evocative of autumn, but the form is incorrect. Please revise to adhere to the haiku's syllable structure.\n", type='TextMessage'), TextMessage(source='primary', models_usage=RequestUsage(prompt_tokens=141, completion_tokens=49), content='Thank you for your feedback! Here’s a revised haiku that follows the 5-7-5 syllable structure:\n\nCrimson leaves drift down, \nChill winds whisper through the gold, \nAutumn’s breath is near.', type='TextMessage'), TextMessage(source='gemini_critic', models_usage=RequestUsage(prompt_tokens=211, completion_tokens=32), content='The revised haiku is much improved. It correctly follows the 5-7-5 syllable structure and maintains the evocative imagery of autumn. APPROVE\n', type='TextMessage')], stop_reason="Text 'APPROVE' mentioned")
在上述部分中,我们展示了一些非常重要的概念:
我们开发了一个自定义代理,该代理使用Google Gemini SDK来响应消息。
我们展示了这个自定义代理可以作为更广泛的AgentChat生态系统的一部分使用——在这种情况下,只要它继承自
BaseChatAgent
,就可以作为RoundRobinGroupChat
的参与者。
使自定义代理声明化#
Autogen 提供了一个Component接口,用于将组件的配置序列化为声明式格式。这对于保存和加载配置,以及与他人共享配置非常有用。
我们通过继承Component
类并实现_from_config
和_to_config
方法来完成这一点。
声明类可以使用dump_component
方法序列化为JSON格式,并使用load_component
方法从JSON格式反序列化。
import os
from typing import AsyncGenerator, Sequence
from autogen_agentchat.agents import BaseChatAgent
from autogen_agentchat.base import Response
from autogen_agentchat.messages import AgentEvent, ChatMessage
from autogen_core import CancellationToken, Component
from pydantic import BaseModel
from typing_extensions import Self
class GeminiAssistantAgentConfig(BaseModel):
name: str
description: str = "An agent that provides assistance with ability to use tools."
model: str = "gemini-1.5-flash-002"
system_message: str | None = None
class GeminiAssistantAgent(BaseChatAgent, Component[GeminiAssistantAgentConfig]): # type: ignore[no-redef]
component_config_schema = GeminiAssistantAgentConfig
# component_provider_override = "mypackage.agents.GeminiAssistantAgent"
def __init__(
self,
name: str,
description: str = "An agent that provides assistance with ability to use tools.",
model: str = "gemini-1.5-flash-002",
api_key: str = os.environ["GEMINI_API_KEY"],
system_message: str
| None = "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed.",
):
super().__init__(name=name, description=description)
self._model_context = UnboundedChatCompletionContext()
self._model_client = genai.Client(api_key=api_key)
self._system_message = system_message
self._model = model
@property
def produced_message_types(self) -> Sequence[type[ChatMessage]]:
return (TextMessage,)
async def on_messages(self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken) -> Response:
final_response = None
async for message in self.on_messages_stream(messages, cancellation_token):
if isinstance(message, Response):
final_response = message
if final_response is None:
raise AssertionError("The stream should have returned the final result.")
return final_response
async def on_messages_stream(
self, messages: Sequence[ChatMessage], cancellation_token: CancellationToken
) -> AsyncGenerator[AgentEvent | ChatMessage | Response, None]:
# Add messages to the model context
for msg in messages:
await self._model_context.add_message(UserMessage(content=msg.content, source=msg.source))
# Get conversation history
history = [
(msg.source if hasattr(msg, "source") else "system")
+ ": "
+ (msg.content if isinstance(msg.content, str) else "")
+ "\n"
for msg in await self._model_context.get_messages()
]
# Generate response using Gemini
response = self._model_client.models.generate_content(
model=self._model,
contents=f"History: {history}\nGiven the history, please provide a response",
config=types.GenerateContentConfig(
system_instruction=self._system_message,
temperature=0.3,
),
)
# Create usage metadata
usage = RequestUsage(
prompt_tokens=response.usage_metadata.prompt_token_count,
completion_tokens=response.usage_metadata.candidates_token_count,
)
# Add response to model context
await self._model_context.add_message(AssistantMessage(content=response.text, source=self.name))
# Yield the final response
yield Response(
chat_message=TextMessage(content=response.text, source=self.name, models_usage=usage),
inner_messages=[],
)
async def on_reset(self, cancellation_token: CancellationToken) -> None:
"""Reset the assistant by clearing the model context."""
await self._model_context.clear()
@classmethod
def _from_config(cls, config: GeminiAssistantAgentConfig) -> Self:
return cls(
name=config.name, description=config.description, model=config.model, system_message=config.system_message
)
def _to_config(self) -> GeminiAssistantAgentConfig:
return GeminiAssistantAgentConfig(
name=self.name,
description=self.description,
model=self._model,
system_message=self._system_message,
)
现在我们已经实现了所需的方法,我们可以将自定义代理加载和转储为JSON格式,然后从JSON格式加载代理。
注意:你应该将
component_provider_override
类变量设置为包含自定义代理类的模块的完整路径,例如 (mypackage.agents.GeminiAssistantAgent
)。这由load_component
方法用于确定如何实例化该类。
gemini_assistant = GeminiAssistantAgent("gemini_assistant")
config = gemini_assistant.dump_component()
print(config.model_dump_json(indent=2))
loaded_agent = GeminiAssistantAgent.load_component(config)
print(loaded_agent)
{
"provider": "__main__.GeminiAssistantAgent",
"component_type": "agent",
"version": 1,
"component_version": 1,
"description": null,
"label": "GeminiAssistantAgent",
"config": {
"name": "gemini_assistant",
"description": "An agent that provides assistance with ability to use tools.",
"model": "gemini-1.5-flash-002",
"system_message": "You are a helpful assistant that can respond to messages. Reply with TERMINATE when the task has been completed."
}
}
<__main__.GeminiAssistantAgent object at 0x11a5c5a90>
下一步#
到目前为止,我们已经了解了如何创建自定义代理、向代理添加自定义模型客户端以及使自定义代理具有声明性。这个基本示例可以通过以下几种方式进行扩展:
扩展Gemini模型客户端以处理类似于
AssistantAgent
类的函数调用。https://ai.google.dev/gemini-api/docs/function-calling实现一个包含自定义代理的包,并在AutoGen Studio等工具中尝试使用其声明式格式。