嵌入器#
你将学到什么?
什么是
Embedder,为什么它被设计成这样?何时使用
Embedder以及如何使用它?如何使用
BatchEmbedder进行批处理?
core.embedder.Embedder 类类似于 Generator,它是一个面向用户的组件,通过 ModelClient 和 output_processors 来协调嵌入模型。
与直接使用 ModelClient 相比,Embedder 进一步简化了接口并输出标准的 EmbedderOutput 格式。
通过切换ModelClient,您可以轻松地在任务管道中使用不同的嵌入模型,甚至可以嵌入不同的数据,如文本、图像等。
嵌入器输出#
core.types.EmbedderOutput 是 Embedder 的标准输出格式。它是 DataClass 的子类,包含以下核心字段:
data: 嵌入列表,每个嵌入的类型为core.types.Embedding。error: 如果在模型推理阶段发生任何错误,则显示错误信息。输出处理阶段的失败将引发异常,而不是设置此字段。raw_response: 用于失败的模型推理。
此外,我们向EmbedderOutput添加了三个属性:
length:data中嵌入的数量。embedding_dim:data中嵌入的维度。is_normalized: 嵌入是否使用numpy归一化为单位向量。
嵌入器在行动#
我们目前支持所有来自OpenAI的嵌入模型和‘thenlper/gte-base’来自HuggingFace的transformers。
我们将使用这两个来演示如何使用Embedder,一个来自API提供商,另一个使用本地模型。对于本地模型,您可能需要确保transformers已安装。
注意
output_processors 可以是一个组件或一个 Sequential 容器,用于将多个组件链接在一起。输出处理器按顺序应用,并且仅适用于 EmbedderOutput 的 data 字段。
使用OpenAI API#
在开始之前,请确保您在环境变量或.env文件中配置了API密钥,或者直接将其传递给OpenAIClient。
from adalflow.core.embedder import Embedder
from adalflow.components.model_client import OpenAIClient
from adalflow.utils import setup_env # ensure you setup OPENAI_API_KEY in your project .env file
model_kwargs = {
"model": "text-embedding-3-small",
"dimensions": 256,
"encoding_format": "float",
}
query = "What is the capital of China?"
queries = [query] * 100
embedder = Embedder(model_client=OpenAIClient(), model_kwargs=model_kwargs)
我们从OpenAI API文档中找到model_kwargs。我们设置query来演示单个查询的调用,并设置queries来演示批量调用。
可视化结构:我们使用 print(embedder)。输出将是:
Embedder(
model_kwargs={'model': 'text-embedding-3-small', 'dimensions': 256, 'encoding_format': 'float'},
(model_client): OpenAIClient()
)
嵌入单个查询: 运行嵌入器并打印输出的长度和嵌入维度。
output = embedder(query)
print(output.length, output.embedding_dim, output.is_normalized)
# 1 256 True
嵌入批量查询:
output = embedder(queries)
print(output.length, output.embedding_dim)
# 100 256
使用本地模型#
使用本地模型设置嵌入器。
from adalflow.core.embedder import Embedder
from adalflow.components.model_client import TransformersClient
model_kwargs = {"model": "thenlper/gte-base"}
local_embedder = Embedder(model_client=TransformersClient(), model_kwargs=model_kwargs)
现在,使用相同的查询和查询调用嵌入器。
output = local_embedder(query)
print(output.length, output.embedding_dim, output.is_normalized)
# 1 768 True
output = local_embedder(queries)
print(output.length, output.embedding_dim, output.is_normalized)
# 100 768 True
使用输出处理器#
如果我们想将嵌入维度减少到仅256以节省内存,我们可以自定义一个额外的输出处理步骤,并通过output_processors参数将其传递给嵌入器。
from adalflow.core.types import Embedding
from adalflow.core.functional import normalize_vector
from typing import List
from adalflow.core.component import Component
from copy import deepcopy
class DecreaseEmbeddingDim(Component):
def __init__(self, old_dim: int, new_dim: int, normalize: bool = True):
super().__init__()
self.old_dim = old_dim
self.new_dim = new_dim
self.normalize = normalize
assert self.new_dim < self.old_dim, "new_dim should be less than old_dim"
def call(self, input: List[Embedding]) -> List[Embedding]:
output: List[Embedding] = deepcopy(input)
for embedding in output:
old_embedding = embedding.embedding
new_embedding = old_embedding[: self.new_dim]
if self.normalize:
new_embedding = normalize_vector(new_embedding)
embedding.embedding = new_embedding
return output
def _extra_repr(self) -> str:
repr_str = f"old_dim={self.old_dim}, new_dim={self.new_dim}, normalize={self.normalize}"
return repr_str
此输出处理器将处理EmbedderOutput的data字段,该字段的类型为List[Embedding]。因此,在call方法中,我们有input: List[Embedding] -> output: List[Embedding]。
综上所述,我们可以创建一个带有输出处理器的新嵌入器。
local_embedder_256 = Embedder(
model_client=TransformersClient(),
model_kwargs=model_kwargs,
output_processors=DecreaseEmbeddingDim(768, 256),
)
print(local_embedder_256)
结构如下:
Embedder(
model_kwargs={'model': 'thenlper/gte-base'},
(model_client): TransformersClient()
(output_processors): DecreaseEmbeddingDim(old_dim=768, new_dim=256, normalize=True)
)
运行查询:
output = local_embedder_256(query)
print(output.length, output.embedding_dim, output.is_normalized)
# 1 256 True
注意
请查找关于直接减少嵌入维度如何影响下游任务性能的相关研究。我们只是以此为例来展示输出处理器。
批量嵌入器#
特别是在数据处理管道中,你经常会有超过1000个查询需要嵌入。我们需要将查询分成较小的批次,以避免内存溢出。
core.embedder.BatchEmbedder 就是为处理这种情况而设计的。目前,代码相当简单,但在未来,当你在生产数据管道中使用AdalFlow时,它可以扩展以支持多进程处理。
BatchEmbedder 协调 Embedder 并处理批处理过程。要使用它,您需要将 Embedder 和批处理大小传递给构造函数。
from adalflow.core.embedder import BatchEmbedder
batch_embedder = BatchEmbedder(embedder=local_embedder, batch_size=100)
queries = [query] * 1000
response = batch_embedder(queries)
# 100%|██████████| 11/11 [00:04<00:00, 2.59it/s]
注意
要集成您自己的嵌入模型或来自API提供商的模型,您需要实现自己的ModelClient子类。
参考文献
transformers: https://huggingface.co/docs/transformers/en/index
thenlper/gte-base 模型: https://huggingface.co/thenlper/gte-base