存储配置

Delta Lake的ACID保证基于存储系统的原子性和持久性保证。具体来说,Delta Lake在与存储系统交互时依赖以下要素:

  • 原子可见性: 必须有一种方式让文件要么完全可见,要么完全不可见。

  • 互斥性: 只有一个写入者能够在最终目的地创建(或重命名)文件。

  • 一致的列表: 一旦文件被写入目录,该目录的所有未来列表操作都必须返回该文件。

由于存储系统本身不一定能提供所有这些保证,Delta Lake的事务操作通常通过LogStore API而非直接访问存储系统。为了为不同存储系统提供ACID保证,您可能需要使用不同的LogStore实现。本文介绍如何为各种存储系统配置Delta Lake。存储系统分为两大类:

  • 内置支持的存储系统: 对于某些存储系统,您不需要额外配置。Delta Lake 使用路径的 scheme(例如 s3as3a://path 中)来动态识别存储系统,并使用相应的 LogStore 实现来提供事务保证。不过对于 S3,并发写入存在额外注意事项。详情请参阅 关于 S3 的部分

  • 其他存储系统:与Apache Spark类似,LogStore使用Hadoop FileSystem API进行读写操作。因此Delta Lake支持在任何实现了FileSystem API的存储系统上进行并发读取。对于具有事务保证的并发写入,根据FileSystem实现提供的保证分为两种情况。如果该实现提供一致的列表和不覆盖的原子重命名(即rename(... , overwrite = false)会原子性地生成目标文件,或者如果目标已存在则抛出java.nio.file.FileAlreadyExistsException失败),那么使用重命名的默认LogStore实现将允许具有保证的并发写入。否则,您必须通过设置以下Spark配置来配置自定义的LogStore实现。

    spark.delta.logStore.<scheme>.impl=<full-qualified-class-name>
    

    其中是您存储系统路径的方案。这将配置Delta Lake动态地仅对这些路径使用给定的LogStore实现。您可以在应用程序中为不同方案配置多个此类配置,从而允许同时从不同存储系统读取和写入。

    注意

    • 本地文件系统上的Delta Lake可能不支持并发事务写入。这是因为本地文件系统可能无法提供原子重命名操作。因此,您不应使用本地文件系统来测试并发写入。

    • 在1.0版本之前,Delta Lake支持通过设置spark.delta.logStore.class来配置LogStores。这种方法现已弃用。设置此配置将对所有路径使用配置的LogStore,从而禁用基于动态方案的委托。

在本文中:

Amazon S3

Delta Lake支持以两种不同模式读写S3:单集群和多集群。

单集群

多集群

配置

开箱即用Delta Lake

处于实验阶段,需要额外配置

读取

支持从多个集群并发读取

支持从多个集群并发读取

写入

支持来自单个Spark驱动程序的并发写入

支持多集群写入

权限

S3凭证

S3和DynamoDB操作权限

单集群设置(默认)

在默认模式下,Delta Lake支持从多个集群并发读取数据,但对S3的并发写入必须来自单个Spark驱动节点,以便Delta Lake能够提供事务保证。这是因为S3目前不提供互斥机制,也就是说无法确保只有一个写入者能够创建文件。

警告

多个Spark驱动程序并发写入S3存储上的同一Delta表可能导致数据丢失。关于多集群解决方案,请参阅下方的多集群设置章节。

需求 (S3单集群)

  • S3凭证:IAM角色(推荐)或访问密钥

  • 与相应Delta Lake版本关联的Apache Spark。

  • 适用于Apache Spark编译版本的Hadoop AWS连接器(hadoop-aws)

快速入门 (S3 单集群)

本节介绍如何在单集群模式下快速开始读写S3上的Delta表。有关配置的详细说明,请参阅设置配置(S3多集群)

  1. 使用以下命令启动一个支持Delta Lake和S3的Spark shell(假设您使用的是预构建为Hadoop 3.3.4的Spark 3.5.3版本):

    bin/spark-shell \
     --packages io.delta:delta-spark_2.12:3.3.1,org.apache.hadoop:hadoop-aws:3.3.4 \
     --conf spark.hadoop.fs.s3a.access.key= \
     --conf spark.hadoop.fs.s3a.secret.key=
    
  2. 尝试在S3上执行一些基本的Delta表操作(Scala语言):

    // 在S3上创建Delta表:
    spark.range(5).write.format("delta").save("s3a:///")
    
    // 读取S3上的Delta表:
    spark.read.format("delta").load("s3a:///").show()
    

如需了解其他语言及Delta表操作的更多示例,请参阅Quickstart页面。

为了高效列出S3上的Delta Lake元数据文件,请设置配置delta.enableFastS3AListFrom=true。此性能优化处于实验性支持模式,仅适用于S3A文件系统,不适用于Amazon的EMR默认文件系统S3

  bin/spark-shell \
    --packages io.delta:delta-spark_2.12:3.3.1,org.apache.hadoop:hadoop-aws:3.3.4 \
    --conf spark.hadoop.fs.s3a.access.key=<your-s3-access-key> \
    --conf spark.hadoop.fs.s3a.secret.key=<your-s3-secret-key> \
    --conf "spark.hadoop.delta.enableFastS3AListFrom=true

配置 (S3 单集群)

以下是配置Delta Lake以使用S3的步骤。

  1. 在类路径中包含 hadoop-aws JAR包。

    Delta Lake需要来自 hadoop-aws 包的 org.apache.hadoop.fs.s3a.S3AFileSystem 类,该类实现了Hadoop针对S3的 FileSystem API。请确保该包的版本与构建Spark时使用的Hadoop版本相匹配。

  2. 设置S3凭证。

    我们推荐使用IAM角色进行认证和授权。但如果想使用密钥,这里有一种设置Hadoop配置的方法(使用Scala):

    sc.hadoopConfiguration.set("fs.s3a.access.key", "")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "")
    

