org.apache.hadoop.fs.FSDataInputStream

FSDataInputStream extends DataInputStream

FSDataInputStream的核心行为由java.io.DataInputStream定义,并通过扩展为系统添加了关键假设。

  1. 源可以是本地或远程文件系统。
  2. 读取的流引用了一个有限的字节数组。
  3. 数据长度在读取过程中不会改变。
  4. 数据处理过程中数据内容保持不变。
  5. 读取过程中源文件仍然存在。
  6. 调用者可以使用Seekable.seek()来定位字节数组中的偏移量,后续读取操作将从该偏移位置开始。
  7. 向前和向后查找的成本很低。
  8. 对流的实现没有线程安全的要求。
  9. 但是,如果流实现了PositionedReadable接口,"定位读取"操作必须是线程安全的。

文件通过FileSystem.open(p)打开,如果成功,将返回:

result = FSDataInputStream(0, FS.Files[p])

数据流可以建模为:

FSDIS = (pos, data[], isOpen)

带有访问函数:

pos(FSDIS)
data(FSDIS)
isOpen(FSDIS)

隐式不变量:数据流的大小等于通过FileSystem.getFileStatus(Path p)返回的文件大小

forall p in dom(FS.Files[p]) :
    len(data(FSDIS)) == FS.getFileStatus(p).length

Closeable.close()

java.io.Closeable的语义在JRE中的接口定义里进行了规定。

该操作必须是幂等的;以下序列不是错误:

FSDIS.close();
FSDIS.close();

实现说明

  • 实现应具备对故障的鲁棒性。如果内部流被关闭,应首先检查其是否为null

  • 实现过程中不应在此操作期间抛出IOException异常(或任何其他异常)。客户端应用程序通常会忽略这些异常,或者可能意外失败。

后置条件

FSDIS' = ((undefined), (undefined), False)

Seekable.getPos()

返回当前位置。流关闭时的结果是未定义的。

前提条件

isOpen(FSDIS)

后置条件

result = pos(FSDIS)

InputStream.read()

返回当前位置的数据。

  1. 当流关闭时,实现应该失败。
  2. read() 方法的完成时间没有限制。

前提条件

isOpen(FSDIS)

后置条件

if ( pos < len(data) ):
   FSDIS' = (pos + 1, data, True)
   result = data[pos]
else
    result = -1

InputStream.read(buffer[], offset, length)

从偏移量offset开始,读取length字节的数据到目标缓冲区。数据源是流的当前位置,该位置隐式设置在pos中。

前提条件

isOpen(FSDIS)
buffer != null else raise NullPointerException
length >= 0
offset < len(buffer)
length <= len(buffer) - offset
pos >= 0 else raise EOFException, IOException

在前提条件不满足时可能引发的异常包括

InvalidArgumentException
ArrayIndexOutOfBoundsException
RuntimeException

并非所有文件系统都会检查isOpen状态。

后置条件

if length == 0 :
  result = 0

else if pos > len(data):
  result = -1

else
  let l = min(length, len(data)-length) :
    buffer' = buffer where forall i in [0..l-1]:
       buffer'[o+i] = data[pos+i]
    FSDIS' = (pos+l, data, true)
    result = l

java.io API 规定,如果要读取的数据量(即 length)为零,则调用必须阻塞,直到可用数据量大于零——也就是说,直到有数据可读。该调用不要求在缓冲区填满时返回,也不一定要阻塞到流中完全没有数据为止。

也就是说,l并非简单地定义为min(length, len(data)-length),它严格来说是一个位于1..min(length, len(data)-length)范围内的整数。虽然调用者可能期望尽可能多地填充缓冲区,但根据规范,实现始终可以返回较小的数值,甚至可能每次只返回1字节。

关键在于,除非目标缓冲区大小为0,否则调用必须阻塞直到至少返回一个字节。因此,对于长度大于零的任何数据源,重复调用此read()操作最终将读取所有数据。

Seekable.seek(s)

前提条件

并非所有子类都实现了Seek操作:

supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]

如果操作被支持,文件应该被打开:

isOpen(FSDIS)

某些文件系统不执行此检查,而是依赖read()契约来拒绝在关闭的流上进行读取操作(例如RawLocalFileSystem)。

seek(0) 必须始终成功,因为查找位置必须为正数且小于流的长度:

s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException]

某些文件系统在未满足此条件时不会抛出异常。相反,在任何read()操作中,当读取时len(data(FSDIS)) < pos(FSDIS)的情况下,它们会返回-1。

在搜索失败后,pos(FSDIS)的值可能会发生变化。例如,搜索超过文件末尾可能会将读取位置移动到文件末尾,同时引发EOFException异常。

