接口 org.apache.hadoop.fs.MultipartUploader

MultipartUploader可以通过多部分上传方式将文件上传至Hadoop支持的文件系统。多部分上传的优势在于:文件可以从多个客户端或进程并行上传,且在调用complete函数之前,其他客户端将看不到上传结果。

当由对象存储实现时,上传的数据可能会产生存储费用,甚至在文件系统中可见之前。此API的用户必须谨慎行事,始终尽最大努力尝试完成或中止上传。abortUploadsUnderPath(path)操作在此情况下可以提供帮助。

不变性

一个有效的MultipartUploader的所有要求都被视为隐含条件和后置条件:

单个多部分上传的操作可能发生在多部分上传器的不同实例、不同进程和主机之间。因此,需要满足以下要求:

  1. 上传部分、完成上传或中止上传所需的所有状态必须包含在上传句柄中或可从上传句柄中检索。

  2. 该句柄必须是可序列化的;它必须能够被反序列化到运行完全相同版本Hadoop的不同进程中。

  3. 不同的主机/进程可以顺序或同时上传不同的部分。它们上传到文件系统的顺序不得限制数据在最终文件中的存储顺序。

  4. 上传可以在与上传部分不同的实例上完成。

  5. 上传的输出在最终目的地必须不可见,直到上传可能完成。

  6. 如果单个多部分上传实例顺序地向同一目标发起或完成多个文件上传,无论存储是否支持并发上传,这都不算错误。

并发性

多个进程可以同时上传多部分上传的各个部分。

如果对正在进行上传的目标路径调用startUpload(path),实现必须执行以下两种操作之一。

  • 将该调用作为重复项拒绝。
  • 允许两者继续执行,文件的最终输出将是两个上传中的一个

上传成功与否是未定义的。用户不能期望在不同文件系统、不同文件系统实例甚至不同请求之间有一致的行为。

如果在上传分块过程中完成或中止了多部分上传,那么尚未完成的上传分块(无论是完整还是部分)都不应包含在最终文件中。实现方案应在putPart()操作中抛出错误。

序列化兼容性

用户不得期望序列化的PathHandle版本在以下情况下兼容:* 不同的多部分上传实现之间 * 同一实现的不同版本之间。

也就是说:所有客户端必须使用完全相同版本的Hadoop。

模型

