变更数据馈送
变更数据馈送(CDF)功能允许Delta表跟踪Delta表不同版本之间的行级变更。当在Delta表上启用该功能时,运行时会记录所有写入该表数据的"变更事件"。这包括行数据以及元数据,元数据会指示指定行是被插入、删除还是更新。
您可以使用DataFrame API(即df.read)在批处理查询中读取变更事件,也可以使用DataFrame API(即df.readStream)在流式查询中读取变更事件。
使用案例
变更数据馈送默认未启用。以下用例将指导您何时启用变更数据馈送。
银表和金表: 通过仅处理初始
MERGE、UPDATE或DELETE操作后的行级变更来提升Delta性能,从而加速并简化ETL和ELT操作。传输变更: 将变更数据流发送给下游系统(如Kafka或RDBMS),这些系统可以利用它在数据管道的后续阶段进行增量处理。
审计追踪表: 将变更数据流捕获为Delta表,可提供永久存储和高效查询能力,用于查看随时间发生的所有变更,包括删除操作的发生时间和更新内容。
启用变更数据馈送
您必须使用以下方法之一显式启用变更数据馈送选项:
新建表: 在
CREATE TABLE命令中设置表属性delta.enableChangeDataFeed = true。CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
现有表: 在
ALTER TABLE命令中设置表属性delta.enableChangeDataFeed = true。ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
所有新表:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
重要
一旦为表启用变更数据馈送选项,您将无法再使用Delta Lake 1.2.1或更低版本写入该表。但您始终可以读取该表。
只有在启用变更数据馈送后所做的更改才会被记录;表中过去的更改不会被捕获。
变更数据存储
Delta Lake 会将 UPDATE、DELETE 和 MERGE 操作的变更数据记录在 Delta 表目录下的 _change_data 文件夹中。
当 Delta Lake 检测到可以直接从事务日志高效计算变更数据时,可能会跳过这些记录。
特别是仅插入操作和全分区删除操作不会在 _change_data 目录中生成数据。
_change_data 文件夹中的文件遵循表的保留策略。因此,如果运行 VACUUM 命令,变更数据馈送数据也会被删除。
批量查询中的读取变更
您可以为起始和结束提供版本号或时间戳。查询中包含起始和结束的版本号及时间戳。 要从特定起始版本读取到表的最新版本变更,只需指定起始版本或时间戳。
您可以将版本指定为整数,将时间戳指定为格式为yyyy-MM-dd[ HH:mm:ss[.SSS]]的字符串。
如果您提供的版本号低于或时间戳早于已记录变更事件的版本,即变更数据馈送功能启用前的版本,系统将抛出错误,提示变更数据馈送功能尚未启用。
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
# version as ints or longs
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# path based tables
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.load("pathToMyDeltaTable")
// version as ints or longs
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// path based tables
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("pathToMyDeltaTable")
读取流式查询中的变更
# providing a starting version
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# providing a starting timestamp
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2021-04-21 05:35:43") \
.load("/pathToMyDeltaTable")
# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("myDeltaTable")
// providing a starting version
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// providing a starting timestamp
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "2021-04-21 05:35:43")
.load("/pathToMyDeltaTable")
// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("myDeltaTable")
要在读取表时获取变更数据,请将选项readChangeFeed设置为true。
startingVersion或startingTimestamp是可选的,如果未提供,流将返回流处理时表的最新快照作为INSERT,并将后续变更作为变更数据。
读取变更数据时也支持速率限制(maxFilesPerTrigger, maxBytesPerTrigger)和excludeRegex等选项。
注意
对于起始快照版本之外的版本,速率限制可以是原子性的。也就是说,整个提交版本要么会被速率限制,要么整个提交会被返回。
默认情况下,如果用户传入的版本或时间戳超过表的最后一次提交,则会抛出错误timestampGreaterThanLatestCommit。如果用户将以下配置设置为true,CDF可以处理超出范围的版本情况。
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
如果您提供的起始版本号大于表上的最后一次提交,或起始时间戳晚于表上的最后一次提交,那么当启用上述配置时,系统将返回空的读取结果。
如果您提供的结束版本号大于表的最后一次提交版本,或结束时间戳晚于表的最后一次提交时间,那么当在批量读取模式下启用上述配置时,将返回起始版本与最后一次提交之间的所有变更。
变更数据馈送的模式是什么?
当您从表的变更数据源读取时,将使用最新表版本的架构。
注意
大多数模式变更和演进操作都得到完全支持。启用了列映射的表不支持所有用例,并表现出不同的行为。请参阅启用了列映射的表的数据变更反馈限制。
除了Delta表模式中的数据列外,变更数据馈送还包含用于标识变更事件类型的元数据列:
列名 |
类型 |
值 |
|---|---|---|
|
字符串 |
|
|
长整型 |
包含变更的Delta日志或表版本。 |
|
时间戳 |
提交创建时关联的时间戳。 |
(1) preimage 表示更新前的值,postimage 表示更新后的值。
启用了列映射功能的表在变更数据馈送方面的限制
在启用了列映射功能的Delta表上,您可以删除或重命名表中的列,而无需为现有数据重写数据文件。启用列映射后,在执行非添加性模式变更(如重命名或删除列、更改数据类型或可空性变更)后,变更数据馈送功能将受到限制。
重要
在Delta Lake 2.0及更早版本中,启用了列映射的表不支持对变更数据流进行流式读取或批量读取。
在Delta Lake 2.1中,启用了列映射的表支持对变更数据流的批量读取,只要没有非累加性的模式变更。不支持对启用了列映射的表的变更数据流进行流式读取。
在Delta Lake 2.2中,启用了列映射的表支持对变更数据流的批处理和流式读取,只要没有非附加性的模式变更。
在Delta Lake 2.3及以上版本中,您可以对启用了列映射且经历了非附加模式变更的表执行变更数据馈送的批量读取。读取操作不再使用表最新版本的模式,而是使用查询中指定的表结束版本的模式。如果指定的版本范围跨越了非附加模式变更,查询仍然会失败。
在Delta Lake 3.0及以上版本中,您可以通过启用模式跟踪来对启用了列映射且经历了非附加模式变更的表执行变更数据流的流式读取。