使用清单实现Presto、Trino和Athena与Delta Lake的集成

重要

Presto、Trino和Athena均原生支持Delta Lake。支持情况如下:

  • Presto version 0.269 及以上版本原生支持读取Delta Lake表。有关使用原生Delta Lake连接器的详细信息,请参阅Delta Lake Connector - Presto。对于低于0.269的Presto版本,您可以使用本文中基于清单的方法。

  • Trino version 373 及以上版本原生支持读写Delta Lake表。有关使用原生Delta Lake连接器的详细信息,请参阅Delta Lake Connector - Trino。对于低于version 373的Trino版本,您可以使用本文中详述的基于清单(manifest)的方法。

  • Athena 版本 3 及以上版本原生支持读取 Delta Lake 表。有关使用原生 Delta Lake 连接器的详细信息,请参阅查询 Delta Lake 表。对于低于版本 3的 Athena 版本,您可以使用本文中详述的基于清单的方法。

Presto、Trino和Athena支持通过清单文件读取外部表,该文本文件包含了查询表时需要读取的数据文件列表。当在Hive元存储中使用清单文件定义外部表时,Presto、Trino和Athena可以直接使用清单中的文件列表,而无需通过目录列表来查找文件。本文介绍了如何使用清单文件设置Presto、Trino和Athena与Delta Lake的集成,并查询Delta表。

设置Presto、Trino或Athena与Delta Lake的集成并查询Delta表

您可以通过以下步骤设置Presto、Trino或Athena与Delta Lake的集成。

步骤1:使用Apache Spark生成Delta表的清单

使用配置了Delta Lake的Spark,在位于的Delta表上运行以下任一命令:

GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`
val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);
deltaTable.generate("symlink_format_manifest");
deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

详情请参阅生成清单文件

generate命令会在/_symlink_format_manifest/路径下生成清单文件。换句话说,该目录中的文件将包含读取Delta表快照时应读取的数据文件(即Parquet文件)名称。

步骤2:配置Presto、Trino或Athena以读取生成的清单

  1. 在连接到Presto、Trino或Athena的Hive元存储中,使用格式SymlinkTextInputFormat和清单位置/_symlink_format_manifest/定义一个新表。

    CREATE EXTERNAL TABLE mytable ([(col_name1 col_datatype1, ...)])
    [PARTITIONED BY (col_name2 col_datatype2, ...)]
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION '<path-to-delta-table>/_symlink_format_manifest/'  -- location of the generated manifest
    

    SymlinkTextInputFormat 配置 Presto、Trino 或 Athena 通过读取清单文件而非使用目录列表查找数据文件来计算 mytable 的文件分片。将 mytable 替换为外部表的名称,并将 替换为 Delta 表的绝对路径。

    重要

    • mytable 必须与Delta表具有相同的模式和分区。

    • PARTITIONED BY 指定的分区列集合必须与非分区列集合不同。此外,不能通过 AS <select-statement> 来指定分区列。

  • 您不能在Apache Spark中使用此表定义;它只能由Presto、Trino和Athena使用。

    您用来运行命令的工具取决于Apache Spark和Presto、Trino或Athena是否使用相同的Hive元存储。

    • 相同的元存储: 如果Apache Spark和Presto、Trino或Athena使用相同的Hive元存储,您可以使用Apache Spark来定义表。

  • 不同的元存储:如果Apache Spark和Presto、Trino或Athena使用不同的元存储,您必须使用其他工具来定义表。

    • Athena: 您可以在Athena中定义外部表。

    • Presto: Presto不支持CREATE EXTERNAL TABLE ... STORED AS ...语法,因此必须使用连接到与Presto相同元存储的其他工具(例如Spark或Hive)来创建表。

  1. 如果Delta表已分区,在生成清单后运行MSCK REPAIR TABLE mytable以强制元存储(连接到Presto、Trino或Athena)发现分区。这是必要的,因为分区表的清单本身会按照与表相同的目录结构进行分区。请使用创建表时使用的相同工具运行此命令。此外,您应该运行此命令:

    • 每次生成清单后: 新分区通常在清单文件更新后立即可见。但过于频繁地执行此操作可能会给Hive元存储带来高负载。

    • 按预期新分区出现的频率: 例如,如果一个表按日期分区,那么你可以在每天午夜之后运行一次修复操作,此时新分区已在表中创建且对应的清单文件已生成。

步骤3:更新清单

当Delta表中的数据被更新时,您必须使用以下任一方法重新生成清单:

  • 显式更新:在所有数据更新完成后,您可以运行generate操作来更新清单文件。

  • 自动更新: 您可以配置Delta表,使表上的所有写入操作都能自动更新清单文件。要启用此自动模式,请使用以下SQL命令设置相应的表属性。

    ALTER TABLE delta.`<path-to-delta-table>` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)
    

    要禁用此自动模式,请将此属性设置为false。此外,对于分区表,您必须运行MSCK REPAIR以确保连接到Presto、Trino或Athena的元存储更新分区信息。

    注意

    在分区表上启用自动模式后,每次写入操作仅更新该操作所写入分区的对应清单文件。这种增量更新确保写入操作的清单生成开销较低。然而,这也意味着如果其他分区的清单文件已过期,启用自动模式不会自动修复它们。因此,建议您在启用自动模式后立即显式运行GENERATE命令来更新整个表的清单文件。

是自动更新还是显式更新取决于Delta表上写操作的并发性质以及所需的数据一致性。例如,如果启用了自动模式,并发写操作会导致对清单文件的并发覆盖。由于这种无序写入,无法保证清单文件在写操作完成后指向表的最新版本。因此,如果预期会有并发写入并且希望避免过时的清单,应考虑在预期的写操作完成后显式更新清单。

限制

Presto、Trino和Athena集成在其行为中存在已知的限制。

数据一致性

每当Delta Lake生成更新的清单文件时,它会以原子操作方式覆盖现有的清单文件。因此,Presto、Trino和Athena总能查看数据文件的一致性视图;它们要么看到所有旧版本文件,要么看到所有新版本文件。不过,一致性保证的粒度取决于表是否进行了分区。

  • 未分区表: 所有文件名都写入一个以原子方式更新的清单文件中。在这种情况下,Presto、Trino和Athena将看到完整的表快照一致性。

  • 分区表: 清单文件采用与原始Delta表相同的Hive分区风格目录结构进行分区。这意味着每个分区都是原子更新的,Presto、Trino或Athena将看到每个分区的一致视图,但无法看到跨分区的一致视图。此外,由于无法同时更新所有分区的所有清单,并发生成清单的尝试可能导致不同分区拥有不同版本的清单。虽然这种在数据变更下的一致性保证比使用Spark读取Delta表要弱,但它仍然比Parquet等格式更强,因为后者不提供分区级别的一致性。

根据您为Delta表使用的存储系统类型,当Presto、Trino或Athena在清单文件被重写时并发查询清单,可能会得到不正确的结果。在缺乏原子性文件覆盖功能的文件系统实现中,清单文件可能会暂时不可用。因此,如果清单更新可能与Presto、Trino或Athena的查询同时发生,请谨慎使用清单。

性能

大量文件会影响Presto、Trino和Athena的性能。因此建议在生成清单文件前先压缩文件。文件数量不应超过1000个(对于整个未分区表或分区表中的每个分区而言)。

模式演进

Delta Lake支持模式演进,对Delta表的查询会自动使用最新模式,而不管Hive元存储中定义的表模式如何。然而,Presto、Trino或Athena使用的是Hive元存储中定义的模式,在Presto、Trino或Athena使用的表被重新定义为具有更新模式之前,它们不会使用更新后的模式进行查询。

加密表

Athena不支持从CSE-KMS加密表中读取清单。有关最新信息,请参阅AWS文档。