支持多部分上传的FileSystem/FileContext将现有模型(Directories, Files, Symlinks)扩展为(Directories, Files, Symlinks, Uploads),其中Uploads的类型为Map[UploadHandle -> Map[PartHandle -> UploadPart]

状态元组中的Uploads元素是所有活动上传的映射表。

Uploads: Map[UploadHandle -> Map[PartHandle -> UploadPart]`

UploadHandle是一个非空的字节列表。

UploadHandle: List[byte]
len(UploadHandle) > 0

客户端必须将此视为不透明数据。此功能设计的核心在于,该句柄可在不同客户端间保持有效:句柄可在主机hostA上序列化,在hostB上反序列化,并仍可用于扩展或完成上传。

UploadPart = (Path: path, parts: Map[PartHandle -> byte[]])

类似地,PartHandle类型也是一个非空的不透明字节列表,同样可以在主机之间进行编组传输。

PartHandle: List[byte]

隐含的意思是,FS.Uploads中的每个UploadHandle都是唯一的。同样地,[PartHandle -> UploadPart]映射中的每个PartHandle也必须是唯一的。

  1. 不要求上传的部件句柄(Part Handles)必须唯一。
  2. 不要求上传句柄随时间保持唯一。但是,如果分块句柄被快速回收,名义上幂等的操作abort(FS, uploadHandle)可能会意外取消使用相同上传句柄的后继操作。

异步API

所有操作都返回CompletableFuture<>类型,必须随后对其进行求值才能获取返回值。

  1. 操作的执行可能会阻塞调用线程。
  2. 如果不是,则应在单独的线程中执行,并且必须在未来评估返回之前完成。
  3. 部分/所有前置条件可能在初始调用时进行评估,
  4. 所有那些当时未被评估的内容,必须在未来的执行过程中进行评估。

这意味着,当实现与快速文件系统/存储交互时,包括文件存在性在内的所有前置条件可能会被提前评估;而与远程对象存储交互的实现,由于其探测速度较慢,可能会在异步阶段验证前置条件——尤其是那些需要与远程存储交互的条件。

Java的CompletableFutures与受检异常配合不佳。随着异步API的更多使用,Hadoop代码库仍在完善此处的异常处理细节。请注意:任何声明必须抛出IOException的前置条件失败,如果在未来执行时,该操作可能会被包装在某种形式的RuntimeException中;此规则同样适用于操作过程中抛出的其他IOException

close()

应用程序在使用上传器后必须调用close();这是为了释放其他对象、更新统计信息等。

状态变更操作

CompletableFuture startUpload(Path)

启动一个多部分上传,最终返回一个UploadHandle用于后续操作。

前提条件

if path == "/" : raise IOException

if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException

如果文件系统不支持并发上传到目标位置,则会添加以下前提条件:

if path in values(FS.Uploads) raise PathExistsException, IOException

后置条件

初始化操作完成后,文件系统状态会更新为一个新的活动上传,附带一个新的句柄,该句柄将返回给调用者。

handle' = UploadHandle where not handle' in keys(FS.Uploads)
FS' = FS where FS'.Uploads(handle') == {}
result = handle'

CompletableFuture putPart(UploadHandle uploadHandle, int partNumber, Path filePath, InputStream inputStream, long lengthInBytes)

上传特定多部分上传的一个部分,最终会返回一个不透明的部分句柄,代表指定上传的这一部分。

前提条件

uploadHandle in keys(FS.Uploads)
partNumber >= 1
lengthInBytes >= 0
len(inputStream) >= lengthInBytes

后置条件

data' = inputStream(0..lengthInBytes)
partHandle' = byte[] where not partHandle' in keys(FS.uploads(uploadHandle).parts)
FS' = FS where FS'.uploads(uploadHandle).parts(partHandle') == data'
result = partHandle'

数据存储在文件系统中,等待完成。在目标路径上它必须不可见。它可能在文件系统中某个临时路径可见;这是具体实现相关的,不能依赖于此。

CompletableFuture complete(UploadHandle uploadId, Path filePath, Map handles)

完成分段上传。

文件系统可能会强制规定每个部分的最小大小,最后上传的部分除外。

如果某部分超出此范围,必须抛出IOException异常。

前提条件

uploadHandle in keys(FS.Uploads) else raise FileNotFoundException
FS.Uploads(uploadHandle).path == path
if exists(FS, path) and not isFile(FS, path) raise PathIsDirectoryException, IOException
parts.size() > 0
forall k in keys(parts): k > 0
forall k in keys(parts):
  not exists(k2 in keys(parts)) where (parts[k] == parts[k2])

所有键值必须大于零,且不得对同一路径句柄存在任何重复引用。这些验证可以在操作过程中的任何阶段执行。一旦失败后,无法保证对该上传操作调用complete()方法(即使传入有效的路径映射)能够完成。调用方应在任何此类失败后调用abort()以确保资源清理。

如果针对此uploadHandle执行了putPart()操作,但其PathHandle句柄未包含在此请求中,则被省略的部分不应成为最终文件的一部分。

MultipartUploader 必须清理所有此类未完成的条目。

对于支持目录的备份存储(如本地文件系统、HDFS等),如果在完成操作时目标位置已存在目录,则必须抛出PathIsDirectoryException或其他IOException异常。

后置条件

UploadData' == ordered concatention of all data in the map of parts, ordered by key
exists(FS', path') and result = PathHandle(path')
FS' = FS where FS.Files(path) == UploadData' and not uploadHandle in keys(FS'.uploads)

PathHandle由完整操作返回,因此后续操作将能够识别在此期间数据未被更改。

上传文件中各部分的顺序遵循映射中的自然顺序:第1部分在第2部分之前,依此类推。

CompletableFuture abort(UploadHandle uploadId, Path filePath)

中止一个多部分上传。该句柄将失效且不可重复使用。

前提条件

uploadHandle in keys(FS.Uploads) else raise FileNotFoundException

后置条件

上传句柄已不再有效。

FS' = FS where not uploadHandle in keys(FS'.uploads)

后续使用相同句柄调用abort()将会失败,除非该句柄已被回收。

CompletableFuture abortUploadsUnderPath(Path path)

尽力清理路径下的所有上传内容。

返回一个解析为的future。

-1 if unsuppported
>= 0 if supported

由于这是尽力而为的操作,无法保证严格的后置条件。理想的后置条件是路径下的所有上传都被中止,且计数表示被中止的上传数量:

FS'.uploads forall upload in FS.uploads:
    not isDescendant(FS, path, upload.path)
return len(forall upload in FS.uploads:
               isDescendant(FS, path, upload.path))