多集群设置

注意

此支持是新的且处于实验阶段。

此模式支持从多个集群并发写入S3,必须通过配置Delta Lake使用正确的LogStore实现来显式启用。该实现利用DynamoDB来提供S3所缺乏的互斥机制。

警告

该多集群写入解决方案仅在所有写入器都使用此LogStore实现以及相同的DynamoDB表和区域时才安全。如果某些驱动程序使用开箱即用的Delta Lake,而其他驱动程序使用此实验性LogStore,则可能发生数据丢失。

需求 (S3多集群)

快速入门 (S3 多集群)

本节介绍如何通过多集群模式快速开始读写S3上的Delta表。

  1. 使用以下命令启动一个支持Delta Lake和S3的Spark shell(假设您使用的是预构建了Hadoop 3.3.4的Spark 3.5.3版本):

    bin/spark-shell \
     --packages io.delta:delta-spark_2.12:3,org.apache.hadoop:hadoop-aws:3.3.4,io.delta:delta-storage-s3-dynamodb:3.3.1 \
     --conf spark.hadoop.fs.s3a.access.key= \
     --conf spark.hadoop.fs.s3a.secret.key= \
     --conf spark.delta.logStore.s3a.impl=io.delta.storage.S3DynamoDBLogStore \
     --conf spark.io.delta.storage.S3DynamoDBLogStore.ddb.region=us-west-2
    
  2. 在S3上尝试一些基本的Delta表操作(使用Scala):

    // 在S3上创建Delta表:
    spark.range(5).write.format("delta").save("s3a:///")
    
    // 读取S3上的Delta表:
    spark.read.format("delta").load("s3a:///").show()
    

