Parquet 文件
Parquet 是一种列式格式,受许多其他数据处理系统的支持。Spark SQL 提供对读取和写入 Parquet 文件的支持,这些文件自动保留原始数据的模式。在读取 Parquet 文件时,所有列都被自动转换为可空,以确保兼容性。
以编程方式加载数据
使用上面示例中的数据:
peopleDF = spark.read.json("examples/src/main/resources/people.json")
# 数据框可以保存为 Parquet 文件,保持模式信息。
peopleDF.write.parquet("people.parquet")
# 读取上面创建的 Parquet 文件。
# Parquet 文件是自描述的,因此模式得以保留。
# 加载 Parquet 文件的结果也是一个数据框。
parquetFile = spark.read.parquet("people.parquet")
# Parquet 文件也可以用来创建临时视图,然后用于 SQL 语句。
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()
# +------+
# | name|
# +------+
# |Justin|
# +------+
// 通过导入 spark.implicits._ 自动提供大多数常见类型的编码器
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrame 可以作为 Parquet 文件保存,维护模式信息
peopleDF.write.parquet("people.parquet")
// 读取上面创建的 Parquet 文件
// Parquet 文件是自描述的,因此模式得以保留
// 加载 Parquet 文件的结果也是一个 DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet 文件还可以用来创建临时视图,然后在 SQL 语句中使用
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
// 数据框可以保存为 Parquet 文件,保持架构信息
peopleDF.write().parquet("people.parquet");
// 读取上面创建的 Parquet 文件。
// Parquet 文件是自描述的,因此架构得以保留
// 加载 parquet 文件的结果也是一个 DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
// Parquet 文件还可以用于创建临时视图,然后可以在 SQL 语句中使用
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
df <- read.df("examples/src/main/resources/people.json", "json")
# SparkDataFrame 可以保存为 Parquet 文件,保持模式信息。
write.parquet(df, "people.parquet")
# 读取上述创建的 Parquet 文件。Parquet 文件是自描述的,所以模式得以保留。
# 加载 Parquet 文件的结果也是一个 DataFrame。
parquetFile <- read.parquet("people.parquet")
# Parquet 文件也可以用来创建临时视图,然后可以在 SQL 语句中使用。
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin
# 我们还可以在 Spark DataFrames 上运行自定义的 R-UDF。在这里我们为所有名称添加前缀 "Name:"
schema <- structType(structField("name", "string"))
teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
for (teenName in collect(teenNames)$name) {
cat(teenName, "\n")
}
## Name: Michael
## Name: Andy
## Name: Justin
创建 临时 视图 parquetTable
使用 org.apache.spark.sql.parquet
选项 (
路径 "examples/src/main/resources/people.parquet"
)
选择 * 从 parquetTable
分区发现
表分区是像 Hive 这样的系统中常用的优化方法。在分区表中,数据通常存储在不同的目录中,分区列值被编码在每个分区目录的路径中。所有内置文件源(包括 Text/CSV/JSON/ORC/Parquet)都能够自动发现和推断分区信息。例如,我们可以使用以下目录结构将我们以前使用的人口数据存储到分区表中,使用两个额外的列,
gender
和
country
作为分区列:
路径
└── 至
└── 表
├── 性别=男性
│ ├── ...
│ │
│ ├── 国家=美国
│ │ └── data.parquet
│ ├── 国家=中国
│ │ └── data.parquet
│ └── ...
└── 性别=女性
├── ...
│
├── 国家=美国
│ └── data.parquet
├── 国家=中国
│ └── data.parquet
└── ...
通过将
path/to/table
传递给
SparkSession.read.parquet
或
SparkSession.read.load
,Spark SQL 将自动从路径中提取分区信息。现在返回的 DataFrame 的模式变为:
root
|-- name: 字符串 (可为空 = true)
|-- age: 长整型 (可为空 = true)
|-- gender: 字符串 (可为空 = true)
|-- country: 字符串 (可为空 = true)
注意,分区列的数据类型是自动推断的。目前,支持数值数据类型、日期、时间戳和字符串类型。有时用户可能不希望自动推断分区列的数据类型。对于这些使用案例,可以通过
spark.sql.sources.partitionColumnTypeInference.enabled
配置自动类型推断,默认值为
true
。当类型推断被禁用时,分区列将使用字符串类型。
从Spark 1.6.0开始,分区发现默认只在给定路径下查找分区。对于上述示例,如果用户将
path/to/table/gender=male
传递给
SparkSession.read.parquet
或
SparkSession.read.load
,
gender
将不会被视为分区列。如果用户需要指定分区发现应该从哪个基础路径开始,可以在数据源选项中设置
basePath
。例如,当
path/to/table/gender=male
是数据的路径并且用户将
basePath
设置为
path/to/table/
时,
gender
将成为一个分区列。
架构合并
像协议缓冲区、Avro 和 Thrift 一样,Parquet 也支持模式演变。用户可以从一个简单的模式开始,并根据需要逐渐添加更多列到模式中。通过这种方式,用户可能最终会得到多个具有不同但相互兼容模式的 Parquet 文件。现在,Parquet 数据源能够自动检测这种情况并合并所有这些文件的模式。
由于模式合并是一项相对昂贵的操作,并且在大多数情况下并不是必要的,从1.5.0开始我们默认将其关闭。您可以通过
-
在读取Parquet文件时,将数据源选项
mergeSchema设置为true(如下例所示),或者 -
将全局SQL选项
spark.sql.parquet.mergeSchema设置为true。
from pyspark.sql import Row
# spark来自前面的例子。
# 创建一个简单的DataFrame,存储到分区目录中
sc = spark.sparkContext
squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
.map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")
# 在一个新的分区目录中创建另一个DataFrame,
# 添加一个新列并删除一个现有列
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
.map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")
# 读取分区表
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
# 最终的模式由Parquet文件中的所有3列组成
# 以及在分区目录路径中出现的分区列。
# root
# |-- double: long (nullable = true)
# |-- single: long (nullable = true)
# |-- triple: long (nullable = true)
# |-- key: integer (nullable = true)
// 这用于隐式将 RDD 转换为 DataFrame。
import spark.implicits._
// 创建一个简单的 DataFrame,存储到分区目录中
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
// 在一个新的分区目录中创建另一个 DataFrame,
// 添加一个新列并删除一个现有列
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
// 读取分区表
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// 最终的模式由 Parquet 文件中的所有 3 列组成
// 分区列出现在分区目录路径中
// 根
// |-- value: int (可为空 = true)
// |-- square: int (可为空 = true)
// |-- cube: int (可为空 = true)
// |-- key: int (可为空 = true)
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public static class Square implements Serializable {
private int value;
private int square;
// 获取器和设置器...
}
public static class Cube implements Serializable {
private int value;
private int cube;
// 获取器和设置器...
}
List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
Square square = new Square();
square.setValue(value);
square.setSquare(value * value);
squares.add(square);
}
// 创建一个简单的数据框,存储到分区目录中
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");
List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
Cube cube = new Cube();
cube.setValue(value);
cube.setCube(value * value * value);
cubes.add(cube);
}
// 在一个新的分区目录中创建另一个数据框,
// 添加一个新列并删除一个现有列
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");
// 读取分区表
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();
// 最终的模式由所有 3 列组成,这些列在 Parquet 文件中一起
// 以及分区列出现在分区目录路径中
// 根
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))
# 创建一个简单的 DataFrame,存储到一个分区目录
write.df(df1, "data/test_table/key=1", "parquet", "overwrite")
# 在新的分区目录中创建另一个 DataFrame,
# 添加一个新列并删除一个现有列
write.df(df2, "data/test_table/key=2", "parquet", "overwrite")
# 读取分区表
df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
printSchema(df3)
# 最终的模式包括所有 3 列在 Parquet 文件中一起
# 分区列出现在分区目录路径中
## root
## |-- single: double (nullable = true)
## |-- double: double (nullable = true)
## |-- triple: double (nullable = true)
## |-- key: integer (nullable = true)
Hive metastore Parquet 表转换
当从 Hive 元存储 Parquet 表读取数据并写入非分区的 Hive 元存储 Parquet 表时,Spark SQL 会尝试使用它自己的 Parquet 支持,而不是 Hive SerDe,以获得更好的性能。此行为由
spark.sql.hive.convertMetastoreParquet
配置控制,并默认启用。
Hive/Parquet 架构一致性检查
从表模式处理的角度来看,Hive和Parquet之间有两个关键区别。
- Hive对大小写不敏感,而Parquet对大小写敏感
- Hive认为所有列都是可为空的,而Parquet中的nullability是重要的
由于这个原因,我们必须在将Hive元存储的Parquet表转换为Spark SQL的Parquet表时,对Hive元存储的模式与Parquet模式进行协调。协调规则如下:
-
在两个模式中具有相同名称的字段必须具有相同的数据类型,而不考虑可空性。合并后的字段应具有Parquet侧的数据类型,以确保尊重可空性。
-
合并后的模式恰好包含Hive metastore模式中定义的那些字段。
- 在Parquet模式中仅出现的任何字段将在合并后的模式中被删除。
- 在Hive metastore模式中仅出现的任何字段将作为可空字段添加到合并后的模式中。
元数据刷新
Spark SQL 缓存 Parquet 元数据以提升性能。当启用 Hive 元存储 Parquet 表转换时,转换后的表的元数据也会被缓存。如果这些表被 Hive 或其他外部工具更新,您需要手动刷新它们以确保元数据的一致性。
# spark是一个现有的SparkSession
spark.catalog.refreshTable("my_table")
// spark是一个现有的SparkSession
spark.catalog.refreshTable("my_table")
// spark 是一个现有的 SparkSession
spark.catalog().refreshTable("my_table");
refreshTable("my_table")
刷新 表 my_table;
列式加密
自Spark 3.2以来,Parquet表支持使用Apache Parquet 1.12+进行列式加密。
Parquet 使用信封加密实践,其中文件部分使用“数据加密密钥”(DEK)进行加密,DEK 又使用“主加密密钥”(MEK)进行加密。DEK 是由 Parquet 为每个加密文件/列随机生成的。MEK 在用户选择的密钥管理服务(KMS)中生成、存储和管理。Parquet Maven
仓库
提供了一个带有模拟 KMS 实现的 jar 文件,允许仅使用 spark-shell 运行列加密和解密,而无需部署 KMS 服务器(下载
parquet-hadoop-tests.jar
文件并将其放置在 Spark
jars
文件夹中):
# 设置hadoop配置属性,例如使用Spark作业的配置属性:
# --conf spark.hadoop.parquet.encryption.kms.client.class=\
# "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS"\
# --conf spark.hadoop.parquet.encryption.key.list=\
# "keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA=="\
# --conf spark.hadoop.parquet.crypto.factory.class=\
# "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory"
# 写入加密的DataFrame文件。
# 列 "square" 将由主密钥 "keyA" 保护。
# Parquet 文件页脚将由主密钥 "keyB" 保护。
squaresDF.write\
.option("parquet.encryption.column.keys" , "keyA:square")\
.option("parquet.encryption.footer.key" , "keyB")\
.parquet("/path/to/table.parquet.encrypted")
# 读取加密的DataFrame文件
df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")
// 显式主密钥(base64 编码)- 仅在模拟 InMemoryKMS 时需要
sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
"keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==")
// 激活 Parquet 加密,由 Hadoop 属性驱动
sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
"org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")
// 生成加密的数据框文件。
// 列 "square" 将使用主密钥 "keyA" 进行保护。
// Parquet 文件的尾部将使用主密钥 "keyB" 进行保护
squaresDF.write.
option("parquet.encryption.column.keys" , "keyA:square").
option("parquet.encryption.footer.key" , "keyB").
parquet("/path/to/table.parquet.encrypted")
// 读取加密的数据框文件
val df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration().set("parquet.encryption.kms.client.class" ,
"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS");
// 显式主密钥(base64 编码) - 仅对于模拟 InMemoryKMS 必需
sc.hadoopConfiguration().set("parquet.encryption.key.list" ,
"keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==");
// 激活 Parquet 加密,由 Hadoop 属性驱动
sc.hadoopConfiguration().set("parquet.crypto.factory.class" ,
"org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory");
// 写入加密的数据框文件。
// 列 "square" 将使用主密钥 "keyA" 进行保护。
// Parquet 文件的页脚将使用主密钥 "keyB" 进行保护。
squaresDF.write().
option("parquet.encryption.column.keys" , "keyA:square").
option("parquet.encryption.footer.key" , "keyB").
parquet("/path/to/table.parquet.encrypted");
// 读取加密的数据框文件
Dataset<Row> df2 = spark.read().parquet("/path/to/table.parquet.encrypted");
KMS 客户端
InMemoryKMS 类仅用于说明和简单演示 Parquet 加密功能。 它不应在实际部署中使用 。主加密密钥必须在生产级 KMS 系统中进行保存和管理,该系统部署在用户的组织中。使用 Parquet 加密的 Spark 部署需要为 KMS 服务器实现一个客户端类。Parquet 提供了一个插件 接口 来开发此类类,
public interface KmsClient {
// 用主密钥包装一个密钥 - 用主密钥加密它。
public String wrapKey(byte[] keyBytes, String masterKeyIdentifier);
// 用主密钥解密(解包)一个密钥。
public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier);
// 初始化参数的使用是可选的。
public void initialize(Configuration configuration, String kmsInstanceID,
String kmsInstanceURL, String accessToken);
}
在 parquet-mr 仓库中可以找到一个这样的类的
示例
,用于一个开源的
KMS
。生产 KMS 客户端应与组织的安全管理员合作设计,并由具有访问控制管理经验的开发人员构建。一旦创建了这样的类,就可以通过
parquet.encryption.kms.client.class
参数传递给应用程序,并由普通的 Spark 用户利用,如上面的加密数据框写入/读取示例所示。
注意:默认情况下,Parquet 实现了“双重封装加密”模式,这最小化了 Spark 执行者与 KMS 服务器的交互。在这种模式下,DEK 被用“密钥加密密钥”(KEK,随机生成的由 Parquet)加密。KEK 被 KMS 中的 MEK 加密;结果和 KEK 本身被缓存到 Spark 执行者内存中。对常规封装加密感兴趣的用户,可以通过将
parquet.encryption.double.wrapping
参数设置为
false
来切换到它。有关 Parquet 加密参数的更多详细信息,请访问 parquet-hadoop 配置
页面
。
数据源选项
Parquet的数据源选项可以通过以下方式设置:
-
的
.option/.options方法-
DataFrameReader -
DataFrameWriter -
DataStreamReader -
DataStreamWriter
-
-
OPTIONS子句在 使用 DATA_SOURCE 创建表
| 属性名称 | 默认值 | 含义 | 范围 |
|---|---|---|---|
datetimeRebaseMode
|
(
spark.sql.parquet.datetimeRebaseModeInRead
配置的值)
|
datetimeRebaseMode
选项允许指定从儒略历到公历的
DATE
、
TIMESTAMP_MILLIS
、
TIMESTAMP_MICROS
逻辑类型值的重基模式。
当前支持的模式有:
|
读 |
int96RebaseMode
|
(
spark.sql.parquet.int96RebaseModeInRead
配置的值)
|
int96RebaseMode
选项允许指定从儒略历到公历的 INT96 时间戳的重基模式。
当前支持的模式有:
|
读 |
mergeSchema
|
(
spark.sql.parquet.mergeSchema
配置的值)
|
设置是否应合并从所有 Parquet 部件文件收集的模式。这将覆盖
spark.sql.parquet.mergeSchema
。
|
读 |
compression
|
snappy
|
保存到文件时使用的压缩编码器。这可以是已知的、不区分大小写的缩写名称之一(none, uncompressed, snappy, gzip, lzo, brotli, lz4, 和 zstd)。这将覆盖
spark.sql.parquet.compression.codec
。
|
写 |
其他通用选项可以在 Generic Files Source Options 中找到
配置
可以通过在
SparkSession
上使用
setConf
方法或者通过运行
SET key=value
命令来配置 Parquet,使用 SQL。
| 属性名称 | 默认值 | 含义 | 自版本 |
|---|---|---|---|
spark.sql.parquet.binaryAsString
|
false | 一些其他生成 Parquet 的系统,特别是 Impala、Hive 和旧版本的 Spark SQL,在写出 Parquet 架构时不区分二进制数据和字符串。这个标志指示 Spark SQL 将二进制数据解释为字符串,以提供与这些系统的兼容性。 | 1.1.1 |
spark.sql.parquet.int96AsTimestamp
|
true | 一些生成 Parquet 的系统,特别是 Impala 和 Hive,将时间戳存储为 INT96。这个标志指示 Spark SQL 将 INT96 数据解释为时间戳,以提供与这些系统的兼容性。 | 1.3.0 |
spark.sql.parquet.int96TimestampConversion
|
false | 这控制在将 INT96 数据转换为时间戳时是否应应用时间戳调整,适用于由 Impala 写入的数据。这是必要的,因为 Impala 使用与 Hive 和 Spark 不同的时区偏移存储 INT96 数据。 | 2.3.0 |
spark.sql.parquet.outputTimestampType
|
INT96 | 设置 Spark 写入 Parquet 文件时使用的 Parquet 时间戳类型。INT96 是 Parquet 中一种非标准但常用的时间戳类型。TIMESTAMP_MICROS 是 Parquet 中的标准时间戳类型,它存储自 Unix 纪元以来的微秒数。TIMESTAMP_MILLIS 也是标准类型,但具有毫秒精度,这意味着 Spark 必须截断其时间戳值的微秒部分。 | 2.3.0 |
spark.sql.parquet.compression.codec
|
snappy |
设置在写入 Parquet 文件时使用的压缩编解码器。如果在特定于表的选项/属性中指定了
compression
或
parquet.compression
,则优先级为
compression
、
parquet.compression
、
spark.sql.parquet.compression.codec
。可接受的值包括:
none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd。
注意,
brotli
需要安装
BrotliCodec
。
|
1.1.1 |
spark.sql.parquet.filterPushdown
|
true | 启用 Parquet 过滤器下推优化,当设置为 true 时。 | 1.2.0 |
spark.sql.parquet.aggregatePushdown
|
false | 如果为 true,聚合将被下推到 Parquet 进行优化。支持 MIN、MAX 和 COUNT 作为聚合表达式。对于 MIN/MAX,支持布尔、整数、浮点和日期类型。对于 COUNT,支持所有数据类型。如果任何 Parquet 文件的脚注中缺少统计信息,将抛出异常。 | 3.3.0 |
spark.sql.hive.convertMetastoreParquet
|
true | 当设置为 false 时,Spark SQL 将使用 Hive SerDe 对 Parquet 表,而不是内置支持。 | 1.1.1 |
spark.sql.parquet.mergeSchema
|
false |
当为 true 时,Parquet 数据源合并来自所有数据文件的模式,否则将从摘要文件或随机数据文件中选择模式,如果没有摘要文件可用。 |
1.5.0 |
spark.sql.parquet.respectSummaryFiles
|
false | 当为 true 时,我们假设 Parquet 的所有部分文件与摘要文件一致,在合并模式时将忽略它们。否则,如果为 false,即默认值,我们将合并所有部分文件。这应该被视为仅限专家选项,不应在不知道其确切含义之前启用。 | 1.5.0 |
spark.sql.parquet.writeLegacyFormat
|
false | 如果为 true,数据将以 Spark 1.4 及更早版本的方式写入。例如,十进制值将以 Apache Parquet 的固定长度字节数组格式写入,而其他系统(例如 Apache Hive 和 Apache Impala)将使用该格式。如果为 false,将使用 Parquet 中的较新格式。例如,十进制将以基于整型格式写入。如果 Parquet 输出是为了与不支持此较新格式的系统一起使用,则设置为 true。 | 1.6.0 |
spark.sql.parquet.enableVectorizedReader
|
true | 启用矢量化的 Parquet 解码。 | 2.0.0 |
spark.sql.parquet.enableNestedColumnVectorizedReader
|
true |
启用嵌套列(例如,结构、列表、映射)的矢量化 Parquet 解码。
需要启用
spark.sql.parquet.enableVectorizedReader
。
|
3.3.0 |
spark.sql.parquet.recordLevelFilter.enabled
|
false |
如果为 true,启用 Parquet 的原生记录级过滤,使用下推的过滤器。
此配置仅在
spark.sql.parquet.filterPushdown
启用且未使用矢量化读取器时才会生效。您可以通过将
spark.sql.parquet.enableVectorizedReader
设置为 false 来确保不使用矢量化读取器。
|
2.3.0 |
spark.sql.parquet.columnarReaderBatchSize
|
4096 | 包含在一个 Parquet 矢量化读取器批次中的行数。该数字应仔细选择,以最小化开销并避免在读取数据时出现 OOM。 | 2.4.0 |
spark.sql.parquet.fieldId.write.enabled
|
true | 字段 ID 是 Parquet 架构规范的本地字段。当启用时,Parquet 写入器将填充 Spark 架构中字段 ID 元数据(如果存在)到 Parquet 架构中。 | 3.3.0 |
spark.sql.parquet.fieldId.read.enabled
|
false | 字段 ID 是 Parquet 架构规范的本地字段。当启用时,Parquet 读取器将使用请求的 Spark 架构中的字段 ID(如果存在)查找 Parquet 字段,而不是使用列名。 | 3.3.0 |
spark.sql.parquet.fieldId.read.ignoreMissing
|
false | 当 Parquet 文件没有任何字段 ID,但 Spark 读取架构使用字段 ID 进行读取时,当启用此标志时,我们将静默返回 null,否则将返回错误。 | 3.3.0 |
spark.sql.parquet.inferTimestampNTZ.enabled
|
true |
当启用时,带有注释
isAdjustedToUTC = false
的 Parquet 时间戳列在模式推断期间被推断为 TIMESTAMP_NTZ 类型。否则,所有 Parquet 时间戳列被推断为 TIMESTAMP_LTZ 类型。注意,Spark 在写入文件时将输出架构写入 Parquet 的页脚元数据,并在读取文件时利用它。因此,此配置仅影响未由 Spark 写入的 Parquet 文件的模式推断。
|
3.4.0 |
| spark.sql.parquet.datetimeRebaseModeInRead |
EXCEPTION
|
将
DATE
、
TIMESTAMP_MILLIS
、
TIMESTAMP_MICROS
逻辑类型的值从儒略历转换到普罗莱普蒂克公历的重基模式:
|
3.0.0 |
| spark.sql.parquet.datetimeRebaseModeInWrite |
EXCEPTION
|
将
DATE
、
TIMESTAMP_MILLIS
、
TIMESTAMP_MICROS
逻辑类型的值从普罗莱普蒂克公历转换到儒略历的重基模式:
|
3.0.0 |
| spark.sql.parquet.int96RebaseModeInRead |
EXCEPTION
|
将
INT96
时间戳类型的值从儒略历转换到普罗莱普蒂克公历的重基模式:
|
3.1.0 |
| spark.sql.parquet.int96RebaseModeInWrite |
EXCEPTION
|
将
INT96
时间戳类型的值从普罗莱普蒂克公历转换到儒略历的重基模式:
|
3.1.0 |