本文档描述了Manifest Committer的提交协议
术语 | 含义 |
---|---|
提交者 | 一个可由MR Spark调用的类,用于执行任务和作业提交操作。 |
Spark Driver | 负责调度工作并编排提交操作的Spark进程。 |
作业:在MapReduce中 | 整个应用程序。在spark中,这是工作链中的单个阶段 |
作业尝试 | 对作业的一次尝试。MR支持在部分作业失败时进行恢复的多次作业尝试。Spark则表示"从头开始重新启动" |
任务 | 作业的一个子部分,例如处理一个文件或文件的一部分 |
任务ID | 任务的ID,在该作业内唯一。通常从0开始,并用于文件名中(如part-0000、part-001等) |
任务尝试 (TA) | 执行任务的尝试。它可能会失败,在这种情况下MR/spark会调度另一个尝试。 |
任务尝试ID | 任务尝试的唯一标识符。由任务ID + 尝试计数器组成。 |
目标目录 | 工作的最终目的地。 |
作业尝试目录 | 作业尝试使用的临时目录。该目录始终位于目标目录之下,以确保其与HDFS处于相同的加密区域,以及其他文件系统中的存储卷等。 |
任务尝试目录 | 作业尝试目录下的子目录,任务尝试在此创建自己的子目录进行工作 |
任务尝试工作目录 | 每个任务尝试专用的目录,用于写入文件 |
任务提交 | 获取任务尝试的输出,并将其作为该"成功"任务的最终/独占结果。 |
作业提交 | 汇总所有已提交任务的输出并生成作业的最终结果。 |
提交器(committer)的作用是确保作业的完整输出最终到达目标位置,即使在任务失败的情况下也是如此。
对于Hive经典的层级目录结构表,作业提交要求将所有已提交任务的输出放置到目录树中的正确位置。
内置在hadoop-mapreduce-client-core
模块中的提交器是FileOutputCommitter
。
它有两种算法,v1和v2。
v1算法能够应对各种任务失败的情况,但在提交最终聚合输出时速度较慢,因为它需要逐个将新创建的文件重命名到表中的正确位置。
v2算法不被认为是安全的,因为当单个任务提交时输出是可见的,而不是延迟到作业提交时才可见。多个任务尝试可能会将其数据放入输出目录树中,如果作业在提交之前失败/中止,这些输出将是可见的。
作业尝试目录位于$dest/__temporary/$jobAttemptId/
,包含进行中作业的所有输出。每个任务尝试都会被分配自己的任务尝试目录$dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId
任务的所有工作都写入任务尝试目录下。如果输出是一个深层树状结构且文件位于根目录,那么任务尝试目录最终将具有类似的结构,包含它生成的文件及其上层的目录。
任务尝试目录直接在作业尝试目录下重命名
rename( $dest/__temporary/$jobAttemptId/__temporary/$taskAttemptId $dest/__temporary/$jobAttemptId/$taskId)
对于每个已提交的任务,其下的所有文件都会被重命名到目标目录中,文件名相对于任务基目录的路径会被重新映射为目标目录的路径。
也就是说,$dest/__temporary/$jobAttemptId/$taskId
下的所有内容都会被转换为$dest
下的路径。
递归树遍历识别每个TA目录中需要重命名的路径。如果任务目录树包含一个在目标路径下不存在的子目录,则进行了一些优化:在这种情况下,整个目录可以直接重命名。如果目录已存在,则对该目录执行逐文件合并操作,子目录的处理方式再次取决于目标路径是否存在。
因此,如果每个任务的输出都存放在单独的最终目录中(例如最终分区对单个任务是唯一的),那么无论子文件数量如何,目录的重命名操作都是O(1)复杂度。如果输出要与其他任务共享同一目录(或更新现有目录),那么重命名性能将变为O(文件数)复杂度。
最后,当mapreduce.fileoutputcommitter.marksuccessfuljobs
为true时,会写入一个0字节的_SUCCESS
文件。
任务尝试目录下的文件会逐一重命名并移动到目标目录。这里没有尝试优化目录重命名操作,因为其他任务可能同时也在提交它们的工作。因此时间复杂度是O(files)
加上目录树遍历的开销。再次强调:采用递归树遍历方式实现,而非使用listFiles(path, recursive=true)
这种在HDFS上更快(虽然此处不相关但在S3上也适用)的深度遍历API。
当mapreduce.fileoutputcommitter.marksuccessfuljobs
为true时,会写入一个0字节的_SUCCESS
文件。
如果任务T1的第一次尝试T1A1在提交前失败,驱动程序将调度新的尝试"T1A2"并提交它。一切正常。
但是:如果T1A1获得了提交权限,但在提交过程中失败,其部分输出可能已经写入目标目录。
如果尝试T1A2随后被告知提交,那么当且仅当其输出具有完全相同的文件名集合时,任何已重命名的文件才会被覆盖。如果生成了不同的文件名,那么输出将包含T1A1和T1A2的文件。
如果在提交过程中T1A1发生分区,那么作业提交器会调度另一次尝试并提交其工作。然而,如果T1A1仍然与文件系统保持连接,它可能仍在重命名文件。即使使用相同的文件名,这两个任务的输出也可能相互混杂。
论文A Zero-Rename Committer(Loughran等人著)阐述了这些提交器的原理
它还描述了提交问题,定义了正确性,并介绍了v1和v2提交器的算法,以及S3A提交器、IBM Stocator提交器的算法,以及我们所了解的EMR Spark提交器的相关信息。
hadoop-aws
JAR包含一对提交器:"Staging"和"Magic"。这两者都是针对同一问题的实现:安全快速地将工作提交到S3对象存储。
提交者利用了S3提供原子化文件创建方式的特性:即PUT请求。
文件要么存在,要么不存在。文件可以直接上传到目标位置,只有当上传完成时文件才会显现——这会覆盖任何现有副本。
对于大文件,多部分上传允许将此上传操作拆分为一系列POST请求
1 initiate-upload (path -> upload ID)
1. upload part(path, upload ID, data[]) -> checksum.
这个过程可以并行化。单个对象最多可上传10,000个分块。除最后一个分块外,其他分块都必须大于等于5MB。1. complete-upload (path, upload ID, List
这一步将文件具体化,根据校验和列表定义的块顺序从各个分块构建文件。
S3A提交器的秘密在于,最终的POST请求可以延迟到作业提交阶段,尽管文件是在任务尝试执行/提交期间上传的。任务尝试需要确定每个文件的最终目标位置,将数据作为多部分操作的一部分上传,然后将完成上传所需的信息保存在一个文件中,该文件随后由作业提交器读取并在POST请求中使用。
Staging Committer 基于Netflix的Ryan Blue的贡献。它依赖HDFS作为一致性存储来传播.pendingset
文件。
每个任务尝试的工作目录位于本地文件系统中,称为“暂存目录”。完成上传所需的信息通过使用与集群HDFS文件系统配合的v1 FileOutputCommitter,从任务尝试传递给作业提交器。这确保了提交器具有与v1算法相同的正确性保证。
Magic Committer 是纯S3A的,它利用了作者可以在文件系统客户端内部进行修改这一优势。
定义了一些“魔法”路径,当这些路径被打开以进行写入时,会启动一个多方上传到最终目标目录的过程。当输出流被close()
关闭时,会在魔法路径中写入一个零字节的标记文件,并保存一个包含完成上传所需所有信息的JSON .pending文件。
任务提交:1. 列出每个任务尝试的magic目录下所有.pending
文件;1. 聚合为一个.pendingset
文件 1. 使用任务ID保存到作业尝试目录。
任务提交:
.pendingset
文件Magic提交器绝对需要一个一致的S3存储 - 最初是与S3Guard配合使用。现在S3已具备一致性,可以直接使用原生S3。它不需要HDFS或任何其他具有rename()
功能的文件系统。
S3A提交器被认为是正确的,因为
已修复的重大问题包括:
pendingset
已存在而失败spark.sql.sources.writeJobUUID
中获取唯一作业ID在那些影响正确性而非规模/性能/用户体验的问题中:HADOOP-17258涉及TA1任务提交完成后但未能成功上报的故障恢复。SPARK-33402、SPARK-33230和HADOOP-17318都与此相关:如果两个Spark作业/阶段在同一秒内启动,它们会拥有相同的作业ID。这导致暂存提交器使用的HDFS目录相互混杂。
值得注意的是:这些都是最小集成测试套件未能发现的问题。
好消息是:我们现在已经意识到这些问题,并且能够更好地避免再次出现类似情况。同时也清楚需要针对哪些内容编写测试。
V1提交器在ABFS上表现不佳的原因是:
V2提交器在作业提交时速度更快,因为它会在任务提交阶段执行列表和重命名操作。由于这一过程不具备原子性,因此被认为存在使用风险。V2任务提交算法所展示的是:通过专门使用逐文件重命名方式,可以实现并行提交不同任务的输出结果。
V1提交器在GCS上表现不佳,因为即使是任务提交操作——目录重命名——也是一个非原子性的O(files)
操作。这也意味着它是不安全的。
如果任务尝试已经分区且spark驱动程序调度/提交了另一个TA,那么任务目录可能包含来自第一次尝试的1+文件。
该提交器支持的存储/文件系统必须:
O(1)
文件重命名操作。该提交器支持的存储/文件系统应满足以下条件:
EtagSource
接口。该接口用于ABFS重命名恢复,以及最终输出的可选验证。该提交器支持的存储/文件系统可能具备以下功能:
该提交器支持的存储/文件系统可能不支持以下功能:
O(1)
级别的目录删除。CleanupJobStage
假设情况并非如此,因此会并行删除任务尝试目录。create(Path, overwrite=false)
。清单文件通过写入包含任务尝试ID的路径进行提交,然后重命名为最终路径。listFiles(path, recursive=true)
调用。此API调用未被使用。与FileOutputCommitter
相比,已移除的需求包括:
O(1)
目录删除。HDFS满足所有这些要求,因此从这个提交器中获益不大,尽管它仍然可以在那里工作。
S3存储不符合此提交器的重命名要求,即使现在它是一致的。在S3上使用此提交器并不安全。
每个作业都必须有一个唯一的ID。
该实现期望Spark运行时已应用相关补丁以确保这一点。
作业ID用于命名临时目录,而不是使用传统的递增自然编号方案_temporary/0/
。该方案源自MapReduce,其中尝试ID大于1的作业尝试会查找前驱任务提交的内容,并将其合并到自己的结果中。
该提交器针对Spark设计,不尝试进行恢复。通过在路径中使用作业ID,如果配置作业在清理/中止时不删除所有_temporary
目录,则多个作业可能会使用同一个表作为目标执行。
任务ID和任务尝试ID将像往常一样从作业ID派生。
预期写入的文件名应是唯一的。这在Spark处理ORC和Parquet文件时已实现,默认情况下允许省略对目标文件的检查。
给定目标目录 destDir: Path
一个ID为jobID: String
且尝试次数为jobAttemptNumber:int
的任务将使用以下目录:
$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/
为了其工作(注意:它实际上会使用%02d
格式化最终的子目录)。
这被称为作业尝试目录
在作业尝试目录下,会创建一个名为tasks
的子目录。这被称为任务尝试目录。每个任务尝试都会拥有自己的子目录,其工作内容将被保存到对应的子目录中。
在作业尝试目录下,会创建一个名为manifests
的子目录。这被称为y。
所有已提交任务的清单将保存到此目录,文件名为$taskId-manifest.json
完整路径
$destDir/_temporary/manifest_$jobID/$jobAttemptNumber/manifests/$taskId-manifest.json
这是由已提交任务创建的所有文件清单的最终存储位置。它被称为已提交任务的清单路径。
任务尝试将把它们的清单保存到这个目录中,使用临时文件名$taskAttemptId-manifest.json.tmp
。
这被称为任务尝试清单的临时路径。
对于作业和任务操作,定义了以下路径。
let jobDirectory = "$destDir/_temporary/manifest_$jobID/" let jobAttemptDirectory = jobDirectory + "$jobAttemptNumber/" let manifestDirectory = jobAttemptDirectory + "manifests/" let taskAttemptDirectory = jobAttemptDirectory + "tasks/"
对于每个任务尝试,还定义了以下路径
let taskAttemptWorkingDirectory = taskAttemptDirectory + "$taskAttemptId" let taskManifestPath = manifestDirectory + "$taskId-manifest.json" let taskAttemptTemporaryManifestPath = manifestDirectory + "$taskAttemptId-manifest.json"
该JSON文件设计用于包含(包括IOStatistics和一些诊断信息)
mkdir(jobAttemptDirectory) mkdir(manifestDirectory) mkdir(taskAttemptDirectory)
mkdir(taskAttemptWorkingDirectory)
任务尝试由以下方式提交:
此时不会进行重命名操作:文件将保留在其原始位置,直到在作业提交时进行重命名。
let (renames, directories) = scan(taskAttemptWorkingDirectory) let manifest = new Manifest(renames, directories) manifest.save(taskAttemptTemporaryManifestPath) rename(taskAttemptTemporaryManifestPath, taskManifestPath)
delete(taskAttemptWorkingDirectory)
作业提交包含以下内容:
_SUCCESS
文件,其格式与S3A提交器相同(用于测试;使用写入和重命名实现原子保存)任务提交阶段支持针对多个任务和每个任务的多个文件进行并行化处理,具体来说,有一个线程池用于并行存储IO操作
let manifestPaths = list("$manifestDirectory/*-manifest.json") let manifests = manifestPaths.map(p -> loadManifest(p)) let directoriesToCreate = merge(manifests.directories) let filesToRename = concat(manifests.files) directoriesToCreate.map(p -> mkdirs(p)) filesToRename.map((src, dest, etag) -> rename(src, dest, etag)) if mapreduce.fileoutputcommitter.marksuccessfuljobs then success.save("$destDir/_SUCCESS")
实现说明:
为了辅助调试和开发,摘要可以保存到同一或不同文件系统中的某个位置;中间清单可以重命名为目标文件系统中的某个位置。
if summary.report.directory != "" then success.save("${summary.report.directory}/$jobID.json") if diagnostics.manifest.directory != null then rename($manifestDirectory, "${diagnostics.manifest.directory}/$jobID")
即使作业提交因任何原因失败,摘要报告仍会被保存
作业清理主要是删除作业目录
delete(jobDirectory)
为了解决对象存储的扩展性问题,在此之前应(并行)删除所有任务尝试的工作目录
let taskAttemptWorkingDirectories = list("taskAttemptDirectory") taskAttemptWorkingDirectories.map(p -> delete(p))