为Delta表使用行追踪功能

行追踪功能使Delta Lake能够追踪Delta Lake表中的行级血缘关系。当在Delta Lake表上启用时,行追踪会向该表添加两个新的元数据字段:

  • 行ID为表中的每一行提供一个唯一标识符。当使用MERGEUPDATE语句修改行时,该行将保持相同的ID。

  • 行提交版本记录了该行最后一次被修改时的表版本。每当使用MERGEUPDATE语句修改行时,都会为该行分配一个新版本。

注意

此功能在Delta Lake 3.2.0及以上版本中可用。在现有非空表上启用此功能需要Delta Lake 3.3.0及以上版本。

启用行跟踪

警告

启用行追踪功能创建的表在创建时会启用Delta Lake的行追踪表特性,并使用Delta Lake写入器版本7。表协议版本无法降级,且不支持所有已启用Delta Lake写入器协议表特性的Delta Lake客户端无法写入启用了行追踪的表。参见Delta Lake如何管理特性兼容性?

您必须通过以下方法之一显式启用行跟踪:

  • 新建表: 在CREATE TABLE命令中设置表属性delta.enableRowTracking = true

    -- 创建一个空表
    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES ('delta.enableRowTracking' = 'true');
    
    -- 使用CTAS语句
    CREATE TABLE course_new
    TBLPROPERTIES ('delta.enableRowTracking' = 'true')
    AS SELECT * FROM course_old;
    
    -- 使用LIKE语句复制配置
    CREATE TABLE graduate LIKE student;
    
    -- 使用CLONE语句复制配置
    CREATE TABLE graduate CLONE student;
    
  • 现有表: 从Delta 3.3版本开始支持,在ALTER TABLE命令中设置表属性'delta.enableRowTracking' = 'true'

    ALTER TABLE grade SET TBLPROPERTIES ('delta.enableRowTracking' = 'true');
    
  • 所有新表: 在当前会话中使用SET命令设置配置spark.databricks.delta.properties.defaults.enableRowTracking = true

    SET spark.databricks.delta.properties.defaults.enableRowTracking = true;
    
    spark.conf.set("spark.databricks.delta.properties.defaults.enableRowTracking", True)
    
    spark.conf.set("spark.databricks.delta.properties.defaults.enableRowTracking", true)
    

重要

由于克隆Delta Lake表会创建独立的历史记录,克隆表上的行ID和行提交版本与原始表不匹配。

重要

在现有表上启用行跟踪将自动为表中的所有现有行分配行ID和行提交版本。此过程可能会导致创建表的多个新版本,并可能需要较长时间。

行追踪存储

启用行跟踪可能会增加表的大小。Delta Lake将行跟踪元数据字段存储在数据文件中的隐藏元数据列中。某些操作(例如仅插入操作)不使用这些隐藏列,而是使用Delta Lake日志中的元数据来跟踪行ID和行提交版本。数据重组操作(如OPTIMIZEREORG)会导致行ID和行提交版本使用隐藏元数据列进行跟踪,即使它们最初是使用元数据存储的。

读取行追踪元数据字段

行跟踪添加了以下元数据字段,在读取表时可以访问:

列名

类型

_metadata.row_id

长整型

该行的唯一标识符。

_metadata.row_commit_version

长整型

该行最后插入或更新时的表版本。

读取表时,行ID和行提交版本元数据字段不会自动包含在内。相反,必须从Apache Spark中所有表都提供的隐藏_metadata列中手动选择这些元数据字段。

SELECT _metadata.row_id, _metadata.row_commit_version, * FROM table_name;
spark.read.table("table_name") \
  .select("_metadata.row_id", "_metadata.row_commit_version", "*")
spark.read.table("table_name")
  .select("_metadata.row_id", "_metadata.row_commit_version", "*")

禁用行跟踪

可以禁用行跟踪以减少元数据字段的存储开销。禁用行跟踪后,元数据字段仍然可用,但所有行在被操作修改时都会被分配一个新的ID和提交版本号。

ALTER TABLE table_name SET TBLPROPERTIES (delta.enableRowTracking = false);

重要

禁用行跟踪不会移除相应的表功能,也不会降低表的协议版本。

限制

存在以下限制:

  • 在读取变更数据馈送时无法访问行ID和行提交版本元数据字段。

  • 一旦将行追踪功能添加到表中,如果不重新创建表就无法移除该功能。