跳过内容

API 参考

Source code in instructor/client.py
def from_openai(
    client: openai.OpenAI | openai.AsyncOpenAI,
    mode: instructor.Mode = instructor.Mode.TOOLS,
    **kwargs: Any,
) -> Instructor | AsyncInstructor:
    if hasattr(client, "base_url"):
        provider = get_provider(str(client.base_url))
    else:
        provider = Provider.OPENAI

    if not isinstance(client, (openai.OpenAI, openai.AsyncOpenAI)):
        import warnings

        warnings.warn(
            "Client should be an instance of openai.OpenAI or openai.AsyncOpenAI. Unexpected behavior may occur with other client types.",
            stacklevel=2,
        )

    if provider in {Provider.ANYSCALE, Provider.TOGETHER}:
        assert mode in {
            instructor.Mode.TOOLS,
            instructor.Mode.JSON,
            instructor.Mode.JSON_SCHEMA,
            instructor.Mode.MD_JSON,
        }

    if provider in {Provider.OPENAI, Provider.DATABRICKS}:
        assert mode in {
            instructor.Mode.TOOLS,
            instructor.Mode.JSON,
            instructor.Mode.FUNCTIONS,
            instructor.Mode.PARALLEL_TOOLS,
            instructor.Mode.MD_JSON,
            instructor.Mode.TOOLS_STRICT,
            instructor.Mode.JSON_O1,
        }

    if isinstance(client, openai.OpenAI):
        return Instructor(
            client=client,
            create=instructor.patch(create=client.chat.completions.create, mode=mode),
            mode=mode,
            provider=provider,
            **kwargs,
        )

    if isinstance(client, openai.AsyncOpenAI):
        return AsyncInstructor(
            client=client,
            create=instructor.patch(create=client.chat.completions.create, mode=mode),
            mode=mode,
            provider=provider,
            **kwargs,
        )

Validator

基础: OpenAISchema

验证属性是否正确,如果不正确,则返回一个新的值和错误信息

Source code in instructor/dsl/validators.py
class Validator(OpenAISchema):
    """
    Validate if an attribute is correct and if not,
    return a new value with an error message
    """

    is_valid: bool = Field(
        default=True,
        description="Whether the attribute is valid based on the requirements",
    )
    reason: Optional[str] = Field(
        default=None,
        description="The error message if the attribute is not valid, otherwise None",
    )
    fixed_value: Optional[str] = Field(
        default=None,
        description="If the attribute is not valid, suggest a new value for the attribute",
    )

llm_validator(statement, client, allow_override=False, model='gpt-3.5-turbo', temperature=0)

创建一个使用LLM验证属性的验证器

使用

from instructor import llm_validator
from pydantic import BaseModel, Field, field_validator

