Manifest Committer 协议

本文档描述了Manifest Committer的提交协议

背景

术语

术语 含义
提交者 一个可由MR Spark调用的类,用于执行任务和作业提交操作。
Spark Driver 负责调度工作并编排提交操作的Spark进程。
作业:在MapReduce中 整个应用程序。在spark中,这是工作链中的单个阶段
作业尝试 对作业的一次尝试。MR支持在部分作业失败时进行恢复的多次作业尝试。Spark则表示"从头开始重新启动"
任务 作业的一个子部分,例如处理一个文件或文件的一部分
任务ID 任务的ID,在该作业内唯一。通常从0开始,并用于文件名中(如part-0000、part-001等)
任务尝试 (TA) 执行任务的尝试。它可能会失败,在这种情况下MR/spark会调度另一个尝试。
任务尝试ID 任务尝试的唯一标识符。由任务ID + 尝试计数器组成。
目标目录 工作的最终目的地。
作业尝试目录 作业尝试使用的临时目录。该目录始终位于目标目录之下,以确保其与HDFS处于相同的加密区域,以及其他文件系统中的存储卷等。
任务尝试目录 作业尝试目录下的子目录,任务尝试在此创建自己的子目录进行工作
任务尝试工作目录 每个任务尝试专用的目录,用于写入文件
任务提交 获取任务尝试的输出,并将其作为该"成功"任务的最终/独占结果。
作业提交 汇总所有已提交任务的输出并生成作业的最终结果。

提交器(committer)的作用是确保作业的完整输出最终到达目标位置,即使在任务失败的情况下也是如此。

  • 完成: 输出包含所有成功任务的工作。
  • 独占性:失败任务的输出不会显示。
  • 并发性: 当多个任务并行提交时,输出结果与任务串行提交时相同。这不是作业提交的必要条件。
  • 可中止: 作业和任务可以在作业提交前中止,中止后它们的输出将不可见。
  • 正确性连续性: 一旦作业提交,任何失败、中止或未成功任务的输出在未来某个时刻都不得出现。

对于Hive经典的层级目录结构表,作业提交要求将所有已提交任务的输出放置到目录树中的正确位置。

内置在hadoop-mapreduce-client-core模块中的提交器是FileOutputCommitter

它有两种算法,v1和v2。

v1算法能够应对各种任务失败的情况,但在提交最终聚合输出时速度较慢,因为它需要逐个将新创建的文件重命名到表中的正确位置。

v2算法不被认为是安全的,因为当单个任务提交时输出是可见的,而不是延迟到作业提交时才可见。多个任务尝试可能会将其数据放入输出目录树中,如果作业在提交之前失败/中止,这些输出将是可见的。

文件输出提交器 V1 和 V2

文件输出提交器 V1 和 V2 提交算法

任务尝试执行(V1和V2版本)

作业尝试目录位于$dest/__temporary/$jobAttemptId/,包含进行中作业的所有输出。每个任务尝试都会被分配自己的任务尝试目录$dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId

任务的所有工作都写入任务尝试目录下。如果输出是一个深层树状结构且文件位于根目录,那么任务尝试目录最终将具有类似的结构,包含它生成的文件及其上层的目录。

MapReduce V1 算法:

v1 任务提交

任务尝试目录直接在作业尝试目录下重命名

rename(
  $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId
  $dest/__temporary/$jobAttemptId/$taskId)

V1 作业提交

对于每个已提交的任务,其下的所有文件都会被重命名到目标目录中,文件名相对于任务基目录的路径会被重新映射为目标目录的路径。

也就是说,$dest/__temporary/$jobAttemptId/$taskId下的所有内容都会被转换为$dest下的路径。

递归树遍历识别每个TA目录中需要重命名的路径。如果任务目录树包含一个在目标路径下不存在的子目录,则进行了一些优化:在这种情况下,整个目录可以直接重命名。如果目录已存在,则对该目录执行逐文件合并操作,子目录的处理方式再次取决于目标路径是否存在。

因此,如果每个任务的输出都存放在单独的最终目录中(例如最终分区对单个任务是唯一的),那么无论子文件数量如何,目录的重命名操作都是O(1)复杂度。如果输出要与其他任务共享同一目录(或更新现有目录),那么重命名性能将变为O(文件数)复杂度。

