data-ingestion-beginners-7

使用LangChain将S3数据发送到Qdrant向量存储

时间: 30 分钟级别: 初学者

将数据摄取到向量存储中对于构建有效的搜索和检索算法至关重要,特别是因为近80%的数据是非结构化的,缺乏任何预定义的格式。

在本教程中,我们将创建一个简化的数据摄取管道,直接从AWS S3提取数据并将其输入到Qdrant中。我们将深入探讨向量嵌入,将非结构化数据转换为允许您以语义方式搜索文档的格式。准备好发现揭示非结构化数据中隐藏见解的新方法!

数据摄取工作流架构

我们将在这个工作流程中设置一个强大的文档摄取和分析管道,使用云存储、自然语言处理(NLP)工具和嵌入技术。从S3桶中的原始数据开始,我们将使用LangChain对其进行预处理,应用嵌入API处理文本和图像,并将结果存储在Qdrant中——一个为相似性搜索优化的向量数据库。

图1:数据摄取工作流架构

data-ingestion-beginners-5

让我们分解这个工作流程的每个组成部分:

  • S3 Bucket: 这是我们的起点——一个集中式、可扩展的存储解决方案,适用于各种文件类型,如PDF、图像和文本。
  • LangChain: 作为管道的协调者,LangChain 负责提取、预处理和管理嵌入生成的数据流。它简化了处理 PDF 的过程,因此您无需担心在此处应用 OCR(光学字符识别)。
  • Text Embeddings API: 该API将文件和PDF中的文本转换为向量表示,捕捉语义以进行高级分析和搜索。
  • Image Embeddings API: 该工具接收图像文件并将其转换为向量表示,使得对视觉内容的相似性搜索和分析成为可能。
  • Qdrant: 作为您的向量数据库,Qdrant 存储嵌入及其有效载荷,支持跨所有内容类型的高效相似性搜索和检索。

先决条件

data-ingestion-beginners-11

在本节中,您将获得一个逐步指南,指导您如何从S3存储桶中摄取数据。但在我们深入之前,让我们确保您已经完成了所有先决条件的设置:

样本数据我们将使用一个样本数据集,其中每个文件夹包含文本格式的产品评论以及相应的图片。
AWS 账户一个活跃的AWS 账户,具有访问 S3 服务的权限。
Qdrant 账户一个Qdrant Cloud 账户,可以访问 WebUI 以管理集合并运行查询。
嵌入模型使用嵌入模型,例如OpenAI的文本嵌入模型用于文本文件,CLIP用于图像。
LangChain你将使用这个流行的框架将所有内容整合在一起。

支持的文档类型

用于摄取的文件可以是各种类型,例如PDF、文本文件或图像。我们将组织一个结构化的S3存储桶,其中包含用于测试和实验的受支持文档类型的文件夹。

Python 环境

确保您有一个安装了这些库的Python环境(Python 3.9或更高版本):

boto3
langchain-community
langchain
python-dotenv
unstructured
unstructured[pdf]
qdrant_client

访问密钥: 将您的AWS访问密钥、S3密钥和Qdrant API密钥存储在.env文件中以便轻松访问。您还需要一个OpenAI API密钥。这里是一个示例.env文件。

ACCESS_KEY = ""
SECRET_ACCESS_KEY = ""
OPENAI_API_KEY = ""
QDRANT_KEY = ""

第一步:从S3摄取数据

data-ingestion-beginners-9.png

LangChain框架使得从AWS S3等存储服务中摄取数据变得容易,内置支持加载PDF、图像和文本文件等格式的文档。

要将LangChain与S3连接,您将使用S3DirectoryLoader,它允许您直接将文件从S3存储桶加载到LangChain的管道中。

示例:配置LangChain以从S3加载文件

以下是设置LangChain以从S3存储桶中摄取数据的方法:

from langchain_community.document_loaders import S3DirectoryLoader

# Initialize the S3 document loader
loader = S3DirectoryLoader(
   "product-dataset",  # S3 bucket name
   "p_1", #S3 Folder name containing the data for the first product
   aws_access_key_id=aws_access_key_id,  # AWS Access Key
   aws_secret_access_key=aws_secret_access_key  # AWS Secret Access Key
)

# Load documents from the specified S3 bucket
docs = loader.load()

步骤2. 将文档转换为嵌入

嵌入 是这里的秘密武器——它们是数据(如文本、图像或音频)的数值表示,以一种易于比较的形式捕捉“意义”。通过将文本和图像转换为嵌入,您将能够快速高效地执行相似性搜索。将嵌入视为在Qdrant中存储和检索有意义见解的桥梁。

我们将用于生成嵌入的模型

为了让事情顺利进行,我们将使用两个强大的模型:

  1. OpenAI Embeddings for transforming text data.
  2. CLIP (Contrastive Language-Image Pretraining) for image data.

文本嵌入

