Apache Avro 数据源指南

自Spark 2.4版本发布以来, Spark SQL 提供了对读取和写入Apache Avro数据的内置支持。

部署

模块 spark-avro 是外部模块,默认情况下不包含在 spark-submit spark-shell 中。

与任何Spark应用程序一样, spark-submit 用于启动您的应用程序。 spark-avro_2.12 及其依赖项可以通过使用 --packages 直接添加到 spark-submit 中,例如:

./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.3 ...

要在 spark-shell 上进行实验,您还可以使用 --packages 来直接添加 org.apache.spark:spark-avro_2.12 及其依赖项,

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.5.3 ...

请参阅 申请提交指南 以获取有关提交具有外部依赖项的申请的更多详细信息。

加载与保存函数

由于 spark-avro 模块是外部的,因此在 DataFrameReader DataFrameWriter 中没有 .avro API。

要以Avro格式加载/保存数据,您需要将数据源选项 format 指定为 avro (或 org.apache.spark.sql.avro )。

df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
数据集<> usersDF = spark.().格式("avro").加载("examples/src/main/resources/users.avro");
usersDF.选择("name", "favorite_color").().格式("avro").保存("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")

to_avro() 和 from_avro()

Avro包提供了函数 to_avro 来将列编码为Avro格式的二进制数据,以及 from_avro() 来解码Avro二进制数据为列。这两个函数将一列转换为另一列,输入/输出的SQL数据类型可以是复杂类型或原始类型。

将Avro记录作为列使用在从或写入像Kafka这样的流媒体源时非常有用。每个Kafka键值记录将附加一些元数据,例如进入Kafka的时间戳、Kafka中的偏移量等。

from pyspark.sql.avro.functions import from_avro, to_avro
# `from_avro` 需要以 JSON 字符串格式提供 Avro 模式。
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
df = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()
# 1. 解码 Avro 数据为结构;
# 2. 按列 `favorite_color` 过滤;
# 3. 将列 `name` 编码为 Avro 格式。
output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))
query = output\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")\
  .start()
