跳至内容

数据摄取管道#

一个IngestionPipeline使用Transformations的概念来处理输入数据。这些Transformations会应用于您的输入数据,生成的节点将被返回或插入到向量数据库中(如果提供了数据库)。每个节点+转换的组合都会被缓存,这样后续运行(如果缓存被保留)相同的节点+转换组合时就可以使用缓存结果,从而节省您的时间。

要查看IngestionPipeline实际应用的交互式示例,请查看RAG CLI

使用模式#

最简单的用法是像这样实例化一个IngestionPipeline

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache

# create the pipeline with transformations
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(),
    ]
)

# run the pipeline
nodes = pipeline.run(documents=[Document.example()])

请注意,在实际应用场景中,您需要通过SimpleDirectoryReader或Llama Hub中的其他阅读器来获取文档。

连接向量数据库#

运行数据摄取管道时,您还可以选择将生成的节点自动插入远程向量存储库。

然后,您可以稍后从该向量存储中构建索引。

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.vector_stores.qdrant import QdrantVectorStore

import qdrant_client

client = qdrant_client.QdrantClient(location=":memory:")
vector_store = QdrantVectorStore(client=client, collection_name="test_store")

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(),
    ],
    vector_store=vector_store,
)

# Ingest directly into a vector db
pipeline.run(documents=[Document.example()])

# Create your index
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex.from_vector_store(vector_store)

在流水线中计算嵌入#

请注意,在上面的示例中,嵌入计算是管道流程的一部分。如果将管道连接到向量存储,嵌入必须是管道的一个阶段,否则后续索引实例化将失败。

如果不连接到向量存储,可以省略管道中的嵌入步骤,即仅生成节点列表。

缓存#

IngestionPipeline中,每个节点+转换的组合都会被哈希处理并缓存。这可以节省后续使用相同数据时的运行时间。

以下部分介绍了一些关于缓存的基本用法。

本地缓存管理#

一旦你有了一个管道,你可能想要存储和加载缓存。

# save
pipeline.persist("./pipeline_storage")

# load and restore state
new_pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
    ],
)
new_pipeline.load("./pipeline_storage")

# will run instantly due to the cache
nodes = pipeline.run(documents=[Document.example()])

如果缓存变得过大,您可以清除它

# delete all context of the cache
cache.clear()

远程缓存管理#

我们支持多种用于缓存的远程存储后端

  • RedisCache
  • MongoDBCache
  • FirestoreCache

以下是一个使用RedisCache的示例:

from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline, IngestionCache
from llama_index.storage.kvstore.redis import RedisKVStore as RedisCache


ingest_cache = IngestionCache(
    cache=RedisCache.from_host_and_port(host="127.0.0.1", port=6379),
    collection="my_test_cache",
)

pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=25, chunk_overlap=0),
        TitleExtractor(),
        OpenAIEmbedding(),
    ],
    cache=ingest_cache,
)

# Ingest directly into a vector db
nodes = pipeline.run(documents=[Document.example()])

这里不需要持久化步骤,因为所有内容都会在指定的远程集合中实时缓存。

异步支持#

IngestionPipeline 还支持异步操作

nodes = await pipeline.arun(documents=documents)

文档管理#

在数据摄取管道中附加docstore将启用文档管理功能。

使用document.doc_idnode.ref_doc_id作为基准点,数据摄取管道将主动查找重复文档。

它的工作原理是:

  • 存储一个从doc_iddocument_hash的映射关系
  • 如果附加了向量存储:
  • 如果检测到重复的doc_id且哈希值发生变化,该文档将被重新处理并执行更新插入操作
  • 如果检测到重复的doc_id且哈希值未改变,则跳过该节点
  • 如果仅未附加向量存储:
  • 检查每个节点的所有现有哈希值
  • 如果发现重复项,则跳过该节点
  • 否则,节点将被处理

注意: 如果不附加向量存储,我们只能检查并移除重复的输入。

from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.storage.docstore import SimpleDocumentStore

pipeline = IngestionPipeline(
    transformations=[...], docstore=SimpleDocumentStore()
)

完整教程请参阅我们的演示笔记本

也可以查看另一篇指南,其中使用了Redis作为整个数据摄取栈

并行处理#

IngestionPipelinerun方法可以通过并行进程执行。它通过利用multiprocessing.Pool将节点批次分配到多个处理器来实现这一功能。

要启用并行处理执行,请将num_workers设置为你想使用的进程数量:

from llama_index.core.ingestion import IngestionPipeline

pipeline = IngestionPipeline(
    transformations=[...],
)
pipeline.run(documents=[...], num_workers=4)

模块#

优云智算