最后,当mapreduce.fileoutputcommitter.marksuccessfuljobs为true时,会写入一个0字节的_SUCCESS文件。

MapReduce V2算法:

V2 任务提交

任务尝试目录下的文件会逐一重命名并移动到目标目录。这里没有尝试优化目录重命名操作,因为其他任务可能同时也在提交它们的工作。因此时间复杂度是O(files)加上目录树遍历的开销。再次强调:采用递归树遍历方式实现,而非使用listFiles(path, recursive=true)这种在HDFS上更快(虽然此处不相关但在S3上也适用)的深度遍历API。

V2 任务提交

mapreduce.fileoutputcommitter.marksuccessfuljobs为true时,会写入一个0字节的_SUCCESS文件。

为什么V2提交器不正确/不安全

如果任务T1的第一次尝试T1A1在提交前失败,驱动程序将调度新的尝试"T1A2"并提交它。一切正常。

但是:如果T1A1获得了提交权限,但在提交过程中失败,其部分输出可能已经写入目标目录。

如果尝试T1A2随后被告知提交,那么当且仅当其输出具有完全相同的文件名集合时,任何已重命名的文件才会被覆盖。如果生成了不同的文件名,那么输出将包含T1A1和T1A2的文件。

如果在提交过程中T1A1发生分区,那么作业提交器会调度另一次尝试并提交其工作。然而,如果T1A1仍然与文件系统保持连接,它可能仍在重命名文件。即使使用相同的文件名,这两个任务的输出也可能相互混杂。

背景:S3A提交器

论文A Zero-Rename Committer(Loughran等人著)阐述了这些提交器的原理

它还描述了提交问题,定义了正确性,并介绍了v1和v2提交器的算法,以及S3A提交器、IBM Stocator提交器的算法,以及我们所了解的EMR Spark提交器的相关信息。

hadoop-aws JAR包含一对提交器:"Staging"和"Magic"。这两者都是针对同一问题的实现:安全快速地将工作提交到S3对象存储。

提交者利用了S3提供原子化文件创建方式的特性:即PUT请求。

文件要么存在,要么不存在。文件可以直接上传到目标位置,只有当上传完成时文件才会显现——这会覆盖任何现有副本。

对于大文件,多部分上传允许将此上传操作拆分为一系列POST请求

1 initiate-upload (path -> upload ID) 1. upload part(path, upload ID, data[]) -> checksum. 这个过程可以并行化。单个对象最多可上传10,000个分块。除最后一个分块外,其他分块都必须大于等于5MB。1. complete-upload (path, upload ID, List) 这一步将文件具体化,根据校验和列表定义的块顺序从各个分块构建文件。

S3A提交器的秘密在于,最终的POST请求可以延迟到作业提交阶段,尽管文件是在任务尝试执行/提交期间上传的。任务尝试需要确定每个文件的最终目标位置,将数据作为多部分操作的一部分上传,然后将完成上传所需的信息保存在一个文件中,该文件随后由作业提交器读取并在POST请求中使用。

暂存提交器

Staging Committer 基于Netflix的Ryan Blue的贡献。它依赖HDFS作为一致性存储来传播.pendingset文件。

每个任务尝试的工作目录位于本地文件系统中,称为“暂存目录”。完成上传所需的信息通过使用与集群HDFS文件系统配合的v1 FileOutputCommitter,从任务尝试传递给作业提交器。这确保了提交器具有与v1算法相同的正确性保证。

  1. 任务提交包括将本地文件系统任务尝试工作目录下的所有文件上传到其最终目标路径,暂不执行最终的清单POST操作。
  2. 一个包含完成任务尝试中所有文件上传所需全部信息的JSON文件被写入与HDFS配合工作的包装提交器的Job Attempt目录中。
  3. 作业提交:加载HDFS作业尝试目录中的所有清单文件,然后发出POST请求以完成上传。这些操作是并行执行的。

魔法提交器

Magic Committer 是纯S3A的,它利用了作者可以在文件系统客户端内部进行修改这一优势。

定义了一些“魔法”路径,当这些路径被打开以进行写入时,会启动一个多方上传到最终目标目录的过程。当输出流被close()关闭时,会在魔法路径中写入一个零字节的标记文件,并保存一个包含完成上传所需所有信息的JSON .pending文件。

