Lance ❤️ Spark

Lance can be used as a third party datasource of https://spark.apache.org/docs/latest/sql-data-sources.html

警告

此功能处于实验阶段,API接口未来可能会发生变化。

从源代码构建

git clone https://github.com/lancedb/lance.git
cd lance/java
mvn clean package -DskipTests -Drust.release.build=true

构建代码后,与spark相关的jar包位于路径lance/java/spark/target/jars/

arrow-c-data-15.0.0.jar
arrow-dataset-15.0.0.jar
jar-jni-1.1.1.jar
lance-core-0.25.0-SNAPSHOT.jar
lance-spark-0.25.0-SNAPSHOT.jar

下载预构建的jar包

如果您不想从源代码获取jar包,可以从Maven仓库下载这五个jar文件。

wget https://repo1.maven.org/maven2/com/lancedb/lance-core/0.23.0/lance-core-0.23.0.jar
wget https://repo1.maven.org/maven2/com/lancedb/lance-spark/0.23.0/lance-spark-0.23.0.jar
wget https://repo1.maven.org/maven2/org/questdb/jar-jni/1.1.1/jar-jni-1.1.1.jar
wget https://repo1.maven.org/maven2/org/apache/arrow/arrow-c-data/12.0.1/arrow-c-data-12.0.1.jar
wget https://repo1.maven.org/maven2/org/apache/arrow/arrow-dataset/12.0.1/arrow-dataset-12.0.1.jar

Lance Spark连接器的配置

要在spark-defaults.conf中设置一些配置才能启用lance数据源。

spark.sql.catalog.lance com.lancedb.lance.spark.LanceCatalog

此配置定义了LanceCatalog,随后Spark会将lancedb视为数据源。

如果处理存储在对象存储中的lance数据集,应设置以下配置:

spark.sql.catalog.lance.access_key_id {your object store ak}
spark.sql.catalog.lance.secret_access_key {your object store sk}
spark.sql.catalog.lance.aws_region {your object store region(optional)}
spark.sql.catalog.lance.aws_endpoint {your object store aws_endpoint which should be in virtual host style}
spark.sql.catalog.lance.virtual_hosted_style_request true

启动Spark Shell

bin/spark-shell --master "local[56]"  --jars "/path_of_code/lance/java/spark/target/jars/*.jar"

使用--jars来引入我们构建或下载的相关jar包。

注意

Spark shell 控制台使用 scala 语言而非 python

使用Spark Shell操作lance数据集

  • 写入一个名为test.lance的新数据集

val df = Seq(
  ("Alice", 1),
  ("Bob", 2)
).toDF("name", "id")
df.write.format("lance").option("path","./test.lance").save()
  • 覆盖 test.lance 数据集

val df = Seq(
  ("Alice", 3),
  ("Bob", 4)
).toDF("name", "id")
df.write.format("lance").option("path","./test.lance").mode("overwrite").save()
  • 将数据追加到test.lance数据集中

val df = Seq(
  ("Chris", 5),
  ("Derek", 6)
).toDF("name", "id")
df.write.format("lance").option("path","./test.lance").mode("append").save()
  • 使用Spark数据框读取test.lance数据集

val data = spark.read.format("lance").option("path", "./test.lance").load();
data.show()
  • 将数据框注册为表并使用SQL查询test.lance数据集

data.createOrReplaceTempView("lance_table")
spark.sql("select id, count(*) from lance_table group by id order by id").show()