面向Azure和谷歌云存储的Manifest Committer工具

本文档介绍如何使用Manifest Committer

Manifest提交器是一种工作提交器,它能在ABFS上为“真实世界”查询提供性能优化,并在GCS上保证性能和正确性。它也能与其他文件系统(包括HDFS)协同工作。不过,该设计主要针对对象存储进行了优化,因为在这些存储系统中,列表操作既缓慢又昂贵。

提交器的架构和实现详见Manifest Committer Architecture

协议及其正确性在Manifest Committer Protocol中有详细说明。

该功能于2022年3月添加。截至2024年4月,已发现的问题包括:* 大规模内存使用问题 * 目录删除的可扩展性问题 * 任务提交重命名失败的恢复能力问题

That is: the core algorithms is correct, but task commit robustness was insufficient to some failure conditions. And scale is always a challenge, even with components tested through large TPC-DS test runs.

问题:

从Spark向Azure ADLS Gen 2存储“abfs://”安全提交工作的唯一提交器是“v1文件提交器”。

从某种意义上说这是“正确的”,因为如果任务尝试失败,其输出保证不会包含在最终结果中。而“v2”提交算法无法满足这一保证,这就是它不再是默认选项的原因。

但是:它很慢,尤其是在使用深层输出目录树的任务中。为什么这么慢?很难确定具体原因,主要是因为FileOutputCommitter中缺乏任何监控机制。运行任务的堆栈跟踪通常显示rename()操作,不过列表操作也会出现。

在Google GCS上,无论是v1还是v2算法都不安全,因为谷歌文件系统不具备v1算法所需的原子目录重命名功能。

另一个问题是,Azure和GCS存储在删除包含大量子项的目录时可能会遇到扩展性问题。这可能导致超时,因为FileOutputCommitter假设作业完成后的清理操作只需快速调用delete("_temporary", true)即可。

解决方案。

Intermediate Manifest提交器是一种新的提交器,旨在为ABFS上的"真实世界"查询提供性能优化,并确保在GCS上的性能和正确性。

该提交器利用了为S3A提交器引入的扩展点。用户可以为abfs://gcs:// URL声明新的提交器工厂。经过适当配置的spark部署将自动采用这个新提交器。

作业清理中的目录性能问题可以通过以下选项解决:1. 提交者将在删除_temporary目录之前并行删除任务尝试目录。2. 在并行尝试之前先尝试删除_temporary目录。3. 可以抑制异常,使清理失败不会导致作业失败。4. 可以禁用清理功能。

提交器(committer)可与任何具有"真正"文件重命名(rename())操作的文件系统客户端配合使用。它针对远程对象存储进行了优化,因为这些场景下列表和文件探测操作成本高昂——该设计在HDFS上可能不会带来如此显著的加速效果,不过相比经典的v1算法,并行重命名操作仍将提升作业执行速度。

工作原理

完整细节请参阅Manifest Committer架构

使用提交器

为支持S3A提交器而添加的钩子设计初衷是允许每个文件系统模式提供自己的提交器。详见Switching To an S3A Committer

abfs 方案的工厂将在 mapreduce.outputcommitter.factory.scheme.abfs 中定义;类似的还有 gcs 的方案。

需要进行一些匹配的spark配置更改,特别是针对parquet绑定的配置。如果这些配置未在mapred-default.xml JAR中定义,可以在core-site.xml中进行设置。

<property>
  <name>mapreduce.outputcommitter.factory.scheme.abfs</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value>
</property>
<property>
  <name>mapreduce.outputcommitter.factory.scheme.gs</name>
  <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value>
</property>

在Spark中绑定到manifest提交器。

在Apache Spark中,配置可以通过命令行选项(在--conf之后)或使用spark-defaults.conf文件来完成。以下是一个使用spark-defaults.conf的示例,还包括了Parquet的配置,其中使用了parquet提交器的子类,该子类在内部使用工厂机制。

spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

使用Cloudstore的committerinfo命令探查提交者绑定关系。

hadoop提交器设置可以在最新版本的cloudstore及其committerinfo命令中进行验证。该命令通过与MR和spark作业相同的工厂机制为指定路径实例化一个提交器,然后打印其toString值。

hadoop jar cloudstore-1.0.jar committerinfo abfs://testing@ukwest.dfs.core.windows.net/

