org.apache.hadoop.fs.FutureDataInputStreamBuilder

一个提供Builder模式的接口,用于创建指向FSDataInputStream及其子类的JavaFuture引用。它用于启动(可能是异步的)操作来打开现有文件进行读取。

历史

Hadoop 3.3.0: API 引入

HADOOP-15229 添加基于FileSystem构建器的openFile() API以匹配createFile()功能

  • 没有可用的 opt(String key, long value) 方法。
  • withFileStatus(status) 调用需要一个非空参数。
  • 处理选项和文件状态的唯一文件系统是S3A;
  • 仅s3a特定的选项是S3 select和fs.s3a.experimental.input.fadvise
  • S3A文件系统在传入文件状态且文件状态的路径与openFile(path)调用的路径不匹配时,会抛出IllegalArgumentException异常。

这是基础实现。要编写保证能在此版本下编译的代码,请使用opt(String, String)must(String, String)方法,并显式将数字转换为字符串。

fs.open("s3a://bucket/file")
  .opt("fs.option.openfile.length", Long.toString(length))
  .build().get()

Hadoop 3.3.5:标准化与扩展

HADOOP-16202 增强openFile()功能以提升对象存储的读取性能

  • withFileStatus(null) 需要被接受(并忽略)
  • 任何提供的FileStatus路径中,只有文件名部分必须与openFile(path)中传入的文件名匹配。
  • 添加了一个opt(String key, long value)选项。*由于导致回归问题,该选项现已弃用
  • 已定义标准的 fs.option.openfile 选项。
  • S3A文件系统将使用openfile长度选项,seek起始/结束选项目前尚未使用。
  • Azure ABFS 连接器接收提供的 VersionedFileStatus 并省略对对象的任何 HEAD 探测。

Hadoop 3.3.6: 修复运算符重载问题的API变更

新增 optLong()optDouble()mustLong()mustDouble() 构建器方法。

  • 参见 HADOOP-18724 S3AFileSystem打开文件时因NumberFormatException失败,该问题由重载的opt(long)方法引发。
  • 规范已更新,声明无法解析的数字必须被视为“未设置”,并使用默认值代替。

不变性

FutureDataInputStreamBuilder 接口在调用 build() 方法之前和/或在异步打开操作期间,不需要参数或 FileSystem 的状态。

文件系统状态的某些方面,可以在初始的openFile()调用中进行检查,前提是这些方面已知是不变量,在openFile()build().get()序列之间不会发生变化。例如,路径验证。

