Apache Spark
Spark 是一个专为大数据处理和分析设计的分布式计算框架。Qdrant-Spark 连接器 使 Qdrant 能够成为 Spark 中的存储目的地。
安装
您可以根据自己的偏好和要求,以几种不同的方式设置Qdrant-Spark连接器。
GitHub 发布
您可以从GitHub 发布下载打包的JAR文件。它包含了所有必需的依赖项。
从源代码构建
要从源代码构建JAR,您需要在系统上安装JDK 8和Maven。一旦安装完成,请导航到项目的根目录并运行以下命令:
mvn package -DskipTests
这将编译源代码并生成一个fat JAR,默认情况下它将存储在target目录中。
Maven 中央仓库
该软件包可以在这里找到。
用法
下面,我们将逐步介绍如何创建Spark会话并将数据摄取到Qdrant中。
首先,导入必要的库并创建一个支持Qdrant的Spark会话:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(
"spark.jars",
"spark-VERSION.jar", # Specify the path to the downloaded JAR file
)
.master("local[*]")
.appName("qdrant")
.getOrCreate()
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder
.config("spark.jars", "spark-VERSION.jar") // Specify the path to the downloaded JAR file
.master("local[*]")
.appName("qdrant")
.getOrCreate()
import org.apache.spark.sql.SparkSession;
public class QdrantSparkJavaExample {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.config("spark.jars", "spark-VERSION.jar") // Specify the path to the downloaded JAR file
.master("local[*]")
.appName("qdrant")
.getOrCreate();
}
}
连接器支持摄取多个命名/未命名、密集/稀疏的向量。
点击每个以展开。
Unnamed/Default vector
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", <QDRANT_GRPC_URL>)
.option("collection_name", <QDRANT_COLLECTION_NAME>)
.option("embedding_field", <EMBEDDING_FIELD_NAME>) # Expected to be a field of type ArrayType(FloatType)
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Named vector
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", <QDRANT_GRPC_URL>)
.option("collection_name", <QDRANT_COLLECTION_NAME>)
.option("embedding_field", <EMBEDDING_FIELD_NAME>) # Expected to be a field of type ArrayType(FloatType)
.option("vector_name", <VECTOR_NAME>)
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
注意
为了向后兼容,保留了
embedding_field和vector_name选项。建议使用vector_fields和vector_names来命名向量,如下所示。
Multiple named vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Sparse vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("sparse_vector_value_fields", "<COLUMN_NAME>")
.option("sparse_vector_index_fields", "<COLUMN_NAME>")
.option("sparse_vector_names", "<SPARSE_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Multiple sparse vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Combination of named dense and sparse vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
.option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
No vectors - Entire dataframe is stored as payload
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Databricks
您可以在Databricks中使用qdrant-spark连接器作为库。
- 转到您的Databricks集群仪表板中的
Libraries部分。 - 选择
Install New以打开库安装模态框。 - 在Maven包中搜索
io.qdrant:spark:VERSION并点击Install。

数据类型支持
Qdrant 支持大多数 Spark 数据类型,并且根据提供的模式映射适当的数据类型。
配置选项
| 选项 | 描述 | 列数据类型 | 是否必需 |
|---|---|---|---|
qdrant_url | Qdrant 实例的 GRPC URL。例如:http://localhost:6334 | - | ✅ |
collection_name | 要写入数据的集合名称 | - | ✅ |
schema | 数据框模式的JSON字符串 | - | ✅ |
embedding_field | 保存嵌入的列的名称 | ArrayType(FloatType) | ❌ |
id_field | 保存点ID的列名。默认值:随机UUID | StringType 或 IntegerType | ❌ |
batch_size | 上传批次的最大大小。默认值:64 | - | ❌ |
retries | 上传重试次数。默认值:3 | - | ❌ |
api_key | 用于身份验证的Qdrant API密钥 | - | ❌ |
vector_name | 集合中向量的名称。 | - | ❌ |
vector_fields | 逗号分隔的列名,这些列包含向量。 | ArrayType(FloatType) | ❌ |
vector_names | 集合中向量的逗号分隔名称。 | - | ❌ |
sparse_vector_index_fields | 逗号分隔的列名,这些列包含稀疏向量索引。 | ArrayType(IntegerType) | ❌ |
sparse_vector_value_fields | 逗号分隔的列名,用于保存稀疏向量的值。 | ArrayType(FloatType) | ❌ |
sparse_vector_names | 集合中稀疏向量的逗号分隔名称。 | - | ❌ |
shard_key_selector | 在upsert期间使用的自定义分片键名称,以逗号分隔。 | - | ❌ |
欲了解更多信息,请务必查看Qdrant-Spark GitHub 仓库。Apache Spark 指南可在此处获取。祝您数据处理愉快!
