表实用工具命令
Delta表支持多种实用命令。
对于许多Delta Lake操作,您可以通过在创建新的SparkSession时设置配置来启用与Apache Spark DataSourceV2和Catalog API(自3.0起)的集成。请参阅Configure SparkSession。
在本文中:
移除Delta表中不再引用的文件
您可以通过在表上运行vacuum命令来删除不再被Delta表引用且超过保留阈值的文件。vacuum不会自动触发。文件的默认保留阈值为7天。要更改此行为,请参阅Data retention。
重要
vacuum会移除Delta Lake未管理的目录中的所有文件,但会忽略以_开头的目录。如果您在Delta表目录中存储了额外的元数据(例如结构化流检查点),请使用类似_checkpoints这样的目录名称。vacuum仅删除数据文件,不删除日志文件。日志文件会在检查点操作后自动异步删除。日志文件的默认保留期为30天,可通过delta.logRetentionDuration属性进行配置,该属性使用ALTER TABLE SET TBLPROPERTIESSQL方法设置。参见Table properties。运行
vacuum后,将失去时间旅行回早于保留期的版本的能力。
VACUUM eventsTable -- This runs VACUUM in ‘FULL’ mode and deletes data files outside of the retention duration and all files in the table directory not referenced by the table.
VACUUM eventsTable LITE -- This VACUUM in ‘LITE’ mode runs faster.
-- Instead of finding all files in the table directory, `VACUUM LITE` uses the Delta transaction log to identify and remove files no longer referenced by any table versions within the retention duration.
-- If `VACUUM LITE` cannot be completed because the Delta log has been pruned a `DELTA_CANNOT_VACUUM_LITE` exception is raised.
-- This mode is available only in Delta 3.3 and above.
VACUUM '/data/events' -- vacuum files in path-based table
VACUUM delta.`/data/events/`
VACUUM delta.`/data/events/` RETAIN 100 HOURS -- vacuum files not required by versions more than 100 hours old
VACUUM eventsTable DRY RUN -- do dry run to get the list of files to be deleted
VACUUM eventsTable USING INVENTORY inventoryTable —- vacuum files based on a provided reservoir of files as a delta table
VACUUM eventsTable USING INVENTORY (select * from inventoryTable) —- vacuum files based on a provided reservoir of files as spark SQL query
查看配置SparkSession了解启用SQL命令支持的步骤。
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable) # path-based tables, or
deltaTable = DeltaTable.forName(spark, tableName) # Hive metastore-based tables
deltaTable.vacuum() # vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100) # vacuum files not required by versions more than 100 hours old
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
deltaTable.vacuum() // vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100) // vacuum files not required by versions more than 100 hours old
import io.delta.tables.*;
import org.apache.spark.sql.functions;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
deltaTable.vacuum(); // vacuum files not required by versions older than the default retention period
deltaTable.vacuum(100); // vacuum files not required by versions more than 100 hours old
注意
在使用VACUUM时,如需配置Spark并行删除文件(基于shuffle分区数量),请将会话配置"spark.databricks.delta.vacuum.parallelDelete.enabled"设为"true"。
请参阅Delta Lake APIs获取Scala、Java和Python语法详情。
警告
建议您将保留间隔设置为至少7天,
因为旧的快照和未提交的文件可能仍被表的并发读取者或写入者使用。如果VACUUM清理了活动文件,
并发读取可能会失败,更糟糕的是,当VACUUM删除尚未提交的文件时,表可能会损坏。您必须选择一个
比最长运行的并发事务和任何流可能落后于表最新更新的最长时间更长的间隔。
Delta Lake有一个安全检查机制,防止您运行危险的VACUUM命令。如果您确定当前没有对该表执行任何操作所需时间超过您计划指定的保留间隔,您可以通过将Spark配置属性spark.databricks.delta.retentionDurationCheck.enabled设置为false来关闭此安全检查。
库存表
库存表包含一系列文件路径及其大小、类型(是否为目录)以及最后修改时间。当提供INVENTORY选项时,VACUUM将考虑该表中列出的文件,而不是对表目录进行完整列表,这对于非常大的表来说可能非常耗时。库存表可以指定为delta表或给出预期表模式的spark SQL查询。模式应如下所示:
列名 |
类型 |
描述 |
|---|---|---|
路径 |
字符串 |
完整限定URI |
长度 |
整数 |
字节大小 |
isDir |
boolean |
布尔值,表示是否为目录 |
modificationTime |
integer |
文件更新时间,以毫秒为单位 |
检索Delta表历史记录
您可以通过运行history命令获取每次写入Delta表的操作、用户、时间戳等信息。操作记录按时间倒序返回。默认情况下,表历史记录会保留30天。
DESCRIBE HISTORY '/data/events/' -- get the full history of the table
DESCRIBE HISTORY delta.`/data/events/`
DESCRIBE HISTORY '/data/events/' LIMIT 1 -- get the last operation only
DESCRIBE HISTORY eventsTable
请参阅配置SparkSession了解在Apache Spark中启用SQL命令支持的步骤。
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable)
fullHistoryDF = deltaTable.history() # get the full history of the table
lastOperationDF = deltaTable.history(1) # get the last operation
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
val fullHistoryDF = deltaTable.history() // get the full history of the table
val lastOperationDF = deltaTable.history(1) // get the last operation
import io.delta.tables.*;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
DataFrame fullHistoryDF = deltaTable.history(); // get the full history of the table
DataFrame lastOperationDF = deltaTable.history(1); // fetch the last operation on the DeltaTable
查看Delta Lake APIs获取Scala/Java/Python语法详情。
history 操作的输出包含以下列。
列名 |
类型 |
描述 |
|---|---|---|
version |
long |
由操作生成的表版本号。 |
timestamp |
timestamp |
该版本提交的时间。 |
userId |
string |
执行操作的用户ID。 |
userName |
string |
执行操作的用户名称。 |
操作 |
字符串 |
操作的名称。 |
operationParameters |
map |
操作参数(例如谓词条件) |
job |
struct |
运行该操作的作业详情。 |
notebook |
struct |
运行该操作的笔记本详细信息。 |
clusterId |
string |
运行操作的集群ID。 |
readVersion |
long |
执行写入操作时读取的表的版本号。 |
isolationLevel |
string |
此操作使用的隔离级别。 |
isBlindAppend |
boolean |
此操作是否为追加数据。 |
operationMetrics |
map |
操作指标(例如修改的行数和文件数) |
userMetadata |
string |
如果指定了用户定义的提交元数据 |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| null| null| DELETE|[predicate -> ["(...|null| null| null| 4| Serializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| null| null| UPDATE|[predicate -> (id...|null| null| null| 3| Serializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| null| null| DELETE|[predicate -> ["(...|null| null| null| 2| Serializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| null| null| UPDATE|[predicate -> (id...|null| null| null| 1| Serializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| null| null| DELETE|[predicate -> ["(...|null| null| null| 0| Serializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| null| null| WRITE|[mode -> ErrorIfE...|null| null| null| null| Serializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+
注意
某些列可能为空值,因为对应的信息在您的环境中可能不可用。
未来添加的列将始终添加在最后一列之后。
history操作返回一个包含操作指标集合的operationMetrics列映射。
下表按操作列出了映射键的定义。
操作 |
指标名称 |
描述 |
|---|---|---|
写入、创建表并选择、替换表并选择、复制到 |
||
numFiles |
写入的文件数量。 |
|
numOutputBytes |
写入内容的大小(以字节为单位)。 |
|
numOutputRows |
写入的行数。 |
|
流式更新 |
||
numAddedFiles |
新增文件数量。 |
|
numRemovedFiles |
被移除的文件数量。 |
|
numOutputRows |
写入的行数。 |
|
numOutputBytes |
写入的字节大小。 |
|
DELETE |
||
numAddedFiles |
新增文件数量。当删除表的分区时不提供此信息。 |
|
numRemovedFiles |
被移除的文件数量。 |
|
numDeletedRows |
删除的行数。当删除表的分区时不提供此信息。 |
|
numCopiedRows |
在删除文件过程中复制的行数。 |
|
executionTimeMs |
执行整个操作所花费的时间。 |
|
scanTimeMs |
扫描文件以查找匹配项所花费的时间。 |
|
rewriteTimeMs |
重写匹配文件所花费的时间。 |
|
截断 |
||
numRemovedFiles |
被移除的文件数量。 |
|
executionTimeMs |
执行整个操作所花费的时间。 |
|
合并 |
||
numSourceRows |
源DataFrame中的行数。 |
|
numTargetRowsInserted |
插入目标表的行数。 |
|
numTargetRowsUpdated |
目标表中更新的行数。 |
|
numTargetRowsDeleted |
目标表中删除的行数。 |
|
numTargetRowsCopied |
复制的目标行数。 |
|
numOutputRows |
写入的总行数。 |
|
numTargetFilesAdded |
添加到接收端(目标)的文件数量。 |
|
numTargetFilesRemoved |
从接收端(目标)移除的文件数量。 |
|
executionTimeMs |
执行整个操作所花费的时间。 |
|
scanTimeMs |
扫描文件以查找匹配项所花费的时间。 |
|
rewriteTimeMs |
重写匹配文件所花费的时间。 |
|
更新 |
||
numAddedFiles |
新增文件数量。 |
|
numRemovedFiles |
被移除的文件数量。 |
|
numUpdatedRows |
更新的行数。 |
|
numCopiedRows |
在更新文件过程中刚刚复制的行数。 |
|
executionTimeMs |
执行整个操作所花费的时间。 |
|
scanTimeMs |
扫描文件以查找匹配项所花费的时间。 |
|
rewriteTimeMs |
重写匹配文件所花费的时间。 |
|
FSCK |
numRemovedFiles |
已移除的文件数量。 |
CONVERT |
numConvertedFiles |
已转换的Parquet文件数量。 |
优化 |
||
numAddedFiles |
新增文件数量。 |
|
numRemovedFiles |
已优化的文件数量。 |
|
numAddedBytes |
表优化后增加的字节数。 |
|
numRemovedBytes |
移除的字节数。 |
|
minFileSize |
表优化后最小文件的大小。 |
|
p25FileSize |
表优化后第25百分位文件的大小。 |
|
p50FileSize |
表优化后的中位文件大小。 |
|
p75FileSize |
表优化后第75百分位文件的大小。 |
|
maxFileSize |
表优化后最大文件的大小。 |
|
VACUUM |
||
numDeletedFiles |
已删除文件的数量。 |
|
numVacuumedDirectories |
已清理的目录数量。 |
|
numFilesToDelete |
要删除的文件数量。 |
操作 |
指标名称 |
描述 |
|---|---|---|
恢复 |
||
tableSizeAfterRestore |
恢复后的表大小(字节)。 |
|
numOfFilesAfterRestore |
恢复后表中的文件数量。 |
|
numRemovedFiles |
恢复操作移除的文件数量。 |
|
numRestoredFiles |
通过恢复操作新增的文件数量。 |
|
removedFilesSize |
恢复操作删除的文件大小(字节)。 |
|
restoredFilesSize |
通过恢复操作添加的文件大小(以字节为单位)。 |
获取Delta表详细信息
您可以使用DESCRIBE DETAIL获取关于Delta表的详细信息(例如文件数量、数据大小)。
DESCRIBE DETAIL '/data/events/'
DESCRIBE DETAIL eventsTable
请参阅配置SparkSession了解在Apache Spark中启用SQL命令支持的步骤。
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, pathToTable)
detailDF = deltaTable.detail()
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, pathToTable)
val detailDF = deltaTable.detail()
import io.delta.tables.*;
DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable);
DataFrame detailDF = deltaTable.detail();
查看Delta Lake APIs获取Scala/Java/Python语法详情。
此操作的输出只有一行,其模式如下。
列名 |
类型 |
描述 |
|---|---|---|
格式 |
字符串 |
表的格式,即 |
id |
string |
表的唯一ID。 |
name |
string |
元数据存储中定义的表的名称。 |
描述 |
字符串 |
表的描述信息。 |
location |
string |
表的位置。 |
createdAt |
timestamp |
表创建时间。 |
lastModified |
timestamp |
表最后一次修改的时间。 |
partitionColumns |
array of strings |
如果表已分区,则显示分区列的名称。 |
numFiles |
long |
表中最新版本的文件数量。 |
sizeInBytes |
int |
表最新快照的大小,以字节为单位。 |
properties |
string-string map |
为该表设置的所有属性。 |
minReaderVersion |
int |
能够读取该表的最低读取器版本(根据日志协议)。 |
minWriterVersion |
int |
可以写入该表的写入器最低版本(根据日志协议)。 |
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
|format| id| name|description| location| createdAt| lastModified|partitionColumns|numFiles|sizeInBytes|properties|minReaderVersion|minWriterVersion|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
| delta|d31f82d2-a69f-42e...|default.deltatable| null|file:/Users/tuor/...|2020-06-05 12:20:...|2020-06-05 12:20:20| []| 10| 12345| []| 1| 2|
+------+--------------------+------------------+-----------+--------------------+--------------------+-------------------+----------------+--------+-----------+----------+----------------+----------------+
生成清单文件
您可以生成Delta表的清单文件,供其他处理引擎(即非Apache Spark)读取Delta表。例如,要生成可供Presto和Athena读取Delta表的清单文件,请运行以下命令:
GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`
请参阅配置SparkSession了解在Apache Spark中启用SQL命令支持的步骤。
deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);
deltaTable.generate("symlink_format_manifest");
将Parquet表转换为Delta表
将Parquet表原地转换为Delta表。该命令会列出目录中的所有文件,创建一个跟踪这些文件的Delta Lake事务日志,并通过读取所有Parquet文件的页脚自动推断数据模式。如果您的数据是分区的,则必须以DDL格式的字符串(即)指定分区列的模式。
默认情况下,此命令会收集每个文件的统计信息(例如每列的最小值和最大值)。这些统计信息将在查询时用于提供更快的查询速度。您可以在SQL API中使用NO STATISTICS来禁用此统计信息收集。
注意
如果Parquet表是由Structured Streaming创建的,可以通过使用_spark_metadata子目录作为表中所含文件的真实来源来避免文件列表操作,只需将SQL配置spark.databricks.delta.convert.useMetadataLog设置为true。
-- Convert unpartitioned Parquet table at path '<path-to-table>'
CONVERT TO DELTA parquet.`<path-to-table>`
-- Convert unpartitioned Parquet table and disable statistics collection
CONVERT TO DELTA parquet.`<path-to-table>` NO STATISTICS
-- Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
CONVERT TO DELTA parquet.`<path-to-table>` PARTITIONED BY (part int, part2 int)
-- Convert partitioned Parquet table and disable statistics collection
CONVERT TO DELTA parquet.`<path-to-table>` NO STATISTICS PARTITIONED BY (part int, part2 int)
请参阅配置SparkSession了解在Apache Spark中启用SQL命令支持的步骤。
from delta.tables import *
# Convert unpartitioned Parquet table at path '<path-to-table>'
deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")
# Convert partitioned parquet table at path '<path-to-table>' and partitioned by integer column named 'part'
partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int")
import io.delta.tables._
// Convert unpartitioned Parquet table at path '<path-to-table>'
val deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`")
// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
val partitionedDeltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int")
import io.delta.tables.*;
// Convert unpartitioned Parquet table at path '<path-to-table>'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`");
// Convert partitioned Parquet table at path '<path-to-table>' and partitioned by integer columns named 'part' and 'part2'
DeltaTable deltaTable = DeltaTable.convertToDelta(spark, "parquet.`<path-to-table>`", "part int, part2 int");
注意
任何未被Delta Lake跟踪的文件都是不可见的,在运行vacuum时可能会被删除。在转换过程中应避免更新或追加数据文件。表转换完成后,请确保所有写入操作都通过Delta Lake进行。
将Iceberg表转换为Delta表
注意
该功能从Delta Lake 2.3及以上版本开始提供。
如果Iceberg表的底层文件格式是Parquet,您可以原地将其转换为Delta表。与从Parquet表转换类似,这种转换是原地进行的,不会发生任何数据复制或重写。原始Iceberg表和转换后的Delta表拥有独立的历史记录,因此只要不触碰或删除源数据Parquet文件,修改Delta表不会影响Iceberg表。
以下命令基于Iceberg表的原生文件清单、模式和分区信息创建Delta Lake事务日志。除非指定NO STATISTICS,否则转换器在转换过程中还会收集列统计信息。
-- Convert the Iceberg table in the path <path-to-table>.
CONVERT TO DELTA iceberg.`<path-to-table>`
-- Convert the Iceberg table in the path <path-to-table> without collecting statistics.
CONVERT TO DELTA iceberg.`<path-to-table>` NO STATISTICS
重要
使用转换器需要额外的jar包delta-iceberg。例如,bin/spark-sql --packages io.delta:delta-spark_2.12:3.0.0,io.delta:delta-iceberg_2.12:3.0.0:...。
delta-iceberg 目前不适用于 Delta Lake 2.4.0 版本,因为 iceberg-spark-runtime 尚未支持 Spark 3.4。该功能适用于 Delta Lake 2.3.0。
注意
不支持转换Iceberg元存储表。
不支持转换经历过分区演化的Iceberg表。
不支持转换经历过更新、删除或合并的Iceberg merge-on-read表。
将Delta表转换为Parquet表
您可以按照以下步骤轻松将Delta表转换回Parquet表:
如果您执行了可能更改数据文件的Delta Lake操作(例如
delete或merge),请运行保留期为0小时的vacuum以删除不属于表最新版本的所有数据文件。删除表目录中的
_delta_log目录。
将Delta表恢复到早期状态
您可以使用RESTORE命令将Delta表恢复到早期状态。Delta表内部会维护表的历史版本,这使得它可以被恢复到之前的状态。
RESTORE命令支持将表恢复到对应早期状态的版本号,或该状态创建时的时间戳作为选项。
重要
您可以恢复一个已经恢复过的表。
将表恢复到数据文件被手动或通过
vacuum删除的旧版本时会失败。如果设置spark.sql.files.ignoreMissingFiles为true,仍然可以部分恢复到该版本。恢复到早期状态的时间戳格式为
yyyy-MM-dd HH:mm:ss。仅提供日期(yyyy-MM-dd)字符串也是支持的。
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, <path-to-table>) # path-based tables, or
deltaTable = DeltaTable.forName(spark, <table-name>) # Hive metastore-based tables
deltaTable.restoreToVersion(0) # restore table to oldest version
deltaTable.restoreToTimestamp('2019-02-14') # restore to a specific timestamp
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, <path-to-table>)
val deltaTable = DeltaTable.forName(spark, <table-name>)
deltaTable.restoreToVersion(0) // restore table to oldest version
deltaTable.restoreToTimestamp("2019-02-14") // restore to a specific timestamp
import io.delta.tables.*;
DeltaTable deltaTable = DeltaTable.forPath(spark, <path-to-table>);
DeltaTable deltaTable = DeltaTable.forName(spark, <table-name>);
deltaTable.restoreToVersion(0) // restore table to oldest version
deltaTable.restoreToTimestamp("2019-02-14") // restore to a specific timestamp
重要
恢复操作被视为数据变更操作。Delta Lake日志条目由RESTORE命令添加,其中dataChange设置为true。如果存在下游应用程序(例如处理Delta Lake表更新的结构化流作业),则恢复操作添加的数据变更日志条目将被视为新的数据更新,处理这些条目可能会导致数据重复。
例如:
表版本 |
操作 |
Delta日志更新 |
数据变更日志更新中的记录 |
|---|---|---|---|
0 |
INSERT |
AddFile(/path/to/file-1, dataChange = true) |
(name = Viktor, age = 29, (name = George, age = 55) |
1 |
INSERT |
AddFile(/path/to/file-2, dataChange = true) |
(name = George, age = 39) |
2 |
OPTIMIZE |
AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) |
(无记录,因为Optimize压缩不会更改表中的数据) |
3 |
RESTORE(version=1) |
RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) |
(name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
在前面的示例中,RESTORE命令会导致已在读取Delta表版本0和1时看到的更新。如果流式查询正在读取此表,则这些文件将被视为新添加的数据并会再次处理。
RESTORE 操作完成后会以单行DataFrame的形式报告以下指标:
table_size_after_restore: 恢复后表的大小。num_of_files_after_restore: 恢复后表中的文件数量。num_removed_files: 从表中移除(逻辑删除)的文件数量。num_restored_files: 由于回滚操作而恢复的文件数量。removed_files_size: 从表中移除的文件总大小(以字节为单位)。restored_files_size: 已恢复文件的总大小(以字节为单位)。
浅克隆Delta表
注意
该功能从Delta Lake 2.3及以上版本开始提供。
您可以使用shallow clone命令创建现有Delta表在特定版本的浅拷贝。
对浅克隆所做的任何更改仅影响克隆本身,而不会影响源表,只要它们不触及源数据的Parquet文件。
克隆的元数据包括:模式、分区信息、约束条件、可空性。对于浅克隆,流元数据不会被克隆。未被克隆的元数据包括表描述和用户自定义提交元数据。
重要
浅克隆引用源目录中的数据文件。如果您在源表上运行
vacuum,客户端将无法再读取引用的数据文件,并会抛出FileNotFoundException。在这种情况下,使用replace选项在浅克隆上重新运行clone命令可以修复克隆。如果目标路径已存在非Delta表,使用
replace克隆到该目标将创建Delta日志。然后,你可以通过运行vacuum来清理任何现有数据。如果目标路径中已存在Delta表,则会创建一个新的提交,包含来自源表的新元数据和新数据。对于
replace的情况,需要先清空目标表以避免数据重复。克隆表与
Create Table As Select或CTAS不同。浅克隆会获取源表的元数据。克隆语法也更简单:您无需指定分区、格式、约束条件、可空性等,因为这些都继承自源表。克隆表与其源表拥有独立的历史记录。在克隆表上执行时间旅行查询时,不能使用与源表相同的输入参数。例如,若源表当前处于版本100,而我们通过克隆创建新表时,新表的初始版本号为0,因此无法在新表上执行类似
SELECT * FROM tbl AS OF VERSION 99这样的时间旅行查询。
CREATE TABLE delta.`/data/target/` SHALLOW CLONE delta.`/data/source/` -- Create a shallow clone of /data/source at /data/target
CREATE OR REPLACE TABLE db.target_table SHALLOW CLONE db.source_table -- Replace the target. target needs to be emptied
CREATE TABLE IF NOT EXISTS delta.`/data/target/` SHALLOW CLONE db.source_table -- No-op if the target table exists
CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source`
CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` VERSION AS OF version
CREATE TABLE db.target_table SHALLOW CLONE delta.`/data/source` TIMESTAMP AS OF timestamp_expression -- timestamp can be like “2019-01-01” or like date_sub(current_date(), 1)
CLONE 操作完成后会以单行DataFrame的形式报告以下指标:
source_table_size: 被克隆的源表大小(以字节为单位)。source_num_of_files: 源表中的文件数量。
云服务提供商权限
如果您创建的是浅克隆,任何读取该浅克隆的用户都需要具备读取原表文件的权限,因为数据文件仍保留在我们克隆的源表目录中。若要对克隆进行修改,用户需要拥有克隆目录的写入权限。
机器学习流程复现
在进行机器学习时,您可能希望存档用于训练ML模型的特定版本表格。未来的模型可以使用这个存档的数据集进行测试。
-- Trained model on version 15 of Delta table
CREATE TABLE delta.`/model/dataset` SHALLOW CLONE entire_dataset VERSION AS OF 15
在生产表上进行短期实验
要在不破坏生产表的情况下测试工作流,您可以轻松创建一个浅克隆。这样您就可以在包含所有生产数据但不会影响任何生产工作负载的克隆表上运行任意工作流。
-- Perform shallow clone
CREATE OR REPLACE TABLE my_test SHALLOW CLONE my_prod_table;
UPDATE my_test WHERE user_id is null SET invalid=true;
-- Run a bunch of validations. Once happy:
-- This should leverage the update information in the clone to prune to only
-- changed files in the clone if possible
MERGE INTO my_prod_table
USING my_test
ON my_test.user_id <=> my_prod_table.user_id
WHEN MATCHED AND my_test.user_id is null THEN UPDATE *;
DROP TABLE my_test;
表属性覆盖
表属性覆盖特别适用于:
在与不同业务部门共享数据时,为表格添加所有者或用户信息注释。
需要对Delta表进行归档并支持时间旅行功能。您可以单独为归档表指定日志保留期限。例如:
CREATE OR REPLACE TABLE archive.my_table SHALLOW CLONE prod.my_table
TBLPROPERTIES (
delta.logRetentionDuration = '3650 days',
delta.deletedFileRetentionDuration = '3650 days'
)
LOCATION 'xx://archive/my_table'
将Parquet或Iceberg表克隆为Delta表
注意
该功能从Delta Lake 2.3及以上版本开始提供。
Parquet和Iceberg的浅克隆功能结合了用于克隆Delta表以及将表转换为Delta Lake的功能,您可以使用克隆功能将数据从Parquet或Iceberg数据源转换为托管或外部Delta表,语法基本相同。
replace 与 Delta 浅克隆有相同的限制,目标表必须在应用替换前清空。
CREATE OR REPLACE TABLE <target_table_name> SHALLOW CLONE parquet.`/path/to/data`;
CREATE OR REPLACE TABLE <target_table_name> SHALLOW CLONE iceberg.`/path/to/data`;