任务提交:1. 列出每个任务尝试的magic目录下所有.pending文件;1. 聚合为一个.pendingset文件 1. 使用任务ID保存到作业尝试目录。

任务提交:

  1. 列出作业尝试目录中的.pendingset文件
  2. 通过POST请求完成上传。

Magic提交器绝对需要一个一致的S3存储 - 最初是与S3Guard配合使用。现在S3已具备一致性,可以直接使用原生S3。它不需要HDFS或任何其他具有rename()功能的文件系统。

正确性

S3A提交器被认为是正确的,因为

  1. 在作业提交之前不会物化任何内容。
  2. 每个任务尝试的清单只能保存到作业尝试目录中。因此:相同任务ID的TA文件必须独占提交。
  3. 暂存提交器使用HDFS将清单从任务智能体传递到作业提交器,确保S3的最终一致性不会导致清单丢失。
  4. 在S3实现一致性之前,magic committer依赖S3Guard来提供任务提交和作业提交期间所需的列表一致性。
  5. 作者和更广泛的社区修复了在生产环境中暴露的所有与提交者相关的问题。

已修复的重大问题包括:

  • HADOOP-15961. S3A提交器:确保定期调用progress()方法。
  • HADOOP-16570. S3A提交器遇到规模扩展问题。
  • HADOOP-16798. S3A提交器线程池关闭问题。
  • HADOOP-17112. S3A提交器无法处理路径中的空格。
  • HADOOP-17318. 支持具有相同应用尝试ID的并发S3A提交作业。
  • HADOOP-17258. MagicS3GuardCommitter 因 pendingset 已存在而失败
  • HADOOP-17414. 魔法提交器文件未收集由Spark写入的字节数
  • SPARK-33230 Hadoop提交者可在spark.sql.sources.writeJobUUID中获取唯一作业ID
  • SPARK-33402 同一秒内启动的作业会出现重复的MapReduce作业ID
  • SPARK-33739. 通过S3A Magic提交器提交的作业未报告写入的字节数(依赖于HADOOP-17414)

在那些影响正确性而非规模/性能/用户体验的问题中:HADOOP-17258涉及TA1任务提交完成后但未能成功上报的故障恢复。SPARK-33402、SPARK-33230和HADOOP-17318都与此相关:如果两个Spark作业/阶段在同一秒内启动,它们会拥有相同的作业ID。这导致暂存提交器使用的HDFS目录相互混杂。

值得注意的是:这些都是最小集成测试套件未能发现的问题。

好消息是:我们现在已经意识到这些问题,并且能够更好地避免再次出现类似情况。同时也清楚需要针对哪些内容编写测试。

V1提交器:在Azure上速度较慢,在GCS上既慢又不安全。

V1提交器在ABFS上表现不佳的原因是:

  1. 与HDFS相比,使用ABFS进行目录列表和文件重命名的速度稍慢一些。
  2. v1提交器通过列出每个已提交任务的输出,依次提交每个任务的输出结果,当目标位置不存在目录时移动目录,并将文件合并到现有目录中。

V2提交器在作业提交时速度更快,因为它会在任务提交阶段执行列表和重命名操作。由于这一过程不具备原子性,因此被认为存在使用风险。V2任务提交算法所展示的是:通过专门使用逐文件重命名方式,可以实现并行提交不同任务的输出结果。

V1提交器在GCS上表现不佳,因为即使是任务提交操作——目录重命名——也是一个非原子性的O(files)操作。这也意味着它是不安全的。

如果任务尝试已经分区且spark驱动程序调度/提交了另一个TA,那么任务目录可能包含来自第一次尝试的1+文件。


清单提交器协议

存储需求

该提交器支持的存储/文件系统必须:

  • 保持一致的列表。
  • 拥有原子级的 O(1) 文件重命名操作。

该提交器支持的存储/文件系统应满足以下条件:

  • 即使在负载情况下也能成功重命名文件。ABFS不具备此功能,因此提供了专门的恢复机制。
  • 实现HADOOP-17979的EtagSource接口。该接口用于ABFS重命名恢复,以及最终输出的可选验证。

该提交器支持的存储/文件系统可能具备以下功能:

  • 具有高延迟的列表操作。
  • 在负载情况下通过节流响应拒绝调用,这必须在文件系统连接器中处理。

