与云基础设施的集成

介绍

所有主要的云服务提供商都提供持久的数据存储在 对象存储 中。 这些并不是经典的“POSIX”文件系统。为了存储数百PB的数据而没有任何单点故障,对象存储用一个更简单的模型 object-name => data 替代了经典的文件系统目录树。为了启用远程访问,通常以(缓慢的)HTTP REST操作提供对对象的操作。

Spark 可以通过在 Hadoop 中实现的文件系统连接器或由基础设施供应商提供的连接器读取和写入对象存储中的数据。这些连接器使得对象存储看起来 几乎 像文件系统,具有目录和文件,以及对它们的经典操作,如列出、删除和重命名。

重要:云对象存储不是实际文件系统

虽然这些存储看起来像文件系统,但实际上它们仍然是对象存储, 而这个区别是显著的

它们不能直接替代集群文件系统,例如 HDFS 除非在此明确说明

关键区别是:

这对Spark有何影响?

  1. 读取和写入数据的速度可能显著慢于使用普通文件系统。
  2. 某些目录结构在查询拆分计算期间可能非常低效。
  3. Spark 通常在保存 RDD、DataFrame 或 Dataset 时使用的基于重命名的算法可能既慢又不可靠。

出于这些原因,直接使用对象存储作为查询的目标或作为查询链中的中间存储并不总是安全的。请查阅对象存储及其连接器的文档,以确定哪些使用方式被认为是安全的。

一致性

截至2021年,亚马逊(S3)、谷歌云(GCS)和微软(Azure Storage,ADLS Gen1,ADLS Gen2)的对象存储都是 一致的

这意味着一旦文件被写入/更新,它就可以被其他进程列出、查看和打开--并且将检索到最新版本。这是AWS S3的一个已知问题,特别是对于在对象创建之前发出的HEAD请求的404缓存。

即便如此:没有任何存储连接器提供关于它们的客户端如何处理在流读取时被覆盖的对象的保证。请不要假设旧文件可以安全读取,也不要假设更改可见所需的时间是有限的——实际上,如果正在读取的文件被覆盖,客户端可能会直接失败。

因此:避免覆盖已知/可能有其他客户端正在积极读取的文件。

其他对象存储是 不一致的

这包括 OpenStack Swift

这样的商店并不总是安全可用作为工作目的地 - 请查阅每个商店的具体文档。

安装

在类路径上有相关库,并且Spark使用有效凭证进行配置后,可以通过使用其URLs作为数据路径来读取或写入对象。例如 sparkContext.textFile("s3a://landsat-pds/scene_list.gz") 将创建一个存储在S3中的文件 scene_list.gz 的RDD,使用s3a连接器。

要将相关库添加到应用程序的类路径中,请包含 hadoop-cloud 模块及其依赖项。

在Maven中,将以下内容添加到 pom.xml 文件中,假设 spark.version 设置为所选的Spark版本:


  ...
  
org.apache.spark
spark-hadoop-cloud_2.12
${spark.version}
提供

  ...

基于 Apache Spark 的商业产品通常直接设置类路径与云基础设施进行交互,在这种情况下,可能不需要该模块。

正在验证

Spark作业必须向对象存储进行认证,以访问其中的数据。

  1. 当 Spark 在云基础设施中运行时,凭据通常会自动设置。
  2. spark-submit 会读取 AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY AWS_SESSION_TOKEN 环境变量,并为 s3n s3a 连接器设置相关的身份验证选项,以访问 Amazon S3。
  3. 在 Hadoop 集群中,设置可能会在 core-site.xml 文件中进行配置。
  4. 身份验证详细信息可以手动添加到 Spark 配置的 spark-defaults.conf 文件中。
  5. 另外,它们可以在用于配置应用程序的 SparkContext SparkConf 实例中以编程方式设置。

重要提示:绝不要将认证密钥检查到源代码仓库中,尤其是公共仓库

请查阅 Hadoop 文档 以获取相关的配置和安全选项。

配置

