通用文件源选项

这些通用选项/配置仅在使用基于文件的源时有效:parquet、orc、avro、json、csv、text。

请注意,下面示例中使用的目录层级为:

dir1/
 ├── dir2/
 │    └── file2.parquet (模式: <文件: 字符串>, 内容: "file2.parquet")
 └── file1.parquet (模式: <文件, 字符串>, 内容: "file1.parquet")
 └── file3.json (模式: <文件, 字符串>, 内容: "{'文件':'corrupt.json'}")

忽略损坏的文件

Spark允许您使用配置 spark.sql.files.ignoreCorruptFiles 或数据源选项 ignoreCorruptFiles 在读取文件数据时忽略损坏的文件。当设置为true时,在遇到损坏的文件时,Spark作业将继续运行,并且已读取的内容仍然会被返回。

要在读取数据文件时忽略损坏的文件,可以使用:

# 通过数据源选项启用忽略损坏文件
# dir1/file3.json 在 parquet 视图中是损坏的
test_corrupt_df0 = spark.read.option("ignoreCorruptFiles", "true")\
    .parquet("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
test_corrupt_df0.show()
# +-------------+
# |         file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+

# 通过配置启用忽略损坏文件
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json 在 parquet 视图中是损坏的
test_corrupt_df1 = spark.read.parquet("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
test_corrupt_df1.show()
# +-------------+
# |         file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
// 通过数据源选项启用忽略损坏的文件
// dir1/file3.json 从 parquet 的视角来看是损坏的
val testCorruptDF0 = spark.read.option("ignoreCorruptFiles", "true").parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
testCorruptDF0.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
// 通过配置启用忽略损坏的文件
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json 从 parquet 的视角来看是损坏的
val testCorruptDF1 = spark.read.parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
testCorruptDF1.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
// 通过数据源选项启用忽略损坏的文件
// dir1/file3.json 从 parquet 的视角来看是损坏的
Dataset<Row> testCorruptDF0 = spark.read().option("ignoreCorruptFiles", "true").parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/");
testCorruptDF0.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
// 通过配置启用忽略损坏的文件
spark.sql("set spark.sql.files.ignoreCorruptFiles=true");
// dir1/file3.json 从 parquet 的视角来看是损坏的
Dataset<Row> testCorruptDF1 = spark.read().parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/");
testCorruptDF1.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
# 通过数据源选项启用忽略损坏文件
# dir1/file3.json 在 parquet 视图中是损坏的
testCorruptDF0 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"), ignoreCorruptFiles = "true")
head(testCorruptDF0)
#            文件
# 1 file1.parquet
# 2 file2.parquet
# 通过配置启用忽略损坏文件
sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json 在 parquet 视图中是损坏的
testCorruptDF1 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"))
head(testCorruptDF1)
#            文件
# 1 file1.parquet
# 2 file2.parquet
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

忽略缺失的文件

Spark 允许您使用配置 spark.sql.files.ignoreMissingFiles 或数据源选项 ignoreMissingFiles 来忽略在从文件读取数据时缺失的文件。在这里,缺失的文件实际上指的是在构建 DataFrame 后目录下已被删除的文件。当设置为 true 时,Spark 作业在遇到缺失文件时将继续运行,并且已读取的内容仍然会被返回。

路径通配符过滤器

pathGlobFilter 用于仅包含文件名与模式匹配的文件。语法遵循 org.apache.hadoop.fs.GlobFilter 。它不会改变分区发现的行为。

要加载与给定通配符模式匹配的文件路径,同时保持分区发现的行为,您可以使用:

df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", pathGlobFilter="*.parquet")
df.show()
# +-------------+
# |         文件|
# +-------------+
# |file1.parquet|
# +-------------+
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
val testGlobFilterDF = spark.read.format("parquet")
.option("pathGlobFilter", "*.parquet") // 应该过滤掉json文件
.load("examples/src/main/resources/dir1")
testGlobFilterDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Dataset<Row> testGlobFilterDF = spark.read().format("parquet")
.option("pathGlobFilter", "*.parquet") // json 文件应该被过滤掉
.load("examples/src/main/resources/dir1");
testGlobFilterDF.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*.parquet")
#            文件
# 1 file1.parquet
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

递归文件查找

recursiveFileLookup 用于递归加载文件,并且它禁用了分区推断。它的默认值是 false 。如果数据源在 recursiveFileLookup 为真时明确指定 partitionSpec ,将抛出异常。

要递归加载所有文件,可以使用:

recursive_loaded_df = spark.read.format("parquet")\
    .option("recursiveFileLookup", "true")\
    .load("examples/src/main/resources/dir1")
recursive_loaded_df.show()
# +-------------+
# |         文件|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
val recursiveLoadedDF = spark.read.format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// |         文件|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Dataset<Row> recursiveLoadedDF = spark.read().format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1");
recursiveLoadedDF.show();
// +-------------+
// |         文件|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
recursiveLoadedDF <- read.df("examples/src/main/resources/dir1", "parquet", recursiveFileLookup = "true")
head(recursiveLoadedDF)
#            文件
# 1 file1.parquet
# 2 file2.parquet
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

修改时间路径过滤器

modifiedBefore modifiedAfter 是可以一起或单独使用的选项,以实现对在 Spark 批量查询中加载哪些文件的更大粒度控制。 (注意,结构化流文件源不支持这些选项。)

如果没有提供时区选项,时间戳将根据Spark会话时区进行解释 ( spark.sql.session.timeZone )。

要加载具有匹配给定修改时间范围的文件,可以使用:

# 仅加载在 2050年07月01日08:30:00 之前修改的文件
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedBefore="2050-07-01T08:30:00")
df.show()
# +-------------+
# |         file|
# +-------------+
# |file1.parquet|
# +-------------+
# 仅加载在 2050年06月01日08:30:00 之后修改的文件
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedAfter="2050-06-01T08:30:00")
df.show()
# +-------------+
# |         file|
# +-------------+
# +-------------+
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
val beforeFilterDF = spark.read.format("parquet")
// 允许在2020年07月01日05:30之前修改的文件
.option("modifiedBefore", "2020-07-01T05:30:00")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read.format("parquet")
// 允许在2020年06月01日05:30之后修改的文件
.option("modifiedAfter", "2020-06-01T05:30:00")
.load("examples/src/main/resources/dir1");
afterFilterDF.show();
// +-------------+
// |         file|
// +-------------+
// +-------------+
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
Dataset<Row> beforeFilterDF = spark.read().format("parquet")
// 仅加载在2020年7月1日05:30之前修改的文件
.option("modifiedBefore", "2020-07-01T05:30:00")
// 仅加载在2020年6月1日05:30之后修改的文件
.option("modifiedAfter", "2020-06-01T05:30:00")
// 将以上两个时间相对于CST时区进行解释
.option("timeZone", "CST")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00")
#            文件
# 1 file1.parquet
afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00")
#            文件
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.