import org.apache.spark.sql.avro.functions._
// `from_avro` 需要以 JSON 字符串格式提供 Avro 模式。
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. 将 Avro 数据解码为结构体;
// 2. 按列 `favorite_color` 进行筛选;
// 3. 以 Avro 格式对列 `name` 进行编码。
val output = df
.select(from_avro($"value", jsonFormatSchema) as $"user")
.where("user.favorite_color == \"red\"")
.select(to_avro("user.name") as "value")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;
// `from_avro` 需要 JSON 字符串格式的 Avro 模式。
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. 解码 Avro 数据为结构;
// 2. 按列 `favorite_color` 过滤;
// 3. 以 Avro 格式编码列 `name`。
Dataset<Row> output = df
.select(from_avro(col("value"), jsonFormatSchema).as("user"))
.where("user.favorite_color == \"red\"")
.select(to_avro(col("user.name")).as("value"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
# `from_avro` 需要 JSON 字符串格式的 Avro 架构。
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")
df <- read.stream(
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1"
)
# 1. 解码 Avro 数据为结构;
# 2. 按列 `favorite_color` 过滤;
# 3. 以 Avro 格式编码列 `name`。
output <- select(
filter(
select(df, alias(from_avro("value", jsonFormatSchema), "user")),
column("user.favorite_color") == "red"
),
alias(to_avro("user.name"), "value")
)
write.stream(
output,
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
topic = "topic2"
)

数据源选项

Avro的数据源选项可以通过以下方式设置:

属性名称 默认值 含义 范围 自版本
avroSchema 用户提供的可选模式,格式为JSON。
  • 在读取Avro文件或调用函数 from_avro 时,可以将此选项设置为一个发展模式,该模式与实际Avro模式兼容但有所不同。反序列化模式将与发展模式一致。 例如,如果我们设置一个包含一个额外列及其默认值的演变模式,Spark中的读取结果也将包含新列。请注意,当使用此选项与 from_avro 一起时,您仍然需要将实际的Avro模式作为参数传递给函数。
  • 在写入Avro时,如果预计的输出Avro模式与Spark转换的模式不匹配,则可以设置此选项。例如,一个列的预期模式是“枚举”类型,而不是默认转换模式中的“字符串”类型。
读取、写入和函数 from_avro 2.4.0
recordName topLevelRecord 写入结果中的顶级记录名称,这是Avro规范所要求的。 写入 2.4.0
recordNamespace "" 写入结果中的记录命名空间。 写入 2.4.0
ignoreExtension true 该选项控制在读取时是否忽略没有 .avro 扩展名的文件。
如果启用此选项,将加载所有文件(带有和不带有 .avro 扩展名的文件)。
该选项已被弃用,并将在未来的版本中删除。请使用通用数据源选项 pathGlobFilter 来过滤文件名。
读取 2.4.0
compression snappy compression 选项允许指定在写入中使用的压缩编解码器。
当前支持的编解码器有 uncompressed snappy deflate bzip2 xz zstandard
如果未设置该选项,则考虑配置 spark.sql.avro.compression.codec 的配置。
写入 2.4.0
mode FAILFAST mode 选项允许为函数 from_avro 指定解析模式。
当前支持的模式有:
  • FAILFAST :在处理损坏记录时抛出异常。
  • PERMISSIVE :损坏记录作为空结果处理。因此,数据模式强制为完全可为空,这可能与用户提供的模式不同。
函数 from_avro 2.4.0
datetimeRebaseMode ( spark.sql.avro.datetimeRebaseModeInRead 配置的值) datetimeRebaseMode 选项允许为从朱利安历到前瞻性格里高利历的 date timestamp-micros timestamp-millis 逻辑类型的值指定重新基准模式。
当前支持的模式有:
  • EXCEPTION :在读取两种日历之间模糊的古老日期/时间戳时失败。
  • CORRECTED :加载日期/时间戳而不重新基准。
  • LEGACY :在朱利安历和前瞻性格里高利历之间执行古老的日期/时间戳的重新基准。
读取和函数 from_avro 3.2.0
positionalFieldMatching false 这可以与 `avroSchema` 选项配合使用,以调整在提供的Avro模式和SQL模式中匹配字段的行为。默认情况下,将使用字段名称进行匹配,而忽略它们的位置。如果将此选项设置为“true”,则匹配将基于字段的位置。 读取和写入 3.2.0
enableStableIdentifiersForUnionType false 如果设置为true,Avro模式将被反序列化为Spark SQL模式,而Avro联合类型将转换为结构,其中字段名称与其各自的类型保持一致。生成的字段名称将转换为小写,例如 member_int 或 member_string。如果两个用户定义的类型名称或一个用户定义的类型名称和一个内置类型名称在不考虑大小写的情况下相同,将引发异常。但是,在其他情况下,字段名称可以被唯一识别。 读取 3.5.0

配置

Avro的配置可以通过在SparkSession上使用 setConf 方法或通过使用SQL运行 SET key=value 命令来完成。

属性名称 默认值 含义 自版本起
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true 如果设置为true,数据源提供程序 com.databricks.spark.avro 将映射到内置但外部的Avro数据源模块,以保持向后兼容性。
注意: 该SQL配置在Spark 3.2中已被弃用,并可能在将来移除。
2.4.0
spark.sql.avro.compression.codec snappy 写入AVRO文件时使用的压缩编解码器。支持的编解码器:uncompressed, deflate, snappy, bzip2, xz和zstandard。默认编解码器为snappy。 2.4.0
spark.sql.avro.deflate.level -1 写入AVRO文件时用于deflate编解码器的压缩级别。有效值必须在1到9(包括)或-1的范围内。默认值为-1,对应于当前实现中的6级。 2.4.0
spark.sql.avro.datetimeRebaseModeInRead EXCEPTION date timestamp-micros timestamp-millis 逻辑类型的值从儒略历重基到修订的格里高利历的模式:
  • EXCEPTION :如果看到在两种日历中模糊的古老日期/时间戳,Spark将读取失败。
  • CORRECTED :Spark将不进行重基,并按原样读取日期/时间戳。
  • LEGACY :在读取Avro文件时,Spark将从遗留混合(儒略历 + 格里高利历)日历重基日期/时间戳到修订的格里高利历。
此配置仅在Avro文件的写入者信息(如Spark、Hive)未知时有效。
3.0.0
spark.sql.avro.datetimeRebaseModeInWrite EXCEPTION date timestamp-micros timestamp-millis 逻辑类型的值从修订的格里高利历重基到儒略历的模式:
  • EXCEPTION :如果看到在两种日历中模糊的古老日期/时间戳,Spark将写入失败。
  • CORRECTED :Spark将不进行重基,并按原样写入日期/时间戳。
  • LEGACY :在写入Avro文件时,Spark将从修订的格里高利历重基日期/时间戳到遗留混合(儒略历 + 格里高利历)日历。
3.0.0
spark.sql.avro.filterPushdown.enabled true 当为true时,启用对Avro数据源的过滤器下推。 3.1.0

与 Databricks spark-avro 的兼容性

该 Avro 数据源模块最初来自于 Databricks 的开源库,并与 spark-avro 兼容。

默认情况下,当SQL配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled 启用时,数据源提供者 com.databricks.spark.avro 被映射到这个内置的Avro模块。对于在目录元存储中将 Provider 属性设置为 com.databricks.spark.avro 创建的Spark表,映射是加载这些表的关键,如果您使用这个内置的Avro模块。

请注意,在Databricks的 spark-avro 中,隐式类 AvroDataFrameWriter AvroDataFrameReader 是为了快捷函数 .avro() 而创建的。在这个内置但外部的模块中,两个隐式类已被移除。请改用 .format("avro") DataFrameWriter DataFrameReader 中,这将是简洁且足够好的。

如果您更喜欢使用自己的 spark-avro jar 文件,您可以简单地禁用配置 spark.sql.legacy.replaceDatabricksSparkAvro.enabled ,并在部署您的应用程序时使用选项 --jars 。有关更多详细信息,请阅读申请提交指南中的 高级依赖管理 部分。

Avro -> Spark SQL 转换支持的类型

目前,Spark 支持读取 Avro 的所有 基本类型 复杂类型

Avro 类型 Spark SQL 类型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
fixed BinaryType
bytes BinaryType
record StructType
array ArrayType
map MapType
union 见下文

除了上述列出的类型,它还支持读取 union 类型。以下三种类型被视为基本的 union 类型:

  1. union(int, long) 将映射到 LongType。
  2. union(float, double) 将映射到 DoubleType。
  3. union(something, null) ,其中 something 是任何支持的 Avro 类型。这将映射到与 something 的 Spark SQL 类型相同,其中 nullable 设置为 true。 所有其他 union 类型被视为复杂类型。它们将被映射到 StructType,其中字段名称为 member0,member1 等,与 union 的成员一致。这与在 Avro 和 Parquet 之间转换时的行为是一致的。

它还支持读取以下 Avro 逻辑类型 :

Avro 逻辑类型 Avro 类型 Spark SQL 类型
日期 整型 DateType
时间戳-毫秒 长整型 TimestampType
时间戳-微秒 长整型 TimestampType
十进制 固定 DecimalType
十进制 字节 DecimalType

目前,它忽略了Avro文件中存在的文档、别名和其他属性。

Spark SQL 支持的类型 -> Avro 转换

Spark支持将所有Spark SQL类型写入Avro。对于大多数类型,从Spark类型到Avro类型的映射是直接的(例如,IntegerType被转换为int);然而,还有一些特殊情况如下所列:

Spark SQL类型 Avro类型 Avro逻辑类型
ByteType int
ShortType int
BinaryType bytes
DateType int date
TimestampType long timestamp-micros
DecimalType fixed decimal

您还可以通过选项 avroSchema 指定整个输出的 Avro schema,以便将 Spark SQL 类型转换为其他 Avro 类型。下列转换默认情况下未应用,需用户指定 Avro schema:

Spark SQL 类型 Avro 类型 Avro 逻辑类型
BinaryType fixed
StringType enum
TimestampType long timestamp-millis
DecimalType bytes decimal