Manifest Committer 架构

本文档描述了Manifest Committer的架构及其他实现/正确性方面的内容

协议及其正确性在Manifest Committer Protocol中有详细说明。

Manifest提交器是一种工作提交器,它能在ABFS上为"真实世界"查询提供性能,并在GCS上保证性能和正确性。

该提交器使用了为S3A提交器引入的扩展点。用户可以为abfs://gcs:// URL声明新的提交器工厂。它可以通过Hadoop MapReduce和Apache Spark使用。

背景

术语

术语 含义
提交者 一个可由MR/Spark调用的类,用于执行任务和作业提交操作。
Spark Driver 负责调度工作并编排提交操作的Spark进程。
Job 在MapReduce中指整个应用程序。在Spark中,这表示工作链中的单个阶段
作业尝试 对作业的一次尝试。MR支持在部分作业失败时进行恢复的多次作业尝试。Spark则表示"从头开始重新启动"
任务 作业的一个子部分,例如处理一个文件或文件的一部分
任务ID 任务的ID,在该作业内唯一。通常从0开始,并用于文件名中(如part-0000、part-001等)
任务尝试 (TA) 执行任务的尝试。它可能会失败,在这种情况下MR/spark会调度另一个尝试。
任务尝试ID 任务尝试的唯一标识符。由任务ID + 尝试计数器组成。
目标目录 工作的最终目的地。
作业尝试目录 作业尝试使用的临时目录。该目录始终位于目标目录之下,以确保其与HDFS处于相同的加密区域,以及其他文件系统中的存储卷等。
任务尝试目录 (也称为"任务尝试工作目录")。每个任务尝试专用的目录,用于写入文件
任务提交 获取任务尝试的输出,并将其作为该"成功"任务的最终/独占结果。
作业提交 汇总所有已提交任务的输出并生成作业的最终结果。

提交器(committer)的作用是确保作业的完整输出最终到达目标位置,即使在任务失败的情况下也是如此。

  • 完成: 输出包含所有成功任务的工作。
  • 独占性:失败任务的输出不会显示。
  • 并发性: 当多个任务并行提交时,输出结果与任务串行提交时相同。这不是作业提交的必要条件。
  • 可中止: 作业和任务可以在作业提交前中止,中止后它们的输出将不可见。
  • 正确性连续性: 一旦作业提交,任何失败、中止或未成功任务的输出在未来某个时刻都不得出现。

对于Hive经典的层级目录结构表,作业提交要求将所有已提交任务的输出放置到目录树中的正确位置。

内置在hadoop-mapreduce-client-core模块中的提交器是FileOutputCommitter

清单提交器:面向Azure和Google存储的Spark高性能提交器

Manifest Committer是针对ABFS和GCS存储的高性能提交器,适用于通过多个任务在深层目录树中创建文件的作业。

它同样适用于hdfs://file:// URL,但该功能针对云存储中的列表操作、重命名性能及限流问题进行了优化。

它与S3 无法正确配合使用,因为它依赖原子性重命名且不覆盖的操作来提交清单文件。此外,它还会存在复制而非移动所有生成数据导致的性能问题。

虽然它可以与MapReduce一起工作,但不支持从之前失败的尝试中恢复的多任务重试处理。

清单

设计了一个清单文件,其中包含(连同IO统计信息和其他一些内容)

  1. 如果目标目录不存在,则必须创建的目标目录列表。
  2. 需要重命名的文件列表,记录为(绝对源路径,绝对目标路径,文件大小)条目。

任务提交

任务尝试由以下方式提交:

  1. 递归列出任务尝试工作目录以进行构建
  2. 文件被重命名的一系列目录列表。
  3. 要重命名的文件列表:源文件、目标文件、大小以及可选的etag。
  4. 将该信息保存在作业尝试目录的清单文件中,文件名由任务ID派生。注意:为确保清单创建操作的原子性,将采用先写入临时文件再重命名为最终路径的方式。

不会进行重命名操作——文件将保留在其原始位置。

目录树遍历是单线程的,因此其复杂度为O(directories),其中每个目录列表会使用一个或多个分页LIST调用。

这很简单,对于大多数任务来说,扫描不在作业的关键路径上。

统计分析可能证明未来转向并行扫描是合理的。

任务提交

作业提交包含以下内容:

  1. 列出作业尝试目录中的所有清单文件。
  2. 加载每个清单文件,创建尚不存在的目录,然后重命名重命名列表中的每个文件。
  3. 保存一个与S3A提交器格式相同的JSON _SUCCESS文件(用于测试;使用写入和重命名实现原子保存)

任务提交阶段支持针对多个任务和每个任务的多个文件进行并行处理,具体包括:

  1. 清单任务在一个“清单处理器”线程池中被加载和处理。
  2. 目录创建和文件重命名操作分别在"executor"线程池中处理:由于它们使用最少的网络IO,因此可以并行执行多个重命名操作。
  3. 作业清理可以并行化删除任务尝试目录。这很重要,因为目录删除在Google云存储上是O(files)复杂度,在使用OAuth认证的ABFS上也是如此。

祖先目录准备

可选扫描所有祖先...如果发现任何文件,则删除。

父目录创建

  1. 探测共享目录映射以检查目录是否存在。如果找到:操作完成。
  2. 如果映射为空,则在路径上调用getFileStatus()。未找到:创建目录,添加条目及其所有父路径的条目;找到且为目录:添加条目及其所有父路径的条目;找到且为文件:先删除,然后按前述方式创建。

高效处理目录的并发创建(或删除+创建)将是一个难点;为此投入了一些努力来构建需要创建的目录集合。

文件重命名

文件被并行重命名。

