Protobuf 数据源指南
- 部署
- to_protobuf() 和 from_protobuf()
- Protobuf -> Spark SQL 转换的支持类型
- Spark SQL -> Protobuf 转换的支持类型
- 处理循环引用的 Protobuf 字段
自Spark 3.4.0版本发布以来, Spark SQL 提供了内置的支持来读取和写入protobuf数据。
部署
模块
spark-protobuf
是外部模块,默认情况下不包含在
spark-submit
或
spark-shell
中。
与任何Spark应用程序一样,
spark-submit
用于启动您的应用程序。
spark-protobuf_2.12
及其依赖项可以直接通过
--packages
添加到
spark-submit
中,例如,
./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.5.3 ...
在
spark-shell
上进行实验时,您还可以使用
--packages
直接添加
org.apache.spark:spark-protobuf_2.12
及其依赖项,
./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.5.3 ...
有关提交带有外部依赖项的应用程序的更多详细信息,请参见 Application Submission Guide 。
to_protobuf() 和 from_protobuf()
spark-protobuf 包提供了函数
to_protobuf
用于将列编码为protobuf格式的二进制数据,以及
from_protobuf()
用于将protobuf二进制数据解码为列。这两个函数将一列转换为另一列,输入/输出的SQL数据类型可以是复杂类型或原始类型。
在从像Kafka这样的流式源读取或写入时,使用protobuf消息作为列是很有用的。每个Kafka键值记录将会附加一些元数据,如进入Kafka的时间戳、Kafka中的偏移量等。
-
如果包含您数据的“value”字段在protobuf中,您可以使用
from_protobuf()
来提取您的数据,丰富它,清理它,然后将其再次推送到Kafka或写入到其他接收端。 -
to_protobuf()
可用于将结构体转换为protobuf消息。当您希望在将数据写入Kafka时将多个列重新编码为单个列时,此方法特别有用。
Spark SQL 架构是基于传递给
from_protobuf
和
to_protobuf
的 protobuf 描述符文件或 protobuf 类生成的。指定的 protobuf 类或 protobuf 描述符文件必须与数据匹配,否则行为是未定义的:可能会失败或返回任意结果。
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
<span class="c
import org.apache.spark.sql.protobuf.functions._
// `from_protobuf` 和 `to_protobuf` 提供两种架构选择。通过 protobuf 描述符文件,
// 或通过阴影 Java 类。
// 提供输入 .proto protobuf 架构
// 语法 = "proto3"
// 消息 AppEvent {
// 字符串 name = 1;
// int64 id = 2;
// 字符串 context = 3;
// }
val df = spark
.readStream
.format("kafka")
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;
// `from_protobuf` 和 `to_protobuf` 提供了两种模式选择。通过 protobuf 描述符文件,
// 或者通过 shaded Java 类。
// 给定输入 .proto protobuf 模式
// 语法 = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. 解码模式 `AppEvent` 的 Protobuf 数据为一个结构;
// 2. 按列 `name` 进行过滤;
// 3. 以 Protobuf 格式编码列 `event`。
// 可以使用 Protobuf protoc 命令为给定的 .proto 文件生成一个 protobuf 描述符文件。
Dataset<Row> output = df
.select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
.where("event.name == \"alice\"")
.select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));
// 或者,你可以使用 protobuf 类名将 SQL 列解码和编码为 protobuf 格式。
// 指定的 Protobuf 类必须与数据匹配,否则行为是未定义的:
// 可能会失败或返回任意结果。为了避免冲突,包含 'com.google.protobuf.*' 类的 jar 文件应被 shaded。一个 shading 的示例可以在
// https://github.com/rangadi/shaded-protobuf-classes 找到。
Dataset<Row> output = df
.select(
from_protobuf(col("value"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(
to_protobuf(col("event"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
Protobuf -> Spark SQL 转换支持的类型
目前,Spark 支持读取
protobuf 标量类型
、
枚举类型
、
嵌套类型
和
映射类型
,这些都是 Protobuf 消息中的内容。除了这些类型之外,
spark-protobuf
还引入了对 Protobuf
OneOf
字段的支持,这使得您可以处理可能具有多个字段集合的消息,但同时只能存在一个字段集合。这在数据不是总是以相同格式出现的情况中非常有用,并且您需要能够处理具有不同字段集合的消息而不会遇到错误。
Protobuf类型 | Spark SQL类型 |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
bytes | BinaryType |
Message | StructType |
repeated | ArrayType |
map | MapType |
OneOf | Struct |
它还支持读取以下 Protobuf 类型 Timestamp 和 Duration
Protobuf 逻辑类型 | Protobuf 模式 | Spark SQL 类型 |
---|---|---|
持续时间 | MessageType{seconds: Long, nanos: Int} | DayTimeIntervalType |
时间戳 | MessageType{seconds: Long, nanos: Int} | TimestampType |
支持的类型用于 Spark SQL -> Protobuf 转换
Spark支持将所有Spark SQL类型写入Protobuf。对于大多数类型,从Spark类型到Protobuf类型的映射是直接的(例如,IntegerType被转换为int);
Spark SQL 类型 | Protobuf 类型 |
---|---|
BooleanType | boolean |
IntegerType | int |
LongType | long |
FloatType | float |
DoubleType | double |
StringType | string |
StringType | enum |
BinaryType | bytes |
StructType | message |
ArrayType | repeated |
MapType | map |
处理循环引用的protobuf字段
在处理Protobuf数据时,常见的问题之一是循环引用的存在。在Protobuf中,循环引用发生在一个字段引用回自身或引用另一个字段,该字段又引用回原始字段。这可能会在解析数据时导致问题,因为它可能会导致无限循环或其他意外行为。
为了解决这个问题,最新版本的spark-protobuf引入了一个新功能:通过字段类型检查循环引用的能力。这允许用户使用
recursive.fields.max.depth
选项来指定在解析模式时允许的最大递归层级。默认情况下,
spark-protobuf
将不允许递归字段,将
recursive.fields.max.depth
设置为-1。然而,您可以根据需要将此选项设置为0到10。
将
recursive.fields.max.depth
设置为 0 会丢弃所有递归字段,将其设置为 1 允许递归一次,而将其设置为 2 允许递归两次。
recursive.fields.max.depth
的值大于 10 是不允许的,因为这可能导致性能问题甚至堆栈溢出。
下面protobuf消息的SQL架构将根据
recursive.fields.max.depth
的值而有所不同。
语法 = "proto3"
消息 Person {
字符串 名字 = 1;
Person 最好的朋友 = 2
}
// 上述定义的protobuf模式,将根据 `recursive.fields.max.depth` 值转换为Spark SQL列
// 具有以下结构。
0: 结构<名字: 字符串, 最好的朋友: 空>
1: 结构<名字 字符串, 最好的朋友: <名字: 字符串, 最好的朋友: 空>>
2: 结构<名字 字符串, 最好的朋友: <名字: 字符串, 最好的朋友: 结构<名字: 字符串, 最好的朋友: 空>>> ...