使用Spark读写数据集

注意

Spark连接器目前是一个正在快速迭代的实验性功能。

在本示例中,我们将读取本地iris.csv文件并使用Apache Spark将其写入为Lance数据集,然后演示如何查询该数据集。

准备环境与原始数据集

官方网站下载Spark二进制包。我们推荐下载支持Scala 2.12的Spark 3.5+版本(因为当前Spark连接器仅支持Scala 2.12)。

你可以直接通过这个链接下载Spark 3.5.1。

准备数据集,将iris.csv下载到本地机器。

创建一个名为iris_to_lance_via_spark_shell.scala的Scala文件并打开它。

读取原始数据集并写入Lance数据集

添加必要的导入并创建Spark会话:

import org.apache.spark.sql.types.{StructType, StructField, DoubleType, StringType}
import org.apache.spark.sql.{SparkSession, DataFrame}
import com.lancedb.lance.spark.{LanceConfig, LanceDataSource}

val spark = SparkSession.builder()
  .appName("Iris CSV to Lance Converter")
  .config("spark.sql.catalog.lance", "com.lancedb.lance.spark.LanceCatalog")
  .getOrCreate()

指定您的输入和输出路径:

val irisPath = "/path/to/your/input/iris.csv"
val outputPath = "/path/to/your/output/iris.lance"

通过以下代码片段读取iris.csv

val rawDF = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(irisPath)

rawDF.printSchema()

准备lance模式并写入lance数据集:

val lanceSchema = new StructType()
  .add(StructField("sepal_length", DoubleType))
  .add(StructField("sepal_width", DoubleType))
  .add(StructField("petal_length", DoubleType))
  .add(StructField("petal_width", DoubleType))
  .add(StructField("species", StringType))

val lanceDF = spark.createDataFrame(rawDF.rdd, lanceSchema)

lanceDF.write
  .format(LanceDataSource.name)
  .option(LanceConfig.CONFIG_DATASET_URI, outputPath)
  .save()

读取Lance数据集

写入数据集后,我们可以将其读回并检查其属性:

val lanceDF = spark.read
  .format("lance")
  .option(LanceConfig.CONFIG_DATASET_URI, outputPath)
  .load()

println(s"The total count: ${lanceDF.count()}")
lanceDF.printSchema()
println("\n The top 5 data:")
lanceDF.show(5, truncate = false)

println("\n Species distribution statistics:")
lanceDF.groupBy("species").count().show()

首先,我们打开数据集并统计总行数。然后打印数据集的结构模式。最后,我们分析物种分布的统计数据。

运行Spark应用程序

要运行该应用程序,请下载以下依赖项:

  • lance-core JAR: 核心Rust Spark绑定,将Lance功能暴露给Java(可在此处获取

  • lance-spark JAR: 用于读写Lance格式的Spark连接器(可在此获取)

  • jar-jni JAR: 加载嵌入在JAR文件中的JNI依赖项(可在此处here获取)

  • arrow-c-data JAR: C数据接口的Java实现(可在此处获取

  • arrow-dataset JAR: Arrow数据集API/框架的Java实现(可在此处here获取)

将这些JAR文件放入${SPARK_HOME}/jars目录中,然后运行:

./bin/spark-shell --jars ./jars/lance-core-0.23.0.jar,./jars/lance-spark-0.23.0.jar,./jars/jar-jni-1.1.1.jar,./jars/arrow-c-data-12.0.1.jar,./jars/arrow-dataset-12.0.1.jar -i ./iris_to_lance_via_spark_shell.scala

应该可以正常工作!祝您使用愉快!