对目标路径进行预重命名检查(并删除现有内容)将是可选的。由于spark为每个文件生成新的UUID,这种情况不会发生,从而节省了HTTP请求。

验证

可选扫描所有已提交的文件,并验证长度以及已知的etag。用于测试和诊断。

优势

  • 将源树列表操作推送到任务提交阶段,这一阶段通常不在执行的关键路径上
  • 提供对GCS的原子任务提交,因为不期望目录重命名是原子操作
  • 可以在清单中传递来自工作节点的IOStatistics。
  • 允许执行一些类似于S3A"分区暂存提交器"的重命名前操作。可以配置为删除计划创建目录中的所有现有条目——如果这些分区非空则操作会失败。详见Partitioned Staging Committer
  • 允许进行可选的预检验证检查(验证不同任务未创建重复文件)
  • 在开发/调试过程中,可以查看清单、确定输出大小等。

缺点

  • 需要一个全新的清单文件格式。
  • May 使得任务提交更加复杂。

该解决方案对于GCS是必要的,并且在ABFS上应该也有益处,因为任务提交者会承担列表开销。

实现细节

约束条件

一个关键目标是保持清单提交器的独立性,既不触及现有的提交器代码,也不涉及hadoop代码库的其他部分。

它必须直接集成到MR和Spark中,除了已为S3A提交者实现的更改外,无需任何其他修改

  • 自包含:不得要求更改hadoop-common等。
  • 隔离性:不得对现有提交者进行更改
  • 集成:必须通过PathOutputCommitterFactory进行绑定。

因此,这里有一些从其他地方复制粘贴的内容,例如 org.apache.hadoop.util.functional.TaskPool 是基于 S3ACommitter 的 org.apache.hadoop.fs.s3a.commit.Tasks

_SUCCESS文件必须与S3A JSON文件兼容。这是为了确保任何现有的验证S3A提交器输出的测试套件都可以直接重定向到由清单提交器执行的作业,而无需任何修改。

任务提交中的进度回调。

何时?建议:持续发送心跳直到重命名最终完成。

任务提交中的错误处理与中止。

我们希望停止整个作业提交。在处理每个任务提交线程遍历目录(或处理每个文件?)时,需要检查某个原子布尔值“中止作业”。列出或重命名失败需要升级为停止整个作业提交。这意味着在异步重命名操作或任务提交线程中引发的任何IOE必须:

  1. 被捕获
  2. 存储在共享字段/变量中
  3. 触发中止
  4. commitJob() 调用结束时重新抛出

避免死锁

如果作业提交阶段使用线程池执行每个任务的操作(例如加载文件),则同一线程池不得用于每个任务阶段内的并行操作。

由于每个JobStage在任务或作业提交过程中都是按顺序执行的,因此在各个阶段共享同一个线程池是安全的。

在当前实现中,除了实际加载文件外,作业提交过程中没有并行的"按清单"操作。创建目录和重命名文件的操作实际上是在不进行单个清单并行处理的情况下执行的。

目录准备:合并所有清单中的目录列表,然后将(希望数量少得多的)唯一目录集排队等待创建。

重命名:遍历所有清单并将其重命名操作加入队列池以待执行。

线程池生命周期

线程池的生命周期受限于阶段配置,其范围将被限制在PathOutputCommitter方法内部,用于设置、提交、中止和清理操作。

这避免了S3A提交器的线程池生命周期问题。

与S3A HADOOP-16570类似的扩展性问题。

这是在terasorting过程中出现的一个故障,其中许多任务各自生成了大量文件;提交的完整文件列表(以及每个块的etag)在执行前被构建在内存中并进行验证。

清单提交器假设存储在内存中的数据量较少,因为不再需要为每个提交文件的每个块存储一个etag。

这一假设在某些作业中并不成立:MAPREDUCE-7435. ManifestCommitter OOM on azure job

这里的策略是读取所有清单文件,并将其条目以Hadoop Writable对象的形式流式传输到本地文件,因此比JSON具有更低的编组开销。

目标目录中重复创建目录

合并所有目录列表以创建并消除重复项。

实现架构

该实现架构借鉴了S3A连接器的经验教训。

  • 将提交阶段从MR提交类中隔离出来,因为其生命周期较为复杂。
  • 相反,将其分解为一系列可独立测试的阶段,并通过串联这些阶段来提供最终协议。
  • 不要将MR数据类型(如taskID等)传递到各个阶段——应传递包含通用类型(如字符串等)的配置。
  • 同时传入一个用于存储操作的回调函数,以便于实现一个模拟存储。
  • 对于每个阶段:定义前置条件和后置条件、故障模式。单独进行测试。

统计

提交器会收集其对文件系统执行/调用的所有操作的持续时间统计信息。* 在任务提交期间收集的统计信息会被保存到清单中(不包括保存和重命名该文件的时间)* 当这些清单在作业提交期间被加载时,这些统计信息会被合并以形成整个作业的聚合统计信息。* 这些统计信息会被保存到_SUCCESS文件中 * 如果设置了mapreduce.manifest.committer.summary.report.directory,还会保存到该目录下的任何副本文件中。* 类org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter可以加载并打印这些统计信息。

查询中使用的文件系统及输入输出流的IO统计信息未被收集。

审计

当通过PathOutputCommitter API调用ManifestCommitter时,以下属性会被添加到当前(线程)上下文中

ji Job ID
tai Task Attempt ID
st Stage

这些设置同样适用于作为阶段执行一部分的所有辅助工作线程。

任何支持审计的存储/文件系统都能够收集这些数据并包含在其日志中。

为了简化向后移植,所有审计集成都在单个类 org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration 中。