设置配置 (S3多集群)

  1. 创建DynamoDB表。

    您可以选择自行创建DynamoDB表(推荐)或让系统自动为您创建。

    • 自行创建DynamoDB表

      该DynamoDB表将维护多个Delta表的提交元数据,配置合适的读取/写入容量模式(例如按需或预配置)对您的使用场景至关重要。因此,我们强烈建议您自行创建DynamoDB表。以下示例使用AWS CLI。了解更多信息,请参阅create-table命令参考。

      aws dynamodb create-table \
        --region us-east-1 \
        --table-name delta_log \
        --attribute-definitions AttributeName=tablePath,AttributeType=S \
                                AttributeName=fileName,AttributeType=S \
        --key-schema AttributeName=tablePath,KeyType=HASH \
                     AttributeName=fileName,KeyType=RANGE \
        --billing-mode PAY_PER_REQUEST
      

      注意:一旦选择了table-nameregion,您必须在每个Spark会话中指定它们,才能使这种多集群模式正常工作。请参阅下表。

    • 自动创建DynamoDB表

      尽管如此,在指定这个LogStore实现后,如果默认的DynamoDB表尚不存在,那么它将自动为您创建。这个默认表支持每秒5次强一致性读取和5次写入。您可以使用下表中详述的仅用于表创建的配置键来更改这些默认值。

  2. 按照配置(S3单集群)部分列出的配置步骤进行操作。

  3. 在类路径中包含 delta-storage-s3-dynamodb JAR 文件。

  4. 在您的Spark会话中配置LogStore实现。

    首先,为方案s3配置这个LogStore实现。您也可以为方案s3as3n复制此命令。

    spark.delta.logStore.s3.impl=io.delta.storage.S3DynamoDBLogStore
    

    接下来,指定实例化DynamoDB客户端所需的额外信息。为了让这种多集群模式正常工作,您必须在每个Spark会话中使用相同的tableNameregion来实例化DynamoDB客户端。以下是每个会话的配置列表及其默认值:

    配置键

    描述

    默认值

    spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName

    要使用的DynamoDB表名称

    delta_log

    spark.io.delta.storage.S3DynamoDBLogStore.ddb.region

    客户端要使用的区域

    us-east-1

    spark.io.delta.storage.S3DynamoDBLogStore.credentials.provider

    客户端使用的AWSCredentialsProvider*

    DefaultAWSCredentialsProviderChain

    spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.rcu

    (仅表创建时**) 读取容量单位

    5

    spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.wcu

    (仅表创建**) 写入容量单位

    5

    • *有关AWS凭证提供程序的更多详细信息,请参阅AWS文档

    • **这些配置仅在给定的DynamoDB表尚不存在且需要自动创建时使用。

生产配置(S3多集群)

至此,这个多集群设置已经完全可运行。不过,在生产环境中运行时,您还可以进行额外的配置以提高性能并优化存储。

  1. Adjust your Read and Write Capacity Mode.

    If you are using the default DynamoDB table created for you by this LogStore implementation, its default RCU and WCU might not be enough for your workloads. You can adjust the provisioned throughput or update to On-Demand Mode.

  2. 使用生存时间(TTL)清理旧的DynamoDB条目。

    当一个DynamoDB元数据条目被标记为完成后,经过足够时间使得我们现在可以仅依赖S3来防止对其对应的Delta文件意外覆盖时,就可以安全地从DynamoDB中删除该条目。最经济的方式是使用DynamoDB的TTL功能,这是一种免费的自动化方式来从DynamoDB表中删除项目。

    在指定的DynamoDB表上运行以下命令以启用TTL:

    aws dynamodb update-time-to-live \
      --region us-east-1 \
      --table-name delta_log \
      --time-to-live-specification "Enabled=true, AttributeName=expireTime"
    

    默认的expireTime将是DynamoDB条目被标记为完成后的1天。

  3. 使用S3生命周期过期规则清理旧的AWS S3临时文件。

    在这个LogStore实现中,会创建一个临时文件,其中包含要提交到Delta日志的元数据副本。一旦完成对Delta日志的提交,并且在删除相应的DynamoDB条目后,就可以安全地删除这个临时文件。实际上,在恢复失败的提交时,只会使用最新的临时文件。

    以下是删除这些临时文件的两种简单方法。

    1. 使用S3 CLI手动删除。

      这是最安全的选择。以下命令将删除指定

      中除最新临时文件外的所有文件:

      aws s3 ls s3:////_delta_log/.tmp/ --recursive | awk 'NF>1{print $4}' | grep . | sort | head -n -1  | while read -r line ; do
          echo "正在删除 ${line}"
          aws s3 rm s3:////_delta_log/.tmp/${line}
      done
      
    2. 使用S3生命周期过期规则删除

      一个更自动化的选项是使用S3生命周期过期规则,将过滤器前缀指向表路径中的/_delta_log/.tmp/文件夹,并设置过期时间为30天。

      注意:选择一个足够大的过期值非常重要。如上所述,最新的临时文件将在恢复失败提交时使用。如果此临时文件被删除,那么您的DynamoDB表和S3 /_delta_log/.tmp/文件夹将不同步。

      有多种方法可以配置存储桶生命周期配置,AWS文档中此处有描述。

      一种方法是使用S3的put-bucket-lifecycle-configuration命令。详情请参阅S3生命周期配置。下面给出一个示例规则和命令调用:

      在名为file://lifecycle.json的文件中:

      {
        "Rules":[
          {
            "ID":"expire_tmp_files",
            "Filter":{
              "Prefix":"path/to/table/_delta_log/.tmp/"
            },
            "Status":"Enabled",
            "Expiration":{
              "Days":30
            }
          }
        ]
      }
      
      aws s3api put-bucket-lifecycle-configuration \
        --bucket my-bucket \
        --lifecycle-configuration file://lifecycle.json
      

