快速入门
本指南帮助您快速了解Delta Lake的主要功能。它提供了代码片段,展示如何通过交互式、批处理和流式查询读取和写入Delta表。
在本文中:
使用Delta Lake配置Apache Spark
按照以下说明设置Delta Lake与Spark的集成。您可以通过以下两种方式在本地机器上运行本指南中的步骤:
交互式运行:启动带有Delta Lake的Spark shell(Scala或Python),并在shell中交互式运行代码片段。
作为项目运行:设置一个包含Delta Lake的Maven或SBT项目(Scala或Java),将代码片段复制到源文件中,然后运行该项目。或者,您可以使用Github仓库中提供的示例。
重要
对于以下所有说明,请确保安装与Delta Lake 3.3.1兼容的正确版本的Spark或PySpark。详情请参阅release compatibility matrix。
前提条件:设置Java环境
如官方Apache Spark安装说明此处所述,请确保已安装有效的Java版本(8、11或17),并通过系统PATH或环境变量JAVA_HOME正确配置Java。
Windows用户应按照这篇博客中的说明操作,确保使用与Delta Lake兼容的正确版本Apache Spark 3.3.1。
设置交互式Shell
要在Spark SQL、Scala或Python shell中交互式使用Delta Lake,您需要本地安装Apache Spark。根据您想使用SQL、Python还是Scala,可以分别设置SQL shell、PySpark shell或Spark shell。
Spark SQL Shell
按照下载Spark的说明下载兼容版本的Apache Spark,可以使用pip或下载并解压归档文件,然后在解压目录中运行spark-sql。
bin/spark-sql --packages io.delta:delta-spark_2.12:3.3.1 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
PySpark Shell
通过运行以下命令安装与Delta Lake版本兼容的PySpark版本:
pip install pyspark==
运行带有Delta Lake包和其他配置的PySpark:
pyspark --packages io.delta:delta-spark_2.12:3.3.1 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Spark Scala Shell
按照下载Spark的说明下载兼容版本的Apache Spark,可以使用pip安装,或者下载并解压归档文件,然后在解压目录中运行spark-shell。
bin/spark-shell --packages io.delta:delta-spark_2.12:3.3.1 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
设置项目
如果你想使用Maven中央仓库中的Delta Lake二进制文件构建项目,可以使用以下Maven坐标。
Maven
您可以通过在POM文件中添加Delta Lake作为依赖项来将其包含在Maven项目中。Delta Lake使用Scala 2.12编译。
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-spark_2.12</artifactId>
<version>3.3.1</version>
</dependency>
SBT
您可以通过在build.sbt文件中添加以下行来将Delta Lake包含到您的SBT项目中:
libraryDependencies += "io.delta" %% "delta-spark" % "3.3.1"
Python
要设置一个Python项目(例如用于单元测试),您可以使用pip install delta-spark==3.3.1安装Delta Lake,然后通过Delta Lake中的configure_spark_with_delta_pip()工具函数来配置SparkSession。
import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
创建表
要创建一个Delta表,可以将DataFrame以delta格式写出。你可以使用现有的Spark SQL代码,并将格式从parquet、csv、json等改为delta。
CREATE TABLE delta.`/tmp/delta-table` USING DELTA AS SELECT col1 as id FROM VALUES 0,1,2,3,4;
data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
val data = spark.range(0, 5)
data.write.format("delta").save("/tmp/delta-table")
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
SparkSession spark = ... // create SparkSession
Dataset<Row> data = spark.range(0, 5);
data.write().format("delta").save("/tmp/delta-table");
这些操作会使用从DataFrame中推断出的模式创建一个新的Delta表。有关创建新Delta表时可用的完整选项集,请参阅创建表和写入表。
注意
本快速入门使用本地路径作为Delta表的位置。如需为Delta表配置HDFS或云存储,请参阅存储配置。
读取数据
您可以通过指定文件路径来读取Delta表中的数据:"/tmp/delta-table":
SELECT * FROM delta.`/tmp/delta-table`;
df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
val df = spark.read.format("delta").load("/tmp/delta-table")
df.show()
Dataset<Row> df = spark.read().format("delta").load("/tmp/delta-table");
df.show();
更新表数据
Delta Lake 支持使用标准 DataFrame API 进行多种表修改操作。此示例运行一个批处理作业来覆盖表中的数据:
覆盖写入
INSERT OVERWRITE delta.`/tmp/delta-table` SELECT col1 as id FROM VALUES 5,6,7,8,9;
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
val data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
df.show()
Dataset<Row> data = spark.range(5, 10);
data.write().format("delta").mode("overwrite").save("/tmp/delta-table");
如果你再次读取这张表,应该只能看到你添加的5-9这些值,因为你已经覆盖了之前的数据。
不覆盖的条件更新
Delta Lake 提供了编程API,用于条件更新、删除以及合并(upsert)数据到表中。以下是几个示例。
-- Update every even value by adding 100 to it
UPDATE delta.`/tmp/delta-table` SET id = id + 100 WHERE id % 2 == 0;
-- Delete every even value
DELETE FROM delta.`/tmp/delta-table` WHERE id % 2 == 0;
-- Upsert (merge) new data
CREATE TEMP VIEW newData AS SELECT col1 AS id FROM VALUES 1,3,5,7,9,11,13,15,17,19;
MERGE INTO delta.`/tmp/delta-table` AS oldData
USING newData
ON oldData.id = newData.id
WHEN MATCHED
THEN UPDATE SET id = newData.id
WHEN NOT MATCHED
THEN INSERT (id) VALUES (newData.id);
SELECT * FROM delta.`/tmp/delta-table`;
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "/tmp/delta-table")
# Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
# Upsert (merge) new data
newData = spark.range(0, 20)
deltaTable.alias("oldData") \
.merge(
newData.alias("newData"),
"oldData.id = newData.id") \
.whenMatchedUpdate(set = { "id": col("newData.id") }) \
.whenNotMatchedInsert(values = { "id": col("newData.id") }) \
.execute()
deltaTable.toDF().show()
import io.delta.tables._
import org.apache.spark.sql.functions._
val deltaTable = DeltaTable.forPath("/tmp/delta-table")
// Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = Map("id" -> expr("id + 100")))
// Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
// Upsert (merge) new data
val newData = spark.range(0, 20).toDF
deltaTable.as("oldData")
.merge(
newData.as("newData"),
"oldData.id = newData.id")
.whenMatched
.update(Map("id" -> col("newData.id")))
.whenNotMatched
.insert(Map("id" -> col("newData.id")))
.execute()
deltaTable.toDF.show()
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;
DeltaTable deltaTable = DeltaTable.forPath("/tmp/delta-table");
// Update every even value by adding 100 to it
deltaTable.update(
functions.expr("id % 2 == 0"),
new HashMap<String, Column>() {{
put("id", functions.expr("id + 100"));
}}
);
// Delete every even value
deltaTable.delete(condition = functions.expr("id % 2 == 0"));
// Upsert (merge) new data
Dataset<Row> newData = spark.range(0, 20).toDF();
deltaTable.as("oldData")
.merge(
newData.as("newData"),
"oldData.id = newData.id")
.whenMatched()
.update(
new HashMap<String, Column>() {{
put("id", functions.col("newData.id"));
}})
.whenNotMatched()
.insertExpr(
new HashMap<String, Column>() {{
put("id", functions.col("newData.id"));
}})
.execute();
deltaTable.toDF().show();
您应该可以看到部分现有行已被更新,同时插入了新行。
有关这些操作的更多信息,请参阅表删除、更新和合并。
使用时间旅行读取旧版本数据
您可以通过时间旅行功能查询Delta表的历史快照。如果想访问被覆盖前的数据,可以使用versionAsOf选项查询覆盖第一组数据之前的表快照。
SELECT * FROM delta.`/tmp/delta-table` VERSION AS OF 0;
df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()
val df = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-table")
df.show()
Dataset<Row> df = spark.read().format("delta").option("versionAsOf", 0).load("/tmp/delta-table");
df.show();
您应该能看到被覆盖前的第一组数据。时间旅行利用了Delta Lake事务日志的强大功能,可以访问表中已不存在的数据。移除版本0选项(或指定版本1)将让您再次查看较新的数据。更多信息,请参阅查询表的早期快照(时间旅行)。
将数据流写入表
您还可以使用结构化流式处理(Structured Streaming)写入Delta表。Delta Lake的事务日志保证仅一次处理(exactly-once processing),即使有其他流或批处理查询同时对该表进行操作。默认情况下,流以追加模式运行,这会向表中添加新记录:
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
val streamingDf = spark.readStream.format("rate").load()
val stream = streamingDf.select($"value" as "id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
import org.apache.spark.sql.streaming.StreamingQuery;
Dataset<Row> streamingDf = spark.readStream().format("rate").load();
StreamingQuery stream = streamingDf.selectExpr("value as id").writeStream().format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table");
当流正在运行时,您可以使用之前的命令读取表。
注意
如果您在shell中运行此操作,可能会看到流式任务进度,这使得在该shell中输入命令变得困难。在新终端中启动另一个shell来查询表可能会很有用。
您可以通过在启动流的同一终端中运行stream.stop()来停止流。
有关Delta Lake与结构化流式处理集成的更多信息,请参阅表流式读取和写入。另请参阅Apache Spark网站上的结构化流式处理编程指南。
从表中读取变更流
当流正在写入Delta表时,您也可以将该表作为流式源进行读取。例如,您可以启动另一个流式查询来打印对Delta表所做的所有更改。您可以通过提供startingVersion或startingTimestamp选项来指定Structured Streaming应从哪个版本开始获取从该点开始的更改。详情请参阅Structured Streaming。
stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
val stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()
StreamingQuery stream2 = spark.readStream().format("delta").load("/tmp/delta-table").writeStream().format("console").start();