后置条件

FSDIS' = (s, data, True)

存在一个隐式不变性:定位到当前位置的操作是无操作的

seek(getPos())

实现可能会识别此操作并绕过所有其他前置条件检查,保持输入流不变。

最新的对象存储连接器都实现了某种形式的“延迟定位”:seek()调用可能看似更新了流,且getPos()的值会被更新,但实际上文件直到真正读取数据时才会被打开/重新打开。延迟定位的实现仍必须根据文件的已知长度验证新的定位位置。然而此时不需要刷新文件的状态(即文件是否存在,其当前长度是多少)。文件被删除或截断的事实可能直到read()调用时才会显现。

Seekable.seekToNewSource(offset)

此操作指示源从当前源以外的不同源检索data[]。仅当文件系统支持文件的多副本且偏移量offset处存在超过1个数据副本时,此操作才相关。

前提条件

并非所有子类都实现了此操作,有些子类可能会抛出异常或返回False

supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException]

示例:CompressionInputStreamHttpFSFileSystem

如果支持,文件必须处于打开状态:

isOpen(FSDIS)

后置条件

大多数未实现此操作的子类直接失败。

if not supported(FSDIS, Seekable.seekToNewSource(s)):
    result = False

示例:RawLocalFileSystem , HttpFSFileSystem

如果操作受支持且数据有新位置:

FSDIS' = (pos, data', true)
result = True

新数据是原始数据(或其更新版本,如下文一致性部分所述),但包含offset处数据的块来自不同的副本。

如果没有其他副本,FSDIS 不会被更新;响应会表明这一点:

result = False

在测试方法之外,该方法主要用于{{FSInputChecker}}类中,该类可以通过尝试从其他来源获取数据来应对读取时的校验错误。如果找到新的数据源,它会尝试重新读取并重新检查文件的该部分。

CanUnbuffer.unbuffer()

此操作指示源释放当前持有的所有系统资源,例如缓冲区、套接字、文件描述符等。任何后续的IO操作很可能需要重新获取这些资源。在需要保持流打开但短期内不需要从流中进行IO操作的情况下(例如文件句柄缓存),解除缓冲非常有用。

前提条件

并非所有子类都实现了此操作。除了实现CanUnbuffer外,子类还必须实现StreamCapabilities接口,并且StreamCapabilities.hasCapability(UNBUFFER)必须返回true。如果子类实现了CanUnbuffer但没有通过StreamCapabilities报告该功能,则调用unbuffer不会执行任何操作。如果子类报告它确实实现了UNBUFFER,但没有实现CanUnbuffer接口,则会抛出UnsupportedOperationException异常。

supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)

此方法不是线程安全的。如果在read操作进行时调用unbuffer,结果将是未定义的。

unbuffer 可以在已关闭的文件上调用,这种情况下 unbuffer 不会执行任何操作。

后置条件

大多数未实现此操作的子类只是不执行任何操作。

如果操作被支持,unbuffer会释放与流相关的所有系统资源。这些资源的具体列表通常取决于实现方式,但通常可能包括缓冲区、套接字、文件描述符等。

接口 PositionedReadable

PositionedReadable 操作提供"定位读取"("pread")功能。它们支持从数据流中的特定位置将数据读取到缓冲区。定位读取等同于先执行Seekable.seek到指定偏移量,再执行InputStream.read(buffer[], offset, length),但只需单次方法调用而非先seekread,且两个定位读取操作可以选择性地在同一个FSDataInputStream流实例上并发运行。

该接口声明定位读取是线程安全的(部分实现并未遵循此保证)。

任何与流操作并发执行的位置读取操作(例如Seekable.seekSeekable.getPos()InputStream.read())必须在隔离环境中运行;彼此之间不得产生相互干扰。

并发的位置读取和流操作必须是可序列化的;一个操作可能会阻塞另一个操作,导致它们按顺序运行,但为了获得更好的吞吐量和"活跃性",它们应该并发运行。

给定两个并行位置读取操作,一个在pos1位置读取len1长度到缓冲区dest1,另一个在pos2位置读取len2长度到缓冲区dest2,并且给定一个并发流式读取操作在定位到pos3后执行,即使这些读取操作在底层流上发生重叠,最终缓冲区也必须按以下方式填充:

