跳过内容

pydantic_ai.models.function

一个由本地函数控制的模型。

FunctionModel 类似于 TestModel,但允许更大程度上控制模型的行为。

它的主要用例是进行比TestModel更高级的单元测试。

这是一个最小化的示例:

function_model_usage.py
from pydantic_ai import Agent
from pydantic_ai.messages import ModelMessage, ModelResponse, TextPart
from pydantic_ai.models.function import FunctionModel, AgentInfo

my_agent = Agent('openai:gpt-4o')


async def model_function(
    messages: list[ModelMessage], info: AgentInfo
) -> ModelResponse:
    print(messages)
    """
    [
        ModelRequest(
            parts=[
                UserPromptPart(
                    content='Testing my agent...',
                    timestamp=datetime.datetime(...),
                    part_kind='user-prompt',
                )
            ],
            kind='request',
        )
    ]
    """
    print(info)
    """
    AgentInfo(
        function_tools=[], allow_text_result=True, result_tools=[], model_settings=None
    )
    """
    return ModelResponse(parts=[TextPart('hello world')])


async def test_my_agent():
    """Unit test for my_agent, to be run by pytest."""
    with my_agent.override(model=FunctionModel(model_function)):
        result = await my_agent.run('Testing my agent...')
        assert result.data == 'hello world'

请参见 使用 FunctionModel 的单元测试 获取详细文档。

函数模型 dataclass

基类: Model

由本地函数控制的模型。

除了 __init__,所有方法都是私有的或与基类的方法匹配。

Source code in pydantic_ai_slim/pydantic_ai/models/function.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@dataclass(init=False)
class FunctionModel(Model):
    """A model controlled by a local function.

    Apart from `__init__`, all methods are private or match those of the base class.
    """

    function: FunctionDef | None = None
    stream_function: StreamFunctionDef | None = None

    _model_name: str = field(repr=False)
    _system: str | None = field(default=None, repr=False)

    @overload
    def __init__(self, function: FunctionDef) -> None: ...

    @overload
    def __init__(self, *, stream_function: StreamFunctionDef) -> None: ...

    @overload
    def __init__(self, function: FunctionDef, *, stream_function: StreamFunctionDef) -> None: ...

    def __init__(self, function: FunctionDef | None = None, *, stream_function: StreamFunctionDef | None = None):
        """Initialize a `FunctionModel`.

        Either `function` or `stream_function` must be provided, providing both is allowed.

        Args:
            function: The function to call for non-streamed requests.
            stream_function: The function to call for streamed requests.
        """
        if function is None and stream_function is None:
            raise TypeError('Either `function` or `stream_function` must be provided')
        self.function = function
        self.stream_function = stream_function

        function_name = self.function.__name__ if self.function is not None else ''
        stream_function_name = self.stream_function.__name__ if self.stream_function is not None else ''
        self._model_name = f'function:{function_name}:{stream_function_name}'

    async def request(
        self,
        messages: list[ModelMessage],
        model_settings: ModelSettings | None,
        model_request_parameters: ModelRequestParameters,
    ) -> tuple[ModelResponse, usage.Usage]:
        agent_info = AgentInfo(
            model_request_parameters.function_tools,
            model_request_parameters.allow_text_result,
            model_request_parameters.result_tools,
            model_settings,
        )

        assert self.function is not None, 'FunctionModel must receive a `function` to support non-streamed requests'

        if inspect.iscoroutinefunction(self.function):
            response = await self.function(messages, agent_info)
        else:
            response_ = await _utils.run_in_executor(self.function, messages, agent_info)
            assert isinstance(response_, ModelResponse), response_
            response = response_
        response.model_name = f'function:{self.function.__name__}'
        # TODO is `messages` right here? Should it just be new messages?
        return response, _estimate_usage(chain(messages, [response]))

    @asynccontextmanager
    async def request_stream(
        self,
        messages: list[ModelMessage],
        model_settings: ModelSettings | None,
        model_request_parameters: ModelRequestParameters,
    ) -> AsyncIterator[StreamedResponse]:
        agent_info = AgentInfo(
            model_request_parameters.function_tools,
            model_request_parameters.allow_text_result,
            model_request_parameters.result_tools,
            model_settings,
        )

        assert self.stream_function is not None, (
            'FunctionModel must receive a `stream_function` to support streamed requests'
        )

        response_stream = PeekableAsyncStream(self.stream_function(messages, agent_info))

        first = await response_stream.peek()
        if isinstance(first, _utils.Unset):
            raise ValueError('Stream function must return at least one item')

        yield FunctionStreamedResponse(_model_name=f'function:{self.stream_function.__name__}', _iter=response_stream)

    @property
    def model_name(self) -> str:
        """The model name."""
        return self._model_name

    @property
    def system(self) -> str | None:
        """The system / model provider."""
        return self._system

__init__

__init__(function: FunctionDef) -> None
__init__(*, stream_function: StreamFunctionDef) -> None
__init__(
    function: FunctionDef,
    *,
    stream_function: StreamFunctionDef
) -> None
__init__(
    function: FunctionDef | None = None,
    *,
    stream_function: StreamFunctionDef | None = None
)

初始化一个 FunctionModel

必须提供 functionstream_function,同时提供两者也是允许的。

参数:

名称 类型 描述 默认值
function FunctionDef | None

用于非流请求的调用函数。

None
stream_function StreamFunctionDef | None

