Snowflake连接器

访问Snowflake Delta Lake支持文档以使用该连接器。

重要

读取Delta表时如果Snowflake性能缓慢该怎么办?

社区中有部分用户反馈,与TrinoSpark不同,Snowflake在读取Delta表时并未利用Delta statistics实现数据跳过功能。由于该缺陷,Snowflake可能会读取大量不必要的parquet文件,导致查询性能下降,并增加云服务提供商的API调用请求。

如果您是Snowflake客户并已订阅其企业支持服务,请提交支持工单

作为一种变通方案,您可以启用Delta UniForm来生成Iceberg元数据,并将这些表作为Iceberg表从Snowflake读取。

Delta表可以通过清单文件被Snowflake读取,清单文件是一个文本文件,包含用于查询Delta表的数据文件列表。本文介绍如何使用清单文件设置Delta Lake与Snowflake的集成,并查询Delta表。

设置Delta Lake与Snowflake的集成并查询Delta表

您可以通过以下步骤设置Delta Lake与Snowflake的集成。

步骤1:使用Apache Spark生成Delta表的清单

在位于<path-to-delta-table>的Delta表上运行generate操作:

GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`
val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")
DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);
deltaTable.generate("symlink_format_manifest");
deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

详情请参阅生成清单文件

generate操作会在<path-to-delta-table>/_symlink_format_manifest/路径下生成清单文件。换句话说,该目录中的文件包含了读取Delta表快照时应读取的数据文件(即Parquet文件)名称。

注意

我们建议您在Snowflake可以直接读取的位置定义Delta表。

步骤2:配置Snowflake以读取生成的清单文件

在您的Snowflake环境中运行以下命令。

在清单文件上定义外部表

要在Snowflake中定义一个外部表,您必须首先定义一个外部stage my_staged_table,该stage指向Delta表。在Snowflake中运行以下命令。

create or replace stage my_staged_table url='<path-to-delta-table>'

替换为Delta表的完整路径。通过这个阶段,您可以定义一个表delta_manifest_table,该表读取清单文件中指定的文件名如下:

CREATE OR REPLACE EXTERNAL TABLE delta_manifest_table(
    filename VARCHAR AS split_part(VALUE:c1, '/', -1)
  )
  WITH LOCATION = @my_staged_table/_symlink_format_manifest/
  FILE_FORMAT = (TYPE = CSV)
  PATTERN = '.*[/]manifest'
  AUTO_REFRESH = true;

注意

在这个查询中:

  • 该位置是清单子目录。

  • filename 列包含清单中定义的文件名(非完整路径)。

在Parquet文件上定义外部表

您可以定义一个表my_parquet_data_table,用于读取Delta表中所有的Parquet文件。

CREATE OR REPLACE EXTERNAL TABLE my_parquet_data_table(
    id INT AS (VALUE:id::INT),
    part INT AS (VALUE:part::INT),
    ...
    parquet_filename VARCHAR AS split_part(metadata$filename, '/', -1)
  )
  WITH LOCATION = @my_staged_table/
  FILE_FORMAT = (TYPE = PARQUET)
  PATTERN = '.*[/]part-[^/]*[.]parquet'
  AUTO_REFRESH = true;

注意

在这个查询中:

  • 该位置是Delta表的路径。

  • parquet_filename 列包含存储表中每行数据的文件名。

如果你的Delta表是分区的,那么你必须在表定义中显式提取分区值。例如,如果表是按名为part的单个整数列分区的,你可以按如下方式提取这些值:

CREATE OR REPLACE EXTERNAL TABLE my_parquet_data_partitioned_table(
    id INT AS (VALUE:id::INT),
    part INT AS (
        nullif(
            regexp_replace (metadata$filename, '.*part\\=(.*)\\/.*', '\\1'),
            '__HIVE_DEFAULT_PARTITION__'
        )::INT
    ),
    ...
    parquet_filename VARCHAR AS split_part(metadata$filename, '/', -1),
  )
  WITH LOCATION = @my_staged_partitioned_table/
  FILE_FORMAT = (TYPE = PARQUET)
  PATTERN = '.*[/]part-[^/]*[.]parquet'
  AUTO_REFRESH = true;

正则表达式用于提取列part的分区值。

将此Parquet表作为Delta表查询会产生不正确的结果,因为此查询会读取表中的所有Parquet文件,而不仅仅是那些定义表一致性快照的文件。您可以使用清单表来获取一致性快照数据。

定义视图以使用清单表获取Delta表的正确内容

要仅读取属于生成清单中定义的快照一致性的行,您可以应用筛选器,只保留Parquet表中来自清单表定义文件的行。

CREATE OR REPLACE VIEW my_delta_table AS
    SELECT id, part, ...
    FROM my_parquet_data_table
    WHERE parquet_filename IN (SELECT filename FROM delta-manifest-table);

查询此视图将为您提供Delta表的一致性视图。

步骤3:更新清单

当Delta表中的数据被更新时,您必须使用以下任一方法重新生成清单:

  • 显式更新:在所有数据更新完成后,您可以运行generate操作来更新清单文件。

  • 自动更新: 您可以配置Delta表,使对该表的所有写入操作自动更新清单。要启用此自动模式,请使用以下SQL命令设置相应的表属性。

    ALTER TABLE delta.`<path-to-delta-table>` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)
    

限制

Snowflake集成在其行为中存在已知的限制。

数据一致性

每当Delta Lake生成更新的清单文件时,它会以原子操作方式覆盖现有的清单文件。因此,Snowflake始终能看到数据文件的一致性视图——要么看到所有旧版本文件,要么看到所有新版本文件。不过,一致性保证的粒度取决于表是否进行了分区。

  • 未分区表: 所有文件名都写入一个以原子方式更新的清单文件中。在这种情况下,Snowflake将看到完整的表快照一致性。

  • 分区表: 清单文件采用与原始Delta表相同的Hive分区风格目录结构进行分区。这意味着每个分区都是原子性更新的,Snowflake可以看到每个分区的一致性视图,但无法看到跨分区的一致性视图。此外,由于所有分区的清单无法同时更新,并发生成清单的尝试可能导致不同分区拥有不同版本的清单。

根据您为Delta表使用的存储系统类型,当Snowflake在清单文件被重写的同时并发查询清单时,可能会出现结果不正确的情况。在缺乏原子性文件覆盖功能的文件系统实现中,清单文件可能会暂时不可用。因此,如果清单更新可能与Snowflake的查询时间重合,请谨慎使用清单。

性能

这是一个实验性集成,其性能和可扩展性特性尚未经过测试。

模式演进

Delta Lake支持模式演进,对Delta表的查询会自动使用最新模式,而不管Hive元存储中定义的表模式如何。然而,Snowflake使用其表定义中定义的模式,在表定义更新为新模式之前,不会使用更新后的模式进行查询。