org.apache.hadoop.fs.FutureDataInputStreamBuilder
一个提供Builder模式的接口,用于创建指向FSDataInputStream
及其子类的JavaFuture
引用。它用于启动(可能是异步的)操作来打开现有文件进行读取。
HADOOP-15229 添加基于FileSystem构建器的openFile() API以匹配createFile()功能
opt(String key, long value)
方法。withFileStatus(status)
调用需要一个非空参数。fs.s3a.experimental.input.fadvise
openFile(path)
调用的路径不匹配时,会抛出IllegalArgumentException
异常。这是基础实现。要编写保证能在此版本下编译的代码,请使用opt(String, String)
和must(String, String)
方法,并显式将数字转换为字符串。
fs.open("s3a://bucket/file") .opt("fs.option.openfile.length", Long.toString(length)) .build().get()
HADOOP-16202 增强openFile()功能以提升对象存储的读取性能
withFileStatus(null)
需要被接受(并忽略)openFile(path)
中传入的文件名匹配。opt(String key, long value)
选项。*由于导致回归问题,该选项现已弃用fs.option.openfile
选项。VersionedFileStatus
并省略对对象的任何 HEAD 探测。新增 optLong()
、optDouble()
、mustLong()
和 mustDouble()
构建器方法。
opt(long)
方法引发。FutureDataInputStreamBuilder
接口在调用 build()
方法之前和/或在异步打开操作期间,不需要参数或 FileSystem
的状态。
文件系统状态的某些方面,可以在初始的openFile()
调用中进行检查,前提是这些方面已知是不变量,在openFile()
和build().get()
序列之间不会发生变化。例如,路径验证。
FutureDataInputStreamBuilder bufferSize(int bufSize)
设置要使用的缓冲区大小。
FutureDataInputStreamBuilder withFileStatus(FileStatus status)
一个FileStatus
实例,指向正在被打开的文件。
实现可能会使用此方法来跳过对文件的检查,从而有可能节省远程调用,尤其是对对象存储的调用。
需求:
status != null
status.getPath().getName()
== 正在打开的文件的名称。如果存储在使用FileStatus
打开文件时,必须进行路径验证,否则可以选择执行验证。验证应推迟到build()
操作时进行。
此操作应被视为对文件系统的提示。
如果文件系统实现扩展了FileStatus
,在其实现中返回的信息可以在打开文件时使用这些信息。
这与那些返回版本/ETag信息的存储相关——它们可以利用此机制确保打开的文件与列表中返回的文件完全一致。
提供的status中最后的status.getPath().getName()
元素必须等于传递给openFile(path)
调用的路径名称值。
文件系统不得验证路径的其余部分。这是为了支持viewfs和其他挂载点包装文件系统,其中模式和路径不同。这些系统通常会创建自己的FileStatus结果
前提条件
status == null or status.getPath().getName() == path.getName()
文件系统不得要求status
类的类型必须等于其在文件状态/列表操作中返回的任何特定子类。这是为了支持包装文件系统以及状态序列化/反序列化。
FutureDataInputStreamBuilder opt(String key, String value) FutureDataInputStreamBuilder opt(String key, int value) FutureDataInputStreamBuilder opt(String key, boolean value) FutureDataInputStreamBuilder optLong(String key, long value) FutureDataInputStreamBuilder optDouble(String key, double value) FutureDataInputStreamBuilder must(String key, String value) FutureDataInputStreamBuilder must(String key, int value) FutureDataInputStreamBuilder must(String key, boolean value) FutureDataInputStreamBuilder mustLong(String key, long value) FutureDataInputStreamBuilder mustDouble(String key, double value)
为构建器设置可选或必需参数。使用opt()
或must()
,客户端可以指定文件系统特定参数,而无需检查FileSystem
的具体类型。
示例:
out = fs.openFile(path) .must("fs.option.openfile.read.policy", "random") .optLong("fs.http.connection.timeout", 30_000L) .withFileStatus(statusFromListing) .build() .get();
这里指定了random
的读取策略,要求文件系统实现必须理解该选项。同时提供了一个http专用选项,该选项可被任何存储系统解析;如果打开文件的文件系统无法识别该选项,则可以安全地忽略它。
opt
与 must
opt
和 must
的区别在于,当文件系统打开文件时遇到无法识别的选项时,必须采取不同的处理方式。
def must(name, value): if not name in known_keys: raise IllegalArgumentException if not name in supported_keys: raise UnsupportedException def opt(name, value): if not name in known_keys: # ignore option
对于任何已知的键,无论(key, value)对是如何声明的,对value
参数的验证必须保持一致。
value
的有效性规范在文件系统规范中定义,并通过契约测试进行验证。检查支持的选项必须在build()
操作中执行。
如果通过must(key, value)
声明的必需参数未被识别,则必须抛出IllegalArgumentException
。
如果通过must(key, value)
声明的必选参数依赖于某个特性,而该特性在特定的FileSystem
/FileContext
实例中被识别但不支持,则必须抛出UnsupportedException
异常。
数值解析应修剪任何字符串,如果无法将值解析为数字,则降级为提供的任何默认值。这是为了解决HADOOP-18724 Open file fails with NumberFormatException for S3AFileSystem问题,该问题是由于传入长整型值时,重载的opt()
构建器参数绑定到opt(String, double)
而非opt(String, long)
所导致的。
构建器方法(如bufferSize()
)与opt()
/must()
所设参数之间冲突的解决行为如下:
最后指定的选项将定义其值及其可选/必填状态。
如果在withFileStatus()
中传入FileStatus
选项,实现必须接受FileStatus
的所有子类,包括LocatedFileStatus
,而不仅仅是实现自定义的特定文件系统子类(例如S3AFileStatus
)。它们可以简单地忽略那些非自定义子类。
这对确保功能的安全使用至关重要:目录列表/状态序列化/反序列化可能导致withFileStatus()
参数不是由文件系统实例自身的getFileStatus()
、listFiles()
、listLocatedStatus()
等调用返回的自定义子类。
在这种情况下,实现必须:
status.getPath().getName()
是否与当前的path.getName()
值匹配。路径的其余部分不得进行验证。即使未使用状态值,参数的存在也可以解释为调用者声明他们相信文件存在且具有给定大小。
CompletableFuture build()
返回一个CompletableFuture
,当成功完成时,将返回一个可以从文件系统读取数据的输入流。
build()
操作可能会执行文件存在性及其类型的验证,从而拒绝尝试从目录或不存在的文件读取。或者*文件存在性/状态检查可能会在返回的CompletableFuture<>
中异步执行。*文件存在性/状态检查可能会推迟到首次读取字节时,例如在read()
或PositionedRead
等读取操作中。
也就是说,前提条件 exists(FS, path)
和 isFile(FS, path)
只有在调用返回的future上的get()
方法并尝试读取流之后,才能保证被满足。
因此,即使文件不存在,或者是一个目录而非文件,以下调用也必须成功,返回一个待评估的CompletableFuture
。
Path p = new Path("file://tmp/file-which-does-not-exist"); CompletableFuture<FSDataInputStream> future = p.getFileSystem(conf) .openFile(p) .build();
无法访问/读取文件时,必须在future的get()
调用中抛出IOException
或其子类异常,或者对于延迟绑定操作,在调用读取数据操作时抛出异常。
因此,当在前一个示例返回的future
上调用时,以下序列必定会失败。
future.get().read();
访问权限检查具有相同的可见性要求:权限失败必须延迟到get()
调用时,并且可以延迟到后续操作中。
注意:对输入流的某些操作,例如seek()
可能完全不会尝试任何IO操作。这些操作在与不存在/不可读的文件交互时可能不会引发异常。
openFile()
选项这些是FileSystem
和FileContext
实现必须识别并可能通过适当改变其输入流行为来支持的选项。
Hadoop 3.3.0 新增了 openFile()
API;这些标准选项是在后续版本中定义的。因此,尽管它们是"众所周知的",除非确信应用程序仅会在支持这些选项的Hadoop版本上运行,否则应用程序应该通过 opt()
调用而非 must()
来设置这些选项。
通过openFile()
构建器API打开文件时,调用者可以同时使用.opt(key, value)
和.must(key, value)
调用来设置标准和文件系统特定的选项。
如果设置为opt()
参数,则必须忽略不受支持的"标准"选项,同样也必须忽略无法识别的标准选项。
如果设置为must()
参数,则必须忽略不受支持的"标准"选项。无法识别的标准选项必须被拒绝。
标准的openFile()
选项定义在org.apache.hadoop.fs.OpenFileOptions
中;所有选项都必须以fs.option.openfile.
开头。
请注意,虽然所有FileSystem
/FileContext
实例都应支持这些选项(即must()
声明不应失败),但具体实现可以选择性地解释这些值。这意味着存储系统并不强制要求在实际打开文件时必须读取或使用读取策略和文件长度值。
除非另有说明,否则它们应被视为提示。
注意:如果添加了一个标准选项,当该选项被设置但不被支持时会导致错误,那么实现必须拒绝它。例如,S3A文件系统客户端支持推送SQL命令的能力。如果类似功能被标准化,那么对于不支持该特性的文件系统,无论是在opt()
还是must()
参数中使用该选项都必须被拒绝。
fs.option.openfile.buffer.size
读取缓冲区大小(以字节为单位)。
这会覆盖配置中使用选项io.file.buffer.size
设置的默认值。
所有支持通过FileSystem.open(path, buffersize)
设置流特定缓冲区大小的文件系统客户端都支持此功能。
fs.option.openfile.read.policy
声明输入流的读取策略。这是对输入流预期读取模式的提示,可能会影响预读、缓冲和其他优化行为。
顺序读取可以通过预取数据和/或使用更大的数据块进行读取来优化。某些应用程序(例如distCp)即使在列式数据上也会执行顺序IO。
相比之下,随机IO通过一系列seek()/read()
操作或通过PositionedReadable
或ByteBufferPositionedReadable
API从文件的不同部分读取数据。
如果几乎没有或没有预取发生,同时结合其他可能的优化措施,随机IO性能可能会达到最佳
对诸如Apache ORC和Apache Parquet等列式格式的查询会执行此类随机IO操作;其他数据格式可能更适合采用顺序或整文件读取策略。
关键在于,针对顺序读取优化可能会损害随机读取性能,反之亦然。
must()
选项,文件系统仍可能忽略它。策略 | 含义 |
---|---|
adaptive |
Any adaptive policy implemented by the store. |
avro |
This is an avro format which will be read sequentially |
csv |
This is CSV data which will be read sequentially |
default |
The default policy for this store. Generally “adaptive”. |
columnar |
This is any columnar format other than ORC/parquet. |
hbase |
This is an HBase Table |
json |
This is a UTF-8 JSON/JSON lines format which will be read sequentially |
orc |
This is an ORC file. Optimize for it. |
parquet |
This is a Parquet file. Optimize for it. |
random |
Optimize for random access. |
sequential |
Optimize for sequential access. |
vector |
The Vectored IO API is intended to be used. |
whole-file |
The whole file will be read. |
为输入源选择错误的读取策略可能效率低下,但绝不会致命。
可以提供一个读取策略列表;文件系统识别/支持的第一个策略将被采用。这允许创建跨版本兼容的配置。策略parquet, columnar, vector, random, adaptive
会优先使用支持parquet策略的文件系统,依次回退到columnar
、vector
、random
,最后是adaptive
。S3A连接器从Hadoop 3.3.5版本开始支持random
策略(即自openFile()
API添加后),并从Hadoop 3.4.0开始支持vector
策略。
S3A和ABFS输入流均实现了IOStatisticsSource API,可以查询其IO性能。
提示:在DEBUG
级别记录输入流的toString()
值。S3A和ABFS输入流会记录读取统计信息,这可以帮助了解读取操作是否高效执行。
延伸阅读
adaptive
尝试根据应用程序的读取模式调整寻道策略。
S3A客户端的normal
策略以及wasb:
客户机唯一支持的策略都是自适应的——它们假设是顺序IO,但一旦进行向后查找/定位读取调用,流就会切换到随机IO。
其他文件系统实现可能希望采用类似的策略,和/或扩展算法以检测向前查找,和/或在认为更高效的情况下从随机IO切换到顺序IO。
自适应读取策略是指在open()
API中无法声明寻道策略,因此如果可配置的话,需要在集群/应用程序配置中进行声明。然而,从顺序寻道策略切换到随机寻道策略可能会带来较大开销。
当应用程序明确设置fs.option.openfile.read.policy
选项时,如果它们知道自己的读取计划,则应该声明哪种策略最合适。
default
文件系统实例的默认策略。具体实现/安装相关。
sequential
预期从读取的第一个字节到文件末尾/直到流关闭的顺序读取。
random
预期会出现 seek()/read()
序列操作,或使用 PositionedReadable
或 ByteBufferPositionedReadable
API接口。
vector
这声明调用者打算使用HADOOP-11867中提出的向量化读取API 添加高性能向量化读取接口。
这是一个提示:使用API时并非强制要求。但它会告知实现方,如果已实现此类功能,应将流配置为最佳向量化IO性能。
这并非互斥的:同一数据流仍可用于传统的InputStream
和PositionedRead
API调用。在这些操作中,实现方案应采用random
读取策略。
whole-file
这表明整个文件需要从头到尾完整读取;文件系统客户端可以自由启用任何能最大化性能的策略。特别是,更大范围的读取/GET操作可以通过减少套接字/TLS建立开销,并提供足够长久的连接让TCP流量控制机制确定最佳下载速率,从而实现更高的带宽。
策略可以包括:
openFile()
操作中发起对整个文件的HTTP GET请求。知道整个文件将从打开的流中读取的应用程序应声明此读取策略。
columnar
声明数据为某种(未指定的)列式格式,并且读取序列预期为对整个列条带/行组的随机IO操作,可能会首先获取相关的列统计信息,以确定是否可以完全跳过对某个条带/行组的扫描。
parquet
和 orc
这些是读取策略,声明文件采用特定的列式格式,并且输入流可以针对这些格式进行读取优化。
特别是 * 文件页脚可能会被获取并缓存。 * 应该预期会有向量IO和随机IO。
这些读取策略是Hadoop 3.4.x新增的功能,因此针对多版本的应用和库,如果无法识别这些策略,应该列出它们的回退策略,例如请求类似parquet, vector, random
这样的策略。
avro
, json
和 csv
这些是读取策略,声明文件具有特定的顺序格式,并且输入流可以针对这些格式进行读取优化。
这些读取策略是Hadoop 3.4.x新增的功能,因此针对多版本的应用和库,如果无法识别这些策略,应该列出它们的回退策略,例如请求像avro, sequential
这样的策略。
hbase
该文件是一个HBase表。对这些文件使用任何合适的策略,其中random
是默认应使用的策略,除非存在与HBase相关的特定优化。
fs.option.openfile.length
: Long
声明文件的长度。
客户端可以使用此功能在打开文件时跳过查询远程存储以获取文件大小或存在状态,类似于通过withFileStatus()
选项声明文件状态。
如果文件系统连接器支持,此选项必须被解释为声明文件的最小长度:
read()
, seek()
和定位读取调用可以使用超过当前长度但小于文件实际长度的位置。在这种情况下,实现可以选择抛出EOFExceptions
异常,或者返回数据。如果此选项被FileSystem实现使用
实现者说明
fs.option.openfile.length
的值若小于 0 则必须被忽略。fs.opt.openfile.length
中的值,则优先采用文件状态值。fs.option.openfile.split.start
和 fs.option.openfile.split.end
: Long
当文件被分割成多个部分进行处理时,声明分割的起始和结束位置。
fs.option.openfile.split.end
的值。fs.option.openfile.split.end
中设置的值时,它们可能会引发异常。分割结束值可以提供关于输入流结束的提示。分割起始值可用于优化文件系统客户端的任何初始读取偏移量。
*实现者须知:当应用程序需要读取到跨越分片边界的记录/行末尾时,它们会读取超出分片末尾的内容。
因此,如果文件实际长度超过fs.option.openfile.split.end
设置的值,则必须允许客户端执行seek()
/read()
操作越过该长度限制。
fs.option.openfile.footer.cache
: Boolean
页脚应该被缓存吗?
此选项允许覆盖默认策略。如果应用程序希望明确声明正在读取Parquet/ORC文件,但不想或不需要文件系统流缓存任何页脚(因为应用程序自身已进行此类缓存),则推荐使用此选项。重复缓存页脚效率低下,如果存在内存/内存缓存冲突,甚至可能适得其反。
S3A连接器支持针对预读和查找策略的自定义选项。
名称 | 类型 | 含义 |
---|---|---|
fs.s3a.readahead.range |
long |
readahead range in bytes |
fs.s3a.experimental.input.fadvise |
String |
seek policy. Superceded by fs.option.openfile.read.policy |
fs.s3a.input.async.drain.threshold |
long |
threshold to switch to asynchronous draining of the stream. (Since 3.3.5) |
如果选项集在fs.s3a.select.sql
语句中包含SQL语句,则该文件将作为S3 Select查询打开。详情请参阅S3A文档。
ABFS连接器支持自定义输入流选项。
名称 | 类型 | 含义 |
---|---|---|
fs.azure.buffered.pread.disable |
boolean |
disable caching on the positioned read operations. |
禁用通过PositionedReadable API读取数据的缓存功能。
更多详情请参阅ABFS文档。
这是一个概念验证示例,使用了org.apache.parquet.hadoop.util.HadoopInputFile
读取器,该读取器利用了(可为空的)文件状态和分片起始/结束位置。
始终会传入FileStatus
值 - 但如果该值为null,则使用分片结束位置来声明文件长度。
protected SeekableInputStream newStream(Path path, FileStatus stat, long splitStart, long splitEnd) throws IOException { FutureDataInputStreamBuilder builder = fs.openFile(path) .opt("fs.option.openfile.read.policy", "vector, random") .withFileStatus(stat); builder.optLong("fs.option.openfile.split.start", splitStart); builder.optLong("fs.option.openfile.split.end", splitEnd); CompletableFuture<FSDataInputStream> streamF = builder.build(); return HadoopStreams.wrap(FutureIO.awaitFuture(streamF)); }
因此,无论是直接由文件列表驱动,还是从查询计划中的(path, splitStart, splitEnd)
打开文件时,都无需探测远程存储以获取文件长度。在处理远程对象存储时,即使此类探测是异步完成的,也能节省数十到数百毫秒的时间。
如果同时设置了文件长度和分片结束位置,那么文件长度必须被视为更具权威性,也就是说它确实应该定义文件长度。如果设置了分片结束位置,调用方可能不会读取超过该位置的内容。
如果CompressedSplitLineReader
在处理压缩记录的过程中,可能会读取超过分片(split)的末尾。也就是说:它假设读取到不完整的记录意味着文件长度大于分片长度,并且必须读取部分读取记录的全部内容。其他读取器可能也有类似行为。
因此
FileStatus
或fs.option.openfile.length
中提供的文件长度应作为文件长度的严格上限fs.option.openfile.split.end
中设置的分片结束位置必须被视为提示,而非文件的严格结束位置。标准和非标准选项可以在同一个openFile()
操作中组合使用。
Future<FSDataInputStream> f = openFile(path) .must("fs.option.openfile.read.policy", "random, adaptive") .opt("fs.s3a.readahead.range", 1024 * 1024) .build(); FSDataInputStream is = f.get();
在must()
中设置的选项必须被所有文件系统理解,或至少被识别并忽略。在这个例子中,S3A特定的选项可能会被所有其他文件系统客户端忽略。
并非所有hadoop版本都支持fs.option.openfile.read.policy
选项。
如果通过opt()
构建器参数添加该选项,则可以安全地在应用程序代码中使用,因为它将被视为未知的可选键,之后可以被丢弃。
Future<FSDataInputStream> f = openFile(path) .opt("fs.option.openfile.read.policy", "vector, random, adaptive") .build(); FSDataInputStream is = f.get();
注意1 如果选项名称是通过引用org.apache.hadoop.fs.Options.OpenFileOptions
中的常量设置的,那么程序将无法链接到不包含该特定选项的Hadoop版本。因此,为了能弹性地链接到旧版本,请使用该值的副本。
注2 由于文件系统连接器会执行选项验证,一个设计用于兼容多个hadoop版本的第三方连接器可能不支持该选项。
Hadoop MapReduce会自动读取以mapreduce.job.input.file.option.
和mapreduce.job.input.file.must.
为前缀的MR作业选项,在移除mapreduce特定前缀后,分别将这些值作为.opt()
和must()
应用。
这使得向MR作业传递选项变得简单直接。例如,要声明一个作业应使用随机IO读取其数据:
JobConf jobConf = (JobConf) job.getConfiguration() jobConf.set( "mapreduce.job.input.file.option.fs.option.openfile.read.policy", "random");
一个记录读取器向所打开文件传入选项的示例。
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit)genericSplit; Configuration job = context.getConfiguration(); start = split.getStart(); end = start + split.getLength(); Path file = split.getPath(); // open the file and seek to the start of the split FutureDataInputStreamBuilder builder = file.getFileSystem(job).openFile(file); // the start and end of the split may be used to build // an input strategy. builder.optLong("fs.option.openfile.split.start", start); builder.optLong("fs.option.openfile.split.end", end); FutureIO.propagateOptions(builder, job, "mapreduce.job.input.file.option", "mapreduce.job.input.file.must"); fileIn = FutureIO.awaitFuture(builder.build()); fileIn.seek(start) /* Rest of the operation on the opened stream */ }
FileContext.openFile
来自org.apache.hadoop.fs.AvroFSInput
;文件以顺序输入方式打开。由于文件长度已被探测,因此长度会被传递下去
public AvroFSInput(FileContext fc, Path p) throws IOException { FileStatus status = fc.getFileStatus(p); this.len = status.getLen(); this.stream = awaitFuture(fc.openFile(p) .opt("fs.option.openfile.read.policy", "sequential") .optLong("fs.option.openfile.length", Long.toString(status.getLen())) .build()); fc.open(p); }
在这个示例中,长度参数是通过字符串形式传递的(通过Long.toString()
),而不是直接以长整型传递。这是为了确保输入格式能够兼容那些不包含opt(String, long)
和must(String, long)
构建参数的Hadoop版本。同样地,这些值以可选形式传递,这样即使遇到无法识别的参数,应用程序仍能成功运行。
这来自 org.apache.hadoop.util.JsonSerialization
。
它的load(FileSystem, Path, FileStatus)
方法 * 声明将从头到尾读取整个文件 * 传递文件状态
public T load(FileSystem fs, Path path, status) throws IOException { try (FSDataInputStream dataInputStream = awaitFuture(fs.openFile(path) .opt("fs.option.openfile.read.policy", "whole-file") .withFileStatus(status) .build())) { return fromJsonStream(dataInputStream); } catch (JsonProcessingException e) { throw new PathIOException(path.toString(), "Failed to read JSON file " + e, e); } }