// Positioned read #1
read(pos1, dest1, ... len1) -> dest1[0..len1 - 1] =
  [data(FS, path, pos1), data(FS, path, pos1 + 1) ... data(FS, path, pos1 + len1 - 1]

// Positioned read #2
read(pos2, dest2, ... len2) -> dest2[0..len2 - 1] =
  [data(FS, path, pos2), data(FS, path, pos2 + 1) ... data(FS, path, pos2 + len2 - 1]

// Stream read
seek(pos3);
read(dest3, ... len3) -> dest3[0..len3 - 1] =
  [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]

请注意,实现不需要保证原子性;操作过程中的中间状态(即getPos()值的改变)可能是可见的。

实现前提条件

并非所有FSDataInputStream实现都支持这些操作。那些未实现Seekable.seek()的类也不会实现PositionedReadable接口。

supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]

这一点可能显而易见:如果流不支持Seekable,客户端就无法定位到指定位置。这也是基类实现的一个副作用,该实现使用了Seekable.seek()方法。

隐式不变量:对于所有PositionedReadable操作,在操作结束时pos的值保持不变

pos(FSDIS') == pos(FSDIS)

故障状态

对于任何失败的操作,目标buffer的内容是未定义的。实现可能会在报告失败之前覆盖部分或全部缓冲区。

int PositionedReadable.read(position, buffer, offset, length)

尽可能将数据读取到为其分配的缓冲区空间中。

前提条件

position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
length >= 0
offset >= 0

后置条件

读取的数据量是长度或从指定位置可用的数据量中的较小值:

let available = min(length, len(data)-position)
buffer'[offset..(offset+available-1)] = data[position..position+available -1]
result = available
  1. 返回值为-1表示该流没有更多可用数据。
  2. 当调用length==0时,隐式表示不读取任何数据;具体实现可能会跳过该操作并省略所有IO。在这种情况下,可以省略对数据流是否到达文件末尾的检查。
  3. 如果在读取操作期间发生IO异常,buffer的最终状态将是不确定的。

void PositionedReadable.readFully(position, buffer, offset, length)

Read exactly length bytes of data into the buffer, failing if there is not enough data available.

前提条件

position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException]
length >= 0
offset >= 0
len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException]
(position + length) <= len(data) else raise [EOFException, IOException]

如果在读取操作期间发生IO异常,buffer的最终状态将是不确定的。

如果输入流中没有足够的数据来满足请求,buffer的最终状态将是不确定的。

后置条件

从偏移量offset开始的缓冲区被填充以position位置起始的数据

buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]

PositionedReadable.readFully(position, buffer)

这种语义完全等同于

readFully(position, buffer, 0, len(buffer))

也就是说,缓冲区完全填充了从位置position开始的输入源内容。

void readVectored(List ranges, IntFunction allocate)

异步读取一系列范围的完整数据。默认实现会遍历这些范围,尝试根据minSeekForVectorReadsmaxReadSizeForVectorReads的值合并范围,然后同步读取每个合并后的范围,但目的是让子类可以实现更高效的实现。支持直接和堆字节缓冲区的读取。此外,建议客户端使用WeakReferencedElasticByteBufferPool来分配缓冲区,这样即使直接缓冲区在没有被引用时也会被垃圾回收。

readVectored()之后,getPos()返回的位置是未定义的。

如果在readVectored()操作进行期间文件被更改,则输出结果是不确定的。某些范围可能包含旧数据,某些可能包含新数据,还有些可能同时包含新旧数据。

readVectored()操作正在进行时,常规读取API调用可能会阻塞;此时getPos()的返回值也是未定义的。在等待向量读取结果期间,应用程序不应发起此类请求。

注意:不要使用直接缓冲区从ChecksumFileSystem读取数据,因为这可能导致内存碎片化,详见HADOOP-18296 ChecksumFileSystem向量化IO实现中的内存碎片问题

前提条件

列表不能为空。

if ranges = null raise NullPointerException
if allocate = null raise NullPointerException

对于按getOffset()升序排列的范围列表range[0..n]中的每个请求范围range[i],满足以下条件:

对于所有 i where i > 0:

range[i].getOffset() > range[i-1].getOffset()

对于所有范围 0..i 的前提条件是:

ranges[i] != null else raise IllegalArgumentException
ranges[i].getOffset() >= 0 else raise EOFException
ranges[i].getLength() >= 0 else raise IllegalArgumentException
if i > 0 and ranges[i].getOffset() < (ranges[i-1].getOffset() + ranges[i-1].getLength) :
   raise IllegalArgumentException

如果在验证阶段已知文件的长度:

if range[i].getOffset + range[i].getLength >= data.length() raise EOFException

后置条件

对于范围列表 range[0..n] 中的每个请求范围 range[i]

ranges[i]'.getData() = CompletableFuture<buffer: ByteBuffer>

getData().get() 完成时:

let buffer = `getData().get()
let len = ranges[i].getLength()
let data = new byte[len]
(buffer.position() - buffer.limit) = len
buffer.get(data, 0, len) = readFully(ranges[i].getOffset(), data, 0, len)

也就是说:每次范围读取的结果都是对相同偏移量和长度的PositionedReadable.readFully()(可能是异步)调用的结果

minSeekForVectorReads()

合理的最小寻道值。如果第一个范围的结束与下一个范围的开始之间的差值超过此值,则不会将这两个范围合并在一起。

maxReadSizeForVectorReads()

合并范围后单次可读取的最大字节数。如果要读取的合并数据超过此值,则不会合并两个范围。实际上,将此值设置为0将禁用范围合并。

并发性

  • 当调用readVectored()时,如果另一个线程正试图通过read()/readFully()读取数据,所有操作都必须成功完成。
  • 在已有待处理向量读取操作进行时,必须支持调用新的向量读取。多个请求中各范围完成的顺序是未定义的。
  • 在向量API调用进行期间调用read()/readFully()必须得到支持。这些调用返回数据的顺序是未定义的。

当调用S3A连接器的synchronized readVectored()方法时,它会关闭所有打开的流;随后会将读取策略从正常模式切换为随机模式,以便后续调用都针对有限范围。这是因为预期向量IO和大规模顺序读取不会混合使用,保持任何开放的HTTP连接都是浪费资源的。

处理零长度范围

实现可以短路读取任何range.getLength() = 0的范围,并返回一个空缓冲区。

在这种情况下,可以省略其他验证检查。

无法保证此类优化一定会发生;因此调用方不应包含空范围。

一致性

  • 对于从FileSystem.open(p)提供的数据流FSDIS,无论是本地还是远程的所有读取器,都应在打开时获得对FS.Files[p]数据的访问权限。
  • 如果在读取过程中底层数据发生变化,这些变化可能可见也可能不可见。
  • 这些可见的更改可能是部分可见的。

在时间 t0

FSDIS0 = FS'read(p) = (0, data0[])

在时间 t1

FS' = FS' where FS'.Files[p] = data1

从时间 t >= t1 开始,FSDIS0 的值将变为未定义。

它可能保持不变

FSDIS0.data == data0

forall l in len(FSDIS0.data):
  FSDIS0.read() == data0[l]

它可能会获取新数据

FSDIS0.data == data1

forall l in len(FSDIS0.data):
  FSDIS0.read() == data1[l]

可能会出现不一致的情况,例如读取某个偏移量时可能返回来自任一数据集的数据

forall l in len(FSDIS0.data):
  (FSDIS0.read(l) == data0[l]) or (FSDIS0.read(l) == data1[l]))

也就是说,读取的每个值可能来自原始文件或更新后的文件。

对于同一偏移量的重复读取也可能不一致,即在时间 t2 > t1 时:

r2 = FSDIS0.read(l)

当时间 t3 > t2 时:

r3 = FSDIS0.read(l)

可能出现 r3 != r2 的情况。(也就是说,部分数据可能被缓存或复制,在后续读取时返回了文件内容的不同版本)。

类似地,如果路径p处的数据被删除,这一变更在对FSDIS0执行读取操作时可能可见也可能不可见。

API 稳定性说明

readVectored() API 已在 Hadoop 3.3.5 版本中发布,明确支持本地文件系统、原始本地文件系统和 S3A,其他场景下会回退处理。

重叠范围

最初仅在S3A连接器中强制执行"不允许范围重叠"的限制,否则会抛出UnsupportedOperationException。将范围检查添加为所有实现的前提条件(原始本地文件系统除外)可确保各处行为一致。原始本地文件系统不需要此前提条件的原因是ChecksumFileSystem会根据校验块大小创建分块范围,然后调用原始本地文件系统的readVectored方法,在某些情况下可能导致范围重叠。详情请参阅HADOOP-19291

为了在旧版hadoop版本中可靠使用API:在调用readVectored()之前,请先对范围列表进行排序并检查是否有重叠。

直接缓冲区读取

未包含HADOOP-19101补丁后备实现中的向量读取到堆外缓冲区功能损坏的版本,如果缓冲区分配器函数返回堆外"直接"缓冲区,使用默认的"后备"实现时可能会从错误的偏移量读取数据。

本地文件系统和S3A非预取流中的自定义实现是安全的。

任何实现该API支持的人员,除非确信他们仅在修复实现的版本上运行,否则在分配器为直接且输入流未通过显式的hasCapability()探针明确声明支持时,不应使用该API:

Stream.hasCapability("in:readvectored")

鉴于HADOOP-18296问题涉及ChecksumFileSystem和直接缓冲区,在所有版本中都存在,最好避免在生产环境中将此API与直接缓冲区一起使用。