Apache Avro 数据源指南
- 部署
- 加载和保存函数
- to_avro() 和 from_avro()
- 数据源选项
- 配置
- 与 Databricks spark-avro 的兼容性
- Avro -> Spark SQL 转换的受支持类型
- Spark SQL -> Avro 转换的受支持类型
自Spark 2.4版本发布以来, Spark SQL 提供了对读取和写入Apache Avro数据的内置支持。
部署
模块
spark-avro
是外部模块,默认情况下不包含在
spark-submit
或
spark-shell
中。
与任何Spark应用程序一样,
spark-submit
用于启动您的应用程序。
spark-avro_2.12
及其依赖项可以通过使用
--packages
直接添加到
spark-submit
中,例如:
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.3 ...
要在
spark-shell
上进行实验,您还可以使用
--packages
来直接添加
org.apache.spark:spark-avro_2.12
及其依赖项,
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.5.3 ...
请参阅 申请提交指南 以获取有关提交具有外部依赖项的申请的更多详细信息。
加载与保存函数
由于
spark-avro
模块是外部的,因此在
DataFrameReader
或
DataFrameWriter
中没有
.avro
API。
要以Avro格式加载/保存数据,您需要将数据源选项
format
指定为
avro
(或
org.apache.spark.sql.avro
)。
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
数据集<行> usersDF = spark.读().格式("avro").加载("examples/src/main/resources/users.avro");
usersDF.选择("name", "favorite_color").写().格式("avro").保存("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")
to_avro() 和 from_avro()
Avro包提供了函数
to_avro
来将列编码为Avro格式的二进制数据,以及
from_avro()
来解码Avro二进制数据为列。这两个函数将一列转换为另一列,输入/输出的SQL数据类型可以是复杂类型或原始类型。
将Avro记录作为列使用在从或写入像Kafka这样的流媒体源时非常有用。每个Kafka键值记录将附加一些元数据,例如进入Kafka的时间戳、Kafka中的偏移量等。
-
如果包含您数据的“value”字段是Avro格式的,您可以使用
from_avro()来提取您的数据,增强它,清理它,然后再次将其推送到Kafka或写入文件。 -
to_avro()可以用于将结构体转换为Avro记录。当您希望在将数据写入Kafka时将多个列重新编码为一个单列时,此方法特别有用。
from pyspark.sql.avro.functions import from_avro, to_avro
# `from_avro` 需要以 JSON 字符串格式提供 Avro 模式。
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("subscribe", "topic1")\
.load()
# 1. 解码 Avro 数据为结构;
# 2. 按列 `favorite_color` 过滤;
# 3. 将列 `name` 编码为 Avro 格式。
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
query = output\
.writeStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")\
.start()
import org.apache.spark.sql.avro.functions._
// `from_avro` 需要以 JSON 字符串格式提供 Avro 模式。
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. 将 Avro 数据解码为结构体;
// 2. 按列 `favorite_color` 进行筛选;
// 3. 以 Avro 格式对列 `name` 进行编码。
val output = df
.select(from_avro($"value", jsonFormatSchema) as $"user")
.where("user.favorite_color == \"red\"")
.select(to_avro("user.name") as "value")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;
// `from_avro` 需要 JSON 字符串格式的 Avro 模式。
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. 解码 Avro 数据为结构;
// 2. 按列 `favorite_color` 过滤;
// 3. 以 Avro 格式编码列 `name`。
Dataset<Row> output = df
.select(from_avro(col("value"), jsonFormatSchema).as("user"))
.where("user.favorite_color == \"red\"")
.select(to_avro(col("user.name")).as("value"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
# `from_avro` 需要 JSON 字符串格式的 Avro 架构。
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")
df <- read.stream(
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1"
)
# 1. 解码 Avro 数据为结构;
# 2. 按列 `favorite_color` 过滤;
# 3. 以 Avro 格式编码列 `name`。
output <- select(
filter(
select(df, alias(from_avro("value", jsonFormatSchema), "user")),
column("user.favorite_color") == "red"
),
alias(to_avro("user.name"), "value")
)
write.stream(
output,
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
topic = "topic2"
)
数据源选项
Avro的数据源选项可以通过以下方式设置:
-
在
DataFrameReader或DataFrameWriter上的.option方法。 -
函数
from_avro中的options参数。
| 属性名称 | 默认值 | 含义 | 范围 | 自版本 |
|---|---|---|---|---|
avroSchema
|
无 |
用户提供的可选模式,格式为JSON。
|
读取、写入和函数
from_avro
|
2.4.0 |
recordName
|
topLevelRecord | 写入结果中的顶级记录名称,这是Avro规范所要求的。 | 写入 | 2.4.0 |
recordNamespace
|
"" | 写入结果中的记录命名空间。 | 写入 | 2.4.0 |
ignoreExtension
|
true |
该选项控制在读取时是否忽略没有
.avro
扩展名的文件。
如果启用此选项,将加载所有文件(带有和不带有
.avro
扩展名的文件)。
该选项已被弃用,并将在未来的版本中删除。请使用通用数据源选项 pathGlobFilter 来过滤文件名。 |
读取 | 2.4.0 |
compression
|
snappy |
compression
选项允许指定在写入中使用的压缩编解码器。
当前支持的编解码器有
uncompressed
、
snappy
、
deflate
、
bzip2
、
xz
和
zstandard
。
如果未设置该选项,则考虑配置
spark.sql.avro.compression.codec
的配置。
|
写入 | 2.4.0 |
mode
|
FAILFAST |
mode
选项允许为函数
from_avro
指定解析模式。
当前支持的模式有:
|
函数
from_avro
|
2.4.0 |
datetimeRebaseMode
|
(
spark.sql.avro.datetimeRebaseModeInRead
配置的值)
|
datetimeRebaseMode
选项允许为从朱利安历到前瞻性格里高利历的
date
、
timestamp-micros
、
timestamp-millis
逻辑类型的值指定重新基准模式。
当前支持的模式有:
|
读取和函数
from_avro
|
3.2.0 |
positionalFieldMatching
|
false | 这可以与 `avroSchema` 选项配合使用,以调整在提供的Avro模式和SQL模式中匹配字段的行为。默认情况下,将使用字段名称进行匹配,而忽略它们的位置。如果将此选项设置为“true”,则匹配将基于字段的位置。 | 读取和写入 | 3.2.0 |
enableStableIdentifiersForUnionType
|
false | 如果设置为true,Avro模式将被反序列化为Spark SQL模式,而Avro联合类型将转换为结构,其中字段名称与其各自的类型保持一致。生成的字段名称将转换为小写,例如 member_int 或 member_string。如果两个用户定义的类型名称或一个用户定义的类型名称和一个内置类型名称在不考虑大小写的情况下相同,将引发异常。但是,在其他情况下,字段名称可以被唯一识别。 | 读取 | 3.5.0 |
配置
Avro的配置可以通过在SparkSession上使用
setConf
方法或通过使用SQL运行
SET key=value
命令来完成。
| 属性名称 | 默认值 | 含义 | 自版本起 |
|---|---|---|---|
| spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true |
如果设置为true,数据源提供程序
com.databricks.spark.avro
将映射到内置但外部的Avro数据源模块,以保持向后兼容性。
注意: 该SQL配置在Spark 3.2中已被弃用,并可能在将来移除。 |
2.4.0 |
| spark.sql.avro.compression.codec | snappy | 写入AVRO文件时使用的压缩编解码器。支持的编解码器:uncompressed, deflate, snappy, bzip2, xz和zstandard。默认编解码器为snappy。 | 2.4.0 |
| spark.sql.avro.deflate.level | -1 | 写入AVRO文件时用于deflate编解码器的压缩级别。有效值必须在1到9(包括)或-1的范围内。默认值为-1,对应于当前实现中的6级。 | 2.4.0 |
| spark.sql.avro.datetimeRebaseModeInRead |
EXCEPTION
|
将
date
、
timestamp-micros
、
timestamp-millis
逻辑类型的值从儒略历重基到修订的格里高利历的模式:
|
3.0.0 |
| spark.sql.avro.datetimeRebaseModeInWrite |
EXCEPTION
|
将
date
、
timestamp-micros
、
timestamp-millis
逻辑类型的值从修订的格里高利历重基到儒略历的模式:
|
3.0.0 |
| spark.sql.avro.filterPushdown.enabled | true | 当为true时,启用对Avro数据源的过滤器下推。 | 3.1.0 |
与 Databricks spark-avro 的兼容性
该 Avro 数据源模块最初来自于 Databricks 的开源库,并与 spark-avro 兼容。
默认情况下,当SQL配置
spark.sql.legacy.replaceDatabricksSparkAvro.enabled
启用时,数据源提供者
com.databricks.spark.avro
被映射到这个内置的Avro模块。对于在目录元存储中将
Provider
属性设置为
com.databricks.spark.avro
创建的Spark表,映射是加载这些表的关键,如果您使用这个内置的Avro模块。
请注意,在Databricks的
spark-avro
中,隐式类
AvroDataFrameWriter
和
AvroDataFrameReader
是为了快捷函数
.avro()
而创建的。在这个内置但外部的模块中,两个隐式类已被移除。请改用
.format("avro")
在
DataFrameWriter
或
DataFrameReader
中,这将是简洁且足够好的。
如果您更喜欢使用自己的
spark-avro
jar 文件,您可以简单地禁用配置
spark.sql.legacy.replaceDatabricksSparkAvro.enabled
,并在部署您的应用程序时使用选项
--jars
。有关更多详细信息,请阅读申请提交指南中的
高级依赖管理
部分。
Avro -> Spark SQL 转换支持的类型
目前,Spark 支持读取 Avro 的所有 基本类型 和 复杂类型 。
| Avro 类型 | Spark SQL 类型 |
|---|---|
| boolean | BooleanType |
| int | IntegerType |
| long | LongType |
| float | FloatType |
| double | DoubleType |
| string | StringType |
| enum | StringType |
| fixed | BinaryType |
| bytes | BinaryType |
| record | StructType |
| array | ArrayType |
| map | MapType |
| union | 见下文 |
除了上述列出的类型,它还支持读取
union
类型。以下三种类型被视为基本的
union
类型:
-
union(int, long)将映射到 LongType。 -
union(float, double)将映射到 DoubleType。 -
union(something, null),其中 something 是任何支持的 Avro 类型。这将映射到与 something 的 Spark SQL 类型相同,其中 nullable 设置为 true。 所有其他 union 类型被视为复杂类型。它们将被映射到 StructType,其中字段名称为 member0,member1 等,与 union 的成员一致。这与在 Avro 和 Parquet 之间转换时的行为是一致的。
它还支持读取以下 Avro 逻辑类型 :
| Avro 逻辑类型 | Avro 类型 | Spark SQL 类型 |
|---|---|---|
| 日期 | 整型 | DateType |
| 时间戳-毫秒 | 长整型 | TimestampType |
| 时间戳-微秒 | 长整型 | TimestampType |
| 十进制 | 固定 | DecimalType |
| 十进制 | 字节 | DecimalType |
目前,它忽略了Avro文件中存在的文档、别名和其他属性。
Spark SQL 支持的类型 -> Avro 转换
Spark支持将所有Spark SQL类型写入Avro。对于大多数类型,从Spark类型到Avro类型的映射是直接的(例如,IntegerType被转换为int);然而,还有一些特殊情况如下所列:
| Spark SQL类型 | Avro类型 | Avro逻辑类型 |
|---|---|---|
| ByteType | int | |
| ShortType | int | |
| BinaryType | bytes | |
| DateType | int | date |
| TimestampType | long | timestamp-micros |
| DecimalType | fixed | decimal |
您还可以通过选项
avroSchema
指定整个输出的 Avro schema,以便将 Spark SQL 类型转换为其他 Avro 类型。下列转换默认情况下未应用,需用户指定 Avro schema:
| Spark SQL 类型 | Avro 类型 | Avro 逻辑类型 |
|---|---|---|
| BinaryType | fixed | |
| StringType | enum | |
| TimestampType | long | timestamp-millis |
| DecimalType | bytes | decimal |