class User(BaseModel):
    name: str = Annotated[str, llm_validator("The name must be a full name all lowercase")
    age: int = Field(description="The age of the person")

try:
    user = User(name="Jason Liu", age=20)
except ValidationError as e:
    print(e)
1 validation error for User
name
    The name is valid but not all lowercase (type=value_error.llm_validator)

请注意,错误消息是由LLM写的,错误类型是 value_error.llm_validator

参数:

名称 类型 描述 默认
statement str

要验证的语句

required
model str

用于验证的LLM(默认:“gpt-3.5-turbo-0613”)

'gpt-3.5-turbo'
temperature float

用于LLM的温度(默认值:0)

0
openai_client OpenAI

要使用的OpenAI客户端(默认:无)

required
Source code in instructor/dsl/validators.py
def llm_validator(
    statement: str,
    client: Instructor,
    allow_override: bool = False,
    model: str = "gpt-3.5-turbo",
    temperature: float = 0,
) -> Callable[[str], str]:
    """
    Create a validator that uses the LLM to validate an attribute

    ## Usage

    ```python
    from instructor import llm_validator
    from pydantic import BaseModel, Field, field_validator

    class User(BaseModel):
        name: str = Annotated[str, llm_validator("The name must be a full name all lowercase")
        age: int = Field(description="The age of the person")

    try:
        user = User(name="Jason Liu", age=20)
    except ValidationError as e:
        print(e)
    ```

    ```
    1 validation error for User
    name
        The name is valid but not all lowercase (type=value_error.llm_validator)
    ```

    Note that there, the error message is written by the LLM, and the error type is `value_error.llm_validator`.

    Parameters:
        statement (str): The statement to validate
        model (str): The LLM to use for validation (default: "gpt-3.5-turbo-0613")
        temperature (float): The temperature to use for the LLM (default: 0)
        openai_client (OpenAI): The OpenAI client to use (default: None)
    """

    def llm(v: str) -> str:
        resp = client.chat.completions.create(
            response_model=Validator,
            messages=[
                {
                    "role": "system",
                    "content": "You are a world class validation model. Capable to determine if the following value is valid for the statement, if it is not, explain why and suggest a new value.",
                },
                {
                    "role": "user",
                    "content": f"Does `{v}` follow the rules: {statement}",
                },
            ],
            model=model,
            temperature=temperature,
        )

        # If the response is  not valid, return the reason, this could be used in
        # the future to generate a better response, via reasking mechanism.
        assert resp.is_valid, resp.reason

        if allow_override and not resp.is_valid and resp.fixed_value is not None:
            # If the value is not valid, but we allow override, return the fixed value
            return resp.fixed_value
        return v

    return llm

openai_moderation(client)

使用OpenAI审核模型验证消息。

仅应用于监控OpenAI API的输入和输出,其他用例根据以下内容被禁止:https://platform.openai.com/docs/guides/moderation/overview

示例:

from instructor import OpenAIModeration

class Response(BaseModel):
    message: Annotated[str, AfterValidator(OpenAIModeration(openai_client=client))]

Response(message="我讨厌你")

 ValidationError: 1 validation error for Response
 message
Value error, `I hate you.` was flagged for ['harassment'] [type=value_error, input_value='I hate you.', input_type=str]

client (OpenAI):要使用的OpenAI客户端,必须是同步的(默认值:None)

Source code in instructor/dsl/validators.py
def openai_moderation(client: OpenAI) -> Callable[[str], str]:
    """
    Validates a message using OpenAI moderation model.

    Should only be used for monitoring inputs and outputs of OpenAI APIs
    Other use cases are disallowed as per:
    https://platform.openai.com/docs/guides/moderation/overview

    Example:
    ```python
    from instructor import OpenAIModeration

    class Response(BaseModel):
        message: Annotated[str, AfterValidator(OpenAIModeration(openai_client=client))]

    Response(message="I hate you")
    ```

    ```
     ValidationError: 1 validation error for Response
     message
    Value error, `I hate you.` was flagged for ['harassment'] [type=value_error, input_value='I hate you.', input_type=str]
    ```

    client (OpenAI): The OpenAI client to use, must be sync (default: None)
    """

    def validate_message_with_openai_mod(v: str) -> str:
        response = client.moderations.create(input=v)
        out = response.results[0]
        cats = out.categories.model_dump()
        if out.flagged:
            raise ValueError(
                f"`{v}` was flagged for {', '.join(cat for cat in cats if cats[cat])}"
            )

        return v

    return validate_message_with_openai_mod

IterableModel(subtask_class, name=None, description=None)

动态创建一个 IterableModel OpenAISchema,可以用于根据基类对多个任务进行分段。这将创建一个可以用于特定任务工具包的类,名称和描述会自动生成。但它们可以被覆盖。

使用

from pydantic import BaseModel, Field
from instructor import IterableModel

class User(BaseModel):
    name: str = Field(description="The name of the person")
    age: int = Field(description="The age of the person")
    role: str = Field(description="The role of the person")

MultiUser = IterableModel(User)

结果

class MultiUser(OpenAISchema, MultiTaskBase):
    tasks: List[User] = Field(
        default_factory=list,
        repr=False,
        description="Correctly segmented list of `User` tasks",
    )

    @classmethod
    def from_streaming_response(cls, completion) -> Generator[User]:
        '''
        Parse the streaming response from OpenAI and yield a `User` object
        for each task in the response
        '''
        json_chunks = cls.extract_json(completion)
        yield from cls.tasks_from_chunks(json_chunks)

参数:

名称 类型 描述 默认
subtask_class Type[OpenAISchema]

用于MultiTask的基础类

required
name Optional[str]

MultiTask类的名称,如果为None,则使用子任务类的名称作为 Multi{subtask_class.__name__}

None
description Optional[str]

MultiTask类的描述,如果为None,则描述设置为Correct segmentation of{subtask_class.name}tasks

None

返回:

名称 类型 描述
schema OpenAISchema

一个可以用于划分多个任务的新类

Source code in instructor/dsl/iterable.py
def IterableModel(
    subtask_class: type[BaseModel],
    name: Optional[str] = None,
    description: Optional[str] = None,
) -> type[BaseModel]:
    """
    Dynamically create a IterableModel OpenAISchema that can be used to segment multiple
    tasks given a base class. This creates class that can be used to create a toolkit
    for a specific task, names and descriptions are automatically generated. However
    they can be overridden.

    ## Usage

    ```python
    from pydantic import BaseModel, Field
    from instructor import IterableModel

    class User(BaseModel):
        name: str = Field(description="The name of the person")
        age: int = Field(description="The age of the person")
        role: str = Field(description="The role of the person")

    MultiUser = IterableModel(User)
    ```

    ## Result

    ```python
    class MultiUser(OpenAISchema, MultiTaskBase):
        tasks: List[User] = Field(
            default_factory=list,
            repr=False,
            description="Correctly segmented list of `User` tasks",
        )

        @classmethod
        def from_streaming_response(cls, completion) -> Generator[User]:
            '''
            Parse the streaming response from OpenAI and yield a `User` object
            for each task in the response
            '''
            json_chunks = cls.extract_json(completion)
            yield from cls.tasks_from_chunks(json_chunks)
    ```

    Parameters:
        subtask_class (Type[OpenAISchema]): The base class to use for the MultiTask
        name (Optional[str]): The name of the MultiTask class, if None then the name
            of the subtask class is used as `Multi{subtask_class.__name__}`
        description (Optional[str]): The description of the MultiTask class, if None
            then the description is set to `Correct segmentation of `{subtask_class.__name__}` tasks`

    Returns:
        schema (OpenAISchema): A new class that can be used to segment multiple tasks
    """
    task_name = subtask_class.__name__ if name is None else name

    name = f"Iterable{task_name}"

    list_tasks = (
        list[subtask_class],
        Field(
            default_factory=list,
            repr=False,
            description=f"Correctly segmented list of `{task_name}` tasks",
        ),
    )

    base_models = cast(tuple[type[BaseModel], ...], (OpenAISchema, IterableBase))
    new_cls = create_model(
        name,
        tasks=list_tasks,
        __base__=base_models,
    )
    new_cls = cast(type[IterableBase], new_cls)

    # set the class constructor BaseModel
    new_cls.task_type = subtask_class

    new_cls.__doc__ = (
        f"Correct segmentation of `{task_name}` tasks"
        if description is None
        else description
    )
    assert issubclass(
        new_cls, OpenAISchema
    ), "The new class should be a subclass of OpenAISchema"
    return new_cls

Partial

基础: Generic[T_Model]

生成一个新的类,该类以 PartialBase 作为基类。

Notes

这将在流式传输期间启用模型的部分验证。

Example

部分[SomeModel]

Source code in instructor/dsl/partial.py
class Partial(Generic[T_Model]):
    """Generate a new class which has PartialBase as a base class.

    Notes:
        This will enable partial validation of the model while streaming.

    Example:
        Partial[SomeModel]
    """

    def __new__(
        cls,
        *args: object,  # noqa :ARG003
        **kwargs: object,  # noqa :ARG003
    ) -> Partial[T_Model]:
        """Cannot instantiate.

        Raises:
            TypeError: Direct instantiation not allowed.
        """
        raise TypeError("Cannot instantiate abstract Partial class.")

    def __init_subclass__(
        cls,
        *args: object,
        **kwargs: object,
    ) -> NoReturn:
        """Cannot subclass.

        Raises:
           TypeError: Subclassing not allowed.
        """
        raise TypeError(f"Cannot subclass {cls.__module__}.Partial")

    def __class_getitem__(
        cls,
        wrapped_class: type[T_Model] | tuple[type[T_Model], type[MakeFieldsOptional]],
    ) -> type[T_Model]:
        """Convert model to one that inherits from PartialBase.

        We don't make the fields optional at this point, we just wrap them with `Partial` so the names of the nested models will be
        `Partial{ModelName}`. We want the output of `model_json_schema()` to
        reflect the name change, but everything else should be the same as the
        original model. During validation, we'll generate a true partial model
        to support partially defined fields.

        """

        make_fields_optional = None
        if isinstance(wrapped_class, tuple):
            wrapped_class, make_fields_optional = wrapped_class

        def _wrap_models(field: FieldInfo) -> tuple[object, FieldInfo]:
            tmp_field = deepcopy(field)

            annotation = field.annotation

            # Handle generics (like List, Dict, etc.)
            if get_origin(annotation) is not None:
                # Get the generic base (like List, Dict) and its arguments (like User in List[User])
                generic_base = get_origin(annotation)
                generic_args = get_args(annotation)

                modified_args = tuple(_process_generic_arg(arg) for arg in generic_args)

                # Reconstruct the generic type with modified arguments
                tmp_field.annotation = (
                    generic_base[modified_args] if generic_base else None
                )
            # If the field is a BaseModel, then recursively convert it's
            # attributes to optionals.
            elif isinstance(annotation, type) and issubclass(annotation, BaseModel):
                tmp_field.annotation = Partial[annotation]
            return tmp_field.annotation, tmp_field

        model_name = (
            wrapped_class.__name__
            if wrapped_class.__name__.startswith("Partial")
            else f"Partial{wrapped_class.__name__}"
        )

        return create_model(
            model_name,
            __base__=(wrapped_class, PartialBase),  # type: ignore
            __module__=wrapped_class.__module__,
            **{
                field_name: (
                    _make_field_optional(field_info)
                    if make_fields_optional is not None
                    else _wrap_models(field_info)
                )
                for field_name, field_info in wrapped_class.model_fields.items()
            },  # type: ignore
        )

__class_getitem__(wrapped_class)

将模型转换为继承自PartialBase的模型。

我们此时不将字段设为可选,而是将它们用 Partial 包裹,这样嵌套模型的名称将为 Partial{ModelName}。我们希望 model_json_schema() 的输出能够反映名称的变化,但其他一切应与原始模型保持一致。在验证过程中,我们将生成一个真实的部分模型,以支持部分定义的字段。

Source code in instructor/dsl/partial.py
def __class_getitem__(
    cls,
    wrapped_class: type[T_Model] | tuple[type[T_Model], type[MakeFieldsOptional]],
) -> type[T_Model]:
    """Convert model to one that inherits from PartialBase.

    We don't make the fields optional at this point, we just wrap them with `Partial` so the names of the nested models will be
    `Partial{ModelName}`. We want the output of `model_json_schema()` to
    reflect the name change, but everything else should be the same as the
    original model. During validation, we'll generate a true partial model
    to support partially defined fields.

    """

    make_fields_optional = None
    if isinstance(wrapped_class, tuple):
        wrapped_class, make_fields_optional = wrapped_class

    def _wrap_models(field: FieldInfo) -> tuple[object, FieldInfo]:
        tmp_field = deepcopy(field)

        annotation = field.annotation

        # Handle generics (like List, Dict, etc.)
        if get_origin(annotation) is not None:
            # Get the generic base (like List, Dict) and its arguments (like User in List[User])
            generic_base = get_origin(annotation)
            generic_args = get_args(annotation)

            modified_args = tuple(_process_generic_arg(arg) for arg in generic_args)

            # Reconstruct the generic type with modified arguments
            tmp_field.annotation = (
                generic_base[modified_args] if generic_base else None
            )
        # If the field is a BaseModel, then recursively convert it's
        # attributes to optionals.
        elif isinstance(annotation, type) and issubclass(annotation, BaseModel):
            tmp_field.annotation = Partial[annotation]
        return tmp_field.annotation, tmp_field

    model_name = (
        wrapped_class.__name__
        if wrapped_class.__name__.startswith("Partial")
        else f"Partial{wrapped_class.__name__}"
    )

    return create_model(
        model_name,
        __base__=(wrapped_class, PartialBase),  # type: ignore
        __module__=wrapped_class.__module__,
        **{
            field_name: (
                _make_field_optional(field_info)
                if make_fields_optional is not None
                else _wrap_models(field_info)
            )
            for field_name, field_info in wrapped_class.model_fields.items()
        },  # type: ignore
    )

__init_subclass__(*args, **kwargs)

无法继承。

引发:

类型 描述
TypeError

不允许继承。

Source code in instructor/dsl/partial.py
def __init_subclass__(
    cls,
    *args: object,
    **kwargs: object,
) -> NoReturn:
    """Cannot subclass.

    Raises:
       TypeError: Subclassing not allowed.
    """
    raise TypeError(f"Cannot subclass {cls.__module__}.Partial")

__new__(*args, **kwargs)

无法实例化。

引发:

类型 描述
TypeError

不允许直接实例化。

Source code in instructor/dsl/partial.py
def __new__(
    cls,
    *args: object,  # noqa :ARG003
    **kwargs: object,  # noqa :ARG003
) -> Partial[T_Model]:
    """Cannot instantiate.

    Raises:
        TypeError: Direct instantiation not allowed.
    """
    raise TypeError("Cannot instantiate abstract Partial class.")

PartialBase

基础: Generic[T_Model]

Source code in instructor/dsl/partial.py
class PartialBase(Generic[T_Model]):
    @classmethod
    @cache
    def get_partial_model(cls) -> type[T_Model]:
        """Return a partial model we can use to validate partial results."""
        assert issubclass(
            cls, BaseModel
        ), f"{cls.__name__} must be a subclass of BaseModel"

        model_name = (
            cls.__name__
            if cls.__name__.startswith("Partial")
            else f"Partial{cls.__name__}"
        )

        return create_model(
            model_name,
            __base__=cls,
            __module__=cls.__module__,
            **{
                field_name: _make_field_optional(field_info)
                for field_name, field_info in cls.model_fields.items()
            },  # type: ignore[all]
        )

    @classmethod
    def from_streaming_response(
        cls, completion: Iterable[Any], mode: Mode, **kwargs: Any
    ) -> Generator[T_Model, None, None]:
        json_chunks = cls.extract_json(completion, mode)

        if mode in {Mode.MD_JSON, Mode.GEMINI_TOOLS}:
            json_chunks = extract_json_from_stream(json_chunks)

        if mode == Mode.WRITER_TOOLS:
            yield from cls.writer_model_from_chunks(json_chunks, **kwargs)
        else:
            yield from cls.model_from_chunks(json_chunks, **kwargs)

    @classmethod
    async def from_streaming_response_async(
        cls, completion: AsyncGenerator[Any, None], mode: Mode, **kwargs: Any
    ) -> AsyncGenerator[T_Model, None]:
        json_chunks = cls.extract_json_async(completion, mode)

        if mode == Mode.MD_JSON:
            json_chunks = extract_json_from_stream_async(json_chunks)
        elif mode == Mode.WRITER_TOOLS:
            return cls.writer_model_from_chunks_async(json_chunks, **kwargs)

        return cls.model_from_chunks_async(json_chunks, **kwargs)

    @classmethod
    def writer_model_from_chunks(
        cls, json_chunks: Iterable[Any], **kwargs: Any
    ) -> Generator[T_Model, None, None]:
        potential_object = ""
        partial_model = cls.get_partial_model()
        partial_mode = (
            "on" if issubclass(cls, PartialLiteralMixin) else "trailing-strings"
        )
        for chunk in json_chunks:
            if len(chunk) > len(potential_object):
                potential_object = chunk
            else:
                potential_object += chunk
            obj = from_json(
                (potential_object.strip() or "{}").encode(), partial_mode=partial_mode
            )
            obj = partial_model.model_validate(obj, strict=None, **kwargs)
            yield obj

    @classmethod
    async def writer_model_from_chunks_async(
        cls, json_chunks: AsyncGenerator[str, None], **kwargs: Any
    ) -> AsyncGenerator[T_Model, None]:
        potential_object = ""
        partial_model = cls.get_partial_model()
        partial_mode = (
            "on" if issubclass(cls, PartialLiteralMixin) else "trailing-strings"
        )
        async for chunk in json_chunks:
            if len(chunk) > len(potential_object):
                potential_object = chunk
            else:
                potential_object += chunk
            obj = from_json(
                (potential_object.strip() or "{}").encode(), partial_mode=partial_mode
            )
            obj = partial_model.model_validate(obj, strict=None, **kwargs)
            yield obj

    @classmethod
    def model_from_chunks(
        cls, json_chunks: Iterable[Any], **kwargs: Any
    ) -> Generator[T_Model, None, None]:
        potential_object = ""
        partial_model = cls.get_partial_model()
        partial_mode = (
            "on" if issubclass(cls, PartialLiteralMixin) else "trailing-strings"
        )
        for chunk in json_chunks:
            potential_object += chunk
            obj = from_json(
                (potential_object.strip() or "{}").encode(), partial_mode=partial_mode
            )
            obj = partial_model.model_validate(obj, strict=None, **kwargs)
            yield obj

    @classmethod
    async def model_from_chunks_async(
        cls, json_chunks: AsyncGenerator[str, None], **kwargs: Any
    ) -> AsyncGenerator[T_Model, None]:
        potential_object = ""
        partial_model = cls.get_partial_model()
        partial_mode = (
            "on" if issubclass(cls, PartialLiteralMixin) else "trailing-strings"
        )
        async for chunk in json_chunks:
            potential_object += chunk
            obj = from_json(
                (potential_object.strip() or "{}").encode(), partial_mode=partial_mode
            )
            obj = partial_model.model_validate(obj, strict=None, **kwargs)
            yield obj

    @staticmethod
    def extract_json(
        completion: Iterable[Any], mode: Mode
    ) -> Generator[str, None, None]:
        for chunk in completion:
            try:
                if mode == Mode.ANTHROPIC_JSON:
                    if json_chunk := chunk.delta.text:
                        yield json_chunk
                if mode == Mode.ANTHROPIC_TOOLS:
                    yield chunk.delta.partial_json
                if mode == Mode.GEMINI_JSON:
                    yield chunk.text
                if mode == Mode.GEMINI_TOOLS:
                    # Gemini seems to return the entire function_call and not a chunk?
                    import json

                    resp = chunk.candidates[0].content.parts[0].function_call
                    resp_dict = type(resp).to_dict(resp)  # type:ignore
                    if "args" in resp_dict:
                        yield json.dumps(resp_dict["args"])
                elif chunk.choices:
                    if mode == Mode.FUNCTIONS:
                        Mode.warn_mode_functions_deprecation()
                        if json_chunk := chunk.choices[0].delta.function_call.arguments:
                            yield json_chunk
                    elif mode in {
                        Mode.JSON,
                        Mode.MD_JSON,
                        Mode.JSON_SCHEMA,
                        Mode.CEREBRAS_JSON,
                        Mode.FIREWORKS_JSON,
                    }:
                        if json_chunk := chunk.choices[0].delta.content:
                            yield json_chunk
                    elif mode in {
                        Mode.TOOLS,
                        Mode.TOOLS_STRICT,
                        Mode.FIREWORKS_TOOLS,
                        Mode.WRITER_TOOLS,
                    }:
                        if json_chunk := chunk.choices[0].delta.tool_calls:
                            if json_chunk[0].function.arguments:
                                yield json_chunk[0].function.arguments
                    else:
                        raise NotImplementedError(
                            f"Mode {mode} is not supported for MultiTask streaming"
                        )
            except AttributeError:
                pass

    @staticmethod
    async def extract_json_async(
        completion: AsyncGenerator[Any, None], mode: Mode
    ) -> AsyncGenerator[str, None]:
        async for chunk in completion:
            try:
                if mode == Mode.ANTHROPIC_JSON:
                    if json_chunk := chunk.delta.text:
                        yield json_chunk
                if mode == Mode.ANTHROPIC_TOOLS:
                    yield chunk.delta.partial_json
                elif chunk.choices:
                    if mode == Mode.FUNCTIONS:
                        Mode.warn_mode_functions_deprecation()
                        if json_chunk := chunk.choices[0].delta.function_call.arguments:
                            yield json_chunk
                    elif mode in {
                        Mode.JSON,
                        Mode.MD_JSON,
                        Mode.JSON_SCHEMA,
                        Mode.CEREBRAS_JSON,
                        Mode.FIREWORKS_JSON,
                    }:
                        if json_chunk := chunk.choices[0].delta.content:
                            yield json_chunk
                    elif mode in {
                        Mode.TOOLS,
                        Mode.TOOLS_STRICT,
                        Mode.FIREWORKS_TOOLS,
                        Mode.WRITER_TOOLS,
                    }:
                        if json_chunk := chunk.choices[0].delta.tool_calls:
                            if json_chunk[0].function.arguments:
                                yield json_chunk[0].function.arguments
                    else:
                        raise NotImplementedError(
                            f"Mode {mode} is not supported for MultiTask streaming"
                        )
            except AttributeError:
                pass

get_partial_model() cached classmethod

返回一个部分模型,我们可以用来验证部分结果。

Source code in instructor/dsl/partial.py
@classmethod
@cache
def get_partial_model(cls) -> type[T_Model]:
    """Return a partial model we can use to validate partial results."""
    assert issubclass(
        cls, BaseModel
    ), f"{cls.__name__} must be a subclass of BaseModel"

    model_name = (
        cls.__name__
        if cls.__name__.startswith("Partial")
        else f"Partial{cls.__name__}"
    )

    return create_model(
        model_name,
        __base__=cls,
        __module__=cls.__module__,
        **{
            field_name: _make_field_optional(field_info)
            for field_name, field_info in cls.model_fields.items()
        },  # type: ignore[all]
    )

MaybeBase

基础类: BaseModel, Generic[T]

从模型中提取结果(如果有),否则设置错误和消息字段。

Source code in instructor/dsl/maybe.py
class MaybeBase(BaseModel, Generic[T]):
    """
    Extract a result from a model, if any, otherwise set the error and message fields.
    """

    result: Optional[T]
    error: bool = Field(default=False)
    message: Optional[str]

    def __bool__(self) -> bool:
        return self.result is not None

Maybe(model)

为给定的 Pydantic 模型创建一个 Maybe 模型。这使您能够返回一个模型,该模型包含 resulterrormessage 字段,用于数据可能在上下文中不存在的情况。

使用

from pydantic import BaseModel, Field
from instructor import Maybe

class User(BaseModel):
    name: str = Field(description="The name of the person")
    age: int = Field(description="The age of the person")
    role: str = Field(description="The role of the person")

MaybeUser = Maybe(User)

结果

class MaybeUser(BaseModel):
    result: Optional[User]
    error: bool = Field(default=False)
    message: Optional[str]

    def __bool__(self):
        return self.result is not None

参数:

名称 类型 描述 默认
model Type[BaseModel]

要用Maybe封装的Pydantic模型。

required

返回:

名称 类型 描述
MaybeModel Type[BaseModel]

一个新的 Pydantic 模型,包括 resulterrormessage 字段。

Source code in instructor/dsl/maybe.py
def Maybe(model: type[T]) -> type[MaybeBase[T]]:
    """
    Create a Maybe model for a given Pydantic model. This allows you to return a model that includes fields for `result`, `error`, and `message` for sitatations where the data may not be present in the context.

    ## Usage

    ```python
    from pydantic import BaseModel, Field
    from instructor import Maybe

    class User(BaseModel):
        name: str = Field(description="The name of the person")
        age: int = Field(description="The age of the person")
        role: str = Field(description="The role of the person")

    MaybeUser = Maybe(User)
    ```

    ## Result

    ```python
    class MaybeUser(BaseModel):
        result: Optional[User]
        error: bool = Field(default=False)
        message: Optional[str]

        def __bool__(self):
            return self.result is not None
    ```

    Parameters:
        model (Type[BaseModel]): The Pydantic model to wrap with Maybe.

    Returns:
        MaybeModel (Type[BaseModel]): A new Pydantic model that includes fields for `result`, `error`, and `message`.
    """
    return create_model(
        f"Maybe{model.__name__}",
        __base__=MaybeBase,
        result=(
            Optional[model],
            Field(
                default=None,
                description="Correctly extracted result from the model, if any, otherwise None",
            ),
        ),
        error=(bool, Field(default=False)),
        message=(
            Optional[str],
            Field(
                default=None,
                description="Error message if no result was found, should be short and concise",
            ),
        ),
    )

OpenAISchema

基础类: BaseModel

Source code in instructor/function_calls.py
class OpenAISchema(BaseModel):
    # Ignore classproperty, since Pydantic doesn't understand it like it would a normal property.
    model_config = ConfigDict(ignored_types=(classproperty,))

    @classproperty
    def openai_schema(cls) -> dict[str, Any]:
        """
        Return the schema in the format of OpenAI's schema as jsonschema

        Note:
            Its important to add a docstring to describe how to best use this class, it will be included in the description attribute and be part of the prompt.

        Returns:
            model_json_schema (dict): A dictionary in the format of OpenAI's schema as jsonschema
        """
        schema = cls.model_json_schema()
        docstring = parse(cls.__doc__ or "")
        parameters = {
            k: v for k, v in schema.items() if k not in ("title", "description")
        }
        for param in docstring.params:
            if (name := param.arg_name) in parameters["properties"] and (
                description := param.description
            ):
                if "description" not in parameters["properties"][name]:
                    parameters["properties"][name]["description"] = description

        parameters["required"] = sorted(
            k for k, v in parameters["properties"].items() if "default" not in v
        )

        if "description" not in schema:
            if docstring.short_description:
                schema["description"] = docstring.short_description
            else:
                schema["description"] = (
                    f"Correctly extracted `{cls.__name__}` with all "
                    f"the required parameters with correct types"
                )

        return {
            "name": schema["title"],
            "description": schema["description"],
            "parameters": parameters,
        }

    @classproperty
    def anthropic_schema(cls) -> dict[str, Any]:
        return {
            "name": cls.openai_schema["name"],
            "description": cls.openai_schema["description"],
            "input_schema": cls.model_json_schema(),
        }

    @classproperty
    def gemini_schema(cls) -> Any:
        import google.generativeai.types as genai_types

        function = genai_types.FunctionDeclaration(
            name=cls.openai_schema["name"],
            description=cls.openai_schema["description"],
            parameters=map_to_gemini_function_schema(cls.openai_schema["parameters"]),
        )
        return function

    @classmethod
    def from_response(
        cls,
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
        mode: Mode = Mode.TOOLS,
    ) -> BaseModel:
        """Execute the function from the response of an openai chat completion

        Parameters:
            completion (openai.ChatCompletion): The response from an openai chat completion
            throw_error (bool): Whether to throw an error if the function call is not detected
            context (dict): The context to use for validating the response
            strict (bool): Whether to use strict json parsing
            mode (Mode): The openai completion mode

        Returns:
            cls (OpenAISchema): An instance of the class
        """
        if mode == Mode.ANTHROPIC_TOOLS:
            return cls.parse_anthropic_tools(completion, validation_context, strict)

        if mode == Mode.ANTHROPIC_JSON:
            return cls.parse_anthropic_json(completion, validation_context, strict)

        if mode in {Mode.VERTEXAI_TOOLS, Mode.GEMINI_TOOLS}:
            return cls.parse_vertexai_tools(completion, validation_context)

        if mode == Mode.VERTEXAI_JSON:
            return cls.parse_vertexai_json(completion, validation_context, strict)

        if mode == Mode.COHERE_TOOLS:
            return cls.parse_cohere_tools(completion, validation_context, strict)

        if mode == Mode.GEMINI_JSON:
            return cls.parse_gemini_json(completion, validation_context, strict)

        if mode == Mode.GEMINI_TOOLS:
            return cls.parse_gemini_tools(completion, validation_context, strict)

        if mode == Mode.COHERE_JSON_SCHEMA:
            return cls.parse_cohere_json_schema(completion, validation_context, strict)

        if mode == Mode.WRITER_TOOLS:
            return cls.parse_writer_tools(completion, validation_context, strict)

        if completion.choices[0].finish_reason == "length":
            raise IncompleteOutputException(last_completion=completion)

        if mode == Mode.FUNCTIONS:
            Mode.warn_mode_functions_deprecation()
            return cls.parse_functions(completion, validation_context, strict)

        if mode in {
            Mode.TOOLS,
            Mode.MISTRAL_TOOLS,
            Mode.TOOLS_STRICT,
            Mode.CEREBRAS_TOOLS,
            Mode.FIREWORKS_TOOLS,
        }:
            return cls.parse_tools(completion, validation_context, strict)

        if mode in {
            Mode.JSON,
            Mode.JSON_SCHEMA,
            Mode.MD_JSON,
            Mode.JSON_O1,
            Mode.CEREBRAS_JSON,
            Mode.FIREWORKS_JSON,
        }:
            return cls.parse_json(completion, validation_context, strict)

        raise ValueError(f"Invalid patch mode: {mode}")

    @classmethod
    def parse_cohere_json_schema(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ):
        assert hasattr(
            completion, "text"
        ), "Completion is not of type NonStreamedChatResponse"
        return cls.model_validate_json(
            completion.text, context=validation_context, strict=strict
        )

    @classmethod
    def parse_anthropic_tools(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        from anthropic.types import Message

        if isinstance(completion, Message) and completion.stop_reason == "max_tokens":
            raise IncompleteOutputException(last_completion=completion)

        # Anthropic returns arguments as a dict, dump to json for model validation below
        tool_calls = [
            json.dumps(c.input) for c in completion.content if c.type == "tool_use"
        ]  # TODO update with anthropic specific types

        tool_calls_validator = TypeAdapter(
            Annotated[list[Any], Field(min_length=1, max_length=1)]
        )
        tool_call = tool_calls_validator.validate_python(tool_calls)[0]

        return cls.model_validate_json(
            tool_call, context=validation_context, strict=strict
        )

    @classmethod
    def parse_anthropic_json(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        from anthropic.types import Message

        if hasattr(completion, "choices"):
            completion = completion.choices[0]
            if completion.finish_reason == "length":
                raise IncompleteOutputException(last_completion=completion)
            text = completion.message.content
        else:
            assert isinstance(completion, Message)
            if completion.stop_reason == "max_tokens":
                raise IncompleteOutputException(last_completion=completion)
            text = completion.content[0].text

        extra_text = extract_json_from_codeblock(text)

        if strict:
            return cls.model_validate_json(
                extra_text, context=validation_context, strict=True
            )
        else:
            # Allow control characters.
            parsed = json.loads(extra_text, strict=False)
            # Pydantic non-strict: https://docs.pydantic.dev/latest/concepts/strict_mode/
            return cls.model_validate(parsed, context=validation_context, strict=False)

    @classmethod
    def parse_gemini_json(
        cls: type[BaseModel],
        completion: Any,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        try:
            text = completion.text
        except ValueError:
            logger.debug(
                f"Error response: {completion.result.candidates[0].finish_reason}\n\n{completion.result.candidates[0].safety_ratings}"
            )

        try:
            extra_text = extract_json_from_codeblock(text)  # type: ignore
        except UnboundLocalError:
            raise ValueError("Unable to extract JSON from completion text") from None

        if strict:
            return cls.model_validate_json(
                extra_text, context=validation_context, strict=True
            )
        else:
            # Allow control characters.
            parsed = json.loads(extra_text, strict=False)
            # Pydantic non-strict: https://docs.pydantic.dev/latest/concepts/strict_mode/
            return cls.model_validate(parsed, context=validation_context, strict=False)

    @classmethod
    def parse_vertexai_tools(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
    ) -> BaseModel:
        tool_call = completion.candidates[0].content.parts[0].function_call.args  # type: ignore
        model = {}
        for field in tool_call:  # type: ignore
            model[field] = tool_call[field]
        # We enable strict=False because the conversion from protobuf -> dict often results in types like ints being cast to floats, as a result in order for model.validate to work we need to disable strict mode.
        return cls.model_validate(model, context=validation_context, strict=False)

    @classmethod
    def parse_vertexai_json(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        return cls.model_validate_json(
            completion.text, context=validation_context, strict=strict
        )

    @classmethod
    def parse_cohere_tools(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        text = cast(str, completion.text)  # type: ignore - TODO update with cohere specific types
        extra_text = extract_json_from_codeblock(text)
        return cls.model_validate_json(
            extra_text, context=validation_context, strict=strict
        )

    @classmethod
    def parse_writer_tools(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        message = completion.choices[0].message
        tool_calls = message.tool_calls
        assert (
            len(tool_calls) == 1
        ), "Instructor does not support multiple tool calls, use List[Model] instead"
        assert (
            tool_calls[0].function.name == cls.openai_schema["name"]
        ), "Tool name does not match"
        return cls.model_validate_json(
            tool_calls[0].function.arguments,
            context=validation_context,
            strict=strict,
        )

    @classmethod
    def parse_functions(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        message = completion.choices[0].message
        assert (
            message.function_call.name == cls.openai_schema["name"]  # type: ignore[index]
        ), "Function name does not match"
        return cls.model_validate_json(
            message.function_call.arguments,  # type: ignore[attr-defined]
            context=validation_context,
            strict=strict,
        )

    @classmethod
    def parse_tools(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        message = completion.choices[0].message
        # this field seems to be missing when using instructor with some other tools (e.g. litellm)
        # trying to fix this by adding a check

        if hasattr(message, "refusal"):
            assert (
                message.refusal is None
            ), f"Unable to generate a response due to {message.refusal}"
        assert (
            len(message.tool_calls or []) == 1
        ), f"Instructor does not support multiple tool calls, use List[Model] instead"
        tool_call = message.tool_calls[0]  # type: ignore
        assert (
            tool_call.function.name == cls.openai_schema["name"]  # type: ignore[index]
        ), "Tool name does not match"
        return cls.model_validate_json(
            tool_call.function.arguments,  # type: ignore
            context=validation_context,
            strict=strict,
        )

    @classmethod
    def parse_json(
        cls: type[BaseModel],
        completion: ChatCompletion,
        validation_context: Optional[dict[str, Any]] = None,
        strict: Optional[bool] = None,
    ) -> BaseModel:
        message = completion.choices[0].message.content or ""
        message = extract_json_from_codeblock(message)

        return cls.model_validate_json(
            message,
            context=validation_context,
            strict=strict,
        )

from_response(completion, validation_context=None, strict=None, mode=Mode.TOOLS) classmethod

从openai聊天完成的响应中执行函数

参数:

名称 类型 描述 默认
completion ChatCompletion

来自openai聊天完成的响应

required
throw_error bool

如果没有检测到函数调用,是否抛出错误

required
context dict

用于验证响应的上下文

required
strict bool

是否使用严格的json解析

None
mode Mode

openai 完成模式

TOOLS

返回:

名称 类型 描述
cls OpenAISchema

该类的一个实例

Source code in instructor/function_calls.py
@classmethod
def from_response(
    cls,
    completion: ChatCompletion,
    validation_context: Optional[dict[str, Any]] = None,
    strict: Optional[bool] = None,
    mode: Mode = Mode.TOOLS,
) -> BaseModel:
    """Execute the function from the response of an openai chat completion

    Parameters:
        completion (openai.ChatCompletion): The response from an openai chat completion
        throw_error (bool): Whether to throw an error if the function call is not detected
        context (dict): The context to use for validating the response
        strict (bool): Whether to use strict json parsing
        mode (Mode): The openai completion mode

    Returns:
        cls (OpenAISchema): An instance of the class
    """
    if mode == Mode.ANTHROPIC_TOOLS:
        return cls.parse_anthropic_tools(completion, validation_context, strict)

    if mode == Mode.ANTHROPIC_JSON:
        return cls.parse_anthropic_json(completion, validation_context, strict)

    if mode in {Mode.VERTEXAI_TOOLS, Mode.GEMINI_TOOLS}:
        return cls.parse_vertexai_tools(completion, validation_context)

    if mode == Mode.VERTEXAI_JSON:
        return cls.parse_vertexai_json(completion, validation_context, strict)

    if mode == Mode.COHERE_TOOLS:
        return cls.parse_cohere_tools(completion, validation_context, strict)

    if mode == Mode.GEMINI_JSON:
        return cls.parse_gemini_json(completion, validation_context, strict)

    if mode == Mode.GEMINI_TOOLS:
        return cls.parse_gemini_tools(completion, validation_context, strict)

    if mode == Mode.COHERE_JSON_SCHEMA:
        return cls.parse_cohere_json_schema(completion, validation_context, strict)

    if mode == Mode.WRITER_TOOLS:
        return cls.parse_writer_tools(completion, validation_context, strict)

    if completion.choices[0].finish_reason == "length":
        raise IncompleteOutputException(last_completion=completion)

    if mode == Mode.FUNCTIONS:
        Mode.warn_mode_functions_deprecation()
        return cls.parse_functions(completion, validation_context, strict)

    if mode in {
        Mode.TOOLS,
        Mode.MISTRAL_TOOLS,
        Mode.TOOLS_STRICT,
        Mode.CEREBRAS_TOOLS,
        Mode.FIREWORKS_TOOLS,
    }:
        return cls.parse_tools(completion, validation_context, strict)

    if mode in {
        Mode.JSON,
        Mode.JSON_SCHEMA,
        Mode.MD_JSON,
        Mode.JSON_O1,
        Mode.CEREBRAS_JSON,
        Mode.FIREWORKS_JSON,
    }:
        return cls.parse_json(completion, validation_context, strict)

    raise ValueError(f"Invalid patch mode: {mode}")

openai_schema()

以 jsonschema 格式返回 OpenAI 的架构

Note

添加文档字符串以描述如何最好地使用此类非常重要,它将包含在描述属性中,并成为提示的一部分。

返回:

名称 类型 描述
model_json_schema dict

一个符合OpenAI模式的字典,格式为jsonschema

Source code in instructor/function_calls.py
@classproperty
def openai_schema(cls) -> dict[str, Any]:
    """
    Return the schema in the format of OpenAI's schema as jsonschema

    Note:
        Its important to add a docstring to describe how to best use this class, it will be included in the description attribute and be part of the prompt.

    Returns:
        model_json_schema (dict): A dictionary in the format of OpenAI's schema as jsonschema
    """
    schema = cls.model_json_schema()
    docstring = parse(cls.__doc__ or "")
    parameters = {
        k: v for k, v in schema.items() if k not in ("title", "description")
    }
    for param in docstring.params:
        if (name := param.arg_name) in parameters["properties"] and (
            description := param.description
        ):
            if "description" not in parameters["properties"][name]:
                parameters["properties"][name]["description"] = description

    parameters["required"] = sorted(
        k for k, v in parameters["properties"].items() if "default" not in v
    )

    if "description" not in schema:
        if docstring.short_description:
            schema["description"] = docstring.short_description
        else:
            schema["description"] = (
                f"Correctly extracted `{cls.__name__}` with all "
                f"the required parameters with correct types"
            )

    return {
        "name": schema["title"],
        "description": schema["description"],
        "parameters": parameters,
    }