用于流请求的调用函数。

None
Source code in pydantic_ai_slim/pydantic_ai/models/function.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(self, function: FunctionDef | None = None, *, stream_function: StreamFunctionDef | None = None):
    """Initialize a `FunctionModel`.

    Either `function` or `stream_function` must be provided, providing both is allowed.

    Args:
        function: The function to call for non-streamed requests.
        stream_function: The function to call for streamed requests.
    """
    if function is None and stream_function is None:
        raise TypeError('Either `function` or `stream_function` must be provided')
    self.function = function
    self.stream_function = stream_function

    function_name = self.function.__name__ if self.function is not None else ''
    stream_function_name = self.stream_function.__name__ if self.stream_function is not None else ''
    self._model_name = f'function:{function_name}:{stream_function_name}'

模型名称 property

model_name: str

模型名称。

系统 property

system: str | None

系统 / 模型提供者。

代理信息 dataclass

有关代理的信息。

这是作为第二个参数传递给在 FunctionModel 中使用的函数。

Source code in pydantic_ai_slim/pydantic_ai/models/function.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@dataclass(frozen=True)
class AgentInfo:
    """Information about an agent.

    This is passed as the second to functions used within [`FunctionModel`][pydantic_ai.models.function.FunctionModel].
    """

    function_tools: list[ToolDefinition]
    """The function tools available on this agent.

    These are the tools registered via the [`tool`][pydantic_ai.Agent.tool] and
    [`tool_plain`][pydantic_ai.Agent.tool_plain] decorators.
    """
    allow_text_result: bool
    """Whether a plain text result is allowed."""
    result_tools: list[ToolDefinition]
    """The tools that can called as the final result of the run."""
    model_settings: ModelSettings | None
    """The model settings passed to the run call."""

函数工具 instance-attribute

function_tools: list[ToolDefinition]

此代理可用的功能工具。

这些是通过 tooltool_plain 装饰器注册的工具。

允许文本结果 实例属性

allow_text_result: bool

是否允许纯文本结果。

结果工具 instance-attribute

result_tools: list[ToolDefinition]

可以称为运行最终结果的工具。

模型设置 instance-attribute

model_settings: ModelSettings | None

传递给运行调用的模型设置。

DeltaToolCall dataclass

对工具调用的增量更改。

用于描述在流式传输结构化响应时的一段内容。

Source code in pydantic_ai_slim/pydantic_ai/models/function.py
160
161
162
163
164
165
166
167
168
169
170
@dataclass
class DeltaToolCall:
    """Incremental change to a tool call.

    Used to describe a chunk when streaming structured responses.
    """

    name: str | None = None
    """Incremental change to the name of the tool."""
    json_args: str | None = None
    """Incremental change to the arguments as JSON"""

名称 class-attribute instance-attribute

name: str | None = None

对工具名称的增量更改。

json_args 类属性 实例属性

json_args: str | None = None

对参数的增量更改作为JSON

DeltaToolCalls module-attribute

DeltaToolCalls: TypeAlias = dict[int, DeltaToolCall]

工具调用ID与增量变化的映射。

函数定义 module-attribute

用于生成非流式响应的函数。

流函数定义 module-attribute

用于生成流式响应的函数。

虽然这被定义为返回类型为 AsyncIterator[Union[str, DeltaToolCalls]],但实际上应被视为 Union[AsyncIterator[str], AsyncIterator[DeltaToolCalls]

例如,你需要返回所有文本或所有 DeltaToolCalls,而不是将它们混合在一起。

函数流式响应 dataclass

基础: StreamedResponse

FunctionModel实现StreamedResponse

Source code in pydantic_ai_slim/pydantic_ai/models/function.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@dataclass
class FunctionStreamedResponse(StreamedResponse):
    """Implementation of `StreamedResponse` for [FunctionModel][pydantic_ai.models.function.FunctionModel]."""

    _model_name: str
    _iter: AsyncIterator[str | DeltaToolCalls]
    _timestamp: datetime = field(default_factory=_utils.now_utc)

    def __post_init__(self):
        self._usage += _estimate_usage([])

    async def _get_event_iterator(self) -> AsyncIterator[ModelResponseStreamEvent]:
        async for item in self._iter:
            if isinstance(item, str):
                response_tokens = _estimate_string_tokens(item)
                self._usage += usage.Usage(response_tokens=response_tokens, total_tokens=response_tokens)
                yield self._parts_manager.handle_text_delta(vendor_part_id='content', content=item)
            else:
                delta_tool_calls = item
                for dtc_index, delta_tool_call in delta_tool_calls.items():
                    if delta_tool_call.json_args:
                        response_tokens = _estimate_string_tokens(delta_tool_call.json_args)
                        self._usage += usage.Usage(response_tokens=response_tokens, total_tokens=response_tokens)
                    maybe_event = self._parts_manager.handle_tool_call_delta(
                        vendor_part_id=dtc_index,
                        tool_name=delta_tool_call.name,
                        args=delta_tool_call.json_args,
                        tool_call_id=None,
                    )
                    if maybe_event is not None:
                        yield maybe_event

    @property
    def model_name(self) -> str:
        """Get the model name of the response."""
        return self._model_name

    @property
    def timestamp(self) -> datetime:
        """Get the timestamp of the response."""
        return self._timestamp

模型名称 property

model_name: str

获取响应的模型名称。

时间戳 property

timestamp: datetime

获取响应的时间戳。