使用清单实现Presto、Trino和Athena与Delta Lake的集成
重要
Presto、Trino和Athena均原生支持Delta Lake。支持情况如下:
Presto version 0.269 及以上版本原生支持读取Delta Lake表。有关使用原生Delta Lake连接器的详细信息,请参阅Delta Lake Connector - Presto。对于低于0.269的Presto版本,您可以使用本文中基于清单的方法。
Trino version 373 及以上版本原生支持读写Delta Lake表。有关使用原生Delta Lake连接器的详细信息,请参阅Delta Lake Connector - Trino。对于低于version 373的Trino版本,您可以使用本文中详述的基于清单(manifest)的方法。
Athena 版本 3 及以上版本原生支持读取 Delta Lake 表。有关使用原生 Delta Lake 连接器的详细信息,请参阅查询 Delta Lake 表。对于低于版本 3的 Athena 版本,您可以使用本文中详述的基于清单的方法。
Presto、Trino和Athena支持通过清单文件读取外部表,该文本文件包含了查询表时需要读取的数据文件列表。当在Hive元存储中使用清单文件定义外部表时,Presto、Trino和Athena可以直接使用清单中的文件列表,而无需通过目录列表来查找文件。本文介绍了如何使用清单文件设置Presto、Trino和Athena与Delta Lake的集成,并查询Delta表。
设置Presto、Trino或Athena与Delta Lake的集成并查询Delta表
您可以通过以下步骤设置Presto、Trino或Athena与Delta Lake的集成。
步骤1:使用Apache Spark生成Delta表的清单
使用配置了Delta Lake的Spark,在位于的Delta表上运行以下任一命令:
GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`
val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);
deltaTable.generate("symlink_format_manifest");
deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
详情请参阅生成清单文件。
generate命令会在路径下生成清单文件。换句话说,该目录中的文件将包含读取Delta表快照时应读取的数据文件(即Parquet文件)名称。
步骤2:配置Presto、Trino或Athena以读取生成的清单
在连接到Presto、Trino或Athena的Hive元存储中,使用格式
SymlinkTextInputFormat和清单位置定义一个新表。/_symlink_format_manifest/ CREATE EXTERNAL TABLE mytable ([(col_name1 col_datatype1, ...)]) [PARTITIONED BY (col_name2 col_datatype2, ...)] ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION '<path-to-delta-table>/_symlink_format_manifest/' -- location of the generated manifest
SymlinkTextInputFormat配置 Presto、Trino 或 Athena 通过读取清单文件而非使用目录列表查找数据文件来计算mytable的文件分片。将mytable替换为外部表的名称,并将替换为 Delta 表的绝对路径。重要
mytable必须与Delta表具有相同的模式和分区。PARTITIONED BY指定的分区列集合必须与非分区列集合不同。此外,不能通过AS <select-statement>来指定分区列。
您不能在Apache Spark中使用此表定义;它只能由Presto、Trino和Athena使用。
您用来运行命令的工具取决于Apache Spark和Presto、Trino或Athena是否使用相同的Hive元存储。
相同的元存储: 如果Apache Spark和Presto、Trino或Athena使用相同的Hive元存储,您可以使用Apache Spark来定义表。
不同的元存储:如果Apache Spark和Presto、Trino或Athena使用不同的元存储,您必须使用其他工具来定义表。
Athena: 您可以在Athena中定义外部表。
Presto: Presto不支持
CREATE EXTERNAL TABLE ... STORED AS ...语法,因此必须使用连接到与Presto相同元存储的其他工具(例如Spark或Hive)来创建表。
如果Delta表已分区,在生成清单后运行
MSCK REPAIR TABLE mytable以强制元存储(连接到Presto、Trino或Athena)发现分区。这是必要的,因为分区表的清单本身会按照与表相同的目录结构进行分区。请使用创建表时使用的相同工具运行此命令。此外,您应该运行此命令:每次生成清单后: 新分区通常在清单文件更新后立即可见。但过于频繁地执行此操作可能会给Hive元存储带来高负载。
按预期新分区出现的频率: 例如,如果一个表按日期分区,那么你可以在每天午夜之后运行一次修复操作,此时新分区已在表中创建且对应的清单文件已生成。
步骤3:更新清单
当Delta表中的数据被更新时,您必须使用以下任一方法重新生成清单:
显式更新:在所有数据更新完成后,您可以运行
generate操作来更新清单文件。自动更新: 您可以配置Delta表,使表上的所有写入操作都能自动更新清单文件。要启用此自动模式,请使用以下SQL命令设置相应的表属性。
ALTER TABLE delta.`<path-to-delta-table>` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)
要禁用此自动模式,请将此属性设置为
false。此外,对于分区表,您必须运行MSCK REPAIR以确保连接到Presto、Trino或Athena的元存储更新分区信息。注意
在分区表上启用自动模式后,每次写入操作仅更新该操作所写入分区的对应清单文件。这种增量更新确保写入操作的清单生成开销较低。然而,这也意味着如果其他分区的清单文件已过期,启用自动模式不会自动修复它们。因此,建议您在启用自动模式后立即显式运行
GENERATE命令来更新整个表的清单文件。
是自动更新还是显式更新取决于Delta表上写操作的并发性质以及所需的数据一致性。例如,如果启用了自动模式,并发写操作会导致对清单文件的并发覆盖。由于这种无序写入,无法保证清单文件在写操作完成后指向表的最新版本。因此,如果预期会有并发写入并且希望避免过时的清单,应考虑在预期的写操作完成后显式更新清单。
限制
Presto、Trino和Athena集成在其行为中存在已知的限制。
数据一致性
每当Delta Lake生成更新的清单文件时,它会以原子操作方式覆盖现有的清单文件。因此,Presto、Trino和Athena总能查看数据文件的一致性视图;它们要么看到所有旧版本文件,要么看到所有新版本文件。不过,一致性保证的粒度取决于表是否进行了分区。
未分区表: 所有文件名都写入一个以原子方式更新的清单文件中。在这种情况下,Presto、Trino和Athena将看到完整的表快照一致性。
分区表: 清单文件采用与原始Delta表相同的Hive分区风格目录结构进行分区。这意味着每个分区都是原子更新的,Presto、Trino或Athena将看到每个分区的一致视图,但无法看到跨分区的一致视图。此外,由于无法同时更新所有分区的所有清单,并发生成清单的尝试可能导致不同分区拥有不同版本的清单。虽然这种在数据变更下的一致性保证比使用Spark读取Delta表要弱,但它仍然比Parquet等格式更强,因为后者不提供分区级别的一致性。
根据您为Delta表使用的存储系统类型,当Presto、Trino或Athena在清单文件被重写时并发查询清单,可能会得到不正确的结果。在缺乏原子性文件覆盖功能的文件系统实现中,清单文件可能会暂时不可用。因此,如果清单更新可能与Presto、Trino或Athena的查询同时发生,请谨慎使用清单。
性能
大量文件会影响Presto、Trino和Athena的性能。因此建议在生成清单文件前先压缩文件。文件数量不应超过1000个(对于整个未分区表或分区表中的每个分区而言)。
模式演进
Delta Lake支持模式演进,对Delta表的查询会自动使用最新模式,而不管Hive元存储中定义的表模式如何。然而,Presto、Trino或Athena使用的是Hive元存储中定义的模式,在Presto、Trino或Athena使用的表被重新定义为具有更新模式之前,它们不会使用更新后的模式进行查询。
加密表
Athena不支持从CSE-KMS加密表中读取清单。有关最新信息,请参阅AWS文档。