2021-09-16 19:42:59,731 [main] INFO  commands.CommitterInfo (StoreDurationInfo.java:<init>(53)) - Starting: Create committer
Committer factory for path abfs://testing@ukwest.dfs.core.windows.net/ is
 org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory@3315d2d7
  (classname org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory)
2021-09-16 19:43:00,897 [main] INFO  manifest.ManifestCommitter (ManifestCommitter.java:<init>(144)) - Created ManifestCommitter with
   JobID job__0000, Task Attempt attempt__0000_r_000000_1 and destination abfs://testing@ukwest.dfs.core.windows.net/
Created committer of class org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter:
 ManifestCommitter{ManifestCommitterConfig{destinationDir=abfs://testing@ukwest.dfs.core.windows.net/,
   role='task committer',
   taskAttemptDir=abfs://testing@ukwest.dfs.core.windows.net/_temporary/manifest_job__0000/0/_temporary/attempt__0000_r_000000_1,
   createJobMarker=true,
   jobUniqueId='job__0000',
   jobUniqueIdSource='JobID',
   jobAttemptNumber=0,
   jobAttemptId='job__0000_0',
   taskId='task__0000_r_000000',
   taskAttemptId='attempt__0000_r_000000_1'},
   iostatistics=counters=();

gauges=();

minimums=();

maximums=();

means=();
}

验证提交者是否被使用

新的提交者将在_SUCCESS文件中写入包含统计信息的操作JSON摘要。

如果该文件存在且长度为零字节:则使用了经典的FileOutputCommitter

如果该文件存在且大小超过零字节,则表明使用了清单提交器(manifest committer),或者在S3A文件系统的情况下使用了某个S3A提交器。它们都采用相同的JSON格式。

配置选项

以下是提交器的主要配置选项。

选项 含义 默认值
mapreduce.manifest.committer.delete.target.files Delete target files? false
mapreduce.manifest.committer.io.threads Thread count for parallel operations 64
mapreduce.manifest.committer.summary.report.directory directory to save reports. ""
mapreduce.manifest.committer.cleanup.parallel.delete Delete temporary directories in parallel true
mapreduce.manifest.committer.cleanup.parallel.delete.base.first Attempt to delete the base directory before parallel task attempts false
mapreduce.fileoutputcommitter.cleanup.skipped Skip cleanup of _temporary directory false
mapreduce.fileoutputcommitter.cleanup-failures.ignored Ignore errors during cleanup false
mapreduce.fileoutputcommitter.marksuccessfuljobs Create a _SUCCESS marker file on successful completion. (and delete any existing one in job setup) true

