Protobuf 数据源指南

自Spark 3.4.0版本发布以来, Spark SQL 提供了内置的支持来读取和写入protobuf数据。

部署

模块 spark-protobuf 是外部模块,默认情况下不包含在 spark-submit spark-shell 中。

与任何Spark应用程序一样, spark-submit 用于启动您的应用程序。 spark-protobuf_2.12 及其依赖项可以直接通过 --packages 添加到 spark-submit 中,例如,

./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.5.3 ...

spark-shell 上进行实验时,您还可以使用 --packages 直接添加 org.apache.spark:spark-protobuf_2.12 及其依赖项,

./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.5.3 ...

有关提交带有外部依赖项的应用程序的更多详细信息,请参见 Application Submission Guide

to_protobuf() 和 from_protobuf()

spark-protobuf 包提供了函数 to_protobuf 用于将列编码为protobuf格式的二进制数据,以及 from_protobuf() 用于将protobuf二进制数据解码为列。这两个函数将一列转换为另一列,输入/输出的SQL数据类型可以是复杂类型或原始类型。

在从像Kafka这样的流式源读取或写入时,使用protobuf消息作为列是很有用的。每个Kafka键值记录将会附加一些元数据,如进入Kafka的时间戳、Kafka中的偏移量等。

Spark SQL 架构是基于传递给 from_protobuf to_protobuf 的 protobuf 描述符文件或 protobuf 类生成的。指定的 protobuf 类或 protobuf 描述符文件必须与数据匹配,否则行为是未定义的:可能会失败或返回任意结果。

This div is only used to make markdown editor/viewer happy and does not display on web ```python
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf

<span class="c
This div is only used to make markdown editor/viewer happy and does not display on web ```scala
import org.apache.spark.sql.protobuf.functions._
// `from_protobuf` 和 `to_protobuf` 提供两种架构选择。通过 protobuf 描述符文件,
// 或通过阴影 Java 类。
// 提供输入 .proto protobuf 架构
// 语法 = "proto3"
// 消息 AppEvent {
//   字符串 name = 1;
//   int64 id = 2;
//   字符串 context = 3;
// }
val df = spark
.readStream
.format("kafka")
This div is only used to make markdown editor/viewer happy and does not display on web ```java
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;
// `from_protobuf` 和 `to_protobuf` 提供了两种模式选择。通过 protobuf 描述符文件,
// 或者通过 shaded Java 类。
// 给定输入 .proto protobuf 模式
// 语法 = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. 解码模式 `AppEvent` 的 Protobuf 数据为一个结构;
// 2. 按列 `name` 进行过滤;
// 3. 以 Protobuf 格式编码列 `event`。
// 可以使用 Protobuf protoc 命令为给定的 .proto 文件生成一个 protobuf 描述符文件。
Dataset<Row> output = df
.select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
.where("event.name == \"alice\"")
.select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));
// 或者,你可以使用 protobuf 类名将 SQL 列解码和编码为 protobuf 格式。
// 指定的 Protobuf 类必须与数据匹配,否则行为是未定义的:
// 可能会失败或返回任意结果。为了避免冲突,包含 'com.google.protobuf.*' 类的 jar 文件应被 shaded。一个 shading 的示例可以在
// https://github.com/rangadi/shaded-protobuf-classes 找到。
Dataset<Row> output = df
.select(
from_protobuf(col("value"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
.where("event.name == \"alice\"")
output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)
output = output.select(
to_protobuf(col("event"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
```

Protobuf -> Spark SQL 转换支持的类型

目前,Spark 支持读取 protobuf 标量类型 枚举类型 嵌套类型 映射类型 ,这些都是 Protobuf 消息中的内容。除了这些类型之外, spark-protobuf 还引入了对 Protobuf OneOf 字段的支持,这使得您可以处理可能具有多个字段集合的消息,但同时只能存在一个字段集合。这在数据不是总是以相同格式出现的情况中非常有用,并且您需要能够处理具有不同字段集合的消息而不会遇到错误。

Protobuf类型 Spark SQL类型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
bytes BinaryType
Message StructType
repeated ArrayType
map MapType
OneOf Struct

它还支持读取以下 Protobuf 类型 Timestamp Duration

Protobuf 逻辑类型 Protobuf 模式 Spark SQL 类型
持续时间 MessageType{seconds: Long, nanos: Int} DayTimeIntervalType
时间戳 MessageType{seconds: Long, nanos: Int} TimestampType

支持的类型用于 Spark SQL -> Protobuf 转换

Spark支持将所有Spark SQL类型写入Protobuf。对于大多数类型,从Spark类型到Protobuf类型的映射是直接的(例如,IntegerType被转换为int);

Spark SQL 类型 Protobuf 类型
BooleanType boolean
IntegerType int
LongType long
FloatType float
DoubleType double
StringType string
StringType enum
BinaryType bytes
StructType message
ArrayType repeated
MapType map

处理循环引用的protobuf字段

在处理Protobuf数据时,常见的问题之一是循环引用的存在。在Protobuf中,循环引用发生在一个字段引用回自身或引用另一个字段,该字段又引用回原始字段。这可能会在解析数据时导致问题,因为它可能会导致无限循环或其他意外行为。 为了解决这个问题,最新版本的spark-protobuf引入了一个新功能:通过字段类型检查循环引用的能力。这允许用户使用 recursive.fields.max.depth 选项来指定在解析模式时允许的最大递归层级。默认情况下, spark-protobuf 将不允许递归字段,将 recursive.fields.max.depth 设置为-1。然而,您可以根据需要将此选项设置为0到10。

recursive.fields.max.depth 设置为 0 会丢弃所有递归字段,将其设置为 1 允许递归一次,而将其设置为 2 允许递归两次。 recursive.fields.max.depth 的值大于 10 是不允许的,因为这可能导致性能问题甚至堆栈溢出。

下面protobuf消息的SQL架构将根据 recursive.fields.max.depth 的值而有所不同。

This div is only used to make markdown editor/viewer happy and does not display on web ```protobuf
语法 = "proto3"
消息 Person {
字符串 名字 = 1;
Person 最好的朋友 = 2
}
// 上述定义的protobuf模式,将根据 `recursive.fields.max.depth` 值转换为Spark SQL列
// 具有以下结构。
0: 结构<名字: 字符串, 最好的朋友: >
1: 结构<名字 字符串, 最好的朋友: <名字: 字符串, 最好的朋友: >>
2: 结构<名字 字符串, 最好的朋友: <名字: 字符串, 最好的朋友: 结构<名字: 字符串, 最好的朋友: >>> ...
```