Lance ❤️ Spark¶
Lance can be used as a third party datasource of https://spark.apache.org/docs/latest/sql-data-sources.html
警告
此功能处于实验阶段,API接口未来可能会发生变化。
从源代码构建¶
git clone https://github.com/lancedb/lance.git
cd lance/java
mvn clean package -DskipTests -Drust.release.build=true
构建代码后,与spark相关的jar包位于路径lance/java/spark/target/jars/
arrow-c-data-15.0.0.jar
arrow-dataset-15.0.0.jar
jar-jni-1.1.1.jar
lance-core-0.25.0-SNAPSHOT.jar
lance-spark-0.25.0-SNAPSHOT.jar
下载预构建的jar包¶
如果您不想从源代码获取jar包,可以从Maven仓库下载这五个jar文件。
wget https://repo1.maven.org/maven2/com/lancedb/lance-core/0.23.0/lance-core-0.23.0.jar
wget https://repo1.maven.org/maven2/com/lancedb/lance-spark/0.23.0/lance-spark-0.23.0.jar
wget https://repo1.maven.org/maven2/org/questdb/jar-jni/1.1.1/jar-jni-1.1.1.jar
wget https://repo1.maven.org/maven2/org/apache/arrow/arrow-c-data/12.0.1/arrow-c-data-12.0.1.jar
wget https://repo1.maven.org/maven2/org/apache/arrow/arrow-dataset/12.0.1/arrow-dataset-12.0.1.jar
Lance Spark连接器的配置¶
要在spark-defaults.conf中设置一些配置才能启用lance数据源。
spark.sql.catalog.lance com.lancedb.lance.spark.LanceCatalog
此配置定义了LanceCatalog,随后Spark会将lancedb视为数据源。
如果处理存储在对象存储中的lance数据集,应设置以下配置:
spark.sql.catalog.lance.access_key_id {your object store ak}
spark.sql.catalog.lance.secret_access_key {your object store sk}
spark.sql.catalog.lance.aws_region {your object store region(optional)}
spark.sql.catalog.lance.aws_endpoint {your object store aws_endpoint which should be in virtual host style}
spark.sql.catalog.lance.virtual_hosted_style_request true
启动Spark Shell¶
bin/spark-shell --master "local[56]" --jars "/path_of_code/lance/java/spark/target/jars/*.jar"
使用--jars来引入我们构建或下载的相关jar包。
注意
Spark shell 控制台使用 scala 语言而非 python
使用Spark Shell操作lance数据集¶
写入一个名为
test.lance的新数据集
val df = Seq(
("Alice", 1),
("Bob", 2)
).toDF("name", "id")
df.write.format("lance").option("path","./test.lance").save()
覆盖
test.lance数据集
val df = Seq(
("Alice", 3),
("Bob", 4)
).toDF("name", "id")
df.write.format("lance").option("path","./test.lance").mode("overwrite").save()
将数据追加到
test.lance数据集中
val df = Seq(
("Chris", 5),
("Derek", 6)
).toDF("name", "id")
df.write.format("lance").option("path","./test.lance").mode("append").save()
使用Spark数据框读取
test.lance数据集
val data = spark.read.format("lance").option("path", "./test.lance").load();
data.show()
将数据框注册为表并使用SQL查询
test.lance数据集
data.createOrReplaceTempView("lance_table")
spark.sql("select id, count(*) from lance_table group by id order by id").show()