Apache Spark

Spark 是一个专为大数据处理和分析设计的分布式计算框架。Qdrant-Spark 连接器 使 Qdrant 能够成为 Spark 中的存储目的地。

安装

您可以根据自己的偏好和要求,以几种不同的方式设置Qdrant-Spark连接器。

GitHub 发布

您可以从GitHub 发布下载打包的JAR文件。它包含了所有必需的依赖项。

从源代码构建

要从源代码构建JAR,您需要在系统上安装JDK 8Maven。一旦安装完成,请导航到项目的根目录并运行以下命令:

mvn package -DskipTests

这将编译源代码并生成一个fat JAR,默认情况下它将存储在target目录中。

Maven 中央仓库

该软件包可以在这里找到。

用法

下面,我们将逐步介绍如何创建Spark会话并将数据摄取到Qdrant中。

首先,导入必要的库并创建一个支持Qdrant的Spark会话:

from pyspark.sql import SparkSession

spark = SparkSession.builder.config(
        "spark.jars",
        "spark-VERSION.jar",  # Specify the path to the downloaded JAR file
    )
    .master("local[*]")
    .appName("qdrant")
    .getOrCreate()
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder
  .config("spark.jars", "spark-VERSION.jar") // Specify the path to the downloaded JAR file
  .master("local[*]")
  .appName("qdrant")
  .getOrCreate()
import org.apache.spark.sql.SparkSession;

public class QdrantSparkJavaExample {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .config("spark.jars", "spark-VERSION.jar") // Specify the path to the downloaded JAR file
                .master("local[*]")
                .appName("qdrant")
                .getOrCreate(); 
    }
}

连接器支持摄取多个命名/未命名、密集/稀疏的向量。

点击每个以展开。

Unnamed/Default vector
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", <QDRANT_GRPC_URL>)
   .option("collection_name", <QDRANT_COLLECTION_NAME>)
   .option("embedding_field", <EMBEDDING_FIELD_NAME>)  # Expected to be a field of type ArrayType(FloatType)
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
Named vector
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", <QDRANT_GRPC_URL>)
   .option("collection_name", <QDRANT_COLLECTION_NAME>)
   .option("embedding_field", <EMBEDDING_FIELD_NAME>)  # Expected to be a field of type ArrayType(FloatType)
   .option("vector_name", <VECTOR_NAME>)
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()

注意

为了向后兼容,保留了embedding_fieldvector_name选项。建议使用vector_fieldsvector_names来命名向量,如下所示。

Multiple named vectors
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
Sparse vectors
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("sparse_vector_value_fields", "<COLUMN_NAME>")
   .option("sparse_vector_index_fields", "<COLUMN_NAME>")
   .option("sparse_vector_names", "<SPARSE_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
Multiple sparse vectors
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
Combination of named dense and sparse vectors
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
   .option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
   .option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()
No vectors - Entire dataframe is stored as payload
  <pyspark.sql.DataFrame>
   .write
   .format("io.qdrant.spark.Qdrant")
   .option("qdrant_url", "<QDRANT_GRPC_URL>")
   .option("collection_name", "<QDRANT_COLLECTION_NAME>")
   .option("schema", <pyspark.sql.DataFrame>.schema.json())
   .mode("append")
   .save()

Databricks

您可以在Databricks中使用qdrant-spark连接器作为库。

  • 转到您的Databricks集群仪表板中的Libraries部分。
  • 选择 Install New 以打开库安装模态框。
  • 在Maven包中搜索io.qdrant:spark:VERSION并点击Install

Databricks

数据类型支持

Qdrant 支持大多数 Spark 数据类型,并且根据提供的模式映射适当的数据类型。

配置选项

选项描述列数据类型是否必需
qdrant_urlQdrant 实例的 GRPC URL。例如:http://localhost:6334-
collection_name要写入数据的集合名称-
schema数据框模式的JSON字符串-
embedding_field保存嵌入的列的名称ArrayType(FloatType)
id_field保存点ID的列名。默认值:随机UUIDStringTypeIntegerType
batch_size上传批次的最大大小。默认值:64-
retries上传重试次数。默认值:3-
api_key用于身份验证的Qdrant API密钥-
vector_name集合中向量的名称。-
vector_fields逗号分隔的列名,这些列包含向量。ArrayType(FloatType)
vector_names集合中向量的逗号分隔名称。-
sparse_vector_index_fields逗号分隔的列名,这些列包含稀疏向量索引。ArrayType(IntegerType)
sparse_vector_value_fields逗号分隔的列名,用于保存稀疏向量的值。ArrayType(FloatType)
sparse_vector_names集合中稀疏向量的逗号分隔名称。-
shard_key_selector在upsert期间使用的自定义分片键名称,以逗号分隔。-

欲了解更多信息,请务必查看Qdrant-Spark GitHub 仓库。Apache Spark 指南可在此处获取。祝您数据处理愉快!

这个页面有用吗?

感谢您的反馈!🙏

我们很抱歉听到这个消息。😔 你可以在GitHub上编辑这个页面,或者创建一个GitHub问题。