使用Spark读写数据集¶
注意
Spark连接器目前是一个正在快速迭代的实验性功能。
在本示例中,我们将读取本地iris.csv
文件并使用Apache Spark将其写入为Lance数据集,然后演示如何查询该数据集。
准备环境与原始数据集¶
从官方网站下载Spark二进制包。我们推荐下载支持Scala 2.12的Spark 3.5+版本(因为当前Spark连接器仅支持Scala 2.12)。
你可以直接通过这个链接下载Spark 3.5.1。
准备数据集,将iris.csv下载到本地机器。
创建一个名为iris_to_lance_via_spark_shell.scala
的Scala文件并打开它。
读取原始数据集并写入Lance数据集¶
添加必要的导入并创建Spark会话:
import org.apache.spark.sql.types.{StructType, StructField, DoubleType, StringType}
import org.apache.spark.sql.{SparkSession, DataFrame}
import com.lancedb.lance.spark.{LanceConfig, LanceDataSource}
val spark = SparkSession.builder()
.appName("Iris CSV to Lance Converter")
.config("spark.sql.catalog.lance", "com.lancedb.lance.spark.LanceCatalog")
.getOrCreate()
指定您的输入和输出路径:
val irisPath = "/path/to/your/input/iris.csv"
val outputPath = "/path/to/your/output/iris.lance"
通过以下代码片段读取iris.csv
:
val rawDF = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(irisPath)
rawDF.printSchema()
准备lance模式并写入lance数据集:
val lanceSchema = new StructType()
.add(StructField("sepal_length", DoubleType))
.add(StructField("sepal_width", DoubleType))
.add(StructField("petal_length", DoubleType))
.add(StructField("petal_width", DoubleType))
.add(StructField("species", StringType))
val lanceDF = spark.createDataFrame(rawDF.rdd, lanceSchema)
lanceDF.write
.format(LanceDataSource.name)
.option(LanceConfig.CONFIG_DATASET_URI, outputPath)
.save()
读取Lance数据集¶
写入数据集后,我们可以将其读回并检查其属性:
val lanceDF = spark.read
.format("lance")
.option(LanceConfig.CONFIG_DATASET_URI, outputPath)
.load()
println(s"The total count: ${lanceDF.count()}")
lanceDF.printSchema()
println("\n The top 5 data:")
lanceDF.show(5, truncate = false)
println("\n Species distribution statistics:")
lanceDF.groupBy("species").count().show()
首先,我们打开数据集并统计总行数。然后打印数据集的结构模式。最后,我们分析物种分布的统计数据。
运行Spark应用程序¶
要运行该应用程序,请下载以下依赖项:
lance-core JAR: 核心Rust Spark绑定,将Lance功能暴露给Java(可在此处获取)
lance-spark JAR: 用于读写Lance格式的Spark连接器(可在此获取)
jar-jni JAR: 加载嵌入在JAR文件中的JNI依赖项(可在此处here获取)
arrow-c-data JAR: C数据接口的Java实现(可在此处获取)
arrow-dataset JAR: Arrow数据集API/框架的Java实现(可在此处here获取)
将这些JAR文件放入${SPARK_HOME}/jars
目录中,然后运行:
./bin/spark-shell --jars ./jars/lance-core-0.23.0.jar,./jars/lance-spark-0.23.0.jar,./jars/jar-jni-1.1.1.jar,./jars/arrow-c-data-12.0.1.jar,./jars/arrow-dataset-12.0.1.jar -i ./iris_to_lance_via_spark_shell.scala
应该可以正常工作!祝您使用愉快!