通用文件源选项
这些通用选项/配置仅在使用基于文件的源时有效:parquet、orc、avro、json、csv、text。
请注意,下面示例中使用的目录层级为:
dir1/
├── dir2/
│ └── file2.parquet (模式: <文件: 字符串>, 内容: "file2.parquet")
└── file1.parquet (模式: <文件, 字符串>, 内容: "file1.parquet")
└── file3.json (模式: <文件, 字符串>, 内容: "{'文件':'corrupt.json'}")
忽略损坏的文件
Spark允许您使用配置
spark.sql.files.ignoreCorruptFiles
或数据源选项
ignoreCorruptFiles
在读取文件数据时忽略损坏的文件。当设置为true时,在遇到损坏的文件时,Spark作业将继续运行,并且已读取的内容仍然会被返回。
要在读取数据文件时忽略损坏的文件,可以使用:
# 通过数据源选项启用忽略损坏文件
# dir1/file3.json 在 parquet 视图中是损坏的
test_corrupt_df0 = spark.read.option("ignoreCorruptFiles", "true")\
.parquet("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
test_corrupt_df0.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
# 通过配置启用忽略损坏文件
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json 在 parquet 视图中是损坏的
test_corrupt_df1 = spark.read.parquet("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
test_corrupt_df1.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
// 通过数据源选项启用忽略损坏的文件
// dir1/file3.json 从 parquet 的视角来看是损坏的
val testCorruptDF0 = spark.read.option("ignoreCorruptFiles", "true").parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
testCorruptDF0.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
// 通过配置启用忽略损坏的文件
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json 从 parquet 的视角来看是损坏的
val testCorruptDF1 = spark.read.parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
testCorruptDF1.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
// 通过数据源选项启用忽略损坏的文件
// dir1/file3.json 从 parquet 的视角来看是损坏的
Dataset<Row> testCorruptDF0 = spark.read().option("ignoreCorruptFiles", "true").parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/");
testCorruptDF0.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
// 通过配置启用忽略损坏的文件
spark.sql("set spark.sql.files.ignoreCorruptFiles=true");
// dir1/file3.json 从 parquet 的视角来看是损坏的
Dataset<Row> testCorruptDF1 = spark.read().parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/");
testCorruptDF1.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
# 通过数据源选项启用忽略损坏文件
# dir1/file3.json 在 parquet 视图中是损坏的
testCorruptDF0 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"), ignoreCorruptFiles = "true")
head(testCorruptDF0)
# 文件
# 1 file1.parquet
# 2 file2.parquet
# 通过配置启用忽略损坏文件
sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json 在 parquet 视图中是损坏的
testCorruptDF1 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"))
head(testCorruptDF1)
# 文件
# 1 file1.parquet
# 2 file2.parquet
忽略缺失的文件
Spark 允许您使用配置
spark.sql.files.ignoreMissingFiles
或数据源选项
ignoreMissingFiles
来忽略在从文件读取数据时缺失的文件。在这里,缺失的文件实际上指的是在构建
DataFrame
后目录下已被删除的文件。当设置为 true 时,Spark 作业在遇到缺失文件时将继续运行,并且已读取的内容仍然会被返回。
路径通配符过滤器
pathGlobFilter
用于仅包含文件名与模式匹配的文件。语法遵循
org.apache.hadoop.fs.GlobFilter
。它不会改变分区发现的行为。
要加载与给定通配符模式匹配的文件路径,同时保持分区发现的行为,您可以使用:
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", pathGlobFilter="*.parquet")
df.show()
# +-------------+
# | 文件|
# +-------------+
# |file1.parquet|
# +-------------+
val testGlobFilterDF = spark.read.format("parquet")
.option("pathGlobFilter", "*.parquet") // 应该过滤掉json文件
.load("examples/src/main/resources/dir1")
testGlobFilterDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
Dataset<Row> testGlobFilterDF = spark.read().format("parquet")
.option("pathGlobFilter", "*.parquet") // json 文件应该被过滤掉
.load("examples/src/main/resources/dir1");
testGlobFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*.parquet")
# 文件
# 1 file1.parquet
递归文件查找
recursiveFileLookup
用于递归加载文件,并且它禁用了分区推断。它的默认值是
false
。如果数据源在
recursiveFileLookup
为真时明确指定
partitionSpec
,将抛出异常。
要递归加载所有文件,可以使用:
recursive_loaded_df = spark.read.format("parquet")\
.option("recursiveFileLookup", "true")\
.load("examples/src/main/resources/dir1")
recursive_loaded_df.show()
# +-------------+
# | 文件|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
val recursiveLoadedDF = spark.read.format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// | 文件|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
Dataset<Row> recursiveLoadedDF = spark.read().format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1");
recursiveLoadedDF.show();
// +-------------+
// | 文件|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
recursiveLoadedDF <- read.df("examples/src/main/resources/dir1", "parquet", recursiveFileLookup = "true")
head(recursiveLoadedDF)
# 文件
# 1 file1.parquet
# 2 file2.parquet
修改时间路径过滤器
modifiedBefore
和
modifiedAfter
是可以一起或单独使用的选项,以实现对在 Spark 批量查询中加载哪些文件的更大粒度控制。 (注意,结构化流文件源不支持这些选项。)
-
modifiedBefore
: 一个可选的时间戳,仅包含在指定时间之前进行修改的文件。提供的时间戳必须采用以下格式:YYYY-MM-DDTHH:mm:ss(例如 2020-06-01T13:00:00) -
modifiedAfter
: 一个可选的时间戳,仅包含在指定时间之后进行修改的文件。提供的时间戳必须采用以下格式:YYYY-MM-DDTHH:mm:ss(例如 2020-06-01T13:00:00)
如果没有提供时区选项,时间戳将根据Spark会话时区进行解释 (
spark.sql.session.timeZone
)。
要加载具有匹配给定修改时间范围的文件,可以使用:
# 仅加载在 2050年07月01日08:30:00 之前修改的文件
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedBefore="2050-07-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# +-------------+
# 仅加载在 2050年06月01日08:30:00 之后修改的文件
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedAfter="2050-06-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# +-------------+
val beforeFilterDF = spark.read.format("parquet")
// 允许在2020年07月01日05:30之前修改的文件
.option("modifiedBefore", "2020-07-01T05:30:00")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read.format("parquet")
// 允许在2020年06月01日05:30之后修改的文件
.option("modifiedAfter", "2020-06-01T05:30:00")
.load("examples/src/main/resources/dir1");
afterFilterDF.show();
// +-------------+
// | file|
// +-------------+
// +-------------+
Dataset<Row> beforeFilterDF = spark.read().format("parquet")
// 仅加载在2020年7月1日05:30之前修改的文件
.option("modifiedBefore", "2020-07-01T05:30:00")
// 仅加载在2020年6月1日05:30之后修改的文件
.option("modifiedAfter", "2020-06-01T05:30:00")
// 将以上两个时间相对于CST时区进行解释
.option("timeZone", "CST")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00")
# 文件
# 1 file1.parquet
afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00")
# 文件