每个云连接器都有自己的一套配置参数,请参考相关文档。

对于一致性模型意味着基于重命名的提交是安全的对象存储,请使用 FileOutputCommitter v2 算法以获得性能; v1 以保证安全。

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2

这在作业结束时进行的重命名少于“版本 1”算法。由于它仍然使用 rename() 来提交文件,当对象存储没有一致的元数据/列表时,使用它是不安全的。

提交者还可以被设置为在清理临时文件时忽略失败;这减少了临时网络问题升级为作业失败的风险:

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true

原始的 v1 提交算法将成功任务的输出重命名为作业尝试目录,然后在作业提交阶段将该目录中的所有文件重命名为最终目标:

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1

在 Amazon S3 上模仿重命名的性能缓慢使得这个算法非常非常慢。对此推荐的解决方案是切换到 S3 “零重命名”提交器(见下文)。

作为参考,以下是不同存储和连接器在重命名目录时的性能和安全特性:

商店 连接器 目录重命名安全性 重命名性能
Amazon S3 s3a 不安全 O(data)
Azure Storage wasb 安全 O(files)
Azure Datalake Gen 2 abfs 安全 O(1)
Google Cloud Storage gs 混合 O(files)
  1. 由于存储临时文件可能会产生费用;定期删除名为 "_temporary" 的目录。
  2. 对于 AWS S3,设置一个多部分上传可以保持待处理的时间限制。这可以避免因未完成的上传而产生账单。
  3. 对于 Google cloud,目录重命名是逐文件进行的。考虑使用 v2 提交者,并且仅编写生成幂等输出的代码——包括文件名,因为它 不再不安全 于 v1 提交者,并且更快。

Parquet I/O 设置

在处理Parquet数据时,为了获得最佳性能,请使用以下设置:

spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

这些最小化了查询期间读取的数据量。

ORC I/O 设置

在处理ORC数据时,为获得最佳性能,请使用以下设置:

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true

再次,这些最小化了查询期间读取的数据量。

Spark Streaming与对象存储

Spark Streaming可以通过创建一个 FileInputDStream 来监控添加到对象存储中的文件,监控存储中的一个路径,方法是调用 StreamingContext.textFileStream()

  1. 扫描新文件的时间与路径下的文件数量成正比,而不是与 文件的数量成正比,因此这可能会变成一个缓慢的操作。窗口的大小需要进行设置以处理此问题。

  2. 文件只有在完全写入后才会出现在对象存储中;不需要一个写入然后重命名的工作流来确保在文件仍在写入时不会被拾取。应用程序可以直接写入被监视的目录。

  3. 在默认的检查点文件管理器 FileContextBasedCheckpointFileManager 的情况下,流只能检查点到实现快速且原子 rename() 操作的存储。否则,检查点可能会很慢并且潜在不可靠。在使用Hadoop 3.3.1或更高版本的AWS S3上,使用S3A连接器可以使用可终止的基于流的检查点文件管理器(通过将 spark.sql.streaming.checkpointFileManagerClass 配置设置为 org.apache.spark.internal.io.cloud.AbortableStreamBasedCheckpointFileManager ),这消除了缓慢的重命名。在这种情况下,用户必须格外小心,以避免在多个并行运行的查询之间重用检查点位置,因为这可能导致检查点数据的损坏。

安全快速地将工作提交到云存储。

如前所述,在任何表现出最终一致性的对象存储上(例如:S3),通过重命名提交是危险的,并且通常比经典文件系统重命名慢。

某些对象存储连接器提供自定义的提交器,以便在不使用重命名的情况下提交任务和作业。

Hadoop S3A 提交者

在使用Hadoop 3.1或更高版本构建的Spark版本中, hadoop-aws JAR包含可以安全使用的提交者, 用于通过s3a连接器访问S3存储。

这些提交者将文件写入最终目标,而不是将数据写入存储的临时目录以进行重命名,但不会发出最终的POST命令以使大规模的“多部分”上传可见。这些操作被推迟到作业提交本身。因此,任务和作业提交更快,并且任务失败不会影响结果。