在这里,我们使用OpenAIEmbeddings——一个预训练模型,将文本转换为嵌入,捕捉其潜在含义。有了这些,我们就可以解锁强大的搜索和检索任务。

以下是设置OpenAI文本嵌入模型的方法:

from langchain_openai import OpenAIEmbeddings

# Initialize the text embedding model from OpenAI
text_embedding_model = OpenAIEmbeddings(model="text-embedding-3-small")

text-embedding-3-small 模型生成高质量的文本嵌入,使得查找具有相似含义的文档变得更加容易——即使它们不包含完全相同的单词。

图像嵌入

我们正在使用OpenAI的ClipModel,它设计用于处理图像和文本。在这里,我们将使用CLIP为图像生成嵌入,使您能够基于语义比较图像内容。

以下是设置CLIP模型和处理器的方法:

From transformers import CLIPProcessor, CLIPModel
import torch

# Initialize the CLIP model and processor
clip_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
clip_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

clip-vit-base-patch32 模型专门训练用于将图像和文本对齐到相同的嵌入空间中,这意味着相似的图像将具有彼此接近的嵌入。设置好模型后,您现在可以创建一个函数来根据文档类型处理文档,并使用 CLIP 模型生成嵌入,例如将图像特征转换为向量。

def embed_image_with_clip(image):
    inputs = clip_processor(images=image, return_tensors="pt")
    with torch.no_grad():
        image_features = clip_model.get_image_features(**inputs)
    return image_features.cpu().numpy()

文档处理功能

data-ingestion-beginners-8.png

接下来,我们将创建一个process_document函数,该函数将各种文件类型(如文本、PDF和图像)转换为嵌入。此函数根据文件类型应用不同的方法来提取和嵌入内容,使其能够有效地处理多种格式。

def process_document(doc):
   source = doc.metadata['source']  # Extract document source (e.g., S3 URL)

   # Processing Text Files
   if source.endswith('.txt'):
       text = doc.page_content  # Extract the content from the text file
       print(f"Processing .txt file: {source}")
       return text, text_embedding_model.embed_documents([text])  # Convert to embeddings

   # Processing PDF Files
   elif source.endswith('.pdf'):
       content = doc.page_content  # Extract content from the PDF
       print(f"Processing .pdf file: {source}")
       return content, text_embedding_model.embed_documents([content])  # Convert to embeddings

   # Processing Image Files
   elif source.endswith('.png'):
       print(f"Processing .png file: {source}")
       bucket_name, object_key = parse_s3_url(source)  # Parse the S3 URL
       response = s3.get_object(Bucket=bucket_name, Key=object_key)  # Fetch image from S3
       img_bytes = response['Body'].read()

       # Load the image and convert to embeddings
       img = Image.open(io.BytesIO(img_bytes))
       return source, embed_image_with_clip(img)  # Convert to image embeddings

以下是上述代码的解释:

  • 文件类型检测

首先,函数通过文档源元数据中的文件扩展名(.txt, .pdf, .png)检查文件类型。这告诉它如何处理内容以及使用哪种嵌入模型。

  • 文件处理
    • 文本文件 (.txt): 对于文本文件,处理起来很简单——内容被提取为纯文本并传递给OpenAIEmbeddings模型。然后,embed_documents()函数将此文本转换为数值向量,以嵌入形式捕捉其含义。
    • PDFs: PDF文件通常包含丰富的内容,如多页内容。在这里,我们使用LangChain的文档加载器从每一页提取文本,然后使用OpenAI文本嵌入模型将其转换为嵌入。这使您能够捕捉文档的全部丰富性。
    • 图像 (.png): 对于图像(例如,.png文件),函数通过其URL从S3获取图像。一旦使用Python Imaging Library (PIL)加载图像,图像将被传递给CLIP模型,该模型生成表示图像语义特征的嵌入。

文档处理的辅助函数

要从S3检索图像,辅助函数parse_s3_url将S3 URL分解为其存储桶和关键组件。这对于从S3存储中获取图像至关重要。

def parse_s3_url(s3_url):
    parts = s3_url.replace("s3://", "").split("/", 1)
    bucket_name = parts[0]
    object_key = parts[1]
    return bucket_name, object_key

步骤3:将嵌入加载到Qdrant

data-ingestion-beginners-10

现在您的文档已经处理并转换为嵌入,下一步是将这些嵌入加载到Qdrant中。

在Qdrant中创建集合

在Qdrant中,数据被组织在集合中,每个集合代表一组嵌入(或点)及其相关的元数据(有效载荷)。要存储之前生成的嵌入,您首先需要创建一个集合。

以下是如何在Qdrant中创建一个集合来存储文本和图像嵌入:

def create_collection(collection_name):
    qdrant_client.create_collection(
        collection_name,
        vectors_config={
            "text_embedding": models.VectorParams(
                size=1536,  # Dimension of text embeddings
                distance=models.Distance.COSINE,  # Cosine similarity is used for comparison
            ),
            "image_embedding": models.VectorParams(
                size=512,  # Dimension of image embeddings
                distance=models.Distance.COSINE,  # Cosine similarity is used for comparison
            ),
        },
    )