注意

AWS S3可能对每个存储桶的规则数量有限制。详情请参阅PutBucketLifecycleConfiguration

Microsoft Azure 存储

Delta Lake 内置支持各种 Azure 存储系统,可为来自多个集群的并发读写提供完整的事务保证。

Delta Lake依赖Hadoop FileSystem API来访问Azure存储服务。具体来说,Delta Lake要求FileSystem.rename()的实现必须是原子性的,这一特性仅在较新的Hadoop版本中支持(Hadoop-15156Hadoop-15086)。因此,您可能需要使用较新的Hadoop版本构建Spark,并将其用于部署应用程序。关于使用特定Hadoop版本构建Spark,请参阅Specifying the Hadoop Version and Enabling YARN;关于设置Spark与Delta Lake,请参阅Quickstart

以下是每种Azure存储系统的具体要求列表:

Azure Blob storage

要求 (Azure Blob 存储)

例如,一个可行的组合是Delta 0.7.0或更高版本,配合使用Hadoop 3.2编译和部署的Apache Spark 3.0。

配置 (Azure Blob存储)

以下是配置Delta Lake在Azure Blob存储上的步骤。

  1. 在类路径中包含 hadoop-azure JAR。有关版本详情,请参阅上述要求。

  2. 设置凭证。

    您可以在Spark配置属性中设置凭证。

    建议使用SAS令牌。在Scala中,可以使用以下代码:

    spark.conf.set(
      "fs.azure.sas...blob.core.windows.net",
       "")
    

    或者可以指定账户访问密钥:

    spark.conf.set(
      "fs.azure.account.key..blob.core.windows.net",
       "")
    

使用方法 (Azure Blob存储)

spark.range(5).write.format("delta").save("wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<path-to-delta-table>")
spark.read.format("delta").load("wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<path-to-delta-table>").show()

Azure Data Lake Storage Gen1

要求 (ADLS Gen1)

  • 用于OAuth 2.0访问的服务主体

  • Delta Lake 0.2.0 或更高版本

  • Hadoop的Azure Data Lake Storage Gen1库支持以下版本部署:

    • Hadoop 2的2.9.1+版本

    • Hadoop 3的3.0.1+版本

  • 与对应Delta Lake版本兼容的Apache Spark(参见相关Delta版本文档的快速入门页面)以及使用与所选Hadoop库兼容的Hadoop版本编译

例如,一个可行的组合是Delta 0.7.0或更高版本,配合使用基于Hadoop 3.2编译部署的Apache Spark 3.0。

配置 (ADLS Gen1)

以下是配置Delta Lake在Azure Data Lake Storage Gen1上的步骤。

  1. 在类路径中包含hadoop-azure-datalake JAR。有关版本详情,请参阅上述要求。

  2. 设置Azure Data Lake Storage Gen1凭证。

    您可以使用以下Hadoop配置设置您的凭证(Scala语言):

    spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")
    spark.conf.set("dfs.adls.oauth2.client.id", "")
    spark.conf.set("dfs.adls.oauth2.credential", "")
    spark.conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com//oauth2/token")
    

使用方式 (ADLS Gen1)

spark.range(5).write.format("delta").save("adl://<your-adls-account>.azuredatalakestore.net/<path-to-delta-table>")

spark.read.format("delta").load("adl://<your-adls-account>.azuredatalakestore.net/<path-to-delta-table>").show()

Azure Data Lake Storage Gen2

要求 (ADLS Gen2)

  • Azure Data Lake Storage Gen2中创建的账户

  • 服务主体已创建被分配存储Blob数据贡献者角色给存储账户。

    • 注意记录存储账户名称、目录ID(也称为租户ID)、应用程序ID以及主体的密码。这些将用于配置Spark。

  • Delta Lake 0.7.0 或更高版本

  • Apache Spark 3.0 或更高版本

  • 使用的Apache Spark必须基于Hadoop 3.2或更高版本构建。

例如,一个可行的组合是Delta 0.7.0或更高版本,配合使用基于Hadoop 3.2编译部署的Apache Spark 3.0。

配置 (ADLS Gen2)

