AWS Redshift Spectrum 连接器

实验性

这是一个实验性集成功能,请谨慎使用。

Delta表可以通过使用清单文件被AWS Redshift Spectrum读取,清单文件是一个文本文件,包含用于查询Delta表的数据文件列表。本文介绍如何使用清单文件设置AWS Redshift Spectrum与Delta Lake的集成并查询Delta表。

设置AWS Redshift Spectrum与Delta Lake集成并查询Delta表

您可以通过以下步骤设置AWS Redshift Spectrum与Delta Lake的集成。

步骤1:使用Apache Spark生成Delta表的清单文件

在位于的Delta表上运行generate操作:

GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`
val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);
deltaTable.generate("symlink_format_manifest");
deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

详情请参阅生成清单文件

generate操作会在/_symlink_format_manifest/路径下生成清单文件。换句话说,该目录中的文件包含了读取Delta表快照时应读取的数据文件(即Parquet文件)名称。

注意

我们建议您在AWS Redshift Spectrum可以直接读取的位置定义Delta表。

步骤2:配置AWS Redshift Spectrum以读取生成的清单文件

在您的AWS Redshift Spectrum环境中运行以下命令。

  1. 在AWS Redshift Spectrum中定义一个使用SymlinkTextInputFormat格式和清单位置/_symlink_format_manifest/的新外部表。

    CREATE EXTERNAL TABLE mytable ([(col_name1 col_datatype1, ...)])
    [PARTITIONED BY (col_name2 col_datatype2, ...)]
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION '<path-to-delta-table>/_symlink_format_manifest/'  -- location of the generated manifest
    

    SymlinkTextInputFormat 配置AWS Redshift Spectrum通过读取清单文件而非使用目录列表来查找数据文件,从而为mytable计算文件分片。请将mytable替换为外部表的名称,并将替换为Delta表的绝对路径。

    重要

    • mytable 必须与Delta表具有相同的模式和分区。

    • PARTITIONED BY 指定的分区列集合必须与非分区列集合不同。此外,不能通过 AS 来指定分区列。

  • 您不能在Apache Spark中使用此表定义;它只能由AWS Redshift Spectrum使用。

  1. 如果Delta表已分区,您必须明确将分区添加到AWS Redshift Spectrum表中。这是必要的,因为分区表的清单本身与表具有相同的目录结构分区。

    • 对于表中的每个分区,在AWS Redshift Spectrum中运行以下操作,可以直接在AWS Redshift Spectrum中运行,或使用AWS CLI或Data API

      ALTER TABLE mytable.redshiftdeltatable ADD IF NOT EXISTS PARTITION (col_name=col_value) LOCATION '/_symlink_format_manifest/col_name=col_value'
      

这些步骤将为您提供Delta表的一致性视图。

步骤3:更新清单

当Delta表中的数据被更新时,您必须使用以下任一方法重新生成清单:

  • 显式更新:在所有数据更新完成后,您可以运行generate操作来更新清单文件。

  • 自动更新: 您可以配置Delta表,使表上的所有写入操作自动更新清单文件。要启用此自动模式,请使用以下SQL命令设置相应的表属性。

    ALTER TABLE delta.`<path-to-delta-table>` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)
    

    要禁用此自动模式,请将此属性设置为false

    注意

    在分区表上启用自动模式后,每个写入操作仅更新该操作所写入分区的对应清单文件。这种增量更新确保写入操作的清单生成开销很低。然而,这也意味着如果其他分区的清单文件已过时,启用自动模式不会自动修复它们。因此,您应该在启用自动模式后立即显式运行GENERATE命令来更新整个表的清单文件。

是自动更新还是显式更新取决于Delta表上写操作的并发性质以及所需的数据一致性。例如,如果启用了自动模式,并发写操作会导致对清单文件的并发覆盖。由于这种无序写入,无法保证清单文件在写操作完成后指向表的最新版本。因此,如果预期会有并发写入并且希望避免过时的清单,应考虑在预期的写操作完成后显式更新清单。

此外,如果您的表已分区,则必须按照前述步骤中描述的相同流程添加任何新分区或移除已删除的分区。

限制

AWS Redshift Spectrum集成在其行为中存在已知的限制。

数据一致性

每当Delta Lake生成更新的清单文件时,它会以原子操作方式覆盖现有的清单文件。因此,AWS Redshift Spectrum总能保持对数据文件的一致性视图——要么看到全部旧版本文件,要么看到全部新版本文件。不过,这种一致性保证的粒度取决于表是否进行了分区。

  • 未分区表: 所有文件名都写入一个以原子方式更新的清单文件中。在这种情况下,AWS Redshift Spectrum将看到完整的表快照一致性。

  • 分区表: 清单文件采用与原始Delta表相同的Hive分区风格目录结构进行分区。这意味着每个分区都是原子更新的,AWS Redshift Spectrum可以看到每个分区的一致视图,但无法看到跨分区的一致视图。此外,由于所有分区的所有清单无法同时更新,并发生成清单的尝试可能导致不同分区拥有不同版本的清单。虽然这种数据变更下的一致性保证比使用Spark读取Delta表要弱,但仍比Parquet等格式更强,因为它们不提供分区级别的一致性。

根据您为Delta表使用的存储系统类型,当AWS Redshift Spectrum在清单文件被重写时并发查询清单,可能会出现不正确的结果。在缺乏原子性文件覆盖实现的文件系统中,清单文件可能会短暂不可用。因此,如果清单更新可能与AWS Redshift Spectrum的查询同时发生,请谨慎使用清单。

性能

这是一个实验性集成,其性能和可扩展性特性尚未经过测试。

模式演进

Delta Lake支持模式演进,对Delta表的查询会自动使用最新模式,而无需依赖Hive元存储中定义的表模式。然而,AWS Redshift Spectrum使用其表定义中指定的模式,在表定义更新为新模式之前,不会使用更新后的模式进行查询。