该提交器支持的存储/文件系统可能不支持以下功能:

  • 支持原子目录重命名。此功能仅在清理时可选使用,其他情况下从不使用。
  • 支持O(1)级别的目录删除。CleanupJobStage假设情况并非如此,因此会并行删除任务尝试目录。
  • 支持原子操作create(Path, overwrite=false)。清单文件通过写入包含任务尝试ID的路径进行提交,然后重命名为最终路径。
  • 支持快速的listFiles(path, recursive=true)调用。此API调用未被使用。

FileOutputCommitter相比,已移除的需求包括:

  • 原子性目录重命名。
  • O(1) 目录删除。
  • 快速目录列表。
  • 隐式缺乏节流行为。

HDFS满足所有这些要求,因此从这个提交器中获益不大,尽管它仍然可以在那里工作。

S3存储不符合此提交器的重命名要求,即使现在它是一致的。在S3上使用此提交器并不安全。

任务与作业ID

每个作业都必须有一个唯一的ID。

该实现期望Spark运行时已应用相关补丁以确保这一点。

作业ID用于命名临时目录,而不是使用传统的递增自然编号方案_temporary/0/。该方案源自MapReduce,其中尝试ID大于1的作业尝试会查找前驱任务提交的内容,并将其合并到自己的结果中。

该提交器针对Spark设计,不尝试进行恢复。通过在路径中使用作业ID,如果配置作业在清理/中止时删除所有_temporary目录,则多个作业可能会使用同一个表作为目标执行。

任务ID和任务尝试ID将像往常一样从作业ID派生。

预期写入的文件名应是唯一的。这在Spark处理ORC和Parquet文件时已实现,默认情况下允许省略对目标文件的检查。

目录结构

给定目标目录 destDir: Path

一个ID为jobID: String且尝试次数为jobAttemptNumber:int的任务将使用以下目录:

$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/

为了其工作(注意:它实际上会使用%02d格式化最终的子目录)。

这被称为作业尝试目录

在作业尝试目录下,会创建一个名为tasks的子目录。这被称为任务尝试目录。每个任务尝试都会拥有自己的子目录,其工作内容将被保存到对应的子目录中。

在作业尝试目录下,会创建一个名为manifests的子目录。这被称为y

所有已提交任务的清单将保存到此目录,文件名为$taskId-manifest.json

完整路径

$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/manifests/$taskId-manifest.json

这是由已提交任务创建的所有文件清单的最终存储位置。它被称为已提交任务的清单路径

任务尝试将把它们的清单保存到这个目录中,使用临时文件名$taskAttemptId-manifest.json.tmp

这被称为任务尝试清单的临时路径

对于作业和任务操作,定义了以下路径。

let jobDirectory = "$destDir/_temporary/manifest_$jobID/"
let jobAttemptDirectory = jobDirectory + "$jobAttemptNumber/"
let manifestDirectory = jobAttemptDirectory + "manifests/"
let taskAttemptDirectory = jobAttemptDirectory + "tasks/"

对于每个任务尝试,还定义了以下路径

let taskAttemptWorkingDirectory = taskAttemptDirectory + "$taskAttemptId"
let taskManifestPath = manifestDirectory + "$taskId-manifest.json"
let taskAttemptTemporaryManifestPath = manifestDirectory + "$taskAttemptId-manifest.json"

协议核心算法

  1. 每个任务尝试(Task attempt)都会将其所有文件写入任务尝试目录(Task Attempt Directory)下的唯一目录树中。
  2. 任务提交包括对该任务尝试目录的递归扫描,创建目录列表和文件列表。
  3. 这些列表被保存为一个JSON清单文件。
  4. 作业提交包括列出所有JSON清单文件、加载其内容、创建目标目录的聚合集,并将所有文件重命名至其最终目标位置。

中间清单

该JSON文件设计用于包含(包括IOStatistics和一些诊断信息)

  1. 如果目标目录不存在,则必须创建的目标目录列表。
  2. 要重命名的文件列表,格式为(绝对源路径,绝对目标路径,文件大小)条目。

作业设置

mkdir(jobAttemptDirectory)
mkdir(manifestDirectory)
mkdir(taskAttemptDirectory)

任务设置

mkdir(taskAttemptWorkingDirectory)

任务提交