要切换到 S3A 提交者,请使用构建了 Hadoop 3.1 或更高版本的 Spark,并通过以下选项来切换提交者。

spark.hadoop.fs.s3a.committer.name 目录
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

它已通过Spark支持的最常见格式进行了测试。

mydataframe.write.format("parquet").save("s3a://bucket/destination")

有关这些提交者的更多细节,请参见 最新的Hadoop文档 ,S3A提交者的详细信息包含在 将工作提交到S3与S3A提交者 中。

注意:根据所使用的提交者,使用Hadoop 3.3.1之前的版本时,进行中的统计数据可能会被低报。

Amazon EMR: EMRFS S3 优化提交器

Amazon EMR 有自己针对 parquet 数据的 S3 识别提交者。有关使用说明,请参见 EMRFS S3 优化提交者

有关实现和性能的详细信息,请参见 [“Improve Apache Spark write performance on Apache Parquet formats with the EMRFS S3-optimized committer”](https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/)

Azure 和 Google 云存储:MapReduce 中间清单提交者。

在2022年9月之后发布的hadoop-mapreduce-core JAR版本(3.3.5及更高版本)包含一个针对Azure ADLS Generation 2和Google Cloud Storage的性能和韧性进行了优化的提交者。

这个提交者,即“清单提交者”,使用清单文件将任务提交者的目录列表信息传播到作业提交者。 这些清单可以原子性地写入,而不依赖于原子目录重命名,这在GCS中是缺失的。

作业提交者读取这些清单,并将任务输出目录中的文件直接重命名到目标目录,支持并行处理,并可选择性地限制速率以避免IO瓶颈。这为对象存储提供了性能和可扩展性。

在使用 Azure 存储时,这对作业的正确性并不是至关重要的;经典的 FileOutputCommitter 在那里是安全的 - 然而,这个新的提交器对大型作业和深层宽目录树的扩展性更好。

因为 Google GCS 不支持原子目录重命名,应该在可用时使用清单提交器。

此提交者支持“动态分区覆盖”(见下文)。

有关此提交者的可用性和使用的详细信息,请参考用于该Hadoop版本的Hadoop文档。

在Hadoop 3.3.4或更早版本中不可用。

IBM云对象存储:Stocator

IBM 为 IBM Cloud Object Storage 和 OpenStack Swift 提供了 Stocator 输出提交器。

源代码、文档和发布信息可以在 https://github.com/CODAIT/stocator 找到。

云提交者与 INSERT OVERWRITE TABLE

Spark 具有一个称为“动态分区覆盖”的功能;一个表可以被更新,只有那些添加了新数据的分区,其内容将被替换。

这用于形式为 INSERT OVERWRITE TABLE 的 SQL 语句中,且当数据集以“overwrite”模式写入时

eventDataset.write
.mode("覆盖")
.partitionBy("年", "月")
.format("parquet")
.save(tablePath)

这个功能使用文件重命名,并且对提交者和文件系统有特定要求:

  1. 提交者的工作目录必须位于目标文件系统中。
  2. 目标文件系统必须高效地支持文件重命名。

这些条件是 被 S3A 提交者和 AWS S3 存储满足的。

其他云存储的提交者 可能 支持此功能,并向Spark声明他们是兼容的。如果在通过hadoop提交者写入数据时需要动态分区覆盖,Spark将在使用原始 FileOutputCommitter 时始终允许此操作。对于其他提交者,在它们实例化之后,Spark将探测它们的兼容性声明,如果声明它们是兼容的,则允许该操作。

如果提交者不兼容,则操作将失败,并显示错误消息 PathOutputCommitter does not support dynamicPartitionOverwrite

除非有一个与目标文件系统兼容的提交者,否则唯一的解决方案是使用适合云的格式来存储数据。

进一步阅读

这里是关于Apache和云服务提供商的标准连接器的文档。

云提交者问题及与Hive兼容的解决方案