嵌入器#

你将学到什么?

  1. 什么是Embedder,为什么它被设计成这样?

  2. 何时使用 Embedder 以及如何使用它?

  3. 如何使用BatchEmbedder进行批处理?

core.embedder.Embedder 类类似于 Generator,它是一个面向用户的组件,通过 ModelClientoutput_processors 来协调嵌入模型。 与直接使用 ModelClient 相比,Embedder 进一步简化了接口并输出标准的 EmbedderOutput 格式。

通过切换ModelClient,您可以轻松地在任务管道中使用不同的嵌入模型,甚至可以嵌入不同的数据,如文本、图像等。

嵌入器输出#

core.types.EmbedderOutputEmbedder 的标准输出格式。它是 DataClass 的子类,包含以下核心字段:

  • data: 嵌入列表,每个嵌入的类型为 core.types.Embedding

  • error: 如果在模型推理阶段发生任何错误,则显示错误信息。输出处理阶段的失败将引发异常,而不是设置此字段。

  • raw_response: 用于失败的模型推理。

此外,我们向EmbedderOutput添加了三个属性:

  • length: data 中嵌入的数量。

  • embedding_dim: data 中嵌入的维度。

  • is_normalized: 嵌入是否使用numpy归一化为单位向量。

嵌入器在行动#

我们目前支持所有来自OpenAI的嵌入模型‘thenlper/gte-base’来自HuggingFace的transformers。 我们将使用这两个来演示如何使用Embedder,一个来自API提供商,另一个使用本地模型。对于本地模型,您可能需要确保transformers已安装。

注意

output_processors 可以是一个组件或一个 Sequential 容器,用于将多个组件链接在一起。输出处理器按顺序应用,并且仅适用于 EmbedderOutputdata 字段。

使用OpenAI API#

在开始之前,请确保您在环境变量或.env文件中配置了API密钥,或者直接将其传递给OpenAIClient

from adalflow.core.embedder import Embedder
from adalflow.components.model_client import OpenAIClient
from adalflow.utils import setup_env # ensure you setup OPENAI_API_KEY in your project .env file

model_kwargs = {
    "model": "text-embedding-3-small",
    "dimensions": 256,
    "encoding_format": "float",
}

query = "What is the capital of China?"

queries = [query] * 100


embedder = Embedder(model_client=OpenAIClient(), model_kwargs=model_kwargs)

我们从OpenAI API文档中找到model_kwargs。我们设置query来演示单个查询的调用,并设置queries来演示批量调用。

可视化结构:我们使用 print(embedder)。输出将是:

Embedder(
model_kwargs={'model': 'text-embedding-3-small', 'dimensions': 256, 'encoding_format': 'float'},
(model_client): OpenAIClient()
)

嵌入单个查询: 运行嵌入器并打印输出的长度和嵌入维度。

output = embedder(query)
print(output.length, output.embedding_dim, output.is_normalized)
# 1 256 True

嵌入批量查询:

output = embedder(queries)
print(output.length, output.embedding_dim)
# 100 256

使用本地模型#

使用本地模型设置嵌入器。

from adalflow.core.embedder import Embedder
from adalflow.components.model_client import TransformersClient

model_kwargs = {"model": "thenlper/gte-base"}
local_embedder = Embedder(model_client=TransformersClient(), model_kwargs=model_kwargs)

现在,使用相同的查询和查询调用嵌入器。

output = local_embedder(query)
print(output.length, output.embedding_dim, output.is_normalized)
# 1 768 True

output = local_embedder(queries)
print(output.length, output.embedding_dim, output.is_normalized)
# 100 768 True

使用输出处理器#

如果我们想将嵌入维度减少到仅256以节省内存,我们可以自定义一个额外的输出处理步骤,并通过output_processors参数将其传递给嵌入器。

from adalflow.core.types import Embedding
from adalflow.core.functional import normalize_vector
from typing import List
from adalflow.core.component import Component
from copy import deepcopy

class DecreaseEmbeddingDim(Component):
    def __init__(self, old_dim: int, new_dim: int,  normalize: bool = True):
        super().__init__()
        self.old_dim = old_dim
        self.new_dim = new_dim
        self.normalize = normalize
        assert self.new_dim < self.old_dim, "new_dim should be less than old_dim"

    def call(self, input: List[Embedding]) -> List[Embedding]:
        output: List[Embedding] = deepcopy(input)
        for embedding in output:
            old_embedding = embedding.embedding
            new_embedding = old_embedding[: self.new_dim]
            if self.normalize:
                new_embedding = normalize_vector(new_embedding)
            embedding.embedding = new_embedding
        return output

    def _extra_repr(self) -> str:
        repr_str = f"old_dim={self.old_dim}, new_dim={self.new_dim}, normalize={self.normalize}"
        return repr_str

此输出处理器将处理EmbedderOutputdata字段,该字段的类型为List[Embedding]。因此,在call方法中,我们有input: List[Embedding] -> output: List[Embedding]。 综上所述,我们可以创建一个带有输出处理器的新嵌入器。

local_embedder_256 = Embedder(
     model_client=TransformersClient(),
     model_kwargs=model_kwargs,
     output_processors=DecreaseEmbeddingDim(768, 256),
 )
 print(local_embedder_256)

结构如下:

Embedder(
model_kwargs={'model': 'thenlper/gte-base'},
(model_client): TransformersClient()
(output_processors): DecreaseEmbeddingDim(old_dim=768, new_dim=256, normalize=True)
)

运行查询:

output = local_embedder_256(query)
print(output.length, output.embedding_dim, output.is_normalized)
# 1 256 True

注意

请查找关于直接减少嵌入维度如何影响下游任务性能的相关研究。我们只是以此为例来展示输出处理器。

批量嵌入器#

特别是在数据处理管道中,你经常会有超过1000个查询需要嵌入。我们需要将查询分成较小的批次,以避免内存溢出。 core.embedder.BatchEmbedder 就是为处理这种情况而设计的。目前,代码相当简单,但在未来,当你在生产数据管道中使用AdalFlow时,它可以扩展以支持多进程处理。

BatchEmbedder 协调 Embedder 并处理批处理过程。要使用它,您需要将 Embedder 和批处理大小传递给构造函数。

from adalflow.core.embedder import BatchEmbedder

batch_embedder = BatchEmbedder(embedder=local_embedder, batch_size=100)

queries = [query] * 1000

response = batch_embedder(queries)
# 100%|██████████| 11/11 [00:04<00:00,  2.59it/s]

注意

要集成您自己的嵌入模型或来自API提供商的模型,您需要实现自己的ModelClient子类。