`与实现无关的参数。

FutureDataInputStreamBuilder bufferSize(int bufSize)

设置要使用的缓冲区大小。

FutureDataInputStreamBuilder withFileStatus(FileStatus status)

一个FileStatus实例,指向正在被打开的文件。

实现可能会使用此方法来跳过对文件的检查,从而有可能节省远程调用,尤其是对对象存储的调用。

需求:

  • status != null
  • status.getPath().getName() == 正在打开的文件的名称。

如果存储在使用FileStatus打开文件时,必须进行路径验证,否则可以选择执行验证。验证应推迟到build()操作时进行。

此操作应被视为对文件系统的提示。

如果文件系统实现扩展了FileStatus,在其实现中返回的信息可以在打开文件时使用这些信息。

这与那些返回版本/ETag信息的存储相关——它们可以利用此机制确保打开的文件与列表中返回的文件完全一致。

提供的status中最后的status.getPath().getName()元素必须等于传递给openFile(path)调用的路径名称值。

文件系统不得验证路径的其余部分。这是为了支持viewfs和其他挂载点包装文件系统,其中模式和路径不同。这些系统通常会创建自己的FileStatus结果

前提条件

status == null or status.getPath().getName() == path.getName()

文件系统不得要求status类的类型必须等于其在文件状态/列表操作中返回的任何特定子类。这是为了支持包装文件系统以及状态序列化/反序列化。

设置可选或必选参数

FutureDataInputStreamBuilder opt(String key, String value)
FutureDataInputStreamBuilder opt(String key, int value)
FutureDataInputStreamBuilder opt(String key, boolean value)
FutureDataInputStreamBuilder optLong(String key, long value)
FutureDataInputStreamBuilder optDouble(String key, double value)
FutureDataInputStreamBuilder must(String key, String value)
FutureDataInputStreamBuilder must(String key, int value)
FutureDataInputStreamBuilder must(String key, boolean value)
FutureDataInputStreamBuilder mustLong(String key, long value)
FutureDataInputStreamBuilder mustDouble(String key, double value)

为构建器设置可选或必需参数。使用opt()must(),客户端可以指定文件系统特定参数,而无需检查FileSystem的具体类型。

示例:

out = fs.openFile(path)
    .must("fs.option.openfile.read.policy", "random")
    .optLong("fs.http.connection.timeout", 30_000L)
    .withFileStatus(statusFromListing)
    .build()
    .get();

这里指定了random的读取策略,要求文件系统实现必须理解该选项。同时提供了一个http专用选项,该选项可被任何存储系统解析;如果打开文件的文件系统无法识别该选项,则可以安全地忽略它。

何时使用 optmust

optmust 的区别在于,当文件系统打开文件时遇到无法识别的选项时,必须采取不同的处理方式。

def must(name, value):
  if not name in known_keys:
    raise IllegalArgumentException
  if not name in supported_keys:
    raise UnsupportedException


def opt(name, value):
  if not name in known_keys:
     # ignore option

对于任何已知的键,无论(key, value)对是如何声明的,对value参数的验证必须保持一致。

  1. 对于特定于文件系统的选项,如何验证条目由实现自行决定。
  2. 对于标准选项,value的有效性规范在文件系统规范中定义,并通过契约测试进行验证。

实现说明

检查支持的选项必须在build()操作中执行。

  1. 如果通过must(key, value)声明的必需参数未被识别,则必须抛出IllegalArgumentException

  2. 如果通过must(key, value)声明的必选参数依赖于某个特性,而该特性在特定的FileSystem/FileContext实例中被识别但不支持,则必须抛出UnsupportedException异常。

数值解析应修剪任何字符串,如果无法将值解析为数字,则降级为提供的任何默认值。这是为了解决HADOOP-18724 Open file fails with NumberFormatException for S3AFileSystem问题,该问题是由于传入长整型值时,重载的opt()构建器参数绑定到opt(String, double)而非opt(String, long)所导致的。

构建器方法(如bufferSize())与opt()/must()所设参数之间冲突的解决行为如下:

最后指定的选项将定义其值及其可选/必填状态。

如果在withFileStatus()中传入FileStatus选项,实现必须接受FileStatus的所有子类,包括LocatedFileStatus,而不仅仅是实现自定义的特定文件系统子类(例如S3AFileStatus)。它们可以简单地忽略那些非自定义子类。

这对确保功能的安全使用至关重要:目录列表/状态序列化/反序列化可能导致withFileStatus()参数不是由文件系统实例自身的getFileStatus()listFiles()listLocatedStatus()等调用返回的自定义子类。

在这种情况下,实现必须:

  1. 验证status.getPath().getName()是否与当前的path.getName()值匹配。路径的其余部分不得进行验证。
  2. 根据需要可使用任何状态字段 - 例如文件长度。

即使未使用状态值,参数的存在也可以解释为调用者声明他们相信文件存在且具有给定大小。

构建器接口

CompletableFuture build()

返回一个CompletableFuture,当成功完成时,将返回一个可以从文件系统读取数据的输入流。

build()操作可能会执行文件存在性及其类型的验证,从而拒绝尝试从目录或不存在的文件读取。或者*文件存在性/状态检查可能会在返回的CompletableFuture<>中异步执行。*文件存在性/状态检查可能会推迟到首次读取字节时,例如在read()PositionedRead等读取操作中。

也就是说,前提条件 exists(FS, path)isFile(FS, path) 只有在调用返回的future上的get()方法并尝试读取流之后,才能保证被满足。

因此,即使文件不存在,或者是一个目录而非文件,以下调用也必须成功,返回一个待评估的CompletableFuture

Path p = new Path("file://tmp/file-which-does-not-exist");

CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf)
      .openFile(p)
      .build();

无法访问/读取文件时,必须在future的get()调用中抛出IOException或其子类异常,或者对于延迟绑定操作,在调用读取数据操作时抛出异常。

因此,当在前一个示例返回的future上调用时,以下序列必定会失败。

  future.get().read();

访问权限检查具有相同的可见性要求:权限失败必须延迟到get()调用时,并且可以延迟到后续操作中。

注意:对输入流的某些操作,例如seek()可能完全不会尝试任何IO操作。这些操作在与不存在/不可读的文件交互时可能不会引发异常。

自hadoop branch-3.3版本起标准的openFile()选项

这些是FileSystemFileContext实现必须识别并可能通过适当改变其输入流行为来支持的选项。

Hadoop 3.3.0 新增了 openFile() API;这些标准选项是在后续版本中定义的。因此,尽管它们是"众所周知的",除非确信应用程序仅会在支持这些选项的Hadoop版本上运行,否则应用程序应该通过 opt() 调用而非 must() 来设置这些选项。

通过openFile()构建器API打开文件时,调用者可以同时使用.opt(key, value).must(key, value)调用来设置标准和文件系统特定的选项。

如果设置为opt()参数,则必须忽略不受支持的"标准"选项,同样也必须忽略无法识别的标准选项。

如果设置为must()参数,则必须忽略不受支持的"标准"选项。无法识别的标准选项必须被拒绝。

标准的openFile()选项定义在org.apache.hadoop.fs.OpenFileOptions中;所有选项都必须以fs.option.openfile.开头。

请注意,虽然所有FileSystem/FileContext实例都应支持这些选项(即must()声明不应失败),但具体实现可以选择性地解释这些值。这意味着存储系统并不强制要求在实际打开文件时必须读取或使用读取策略和文件长度值。

除非另有说明,否则它们应被视为提示。

注意:如果添加了一个标准选项,当该选项被设置但不被支持时会导致错误,那么实现必须拒绝它。例如,S3A文件系统客户端支持推送SQL命令的能力。如果类似功能被标准化,那么对于不支持该特性的文件系统,无论是在opt()还是must()参数中使用该选项都必须被拒绝。

选项: fs.option.openfile.buffer.size

读取缓冲区大小(以字节为单位)。

这会覆盖配置中使用选项io.file.buffer.size设置的默认值。

所有支持通过FileSystem.open(path, buffersize)设置流特定缓冲区大小的文件系统客户端都支持此功能。

选项: fs.option.openfile.read.policy

声明输入流的读取策略。这是对输入流预期读取模式的提示,可能会影响预读、缓冲和其他优化行为。

顺序读取可以通过预取数据和/或使用更大的数据块进行读取来优化。某些应用程序(例如distCp)即使在列式数据上也会执行顺序IO。

相比之下,随机IO通过一系列seek()/read()操作或通过PositionedReadableByteBufferPositionedReadable API从文件的不同部分读取数据。

如果几乎没有或没有预取发生,同时结合其他可能的优化措施,随机IO性能可能会达到最佳

对诸如Apache ORC和Apache Parquet等列式格式的查询会执行此类随机IO操作;其他数据格式可能更适合采用顺序或整文件读取策略。

关键在于,针对顺序读取优化可能会损害随机读取性能,反之亦然。

  1. seek策略仅作为提示;即使声明为must()选项,文件系统仍可能忽略它。
  2. 策略的解释/实现是文件系统特定的行为 - 它可能随着Hadoop版本和/或特定存储子系统的变化而改变。
  3. 如果策略未被识别,文件系统客户端必须忽略它。
策略 含义
adaptive Any adaptive policy implemented by the store.
avro This is an avro format which will be read sequentially
csv This is CSV data which will be read sequentially
default The default policy for this store. Generally “adaptive”.
columnar This is any columnar format other than ORC/parquet.
hbase This is an HBase Table
json This is a UTF-8 JSON/JSON lines format which will be read sequentially
orc This is an ORC file. Optimize for it.
parquet This is a Parquet file. Optimize for it.
random Optimize for random access.
sequential Optimize for sequential access.
vector The Vectored IO API is intended to be used.
whole-file The whole file will be read.

为输入源选择错误的读取策略可能效率低下,但绝不会致命。

可以提供一个读取策略列表;文件系统识别/支持的第一个策略将被采用。这允许创建跨版本兼容的配置。策略parquet, columnar, vector, random, adaptive会优先使用支持parquet策略的文件系统,依次回退到columnarvectorrandom,最后是adaptive。S3A连接器从Hadoop 3.3.5版本开始支持random策略(即自openFile() API添加后),并从Hadoop 3.4.0开始支持vector策略。

S3A和ABFS输入流均实现了IOStatisticsSource API,可以查询其IO性能。

提示:DEBUG级别记录输入流的toString()值。S3A和ABFS输入流会记录读取统计信息,这可以帮助了解读取操作是否高效执行。

延伸阅读

读取策略 adaptive

尝试根据应用程序的读取模式调整寻道策略。

S3A客户端的normal策略以及wasb:客户机唯一支持的策略都是自适应的——它们假设是顺序IO,但一旦进行向后查找/定位读取调用,流就会切换到随机IO。

其他文件系统实现可能希望采用类似的策略,和/或扩展算法以检测向前查找,和/或在认为更高效的情况下从随机IO切换到顺序IO。

自适应读取策略是指在open() API中无法声明寻道策略,因此如果可配置的话,需要在集群/应用程序配置中进行声明。然而,从顺序寻道策略切换到随机寻道策略可能会带来较大开销。

当应用程序明确设置fs.option.openfile.read.policy选项时,如果它们知道自己的读取计划,则应该声明哪种策略最合适。

读取策略 default

文件系统实例的默认策略。具体实现/安装相关。

读取策略 sequential

预期从读取的第一个字节到文件末尾/直到流关闭的顺序读取。

读取策略 random

预期会出现 seek()/read() 序列操作,或使用 PositionedReadableByteBufferPositionedReadable API接口。

读取策略 vector

这声明调用者打算使用HADOOP-11867中提出的向量化读取API 添加高性能向量化读取接口

这是一个提示:使用API时并非强制要求。但它会告知实现方,如果已实现此类功能,应将流配置为最佳向量化IO性能。

并非互斥的:同一数据流仍可用于传统的InputStreamPositionedRead API调用。在这些操作中,实现方案应采用random读取策略。

读取策略 whole-file

这表明整个文件需要从头到尾完整读取;文件系统客户端可以自由启用任何能最大化性能的策略。特别是,更大范围的读取/GET操作可以通过减少套接字/TLS建立开销,并提供足够长久的连接让TCP流量控制机制确定最佳下载速率,从而实现更高的带宽。

策略可以包括:

  • openFile()操作中发起对整个文件的HTTP GET请求。
  • 以大块预取数据,可能通过并行读取操作实现。

知道整个文件将从打开的流中读取的应用程序应声明此读取策略。

读取策略 columnar

声明数据为某种(未指定的)列式格式,并且读取序列预期为对整个列条带/行组的随机IO操作,可能会首先获取相关的列统计信息,以确定是否可以完全跳过对某个条带/行组的扫描。

文件格式读取策略 parquetorc

这些是读取策略,声明文件采用特定的列式格式,并且输入流可以针对这些格式进行读取优化。

特别是 * 文件页脚可能会被获取并缓存。 * 应该预期会有向量IO和随机IO。

这些读取策略是Hadoop 3.4.x新增的功能,因此针对多版本的应用和库,如果无法识别这些策略,应该列出它们的回退策略,例如请求类似parquet, vector, random这样的策略。

文件格式读取策略 avro, jsoncsv

这些是读取策略,声明文件具有特定的顺序格式,并且输入流可以针对这些格式进行读取优化。

这些读取策略是Hadoop 3.4.x新增的功能,因此针对多版本的应用和库,如果无法识别这些策略,应该列出它们的回退策略,例如请求像avro, sequential这样的策略。

文件格式读取策略 hbase

该文件是一个HBase表。对这些文件使用任何合适的策略,其中random是默认应使用的策略,除非存在与HBase相关的特定优化。

选项: fs.option.openfile.length: Long

声明文件的长度。

客户端可以使用此功能在打开文件时跳过查询远程存储以获取文件大小或存在状态,类似于通过withFileStatus()选项声明文件状态。

如果文件系统连接器支持,此选项必须被解释为声明文件的最小长度:

  1. 如果值为负数,则该选项应视为未设置。
  2. 如果文件的实际长度大于此值,不应视为错误。
  3. read(), seek() 和定位读取调用可以使用超过当前长度但小于文件实际长度的位置。在这种情况下,实现可以选择抛出EOFExceptions异常,或者返回数据。

如果此选项被FileSystem实现使用

实现者说明

  • fs.option.openfile.length 的值若小于 0 则必须被忽略。
  • 如果提供了文件状态以及fs.opt.openfile.length中的值,则优先采用文件状态值。

选项: fs.option.openfile.split.startfs.option.openfile.split.end: Long

当文件被分割成多个部分进行处理时,声明分割的起始和结束位置。

  1. 如果值为负数,则该选项应视为未设置。
  2. 文件系统可以假设文件长度大于或等于fs.option.openfile.split.end的值。
  3. 并且如果客户端应用程序读取超过fs.option.openfile.split.end中设置的值时,它们可能会引发异常。
  4. 这对选项可用于优化读取计划,例如设置GET请求的内容范围,或将拆分结束位置作为文件保证最小长度的隐式声明。
  5. 如果同时设置了这两个选项,并且分割起始点被声明为大于分割结束点,那么分割起始点应重置为零,而不是拒绝该操作。

分割结束值可以提供关于输入流结束的提示。分割起始值可用于优化文件系统客户端的任何初始读取偏移量。

*实现者须知:当应用程序需要读取到跨越分片边界的记录/行末尾时,它们会读取超出分片末尾的内容。

因此,如果文件实际长度超过fs.option.openfile.split.end设置的值,则必须允许客户端执行seek()/read()操作越过该长度限制。

选项: fs.option.openfile.footer.cache: Boolean

页脚应该被缓存吗?

  • 这是为缓存页脚的客户端提供的提示。
  • 如果在读取策略中声明了已知页脚的格式,则应使用该文件类型的默认页脚缓存策略。

此选项允许覆盖默认策略。如果应用程序希望明确声明正在读取Parquet/ORC文件,但不想或不需要文件系统流缓存任何页脚(因为应用程序自身已进行此类缓存),则推荐使用此选项。重复缓存页脚效率低下,如果存在内存/内存缓存冲突,甚至可能适得其反。

S3A专属配置选项

S3A连接器支持针对预读和查找策略的自定义选项。

名称 类型 含义
fs.s3a.readahead.range long readahead range in bytes
fs.s3a.experimental.input.fadvise String seek policy. Superceded by fs.option.openfile.read.policy
fs.s3a.input.async.drain.threshold long threshold to switch to asynchronous draining of the stream. (Since 3.3.5)

如果选项集在fs.s3a.select.sql语句中包含SQL语句,则该文件将作为S3 Select查询打开。详情请参阅S3A文档。

ABFS 特定选项

ABFS连接器支持自定义输入流选项。

名称 类型 含义
fs.azure.buffered.pread.disable boolean disable caching on the positioned read operations.

禁用通过PositionedReadable API读取数据的缓存功能。

更多详情请参阅ABFS文档。

示例

在打开文件时声明搜索策略和分割限制。

这是一个概念验证示例,使用了org.apache.parquet.hadoop.util.HadoopInputFile读取器,该读取器利用了(可为空的)文件状态和分片起始/结束位置。

始终会传入FileStatus值 - 但如果该值为null,则使用分片结束位置来声明文件长度。

protected SeekableInputStream newStream(Path path, FileStatus stat,
     long splitStart, long splitEnd)
     throws IOException {

   FutureDataInputStreamBuilder builder = fs.openFile(path)
   .opt("fs.option.openfile.read.policy", "vector, random")
   .withFileStatus(stat);

   builder.optLong("fs.option.openfile.split.start", splitStart);
   builder.optLong("fs.option.openfile.split.end", splitEnd);
   CompletableFuture<FSDataInputStream> streamF = builder.build();
   return HadoopStreams.wrap(FutureIO.awaitFuture(streamF));
}

因此,无论是直接由文件列表驱动,还是从查询计划中的(path, splitStart, splitEnd)打开文件时,都无需探测远程存储以获取文件长度。在处理远程对象存储时,即使此类探测是异步完成的,也能节省数十到数百毫秒的时间。

如果同时设置了文件长度和分片结束位置,那么文件长度必须被视为更具权威性,也就是说它确实应该定义文件长度。如果设置了分片结束位置,调用方可能不会读取超过该位置的内容。

如果CompressedSplitLineReader在处理压缩记录的过程中,可能会读取超过分片(split)的末尾。也就是说:它假设读取到不完整的记录意味着文件长度大于分片长度,并且必须读取部分读取记录的全部内容。其他读取器可能也有类似行为。

因此

  1. FileStatusfs.option.openfile.length中提供的文件长度应作为文件长度的严格上限
  2. fs.option.openfile.split.end中设置的分片结束位置必须被视为提示,而非文件的严格结束位置。

使用标准和非标准选项打开文件

标准和非标准选项可以在同一个openFile()操作中组合使用。

Future<FSDataInputStream> f = openFile(path)
  .must("fs.option.openfile.read.policy", "random, adaptive")
  .opt("fs.s3a.readahead.range", 1024 * 1024)
  .build();

FSDataInputStream is = f.get();

must()中设置的选项必须被所有文件系统理解,或至少被识别并忽略。在这个例子中,S3A特定的选项可能会被所有其他文件系统客户端忽略。

使用旧版本打开文件

并非所有hadoop版本都支持fs.option.openfile.read.policy选项。

如果通过opt()构建器参数添加该选项,则可以安全地在应用程序代码中使用,因为它将被视为未知的可选键,之后可以被丢弃。

Future<FSDataInputStream> f = openFile(path)
  .opt("fs.option.openfile.read.policy", "vector, random, adaptive")
  .build();

FSDataInputStream is = f.get();

注意1 如果选项名称是通过引用org.apache.hadoop.fs.Options.OpenFileOptions中的常量设置的,那么程序将无法链接到不包含该特定选项的Hadoop版本。因此,为了能弹性地链接到旧版本,请使用该值的副本。

注2 由于文件系统连接器会执行选项验证,一个设计用于兼容多个hadoop版本的第三方连接器可能不支持该选项。

向MapReduce传递选项

Hadoop MapReduce会自动读取以mapreduce.job.input.file.option.mapreduce.job.input.file.must.为前缀的MR作业选项,在移除mapreduce特定前缀后,分别将这些值作为.opt()must()应用。

这使得向MR作业传递选项变得简单直接。例如,要声明一个作业应使用随机IO读取其数据:

JobConf jobConf = (JobConf) job.getConfiguration()
jobConf.set(
    "mapreduce.job.input.file.option.fs.option.openfile.read.policy",
    "random");

MapReduce输入格式传播选项

一个记录读取器向所打开文件传入选项的示例。

  public void initialize(InputSplit genericSplit,
                     TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit)genericSplit;
    Configuration job = context.getConfiguration();
    start = split.getStart();
    end = start + split.getLength();
    Path file = split.getPath();

    // open the file and seek to the start of the split
    FutureDataInputStreamBuilder builder =
      file.getFileSystem(job).openFile(file);
    // the start and end of the split may be used to build
    // an input strategy.
    builder.optLong("fs.option.openfile.split.start", start);
    builder.optLong("fs.option.openfile.split.end", end);
    FutureIO.propagateOptions(builder, job,
        "mapreduce.job.input.file.option",
        "mapreduce.job.input.file.must");

    fileIn = FutureIO.awaitFuture(builder.build());
    fileIn.seek(start)
    /* Rest of the operation on the opened stream */
  }

FileContext.openFile

来自org.apache.hadoop.fs.AvroFSInput;文件以顺序输入方式打开。由于文件长度已被探测,因此长度会被传递下去

  public AvroFSInput(FileContext fc, Path p) throws IOException {
    FileStatus status = fc.getFileStatus(p);
    this.len = status.getLen();
    this.stream = awaitFuture(fc.openFile(p)
        .opt("fs.option.openfile.read.policy",
            "sequential")
        .optLong("fs.option.openfile.length",
            Long.toString(status.getLen()))
        .build());
    fc.open(p);
  }

在这个示例中,长度参数是通过字符串形式传递的(通过Long.toString()),而不是直接以长整型传递。这是为了确保输入格式能够兼容那些不包含opt(String, long)must(String, long)构建参数的Hadoop版本。同样地,这些值以可选形式传递,这样即使遇到无法识别的参数,应用程序仍能成功运行。

示例:读取整个文件

这来自 org.apache.hadoop.util.JsonSerialization

它的load(FileSystem, Path, FileStatus)方法 * 声明将从头到尾读取整个文件 * 传递文件状态

public T load(FileSystem fs,
        Path path,
        status)
        throws IOException {

 try (FSDataInputStream dataInputStream =
          awaitFuture(fs.openFile(path)
              .opt("fs.option.openfile.read.policy", "whole-file")
              .withFileStatus(status)
              .build())) {
   return fromJsonStream(dataInputStream);
 } catch (JsonProcessingException e) {
   throw new PathIOException(path.toString(),
       "Failed to read JSON file " + e, e);
 }
}