org.apache.hadoop.fs.FSDataInputStream
FSDataInputStream extends DataInputStream
FSDataInputStream
的核心行为由java.io.DataInputStream
定义,并通过扩展为系统添加了关键假设。
Seekable.seek()
来定位字节数组中的偏移量,后续读取操作将从该偏移位置开始。文件通过FileSystem.open(p)
打开,如果成功,将返回:
result = FSDataInputStream(0, FS.Files[p])
数据流可以建模为:
FSDIS = (pos, data[], isOpen)
带有访问函数:
pos(FSDIS) data(FSDIS) isOpen(FSDIS)
隐式不变量:数据流的大小等于通过FileSystem.getFileStatus(Path p)
返回的文件大小
forall p in dom(FS.Files[p]) : len(data(FSDIS)) == FS.getFileStatus(p).length
Closeable.close()
java.io.Closeable
的语义在JRE中的接口定义里进行了规定。
该操作必须是幂等的;以下序列不是错误:
FSDIS.close(); FSDIS.close();
实现应具备对故障的鲁棒性。如果内部流被关闭,应首先检查其是否为null
。
实现过程中不应在此操作期间抛出IOException
异常(或任何其他异常)。客户端应用程序通常会忽略这些异常,或者可能意外失败。
FSDIS' = ((undefined), (undefined), False)
Seekable.getPos()
返回当前位置。流关闭时的结果是未定义的。
isOpen(FSDIS)
result = pos(FSDIS)
InputStream.read()
返回当前位置的数据。
read()
方法的完成时间没有限制。isOpen(FSDIS)
if ( pos < len(data) ): FSDIS' = (pos + 1, data, True) result = data[pos] else result = -1
InputStream.read(buffer[], offset, length)
从偏移量offset
开始,读取length
字节的数据到目标缓冲区。数据源是流的当前位置,该位置隐式设置在pos
中。
isOpen(FSDIS) buffer != null else raise NullPointerException length >= 0 offset < len(buffer) length <= len(buffer) - offset pos >= 0 else raise EOFException, IOException
在前提条件不满足时可能引发的异常包括
InvalidArgumentException ArrayIndexOutOfBoundsException RuntimeException
并非所有文件系统都会检查isOpen
状态。
if length == 0 : result = 0 else if pos > len(data): result = -1 else let l = min(length, len(data)-length) : buffer' = buffer where forall i in [0..l-1]: buffer'[o+i] = data[pos+i] FSDIS' = (pos+l, data, true) result = l
java.io
API 规定,如果要读取的数据量(即 length
)为零,则调用必须阻塞,直到可用数据量大于零——也就是说,直到有数据可读。该调用不要求在缓冲区填满时返回,也不一定要阻塞到流中完全没有数据为止。
也就是说,l
并非简单地定义为min(length, len(data)-length)
,它严格来说是一个位于1..min(length, len(data)-length)
范围内的整数。虽然调用者可能期望尽可能多地填充缓冲区,但根据规范,实现始终可以返回较小的数值,甚至可能每次只返回1字节。
关键在于,除非目标缓冲区大小为0,否则调用必须阻塞直到至少返回一个字节。因此,对于长度大于零的任何数据源,重复调用此read()
操作最终将读取所有数据。
Seekable.seek(s)
并非所有子类都实现了Seek操作:
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
如果操作被支持,文件应该被打开:
isOpen(FSDIS)
某些文件系统不执行此检查,而是依赖read()
契约来拒绝在关闭的流上进行读取操作(例如RawLocalFileSystem
)。
seek(0)
必须始终成功,因为查找位置必须为正数且小于流的长度:
s > 0 and ((s==0) or ((s < len(data)))) else raise [EOFException, IOException]
某些文件系统在未满足此条件时不会抛出异常。相反,在任何read()
操作中,当读取时len(data(FSDIS)) < pos(FSDIS)
的情况下,它们会返回-1。
在搜索失败后,pos(FSDIS)
的值可能会发生变化。例如,搜索超过文件末尾可能会将读取位置移动到文件末尾,同时引发EOFException
异常。
FSDIS' = (s, data, True)
存在一个隐式不变性:定位到当前位置的操作是无操作的
seek(getPos())
实现可能会识别此操作并绕过所有其他前置条件检查,保持输入流不变。
最新的对象存储连接器都实现了某种形式的“延迟定位”:seek()
调用可能看似更新了流,且getPos()
的值会被更新,但实际上文件直到真正读取数据时才会被打开/重新打开。延迟定位的实现仍必须根据文件的已知长度验证新的定位位置。然而此时不需要刷新文件的状态(即文件是否存在,其当前长度是多少)。文件被删除或截断的事实可能直到read()
调用时才会显现。
Seekable.seekToNewSource(offset)
此操作指示源从当前源以外的不同源检索data[]
。仅当文件系统支持文件的多副本且偏移量offset
处存在超过1个数据副本时,此操作才相关。
并非所有子类都实现了此操作,有些子类可能会抛出异常或返回False
。
supported(FSDIS, Seekable.seekToNewSource) else raise [UnsupportedOperationException, IOException]
示例:CompressionInputStream
,HttpFSFileSystem
如果支持,文件必须处于打开状态:
isOpen(FSDIS)
大多数未实现此操作的子类直接失败。
if not supported(FSDIS, Seekable.seekToNewSource(s)): result = False
示例:RawLocalFileSystem
, HttpFSFileSystem
如果操作受支持且数据有新位置:
FSDIS' = (pos, data', true) result = True
新数据是原始数据(或其更新版本,如下文一致性部分所述),但包含offset
处数据的块来自不同的副本。
如果没有其他副本,FSDIS
不会被更新;响应会表明这一点:
result = False
在测试方法之外,该方法主要用于{{FSInputChecker}}类中,该类可以通过尝试从其他来源获取数据来应对读取时的校验错误。如果找到新的数据源,它会尝试重新读取并重新检查文件的该部分。
CanUnbuffer.unbuffer()
此操作指示源释放当前持有的所有系统资源,例如缓冲区、套接字、文件描述符等。任何后续的IO操作很可能需要重新获取这些资源。在需要保持流打开但短期内不需要从流中进行IO操作的情况下(例如文件句柄缓存),解除缓冲非常有用。
并非所有子类都实现了此操作。除了实现CanUnbuffer
外,子类还必须实现StreamCapabilities
接口,并且StreamCapabilities.hasCapability(UNBUFFER)
必须返回true。如果子类实现了CanUnbuffer
但没有通过StreamCapabilities
报告该功能,则调用unbuffer
不会执行任何操作。如果子类报告它确实实现了UNBUFFER
,但没有实现CanUnbuffer
接口,则会抛出UnsupportedOperationException
异常。
supported(FSDIS, StreamCapabilities.hasCapability && FSDIS.hasCapability(UNBUFFER) && CanUnbuffer.unbuffer)
此方法不是线程安全的。如果在read
操作进行时调用unbuffer
,结果将是未定义的。
unbuffer
可以在已关闭的文件上调用,这种情况下 unbuffer
不会执行任何操作。
大多数未实现此操作的子类只是不执行任何操作。
如果操作被支持,unbuffer
会释放与流相关的所有系统资源。这些资源的具体列表通常取决于实现方式,但通常可能包括缓冲区、套接字、文件描述符等。
PositionedReadable
PositionedReadable
操作提供"定位读取"("pread")功能。它们支持从数据流中的特定位置将数据读取到缓冲区。定位读取等同于先执行Seekable.seek
到指定偏移量,再执行InputStream.read(buffer[], offset, length)
,但只需单次方法调用而非先seek
再read
,且两个定位读取操作可以选择性地在同一个FSDataInputStream
流实例上并发运行。
该接口声明定位读取是线程安全的(部分实现并未遵循此保证)。
任何与流操作并发执行的位置读取操作(例如Seekable.seek
、Seekable.getPos()
和InputStream.read()
)必须在隔离环境中运行;彼此之间不得产生相互干扰。
并发的位置读取和流操作必须是可序列化的;一个操作可能会阻塞另一个操作,导致它们按顺序运行,但为了获得更好的吞吐量和"活跃性",它们应该并发运行。
给定两个并行位置读取操作,一个在pos1
位置读取len1
长度到缓冲区dest1
,另一个在pos2
位置读取len2
长度到缓冲区dest2
,并且给定一个并发流式读取操作在定位到pos3
后执行,即使这些读取操作在底层流上发生重叠,最终缓冲区也必须按以下方式填充:
// Positioned read #1 read(pos1, dest1, ... len1) -> dest1[0..len1 - 1] = [data(FS, path, pos1), data(FS, path, pos1 + 1) ... data(FS, path, pos1 + len1 - 1] // Positioned read #2 read(pos2, dest2, ... len2) -> dest2[0..len2 - 1] = [data(FS, path, pos2), data(FS, path, pos2 + 1) ... data(FS, path, pos2 + len2 - 1] // Stream read seek(pos3); read(dest3, ... len3) -> dest3[0..len3 - 1] = [data(FS, path, pos3), data(FS, path, pos3 + 1) ... data(FS, path, pos3 + len3 - 1]
请注意,实现不需要保证原子性;操作过程中的中间状态(即getPos()
值的改变)可能是可见的。
并非所有FSDataInputStream
实现都支持这些操作。那些未实现Seekable.seek()
的类也不会实现PositionedReadable
接口。
supported(FSDIS, Seekable.seek) else raise [UnsupportedOperationException, IOException]
这一点可能显而易见:如果流不支持Seekable
,客户端就无法定位到指定位置。这也是基类实现的一个副作用,该实现使用了Seekable.seek()
方法。
隐式不变量:对于所有PositionedReadable
操作,在操作结束时pos
的值保持不变
pos(FSDIS') == pos(FSDIS)
对于任何失败的操作,目标buffer
的内容是未定义的。实现可能会在报告失败之前覆盖部分或全部缓冲区。
int PositionedReadable.read(position, buffer, offset, length)
尽可能将数据读取到为其分配的缓冲区空间中。
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] length >= 0 offset >= 0
读取的数据量是长度或从指定位置可用的数据量中的较小值:
let available = min(length, len(data)-position) buffer'[offset..(offset+available-1)] = data[position..position+available -1] result = available
length==0
时,隐式表示不读取任何数据;具体实现可能会跳过该操作并省略所有IO。在这种情况下,可以省略对数据流是否到达文件末尾的检查。buffer
的最终状态将是不确定的。void PositionedReadable.readFully(position, buffer, offset, length)
Read exactly length
bytes of data into the buffer, failing if there is not enough data available.
position >= 0 else raise [EOFException, IOException, IllegalArgumentException, RuntimeException] length >= 0 offset >= 0 len(buffer) - offset >= length else raise [IndexOutOfBoundException, RuntimeException] (position + length) <= len(data) else raise [EOFException, IOException]
如果在读取操作期间发生IO异常,buffer
的最终状态将是不确定的。
如果输入流中没有足够的数据来满足请求,buffer
的最终状态将是不确定的。
从偏移量offset
开始的缓冲区被填充以position
位置起始的数据
buffer'[offset..(offset+length-1)] = data[position..(position + length -1)]
PositionedReadable.readFully(position, buffer)
这种语义完全等同于
readFully(position, buffer, 0, len(buffer))
也就是说,缓冲区完全填充了从位置position
开始的输入源内容。
void readVectored(List extends FileRange> ranges, IntFunction allocate)
异步读取一系列范围的完整数据。默认实现会遍历这些范围,尝试根据minSeekForVectorReads
和maxReadSizeForVectorReads
的值合并范围,然后同步读取每个合并后的范围,但目的是让子类可以实现更高效的实现。支持直接和堆字节缓冲区的读取。此外,建议客户端使用WeakReferencedElasticByteBufferPool
来分配缓冲区,这样即使直接缓冲区在没有被引用时也会被垃圾回收。
在readVectored()
之后,getPos()
返回的位置是未定义的。
如果在readVectored()
操作进行期间文件被更改,则输出结果是不确定的。某些范围可能包含旧数据,某些可能包含新数据,还有些可能同时包含新旧数据。
当readVectored()
操作正在进行时,常规读取API调用可能会阻塞;此时getPos(
)的返回值也是未定义的。在等待向量读取结果期间,应用程序不应发起此类请求。
注意:不要使用直接缓冲区从ChecksumFileSystem
读取数据,因为这可能导致内存碎片化,详见HADOOP-18296 ChecksumFileSystem向量化IO实现中的内存碎片问题
列表不能为空。
if ranges = null raise NullPointerException if allocate = null raise NullPointerException
对于按getOffset()
升序排列的范围列表range[0..n]
中的每个请求范围range[i]
,满足以下条件:
对于所有 i where i > 0
:
range[i].getOffset() > range[i-1].getOffset()
对于所有范围 0..i
的前提条件是:
ranges[i] != null else raise IllegalArgumentException ranges[i].getOffset() >= 0 else raise EOFException ranges[i].getLength() >= 0 else raise IllegalArgumentException if i > 0 and ranges[i].getOffset() < (ranges[i-1].getOffset() + ranges[i-1].getLength) : raise IllegalArgumentException
如果在验证阶段已知文件的长度:
if range[i].getOffset + range[i].getLength >= data.length() raise EOFException
对于范围列表 range[0..n]
中的每个请求范围 range[i]
ranges[i]'.getData() = CompletableFuture<buffer: ByteBuffer>
当 getData().get()
完成时:
let buffer = `getData().get() let len = ranges[i].getLength() let data = new byte[len] (buffer.position() - buffer.limit) = len buffer.get(data, 0, len) = readFully(ranges[i].getOffset(), data, 0, len)
也就是说:每次范围读取的结果都是对相同偏移量和长度的PositionedReadable.readFully()
(可能是异步)调用的结果
minSeekForVectorReads()
合理的最小寻道值。如果第一个范围的结束与下一个范围的开始之间的差值超过此值,则不会将这两个范围合并在一起。
maxReadSizeForVectorReads()
合并范围后单次可读取的最大字节数。如果要读取的合并数据超过此值,则不会合并两个范围。实际上,将此值设置为0将禁用范围合并。
readVectored()
时,如果另一个线程正试图通过read()
/readFully()
读取数据,所有操作都必须成功完成。read()
/readFully()
必须得到支持。这些调用返回数据的顺序是未定义的。当调用S3A连接器的synchronized readVectored()
方法时,它会关闭所有打开的流;随后会将读取策略从正常模式切换为随机模式,以便后续调用都针对有限范围。这是因为预期向量IO和大规模顺序读取不会混合使用,保持任何开放的HTTP连接都是浪费资源的。
实现可以短路读取任何range.getLength() = 0
的范围,并返回一个空缓冲区。
在这种情况下,可以省略其他验证检查。
无法保证此类优化一定会发生;因此调用方不应包含空范围。
FileSystem.open(p)
提供的数据流FSDIS
,无论是本地还是远程的所有读取器,都应在打开时获得对FS.Files[p]
数据的访问权限。在时间 t0
FSDIS0 = FS'read(p) = (0, data0[])
在时间 t1
FS' = FS' where FS'.Files[p] = data1
从时间 t >= t1
开始,FSDIS0
的值将变为未定义。
它可能保持不变
FSDIS0.data == data0 forall l in len(FSDIS0.data): FSDIS0.read() == data0[l]
它可能会获取新数据
FSDIS0.data == data1 forall l in len(FSDIS0.data): FSDIS0.read() == data1[l]
可能会出现不一致的情况,例如读取某个偏移量时可能返回来自任一数据集的数据
forall l in len(FSDIS0.data): (FSDIS0.read(l) == data0[l]) or (FSDIS0.read(l) == data1[l]))
也就是说,读取的每个值可能来自原始文件或更新后的文件。
对于同一偏移量的重复读取也可能不一致,即在时间 t2 > t1
时:
r2 = FSDIS0.read(l)
当时间 t3 > t2
时:
r3 = FSDIS0.read(l)
可能出现 r3 != r2
的情况。(也就是说,部分数据可能被缓存或复制,在后续读取时返回了文件内容的不同版本)。
类似地,如果路径p
处的数据被删除,这一变更在对FSDIS0
执行读取操作时可能可见也可能不可见。
readVectored()
API 已在 Hadoop 3.3.5 版本中发布,明确支持本地文件系统、原始本地文件系统和 S3A,其他场景下会回退处理。
重叠范围
最初仅在S3A连接器中强制执行"不允许范围重叠"的限制,否则会抛出UnsupportedOperationException
。将范围检查添加为所有实现的前提条件(原始本地文件系统除外)可确保各处行为一致。原始本地文件系统不需要此前提条件的原因是ChecksumFileSystem会根据校验块大小创建分块范围,然后调用原始本地文件系统的readVectored方法,在某些情况下可能导致范围重叠。详情请参阅HADOOP-19291
为了在旧版hadoop版本中可靠使用API:在调用readVectored()
之前,请先对范围列表进行排序并检查是否有重叠。
直接缓冲区读取
未包含HADOOP-19101补丁后备实现中的向量读取到堆外缓冲区功能损坏的版本,如果缓冲区分配器函数返回堆外"直接"缓冲区,使用默认的"后备"实现时可能会从错误的偏移量读取数据。
本地文件系统和S3A非预取流中的自定义实现是安全的。
任何实现该API支持的人员,除非确信他们仅在修复实现的版本上运行,否则在分配器为直接且输入流未通过显式的hasCapability()
探针明确声明支持时,不应使用该API:
Stream.hasCapability("in:readvectored")
鉴于HADOOP-18296问题涉及ChecksumFileSystem
和直接缓冲区,在所有版本中都存在,最好避免在生产环境中将此API与直接缓冲区一起使用。