Snowflake连接器
访问Snowflake Delta Lake支持文档以使用该连接器。
重要
读取Delta表时如果Snowflake性能缓慢该怎么办?
社区中有部分用户反馈,与Trino或Spark不同,Snowflake在读取Delta表时并未利用Delta statistics实现数据跳过功能。由于该缺陷,Snowflake可能会读取大量不必要的parquet文件,导致查询性能下降,并增加云服务提供商的API调用请求。
如果您是Snowflake客户并已订阅其企业支持服务,请提交支持工单。
作为一种变通方案,您可以启用Delta UniForm来生成Iceberg元数据,并将这些表作为Iceberg表从Snowflake读取。
Delta表可以通过清单文件被Snowflake读取,清单文件是一个文本文件,包含用于查询Delta表的数据文件列表。本文介绍如何使用清单文件设置Delta Lake与Snowflake的集成,并查询Delta表。
设置Delta Lake与Snowflake的集成并查询Delta表
您可以通过以下步骤设置Delta Lake与Snowflake的集成。
步骤1:使用Apache Spark生成Delta表的清单
在位于<path-to-delta-table>的Delta表上运行generate操作:
GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`
val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);
deltaTable.generate("symlink_format_manifest");
deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
详情请参阅生成清单文件。
generate操作会在<path-to-delta-table>/_symlink_format_manifest/路径下生成清单文件。换句话说,该目录中的文件包含了读取Delta表快照时应读取的数据文件(即Parquet文件)名称。
注意
我们建议您在Snowflake可以直接读取的位置定义Delta表。
步骤2:配置Snowflake以读取生成的清单文件
在您的Snowflake环境中运行以下命令。
在清单文件上定义外部表
要在Snowflake中定义一个外部表,您必须首先定义一个外部stage my_staged_table,该stage指向Delta表。在Snowflake中运行以下命令。
create or replace stage my_staged_table url='<path-to-delta-table>'
将替换为Delta表的完整路径。通过这个阶段,您可以定义一个表delta_manifest_table,该表读取清单文件中指定的文件名如下:
CREATE OR REPLACE EXTERNAL TABLE delta_manifest_table(
filename VARCHAR AS split_part(VALUE:c1, '/', -1)
)
WITH LOCATION = @my_staged_table/_symlink_format_manifest/
FILE_FORMAT = (TYPE = CSV)
PATTERN = '.*[/]manifest'
AUTO_REFRESH = true;
注意
在这个查询中:
该位置是清单子目录。
filename列包含清单中定义的文件名(非完整路径)。
在Parquet文件上定义外部表
您可以定义一个表my_parquet_data_table,用于读取Delta表中所有的Parquet文件。
CREATE OR REPLACE EXTERNAL TABLE my_parquet_data_table(
id INT AS (VALUE:id::INT),
part INT AS (VALUE:part::INT),
...
parquet_filename VARCHAR AS split_part(metadata$filename, '/', -1)
)
WITH LOCATION = @my_staged_table/
FILE_FORMAT = (TYPE = PARQUET)
PATTERN = '.*[/]part-[^/]*[.]parquet'
AUTO_REFRESH = true;
注意
在这个查询中:
该位置是Delta表的路径。
parquet_filename列包含存储表中每行数据的文件名。
如果你的Delta表是分区的,那么你必须在表定义中显式提取分区值。例如,如果表是按名为part的单个整数列分区的,你可以按如下方式提取这些值:
CREATE OR REPLACE EXTERNAL TABLE my_parquet_data_partitioned_table(
id INT AS (VALUE:id::INT),
part INT AS (
nullif(
regexp_replace (metadata$filename, '.*part\\=(.*)\\/.*', '\\1'),
'__HIVE_DEFAULT_PARTITION__'
)::INT
),
...
parquet_filename VARCHAR AS split_part(metadata$filename, '/', -1),
)
WITH LOCATION = @my_staged_partitioned_table/
FILE_FORMAT = (TYPE = PARQUET)
PATTERN = '.*[/]part-[^/]*[.]parquet'
AUTO_REFRESH = true;
正则表达式用于提取列part的分区值。
将此Parquet表作为Delta表查询会产生不正确的结果,因为此查询会读取表中的所有Parquet文件,而不仅仅是那些定义表一致性快照的文件。您可以使用清单表来获取一致性快照数据。
定义视图以使用清单表获取Delta表的正确内容
要仅读取属于生成清单中定义的快照一致性的行,您可以应用筛选器,只保留Parquet表中来自清单表定义文件的行。
CREATE OR REPLACE VIEW my_delta_table AS
SELECT id, part, ...
FROM my_parquet_data_table
WHERE parquet_filename IN (SELECT filename FROM delta-manifest-table);
查询此视图将为您提供Delta表的一致性视图。
限制
Snowflake集成在其行为中存在已知的限制。
数据一致性
每当Delta Lake生成更新的清单文件时,它会以原子操作方式覆盖现有的清单文件。因此,Snowflake始终能看到数据文件的一致性视图——要么看到所有旧版本文件,要么看到所有新版本文件。不过,一致性保证的粒度取决于表是否进行了分区。
未分区表: 所有文件名都写入一个以原子方式更新的清单文件中。在这种情况下,Snowflake将看到完整的表快照一致性。
分区表: 清单文件采用与原始Delta表相同的Hive分区风格目录结构进行分区。这意味着每个分区都是原子性更新的,Snowflake可以看到每个分区的一致性视图,但无法看到跨分区的一致性视图。此外,由于所有分区的清单无法同时更新,并发生成清单的尝试可能导致不同分区拥有不同版本的清单。
根据您为Delta表使用的存储系统类型,当Snowflake在清单文件被重写的同时并发查询清单时,可能会出现结果不正确的情况。在缺乏原子性文件覆盖功能的文件系统实现中,清单文件可能会暂时不可用。因此,如果清单更新可能与Snowflake的查询时间重合,请谨慎使用清单。