create_collection("products-data")

此函数创建一个用于存储文本(1536维)和图像(512维)嵌入的集合,使用余弦相似度来比较集合内的嵌入。

一旦集合设置完成,您可以将嵌入加载到Qdrant中。这涉及将嵌入及其相关的元数据(有效载荷)插入(或更新)到指定的集合中。

以下是将嵌入加载到Qdrant的代码:

def ingest_data(points):
    operation_info = qdrant_client.upsert(
        collection_name="products-data",  # Collection where data is being inserted
        points=points
    )
    return operation_info

数据摄入的解释

  1. Upserting the Data Point: The upsert method on the qdrant_client inserts each PointStruct into the specified collection. If a point with the same ID already exists, it will be updated with the new values.
  2. Operation Info: The function returns operation_info, which contains details about the upsert operation, such as success status or any potential errors.

运行摄取代码

以下是如何调用函数并摄取数据:

if __name__ == "__main__":
    collection_name = "products-data"
    create_collection(collection_name)
    for i in range(1,6): # Five documents
        folder = f"p_{i}"
        loader = S3DirectoryLoader(
            "product-dataset",
            folder,
            aws_access_key_id=aws_access_key_id,
            aws_secret_access_key=aws_secret_access_key
        )
        docs = loader.load()
        text_embedding, image_embedding, points, text_review, product_image = [], [], [], "", ""
        for idx, doc in enumerate(docs):
            source = doc.metadata['source']
            if source.endswith(".txt"):
                text_review, text_embedding = process_document(doc)
            elif source.endswith(".png"):
                product_image, image_embedding = process_document(doc)
        if text_review:
            point = PointStruct(
                id=idx,  # Unique identifier for each point
                vector={
                    "text_embedding": text_embedding[0],                      
                    "image_embedding": image_embedding[0].tolist(), 
                },
                payload={
                    "review": text_review,                           
                    "product_image": product_image 
                }
            )
            points.append(point)
    operation_info = ingest_data(points)
    print(operation_info)

PointStruct 使用这些关键参数进行实例化:

  • id: 每个嵌入的唯一标识符,通常是一个递增的索引。
  • vector: 一个包含从每个文档生成的文本和图像嵌入的字典。
  • payload: 一个存储额外元数据的字典,如产品评论和图像引用,这在搜索过程中对于检索和上下文非常宝贵。

代码动态地从S3存储桶加载文件夹,分别处理文本和图像文件,并将它们的嵌入和相关数据存储在专用列表中。然后,它为每个数据条目创建一个PointStruct,并调用摄取函数将其加载到Qdrant中。

探索Qdrant WebUI仪表板

一旦嵌入被加载到Qdrant中,您可以使用WebUI仪表板来可视化和管理您的集合。仪表板提供了一个清晰、结构化的界面,用于查看集合及其数据。让我们在下一节中更详细地看一下。

步骤4:在Qdrant WebUI中可视化数据

要在Qdrant WebUI中开始可视化您的数据,请前往概览部分并选择访问数据库

图2:从Qdrant UI访问数据库 data-ingestion-beginners-2.png

当提示时,输入您的API密钥。进入后,您将能够查看您的集合和相应的数据点。您应该会看到您的集合显示如下:

图3: Qdrant中的产品数据集合 data-ingestion-beginners-4.png

这是最近摄入到Qdrant的最新数据点:

图4:最新添加到product-data集合的点 data-ingestion-beginners-6.png

Qdrant WebUI 的搜索功能允许您在集合中执行向量搜索。通过应用过滤器和参数的选项,检索相关嵌入并探索数据中的关系变得容易。要开始,请前往左侧面板中的控制台,您可以在其中创建查询:

图5: Qdrant控制台概览 data-ingestion-beginners-1.png

第一个查询检索所有集合,第二个从产品数据集合中获取点,第三个执行示例查询。这展示了在Qdrant UI中与数据交互是多么简单。

现在,让我们使用查询从数据库中检索一些文档!

图6:查询Qdrant客户端以检索相关文档 data-ingestion-beginners-3.png

在这个例子中,我们查询了设计改进的手机。然后,我们使用OpenAI将文本转换为向量,并检索了一篇相关的手机评论,重点介绍了设计改进。

结论

在本指南中,我们设置了一个S3存储桶,摄取了各种数据类型,并将嵌入存储在Qdrant中。使用LangChain,我们动态处理了文本和图像文件,使得处理每种文件类型变得容易。

现在,轮到你了。尝试使用不同的数据类型,比如视频,并探索Qdrant的高级功能来增强你的应用程序。要开始使用,注册 Qdrant。

data-ingestion-beginners-12

这个页面有用吗?

感谢您的反馈!🙏

我们很抱歉听到这个消息。😔 你可以在GitHub上编辑这个页面,或者创建一个GitHub问题。