以下是配置Delta Lake在Azure Data Lake Storage Gen1上的步骤。

  1. 在类路径中包含Maven构件hadoop-azure-datalake的JAR包。版本详情请参阅requirements。此外,您可能还需要包含Maven构件hadoop-azurewildfly-openssl的JAR包。

  2. 设置Azure Data Lake Storage Gen2凭证。

     spark.conf.set("fs.azure.account.auth.type..dfs.core.windows.net", "OAuth")
     spark.conf.set("fs.azure.account.oauth.provider.type..dfs.core.windows.net",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
     spark.conf.set("fs.azure.account.oauth2.client.id..dfs.core.windows.net", "")
     spark.conf.set("fs.azure.account.oauth2.client.secret..dfs.core.windows.net","")
     spark.conf.set("fs.azure.account.oauth2.client.endpoint..dfs.core.windows.net", "https://login.microsoftonline.com//oauth2/token")
    

    其中是我们之前设置为需求的服务主体详细信息。

  3. 如果需要的话初始化文件系统

spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
dbutils.fs.ls("abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")

使用方法 (ADLS Gen2)

spark.range(5).write.format("delta").save("abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-delta-table>")

spark.read.format("delta").load("abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-delta-table>").show()

其中 是容器下的文件系统名称。

HDFS

Delta Lake 内置支持HDFS,可在多集群并发读写时提供完整的事务保证。有关配置凭证的详细信息,请参阅Hadoop和Spark文档。

Google Cloud Storage

您必须配置Delta Lake以使用正确的LogStore来从GCS并发读写。

要求 (GCS)

配置 (GCS)

  1. 对于 Delta Lake 1.2.0 及以下版本,您必须为 gs 方案显式配置 LogStore 实现。

    spark.delta.logStore.gs.impl=io.delta.storage.GCSLogStore
    
  2. 在类路径中包含gcs-connector的JAR文件。有关如何配置Spark与GCS的详细信息,请参阅文档

使用方式 (GCS)

spark.range(5, 10).write.format("delta").mode("append").save("gs://<bucket-name>/<path-to-delta-table>")

spark.read.format("delta").load("gs://<bucket-name>/<path-to-delta-table>").show()

Oracle Cloud Infrastructure

注意

此支持是新的且处于实验阶段。

您需要配置Delta Lake以使用正确的LogStore来实现并发读写。

配置 (OCI)

  1. 为方案 oci 配置 LogStore 实现。

    spark.delta.logStore.oci.impl=io.delta.storage.OracleCloudLogStore
    
  2. 在类路径中包含delta-contribshadoop-oci-connector的JAR包。有关如何使用OCI配置Spark的详细信息,请参阅Using the HDFS Connector with Spark

  3. 按照文档中的说明设置OCI对象存储凭证。

使用方式 (OCI)

spark.range(5).write.format("delta").save("oci://<ociBucket>@<ociNameSpace>/<path-to-delta-table>")

spark.read.format("delta").load("oci://<ociBucket>@<ociNameSpace>/<path-to-delta-table>").show()

IBM Cloud Object Storage

注意

此支持是新的且处于实验阶段。

您需要配置Delta Lake以使用正确的LogStore来实现并发读写。

需求 (IBM)

配置 (IBM)

  1. 为方案 cos 配置 LogStore 实现。

    spark.delta.logStore.cos.impl=io.delta.storage.IBMCOSLogStore
    
  2. 在类路径中包含delta-contribsStocator的JAR文件。

  3. 通过以下Hadoop配置属性来配置支持原子写入的Stocator

    fs.stocator.scheme.list=cos
    fs.cos.impl=com.ibm.stocator.fs.ObjectStoreFileSystem
    fs.stocator.cos.impl=com.ibm.stocator.fs.cos.COSAPIClient
    fs.stocator.cos.scheme=cos
    fs.cos.atomic.write=true
    
  4. 设置IBM COS凭证。以下示例使用名为service的服务访问密钥(Scala语言):

    sc.hadoopConfiguration.set("fs.cos.service.endpoint", "")
    sc.hadoopConfiguration.set("fs.cos.service.access.key", "")
    sc.hadoopConfiguration.set("fs.cos.service.secret.key", "")
    

使用方式 (IBM)

spark.range(5).write.format("delta").save("cos://<your-cos-bucket>.service/<path-to-delta-table>")
spark.read.format("delta").load("cos://<your-cos-bucket>.service/<path-to-delta-table>").show()