任务尝试由以下方式提交:

  1. 递归列出任务尝试工作目录以进行构建
  2. 目标目录列表,文件将在这些目录下重命名,及其状态(存在、未找到、文件)
  3. 要重命名的文件列表:源文件、目标文件、大小以及可选的etag。
  4. 这些列表会填充一个JSON文件,即Intermediate Manifest
  5. 任务尝试将此文件保存到其任务尝试清单的临时路径
  6. 任务尝试随后删除已提交任务的清单路径,并将其自身的清单文件重命名为该路径。
  7. 如果重命名成功,则认为任务提交成功。

此时不会进行重命名操作:文件将保留在其原始位置,直到在作业提交时进行重命名。

let (renames, directories) = scan(taskAttemptWorkingDirectory)
let manifest = new Manifest(renames, directories)

manifest.save(taskAttemptTemporaryManifestPath)
rename(taskAttemptTemporaryManifestPath, taskManifestPath)

任务中止/清理

delete(taskAttemptWorkingDirectory)

任务提交

作业提交包含以下内容:

  1. 列出作业尝试目录中的所有清单文件。
  2. 加载每个清单文件,创建尚不存在的目录,然后重命名重命名列表中的每个文件。
  3. 可选地保存一个JSON格式的_SUCCESS文件,其格式与S3A提交器相同(用于测试;使用写入和重命名实现原子保存)

任务提交阶段支持针对多个任务和每个任务的多个文件进行并行化处理,具体来说,有一个线程池用于并行存储IO操作

  1. 清单任务被并行加载和处理。
  2. 在预期创建目录的位置删除了文件。
  3. 创建叶子目录。
  4. 文件重命名。
  5. 在清理和终止过程中:删除任务尝试目录
  6. 如果为测试/调试启用了输出验证:调用getFileStatus来比较文件长度,并在可能的情况下比较etags。
let manifestPaths = list("$manifestDirectory/*-manifest.json")
let manifests = manifestPaths.map(p -> loadManifest(p))
let directoriesToCreate = merge(manifests.directories)
let filesToRename = concat(manifests.files)

directoriesToCreate.map(p -> mkdirs(p))
filesToRename.map((src, dest, etag) -> rename(src, dest, etag))

if mapreduce.fileoutputcommitter.marksuccessfuljobs then
  success.save("$destDir/_SUCCESS")

实现说明:

为了辅助调试和开发,摘要可以保存到同一或不同文件系统中的某个位置;中间清单可以重命名为目标文件系统中的某个位置。

if summary.report.directory != "" then
  success.save("${summary.report.directory}/$jobID.json")
if diagnostics.manifest.directory != null then
  rename($manifestDirectory, "${diagnostics.manifest.directory}/$jobID")

即使作业提交因任何原因失败,摘要报告仍会被保存

任务中止/清理

作业清理主要是删除作业目录

delete(jobDirectory)

为了解决对象存储的扩展性问题,在此之前应(并行)删除所有任务尝试的工作目录

let taskAttemptWorkingDirectories = list("taskAttemptDirectory")
taskAttemptWorkingDirectories.map(p -> delete(p))

新协议的优势

  • 将源树列表操作推送到任务提交阶段,这一阶段通常不在执行的关键路径上。
  • 将需要探测/创建的目录数量减少到输出目录的聚合集合,并消除所有重复项。
  • 文件重命名操作可以并行化,其限制因素为配置的线程池大小和/或任何速率限制约束。
  • 提供对GCS的原子任务提交,因为不期望目录重命名是原子操作。
  • 允许通过清单将任务尝试的IOStatistics传递给作业提交器。
  • 允许在Job Committer中执行一些预重命名操作,类似于S3A的"分区暂存提交器"。可以配置为删除计划创建目录中的所有现有条目——如果这些分区非空则操作会失败。详见Partitioned Staging Committer
  • 允许进行可选的预检验证检查(验证不同任务未创建重复文件)。
  • 在开发/调试过程中,可以查看清单、确定输出大小等。

新协议相比v1算法的缺点

  • 需要一个全新的清单文件格式。
  • 如果任务创建了大量文件和/或子目录,或者收集了etag且这些标签长度较大,清单文件可能会变得很大。HTTP协议将每个etag限制为8 KiB,因此每个文件的成本可能达到8 KiB。
  • 使得任务提交比v1算法更加复杂。
  • 对于单个任务创建唯一输出目录的作业可能不是最优的,因为目录重命名永远不会用于提交目录。