更多内容请参阅(高级)[#advanced]部分。

扩展作业 mapreduce.manifest.committer.io.threads

该提交器比经典的FileOutputCommitter更快的核心原因是它尝试在作业提交期间尽可能并行化文件IO操作,具体表现为:

  • 任务清单加载
  • 删除将要创建目录的文件
  • 目录创建
  • 逐个文件重命名
  • 在作业清理中删除任务尝试目录

这些操作都在同一个线程池中执行,其大小通过选项mapreduce.manifest.committer.io.threads设置。

可以使用更大的值。

Hadoop XML配置

<property>
  <name>mapreduce.manifest.committer.io.threads</name>
  <value>32</value>
</property>

spark-defaults.conf 文件中

spark.hadoop.mapreduce.manifest.committer.io.threads 32

大于分配给MapReduce AM或Spark Driver核心数的值不会直接导致CPU过载,因为这些线程通常正在等待对对象存储/文件系统的(缓慢)IO操作完成。

任务提交时的清单加载可能会占用大量内存;线程数量越多,同时加载的清单数量也越多。

注意事项 * 在Spark中,同一个进程可能会提交多个作业,每个作业在提交或清理时都会创建自己的线程池。 * 如果对存储发起过多IO请求,可能会触发Azure的速率限制。通过设置速率限制选项mapreduce.manifest.committer.io.rate可以帮助避免这种情况。

可选:在作业提交时删除目标文件

经典的 FileOutputCommitter 会在将作业文件重命名到位之前,先删除目标路径中的文件。

这在清单提交器中是可选的,通过选项mapreduce.manifest.committer.delete.target.files设置,默认值为false

这可以提高性能,并且当作业创建的所有文件都具有唯一文件名时使用是安全的。

SPARK-8406 在输出文件名中添加UUID以避免意外覆盖以来,Apache Spark确实会为ORC和Parquet生成唯一的文件名

避免检查/删除目标文件可以为每个提交的文件节省一次删除调用,从而显著减少存储IO的开销。

在向现有表追加数据时,如果使用的不是ORC和parquet格式,除非能确保每个文件名都添加了唯一标识符,否则应启用目标文件删除功能。

spark.hadoop.mapreduce.manifest.committer.delete.target.files true

注1:提交者在创建目标重命名目录时会跳过删除操作。这使得效率略有提升,尤其当追加数据的作业正在创建并写入新分区时。

注2:提交者仍然要求单个作业内的任务创建唯一的文件。这是任何作业生成正确数据的基础。

Spark动态分区覆盖

Spark有一个名为“动态分区覆盖”的功能,

这可以在SQL中启动

INSERT OVERWRITE TABLE ...

或者通过DataSet写入,其中模式为overwrite且分区与现有表的分区匹配

sparkConf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
// followed by an overwrite of a Dataset into an existing partitioned table.
eventData2
  .write
  .mode("overwrite")
  .partitionBy("year", "month")
  .format("parquet")
  .save(existingDir)

该功能在Spark中实现,具体步骤如下:1. 将作业的新数据定向写入临时目录 1. 作业提交完成后,扫描输出以识别写入数据的叶子目录"分区" 1. 删除目标表中这些目录的内容 1. 将新文件重命名到对应分区中。

这一切都是在spark中完成的,它负责扫描中间输出树、删除分区以及重命名新文件的任务。

此功能还增加了作业将数据完全写入目标表之外的能力,具体实现方式为:1. 将新文件写入工作目录 2. 由spark在作业提交时将这些文件移动到最终目的地

清单提交器(manifest committer)兼容Azure和谷歌云存储上的动态分区覆盖操作,因为它们共同满足该扩展的核心要求:1. getWorkPath()返回的工作目录与最终输出位于同一文件系统中。2. rename()是一个O(1)操作,在提交作业时使用既安全又快速。

所有S3A提交器均不支持此功能。暂存提交器不满足条件(1),而S3本身不满足条件(2)。

要使用清单提交器(manifest committer)进行动态分区覆盖,spark版本必须包含SPARK-40034PathOutputCommitters以支持动态分区覆盖功能

请注意,如果重命名的文件数量较多,该操作的重命名阶段会比较慢——这是按顺序执行的。并行重命名可以加快速度,但可能会触发abfs过载问题,而manifest提交器的设计正是为了最小化此类风险并支持从中恢复

提交操作的spark端将列出/遍历临时输出目录(会有一些开销),然后进行文件提升,通过经典的rename()文件系统调用来完成。这里不会有明确的速率限制。

这是什么意思?

这意味着在Azure Storage上执行SQL查询/Spark数据集操作时,如果会创建成千上万的文件,就不应使用_dynamic partitioning。在出现节流规模问题之前,这些操作就会遇到性能问题,这一点应被视为警告。

作业摘要位于 _SUCCESS 文件中

原始的hadoop提交者会在输出目录的根目录下创建一个零字节的_SUCCESS文件,除非该功能被禁用。

该提交者会写入一个JSON摘要,内容包括:* 提交者名称 * 诊断信息 * 部分生成文件的列表(用于测试;完整列表因可能过大而被排除) * IO统计信息

如果在运行查询后,这个_SUCCESS文件是零字节大小,则表示未使用新的提交器

如果不为空,则可以进行检查。

通过ManifestPrinter工具查看_SUCCESS文件

摘要文件为JSON格式,可在任何文本编辑器中查看。

如需更简洁的摘要(包括更直观的统计数据展示),请使用ManifestPrinter工具。

hadoop org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter <path>

这适用于保存在输出目录根部的文件,以及保存到报告目录的任何报告。

来自运行ITestAbfsTerasort MapReduce terasort的示例。

bin/mapred successfile abfs://testing@ukwest.dfs.core.windows.net/terasort/_SUCCESS

Manifest file: abfs://testing@ukwest.dfs.core.windows.net/terasort/_SUCCESS
succeeded: true
created: 2024-04-18T18:34:34.003+01:00[Europe/London]
committer: org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitter
hostname: pi5
jobId: job_1713461587013_0003
jobIdSource: JobID
Diagnostics
  mapreduce.manifest.committer.io.threads = 192
  principal = alice
  stage = committer_commit_job

Statistics:
counters=((commit_file_rename=1)
(committer_bytes_committed=21)
(committer_commit_job=1)
(committer_files_committed=1)
(committer_task_directory_depth=2)
(committer_task_file_count=2)
(committer_task_file_size=21)
(committer_task_manifest_file_size=37157)
(job_stage_cleanup=1)
(job_stage_create_target_dirs=1)
(job_stage_load_manifests=1)
(job_stage_optional_validate_output=1)
(job_stage_rename_files=1)
(job_stage_save_success_marker=1)
(job_stage_setup=1)
(op_create_directories=1)
(op_delete=3)
(op_delete_dir=1)
(op_get_file_status=9)
(op_get_file_status.failures=6)
(op_list_status=3)
(op_load_all_manifests=1)
(op_load_manifest=2)
(op_mkdirs=4)
(op_msync=1)
(op_rename=2)
(op_rename.failures=1)
(task_stage_commit=2)
(task_stage_save_task_manifest=1)
(task_stage_scan_directory=2)
(task_stage_setup=2));

gauges=();

minimums=((commit_file_rename.min=141)
(committer_commit_job.min=2306)
(committer_task_directory_count=0)
(committer_task_directory_depth=1)
(committer_task_file_count=0)
(committer_task_file_size=0)
(committer_task_manifest_file_size=18402)
(job_stage_cleanup.min=196)
(job_stage_create_target_dirs.min=2)
(job_stage_load_manifests.min=687)
(job_stage_optional_validate_output.min=66)
(job_stage_rename_files.min=161)
(job_stage_save_success_marker.min=653)
(job_stage_setup.min=571)
(op_create_directories.min=1)
(op_delete.min=57)
(op_delete_dir.min=129)
(op_get_file_status.failures.min=57)
(op_get_file_status.min=55)
(op_list_status.min=202)
(op_load_all_manifests.min=445)
(op_load_manifest.min=171)
(op_mkdirs.min=67)
(op_msync.min=0)
(op_rename.failures.min=266)
(op_rename.min=139)
(task_stage_commit.min=206)
(task_stage_save_task_manifest.min=651)
(task_stage_scan_directory.min=206)
(task_stage_setup.min=127));

maximums=((commit_file_rename.max=141)
(committer_commit_job.max=2306)
(committer_task_directory_count=0)
(committer_task_directory_depth=1)
(committer_task_file_count=1)
(committer_task_file_size=21)
(committer_task_manifest_file_size=18755)
(job_stage_cleanup.max=196)
(job_stage_create_target_dirs.max=2)
(job_stage_load_manifests.max=687)
(job_stage_optional_validate_output.max=66)
(job_stage_rename_files.max=161)
(job_stage_save_success_marker.max=653)
(job_stage_setup.max=571)
(op_create_directories.max=1)
(op_delete.max=113)
(op_delete_dir.max=129)
(op_get_file_status.failures.max=231)
(op_get_file_status.max=61)
(op_list_status.max=300)
(op_load_all_manifests.max=445)
(op_load_manifest.max=436)
(op_mkdirs.max=123)
(op_msync.max=0)
(op_rename.failures.max=266)
(op_rename.max=139)
(task_stage_commit.max=302)
(task_stage_save_task_manifest.max=651)
(task_stage_scan_directory.max=302)
(task_stage_setup.max=157));

means=((commit_file_rename.mean=(samples=1, sum=141, mean=141.0000))
(committer_commit_job.mean=(samples=1, sum=2306, mean=2306.0000))
(committer_task_directory_count=(samples=4, sum=0, mean=0.0000))
(committer_task_directory_depth=(samples=2, sum=2, mean=1.0000))
(committer_task_file_count=(samples=4, sum=2, mean=0.5000))
(committer_task_file_size=(samples=2, sum=21, mean=10.5000))
(committer_task_manifest_file_size=(samples=2, sum=37157, mean=18578.5000))
(job_stage_cleanup.mean=(samples=1, sum=196, mean=196.0000))
(job_stage_create_target_dirs.mean=(samples=1, sum=2, mean=2.0000))
(job_stage_load_manifests.mean=(samples=1, sum=687, mean=687.0000))
(job_stage_optional_validate_output.mean=(samples=1, sum=66, mean=66.0000))
(job_stage_rename_files.mean=(samples=1, sum=161, mean=161.0000))
(job_stage_save_success_marker.mean=(samples=1, sum=653, mean=653.0000))
(job_stage_setup.mean=(samples=1, sum=571, mean=571.0000))
(op_create_directories.mean=(samples=1, sum=1, mean=1.0000))
(op_delete.mean=(samples=3, sum=240, mean=80.0000))
(op_delete_dir.mean=(samples=1, sum=129, mean=129.0000))
(op_get_file_status.failures.mean=(samples=6, sum=614, mean=102.3333))
(op_get_file_status.mean=(samples=3, sum=175, mean=58.3333))
(op_list_status.mean=(samples=3, sum=671, mean=223.6667))
(op_load_all_manifests.mean=(samples=1, sum=445, mean=445.0000))
(op_load_manifest.mean=(samples=2, sum=607, mean=303.5000))
(op_mkdirs.mean=(samples=4, sum=361, mean=90.2500))
(op_msync.mean=(samples=1, sum=0, mean=0.0000))
(op_rename.failures.mean=(samples=1, sum=266, mean=266.0000))
(op_rename.mean=(samples=1, sum=139, mean=139.0000))
(task_stage_commit.mean=(samples=2, sum=508, mean=254.0000))
(task_stage_save_task_manifest.mean=(samples=1, sum=651, mean=651.0000))
(task_stage_scan_directory.mean=(samples=2, sum=508, mean=254.0000))
(task_stage_setup.mean=(samples=2, sum=284, mean=142.0000)));

收集作业摘要 mapreduce.manifest.committer.summary.report.directory

可以通过在选项mapreduce.manifest.committer.summary.report.directory中设置文件系统路径,将提交器配置为将_SUCCESS摘要文件保存到报告目录中,无论作业成功还是失败。

路径不必与工作目标位于同一存储/文件系统上。例如,可以使用本地文件系统。

XML

<property>
  <name>mapreduce.manifest.committer.summary.report.directory</name>
  <value>file:///tmp/reports</value>
</property>

spark-defaults.conf

spark.hadoop.mapreduce.manifest.committer.summary.report.directory file:///tmp/reports

这样可以收集作业的统计信息,无论其执行结果如何,无论是否启用了保存_SUCCESS标记的功能,也不会因查询链覆盖标记而引起问题。

mapred successfile 操作可用于打印这些报告。

清理

作业清理过程较为复杂,因为它旨在解决云存储中可能出现的若干问题。

  • 删除目录(GCS)时性能缓慢。
  • 删除非常深且宽的目录树时超时(Azure)。
  • 增强对清理问题升级为作业失败的一般性恢复能力。
选项 含义 默认值
mapreduce.fileoutputcommitter.cleanup.skipped Skip cleanup of _temporary directory false
mapreduce.fileoutputcommitter.cleanup-failures.ignored Ignore errors during cleanup false
mapreduce.manifest.committer.cleanup.parallel.delete Delete task attempt directories in parallel true
mapreduce.manifest.committer.cleanup.parallel.delete.base.first Attempt to delete the base directory before parallel task attempts false

算法如下:

if "mapreduce.fileoutputcommitter.cleanup.skipped":
  return
if "mapreduce.manifest.committer.cleanup.parallel.delete":
  if "mapreduce.manifest.committer.cleanup.parallel.delete.base.first" :
    if delete("_temporary"):
      return
  delete(list("$task-directories")) catch any exception
if not "mapreduce.fileoutputcommitter.cleanup.skipped":
  delete("_temporary"); catch any exception
if caught-exception and not "mapreduce.fileoutputcommitter.cleanup-failures.ignored":
  raise caught-exception

虽然有点复杂,但目标是执行快速/可扩展的删除操作,并在失败时抛出有意义的异常。

对于ABFS,将mapreduce.manifest.committer.cleanup.parallel.delete.base.first设置为true,通常可以减少网络IO并加快清理速度。

spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete.base.first true

对于GCS,将mapreduce.manifest.committer.cleanup.parallel.delete.base.first设置为false可能会加快清理速度。

如果在清理过程中出现错误,忽略失败将确保作业仍被视为成功。mapreduce.fileoutputcommitter.cleanup-failures.ignored = true

禁用清理功能可以避免清理带来的开销,但需要工作流或手动操作定期清理所有_temporary目录:mapreduce.fileoutputcommitter.cleanup.skipped = true

使用Azure ADLS Gen2存储

要切换到清单提交器(manifest committer),必须将针对abfs:// URL目标的提交器工厂切换为清单提交器工厂,这可以针对单个应用程序或整个集群进行配置。

<property>
  <name>mapreduce.outputcommitter.factory.scheme.abfs</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value>
</property>

这允许在提交器内部使用ADLS Gen2特有的性能和一致性逻辑。具体包括:* 可以在列表操作中收集Etag标头并在作业提交阶段使用 * IO重命名操作受到速率限制 * 当节流触发重命名失败时会尝试恢复。

警告 此提交器与旧版Azure存储服务(WASB或ADLS Gen 1)不兼容。

Azure优化选项的核心集变为

<property>
  <name>mapreduce.outputcommitter.factory.scheme.abfs</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory</value>
</property>

<property>
  <name>fs.azure.io.rate.limit</name>
  <value>1000</value>
</property>

<property>
  <name>mapreduce.manifest.committer.cleanup.parallel.delete.base.first</name>
  <value>true</value>
</property>

以及用于调试/性能分析的可选设置

<property>
  <name>mapreduce.manifest.committer.summary.report.directory</name>
  <value>Path within same store/separate store</value>
  <description>Optional: path to where job summaries are saved</description>
</property>

Spark的完整ABFS选项集

spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
spark.hadoop.fs.azure.io.rate.limit 1000
spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete.base.first true
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

spark.hadoop.mapreduce.manifest.committer.summary.report.directory  (optional: URI of a directory for job summaries)

ABFS 重命名速率限制 fs.azure.io.rate.limit

为避免触发存储限流和退避延迟,以及其他与限流相关的故障情况,作业提交期间的文件重命名操作通过“速率限制器”进行限流,该限制器控制单个ABFS FileSystem客户端实例每秒可执行的重命名操作数量。

选项 含义
fs.azure.io.rate.limit Rate limit in operations/second for IO operations.

将该选项设置为0以移除所有速率限制。

此参数的默认值设置为1000。

<property>
  <name>fs.azure.io.rate.limit</name>
  <value>1000</value>
  <description>maximum number of renames attempted per second</description>
</property>

此容量是在文件系统客户端级别设置的,因此不会在单个应用程序内的所有进程之间共享,更不用说共享同一存储帐户的其他应用程序了。

它将与由同一个Spark驱动程序提交的所有作业共享,因为这些作业确实共享该文件系统连接器。

如果实施了速率限制,统计量store_io_rate_limited将报告获取提交文件许可所需的时间。

如果发生了服务器端限流,可以通过以下迹象观察到:* 存储服务的日志及其限流状态码(通常是503或500)。* 作业统计项commit_file_rename_recovered。该统计项表明ADLS限流表现为重命名操作失败,而这些失败已在提交器中恢复。

如果出现这些问题——或其他同时运行的应用程序遇到限流/限流触发的问题,请考虑降低fs.azure.io.rate.limit的值,和/或向微软申请更高的IO容量。

重要提示 如果您从微软获得了额外容量,并希望利用它来加速作业提交,请增加fs.azure.io.rate.limit的值,可以在整个集群范围内调整,也可以专门针对您希望分配更高优先级的特定作业进行调整。

这仍是一项进行中的工作;未来可能会扩展以支持单个文件系统实例执行的所有IO操作。

使用Google云存储

清单提交器(manifest committer)通过谷歌的gcs-connector库与Google云存储兼容并经过测试,该库为gs模式提供了Hadoop文件系统客户端。

Google云存储具备确保提交协议安全运行所需的语义。

切换到该提交器的Spark设置如下

spark.hadoop.mapreduce.outputcommitter.factory.scheme.gs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete.base.first false
spark.hadoop.mapreduce.manifest.committer.summary.report.directory  (optional: URI of a directory for job summaries)

存储目录的删除操作是O(files),因此mapreduce.manifest.committer.cleanup.parallel.delete的值应保持默认的true,但需要将mapreduce.manifest.committer.cleanup.parallel.delete.base.first改为false

对于mapreduce,请在core-site.xmlmapred-site.xml中声明绑定

<property>
  <name>mapreduce.outputcommitter.factory.scheme.gcs</name>
  <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory</value>
</property>

使用HDFS

该提交者确实与HDFS协同工作,但其主要针对对象存储进行了优化,某些操作(尤其是列出和重命名)性能有所降低,且语义也简化到经典FileOutputCommitter无法依赖的程度(特别是GCS场景)。

要在HDFS上使用,请将ManifestCommitterFactory设置为hdfs:// URL的提交器工厂。

由于HDFS支持快速目录删除,因此在清理过程中无需并行删除任务尝试目录,所以将mapreduce.manifest.committer.cleanup.parallel.delete设置为false

最终的spark绑定变为

spark.hadoop.mapreduce.outputcommitter.factory.scheme.hdfs org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterFactory
spark.hadoop.mapreduce.manifest.committer.cleanup.parallel.delete false
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol

spark.hadoop.mapreduce.manifest.committer.summary.report.directory  (optional: URI of a directory for job summaries)

高级配置选项

有一些高级选项主要用于开发和测试,而非生产环境使用。

选项 含义 默认值
mapreduce.manifest.committer.manifest.save.attempts How many attempts should be made to commit a task manifest? 5
mapreduce.manifest.committer.store.operations.classname Classname for Manifest Store Operations ""
mapreduce.manifest.committer.validate.output Perform output validation? false
mapreduce.manifest.committer.writer.queue.capacity Queue capacity for writing intermediate file 32

mapreduce.manifest.committer.manifest.save.attempts

保存任务尝试清单的尝试次数,具体步骤如下:1. 将文件写入作业尝试目录中的临时文件。2. 删除任何现有的任务清单。3. 将临时文件重命名为最终文件名。

这可能因不可恢复的原因(权限问题、网络永久丢失、服务宕机等)而失败,也可能是暂时性问题,如果再次尝试写入数据可能不会重现。

尝试次数由mapreduce.manifest.committer.manifest.save.attempts设置;每次尝试后休眠时间会递增。

如果任务尝试无法提交工作且无法从网络问题中恢复,请考虑增加默认值。

验证输出 mapreduce.manifest.committer.validate.output

选项 mapreduce.manifest.committer.validate.output 会触发对每个重命名文件的检查,以验证其是否具有预期长度。

这会为每个文件增加一个HEAD请求的开销,因此仅建议用于测试。

未对实际内容进行验证。

控制存储集成 mapreduce.manifest.committer.store.operations.classname

清单提交器通过实现ManifestStoreOperations接口与文件系统交互。可以为特定存储功能提供自定义实现。针对ABFS有一个这样的实现;当使用abfs特定的提交器工厂时,这会自动设置。

可以显式设置。

<property>
  <name>mapreduce.manifest.committer.store.operations.classname</name>
  <value>org.apache.hadoop.fs.azurebfs.commit.AbfsManifestStoreOperations</value>
</property>

默认实现也可以进行配置。

<property>
  <name>mapreduce.manifest.committer.store.operations.classname</name>
  <value>org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperationsThroughFileSystem</value>
</property>

除非需要为其他存储编写新的实现(仅在存储为提交者提供额外集成支持时才需要),否则无需更改这些值。

mapreduce.manifest.committer.writer.queue.capacity

这是一个次级规模选项。它控制着用于存储待重命名文件列表的队列大小,这些文件列表来自从目标文件系统加载的清单、从工作线程池加载的清单,以及将每个清单条目保存到本地文件系统中间文件的单一线程。

一旦队列已满,所有清单加载线程将被阻塞。

<property>
  <name>mapreduce.manifest.committer.writer.queue.capacity</name>
  <value>32</value>
</property>

由于本地文件系统的写入速度通常远快于任何云存储,因此该队列大小不应成为清单加载性能的限制因素。

它可以帮助限制在作业提交期间加载清单时消耗的内存量。加载清单的最大数量为:

mapreduce.manifest.committer.writer.queue.capacity + mapreduce.manifest.committer.io.threads

支持对同一目录的并发作业

在同一目录树下运行多个作业可能是可行的。

要使此功能正常工作,必须满足以下条件:

  • 使用Spark时,必须设置唯一的作业ID。这意味着Spark发行版必须包含SPARK-33402SPARK-33230的补丁。
  • 必须通过将mapreduce.fileoutputcommitter.cleanup.skipped设置为true来禁用_temporary目录的清理。
  • 所有作业/任务必须创建具有唯一文件名的文件。
  • 所有作业必须创建具有相同目录分区结构的输出。
  • 作业/查询不得使用Spark动态分区"INSERT OVERWRITE TABLE"操作;否则可能导致数据丢失。此限制适用于所有提交器,而不仅仅是清单提交器。
  • 记得稍后删除 _temporary 目录!

未经测试