在Databricks上使用Qdrant

时间: 30 分钟级别: 中级完整笔记本

Databricks 是一个用于处理大数据和人工智能的统一分析平台。它围绕 Apache Spark 构建,这是一个强大的开源分布式计算系统,非常适合处理大规模数据集和执行复杂的分析任务。

Apache Spark 设计为水平扩展,意味着它可以通过在机器集群上分布计算来处理昂贵的操作,如生成向量嵌入。这种可扩展性在处理大型数据集时至关重要。

在这个例子中,我们将演示如何使用Qdrant的快速嵌入库对具有密集和稀疏嵌入的数据集进行向量化。然后,我们将使用Databricks上的Qdrant Spark 连接器将这个向量化数据加载到Qdrant集群中。

设置一个Databricks项目

  • 按照官方文档指南设置一个Databricks集群

  • 安装Qdrant Spark连接器作为库:

    • 导航到集群仪表板中的Libraries部分。

    • 点击右上角的Install New以打开库安装模态框。

    • 在Maven包中搜索io.qdrant:spark:VERSION并点击Install

      Install the library

  • 在您的集群上创建一个新的Databricks 笔记本,开始处理您的数据和库。

下载数据集

  • 安装所需的依赖项:
%pip install fastembed datasets
  • 下载数据集:
from datasets import load_dataset

dataset_name = "tasksource/med"
dataset = load_dataset(dataset_name, split="train")
# We'll use the first 100 entries from this dataset and exclude some unused columns.
dataset = dataset.select(range(100)).remove_columns(["gold_label", "genre"])
  • 将数据集转换为Spark dataframe:
dataset.to_parquet("/dbfs/pq.pq")
dataset_df = spark.read.parquet("file:/dbfs/pq.pq")

向量化数据

在本节中,我们将使用快速嵌入为我们的行生成密集和稀疏向量。我们将创建一个用户定义函数(UDF)来处理这一步。

创建向量化函数

from fastembed import TextEmbedding, SparseTextEmbedding

def vectorize(partition_data):
    # Initialize dense and sparse models
    dense_model = TextEmbedding(model_name="BAAI/bge-small-en-v1.5")
    sparse_model = SparseTextEmbedding(model_name="Qdrant/bm25")

    for row in partition_data:
        # Generate dense and sparse vectors
        dense_vector = next(dense_model.embed(row.sentence1))
        sparse_vector = next(sparse_model.embed(row.sentence2))

        yield [
            row.sentence1,  # 1st column: original text
            row.sentence2,  # 2nd column: original text
            dense_vector.tolist(),  # 3rd column: dense vector
            sparse_vector.indices.tolist(),  # 4th column: sparse vector indices
            sparse_vector.values.tolist(),  # 5th column: sparse vector values
        ]

我们正在使用BAAI/bge-small-en-v1.5模型进行密集嵌入,并使用BM25进行稀疏嵌入。

在我们的数据帧上应用UDF

接下来,让我们在我们的Spark数据帧上应用我们的vectorize UDF来生成嵌入。

embeddings = dataset_df.rdd.mapPartitions(vectorize)

mapPartitions() 方法返回一个 弹性分布式数据集 (RDD),然后应将其转换回 Spark 数据框。

使用向量化数据构建新的Spark数据框

我们现在将使用指定的模式创建一个新的Spark数据框架(embeddings_df),其中包含向量化数据。

from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType

# Define the schema for the new dataframe
schema = StructType([
    StructField("sentence1", StringType()),
    StructField("sentence2", StringType()),
    StructField("dense_vector", ArrayType(FloatType())),
    StructField("sparse_vector_indices", ArrayType(IntegerType())),
    StructField("sparse_vector_values", ArrayType(FloatType()))
])

# Create the new dataframe with the vectorized data
embeddings_df = spark.createDataFrame(data=embeddings, schema=schema)

将数据上传到Qdrant

  • 创建一个Qdrant集合:

    • 按照文档创建一个具有适当配置的集合。以下是一个支持密集和稀疏向量的示例请求:
    PUT /collections/{collection_name}
    {
      "vectors": {
        "dense": {
          "size": 384,
          "distance": "Cosine"
        }
      },
      "sparse_vectors": {
        "sparse": {}
      }
    }
    
  • 将数据框上传到Qdrant:

options = {
    "qdrant_url": "<QDRANT_GRPC_URL>",
    "api_key": "<QDRANT_API_KEY>",
    "collection_name": "<QDRANT_COLLECTION_NAME>",
    "vector_fields": "dense_vector",
    "vector_names": "dense",
    "sparse_vector_value_fields": "sparse_vector_values",
    "sparse_vector_index_fields": "sparse_vector_indices",
    "sparse_vector_names": "sparse",
    "schema": embeddings_df.schema.json(),
}

embeddings_df.write.format("io.qdrant.spark.Qdrant").options(**options).mode(
    "append"
).save()

请确保将占位符值(, , )替换为您的实际值。如果未指定id_field选项,Qdrant Spark连接器将为每个点生成随机UUID。

你应该看到的命令输出类似于:

Command took 40.37 seconds -- by xxxxx90@xxxxxx.com at 4/17/2024, 12:13:28 PM on fastembed

结论

我们的教程到此结束!请随意探索更多功能,并尝试使用Databricks、Spark和Qdrant中提供的不同模型、参数和功能进行实验。

快乐的数据工程!

这个页面有用吗?

感谢您的反馈!🙏

我们很抱歉听到这个消息。😔 你可以在GitHub上编辑这个页面,或者创建一个GitHub问题。