在Databricks上使用Qdrant
| 时间: 30 分钟 | 级别: 中级 | 完整笔记本 |
|---|
Databricks 是一个用于处理大数据和人工智能的统一分析平台。它围绕 Apache Spark 构建,这是一个强大的开源分布式计算系统,非常适合处理大规模数据集和执行复杂的分析任务。
Apache Spark 设计为水平扩展,意味着它可以通过在机器集群上分布计算来处理昂贵的操作,如生成向量嵌入。这种可扩展性在处理大型数据集时至关重要。
在这个例子中,我们将演示如何使用Qdrant的快速嵌入库对具有密集和稀疏嵌入的数据集进行向量化。然后,我们将使用Databricks上的Qdrant Spark 连接器将这个向量化数据加载到Qdrant集群中。
设置一个Databricks项目
按照官方文档指南设置一个Databricks集群。
安装Qdrant Spark连接器作为库:
导航到集群仪表板中的
Libraries部分。点击右上角的
Install New以打开库安装模态框。在Maven包中搜索
io.qdrant:spark:VERSION并点击Install。
在您的集群上创建一个新的Databricks 笔记本,开始处理您的数据和库。
下载数据集
- 安装所需的依赖项:
%pip install fastembed datasets
- 下载数据集:
from datasets import load_dataset
dataset_name = "tasksource/med"
dataset = load_dataset(dataset_name, split="train")
# We'll use the first 100 entries from this dataset and exclude some unused columns.
dataset = dataset.select(range(100)).remove_columns(["gold_label", "genre"])
- 将数据集转换为Spark dataframe:
dataset.to_parquet("/dbfs/pq.pq")
dataset_df = spark.read.parquet("file:/dbfs/pq.pq")
向量化数据
在本节中,我们将使用快速嵌入为我们的行生成密集和稀疏向量。我们将创建一个用户定义函数(UDF)来处理这一步。
创建向量化函数
from fastembed import TextEmbedding, SparseTextEmbedding
def vectorize(partition_data):
# Initialize dense and sparse models
dense_model = TextEmbedding(model_name="BAAI/bge-small-en-v1.5")
sparse_model = SparseTextEmbedding(model_name="Qdrant/bm25")
for row in partition_data:
# Generate dense and sparse vectors
dense_vector = next(dense_model.embed(row.sentence1))
sparse_vector = next(sparse_model.embed(row.sentence2))
yield [
row.sentence1, # 1st column: original text
row.sentence2, # 2nd column: original text
dense_vector.tolist(), # 3rd column: dense vector
sparse_vector.indices.tolist(), # 4th column: sparse vector indices
sparse_vector.values.tolist(), # 5th column: sparse vector values
]
我们正在使用BAAI/bge-small-en-v1.5模型进行密集嵌入,并使用BM25进行稀疏嵌入。
在我们的数据帧上应用UDF
接下来,让我们在我们的Spark数据帧上应用我们的vectorize UDF来生成嵌入。
embeddings = dataset_df.rdd.mapPartitions(vectorize)
mapPartitions() 方法返回一个 弹性分布式数据集 (RDD),然后应将其转换回 Spark 数据框。
使用向量化数据构建新的Spark数据框
我们现在将使用指定的模式创建一个新的Spark数据框架(embeddings_df),其中包含向量化数据。
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType
# Define the schema for the new dataframe
schema = StructType([
StructField("sentence1", StringType()),
StructField("sentence2", StringType()),
StructField("dense_vector", ArrayType(FloatType())),
StructField("sparse_vector_indices", ArrayType(IntegerType())),
StructField("sparse_vector_values", ArrayType(FloatType()))
])
# Create the new dataframe with the vectorized data
embeddings_df = spark.createDataFrame(data=embeddings, schema=schema)
将数据上传到Qdrant
创建一个Qdrant集合:
- 按照文档创建一个具有适当配置的集合。以下是一个支持密集和稀疏向量的示例请求:
PUT /collections/{collection_name} { "vectors": { "dense": { "size": 384, "distance": "Cosine" } }, "sparse_vectors": { "sparse": {} } }将数据框上传到Qdrant:
options = {
"qdrant_url": "<QDRANT_GRPC_URL>",
"api_key": "<QDRANT_API_KEY>",
"collection_name": "<QDRANT_COLLECTION_NAME>",
"vector_fields": "dense_vector",
"vector_names": "dense",
"sparse_vector_value_fields": "sparse_vector_values",
"sparse_vector_index_fields": "sparse_vector_indices",
"sparse_vector_names": "sparse",
"schema": embeddings_df.schema.json(),
}
embeddings_df.write.format("io.qdrant.spark.Qdrant").options(**options).mode(
"append"
).save()
请确保将占位符值(, , )替换为您的实际值。如果未指定id_field选项,Qdrant Spark连接器将为每个点生成随机UUID。
你应该看到的命令输出类似于:
Command took 40.37 seconds -- by xxxxx90@xxxxxx.com at 4/17/2024, 12:13:28 PM on fastembed
结论
我们的教程到此结束!请随意探索更多功能,并尝试使用Databricks、Spark和Qdrant中提供的不同模型、参数和功能进行实验。
快乐的数据工程!
