本文档描述了Manifest Committer的架构及其他实现/正确性方面的内容
协议及其正确性在Manifest Committer Protocol中有详细说明。
Manifest提交器是一种工作提交器,它能在ABFS上为"真实世界"查询提供性能,并在GCS上保证性能和正确性。
该提交器使用了为S3A提交器引入的扩展点。用户可以为abfs://
和gcs://
URL声明新的提交器工厂。它可以通过Hadoop MapReduce和Apache Spark使用。
术语 | 含义 |
---|---|
提交者 | 一个可由MR/Spark调用的类,用于执行任务和作业提交操作。 |
Spark Driver | 负责调度工作并编排提交操作的Spark进程。 |
Job | 在MapReduce中指整个应用程序。在Spark中,这表示工作链中的单个阶段 |
作业尝试 | 对作业的一次尝试。MR支持在部分作业失败时进行恢复的多次作业尝试。Spark则表示"从头开始重新启动" |
任务 | 作业的一个子部分,例如处理一个文件或文件的一部分 |
任务ID | 任务的ID,在该作业内唯一。通常从0开始,并用于文件名中(如part-0000、part-001等) |
任务尝试 (TA) | 执行任务的尝试。它可能会失败,在这种情况下MR/spark会调度另一个尝试。 |
任务尝试ID | 任务尝试的唯一标识符。由任务ID + 尝试计数器组成。 |
目标目录 | 工作的最终目的地。 |
作业尝试目录 | 作业尝试使用的临时目录。该目录始终位于目标目录之下,以确保其与HDFS处于相同的加密区域,以及其他文件系统中的存储卷等。 |
任务尝试目录 | (也称为"任务尝试工作目录")。每个任务尝试专用的目录,用于写入文件 |
任务提交 | 获取任务尝试的输出,并将其作为该"成功"任务的最终/独占结果。 |
作业提交 | 汇总所有已提交任务的输出并生成作业的最终结果。 |
提交器(committer)的作用是确保作业的完整输出最终到达目标位置,即使在任务失败的情况下也是如此。
对于Hive经典的层级目录结构表,作业提交要求将所有已提交任务的输出放置到目录树中的正确位置。
内置在hadoop-mapreduce-client-core
模块中的提交器是FileOutputCommitter
。
Manifest Committer是针对ABFS和GCS存储的高性能提交器,适用于通过多个任务在深层目录树中创建文件的作业。
它同样适用于hdfs://
和file://
URL,但该功能针对云存储中的列表操作、重命名性能及限流问题进行了优化。
它与S3 无法正确配合使用,因为它依赖原子性重命名且不覆盖的操作来提交清单文件。此外,它还会存在复制而非移动所有生成数据导致的性能问题。
虽然它可以与MapReduce一起工作,但不支持从之前失败的尝试中恢复的多任务重试处理。
设计了一个清单文件,其中包含(连同IO统计信息和其他一些内容)
任务尝试由以下方式提交:
不会进行重命名操作——文件将保留在其原始位置。
目录树遍历是单线程的,因此其复杂度为O(directories)
,其中每个目录列表会使用一个或多个分页LIST调用。
这很简单,对于大多数任务来说,扫描不在作业的关键路径上。
统计分析可能证明未来转向并行扫描是合理的。
作业提交包含以下内容:
_SUCCESS
文件(用于测试;使用写入和重命名实现原子保存)任务提交阶段支持针对多个任务和每个任务的多个文件进行并行处理,具体包括:
O(files)
复杂度,在使用OAuth认证的ABFS上也是如此。可选扫描所有祖先...如果发现任何文件,则删除。
getFileStatus()
。未找到:创建目录,添加条目及其所有父路径的条目;找到且为目录:添加条目及其所有父路径的条目;找到且为文件:先删除,然后按前述方式创建。高效处理目录的并发创建(或删除+创建)将是一个难点;为此投入了一些努力来构建需要创建的目录集合。
文件被并行重命名。
对目标路径进行预重命名检查(并删除现有内容)将是可选的。由于spark为每个文件生成新的UUID,这种情况不会发生,从而节省了HTTP请求。
可选扫描所有已提交的文件,并验证长度以及已知的etag。用于测试和诊断。
该解决方案对于GCS是必要的,并且在ABFS上应该也有益处,因为任务提交者会承担列表开销。
一个关键目标是保持清单提交器的独立性,既不触及现有的提交器代码,也不涉及hadoop代码库的其他部分。
它必须直接集成到MR和Spark中,除了已为S3A提交者实现的更改外,无需任何其他修改
PathOutputCommitterFactory
进行绑定。因此,这里有一些从其他地方复制粘贴的内容,例如 org.apache.hadoop.util.functional.TaskPool
是基于 S3ACommitter 的 org.apache.hadoop.fs.s3a.commit.Tasks
。
_SUCCESS
文件必须与S3A JSON文件兼容。这是为了确保任何现有的验证S3A提交器输出的测试套件都可以直接重定向到由清单提交器执行的作业,而无需任何修改。
何时?建议:持续发送心跳直到重命名最终完成。
我们希望停止整个作业提交。在处理每个任务提交线程遍历目录(或处理每个文件?)时,需要检查某个原子布尔值“中止作业”。列出或重命名失败需要升级为停止整个作业提交。这意味着在异步重命名操作或任务提交线程中引发的任何IOE必须:
commitJob()
调用结束时重新抛出如果作业提交阶段使用线程池执行每个任务的操作(例如加载文件),则同一线程池不得用于每个任务阶段内的并行操作。
由于每个JobStage
在任务或作业提交过程中都是按顺序执行的,因此在各个阶段共享同一个线程池是安全的。
在当前实现中,除了实际加载文件外,作业提交过程中没有并行的"按清单"操作。创建目录和重命名文件的操作实际上是在不进行单个清单并行处理的情况下执行的。
目录准备:合并所有清单中的目录列表,然后将(希望数量少得多的)唯一目录集排队等待创建。
重命名:遍历所有清单并将其重命名操作加入队列池以待执行。
线程池的生命周期受限于阶段配置,其范围将被限制在PathOutputCommitter
方法内部,用于设置、提交、中止和清理操作。
这避免了S3A提交器的线程池生命周期问题。
这是在terasorting过程中出现的一个故障,其中许多任务各自生成了大量文件;提交的完整文件列表(以及每个块的etag)在执行前被构建在内存中并进行验证。
清单提交器假设存储在内存中的数据量较少,因为不再需要为每个提交文件的每个块存储一个etag。
这一假设在某些作业中并不成立:MAPREDUCE-7435. ManifestCommitter OOM on azure job
这里的策略是读取所有清单文件,并将其条目以Hadoop Writable对象的形式流式传输到本地文件,因此比JSON具有更低的编组开销。
合并所有目录列表以创建并消除重复项。
该实现架构借鉴了S3A连接器的经验教训。
提交器会收集其对文件系统执行/调用的所有操作的持续时间统计信息。* 在任务提交期间收集的统计信息会被保存到清单中(不包括保存和重命名该文件的时间)* 当这些清单在作业提交期间被加载时,这些统计信息会被合并以形成整个作业的聚合统计信息。* 这些统计信息会被保存到_SUCCESS
文件中 * 如果设置了mapreduce.manifest.committer.summary.report.directory
,还会保存到该目录下的任何副本文件中。* 类org.apache.hadoop.mapreduce.lib.output.committer.manifest.files.ManifestPrinter
可以加载并打印这些统计信息。
查询中使用的文件系统及输入输出流的IO统计信息未被收集。
当通过PathOutputCommitter
API调用ManifestCommitter
时,以下属性会被添加到当前(线程)上下文中
键 | 值 |
---|---|
ji |
Job ID |
tai |
Task Attempt ID |
st |
Stage |
这些设置同样适用于作为阶段执行一部分的所有辅助工作线程。
任何支持审计的存储/文件系统都能够收集这些数据并包含在其日志中。
为了简化向后移植,所有审计集成都在单个类 org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.AuditingIntegration
中。