pyspark.sql.protobuf.functions.from_protobuf

pyspark.sql.protobuf.functions. from_protobuf ( data : ColumnOrName , messageName : str , descFilePath : Optional [ str ] = None , options : Optional [ Dict [ str , str ] ] = None , binaryDescriptorSet : Optional [ bytes ] = None ) → pyspark.sql.column.Column [source]

将Protobuf格式的二进制列转换为其对应的catalyst值。 Protobuf定义通过以下方式之一提供:

  • Protobuf描述文件:例如,使用以下命令创建的描述文件

    protoc –include_imports –descriptor_set_out=abc.desc abc.proto

  • Protobuf描述文件的二进制内容:与前一个选项中的文件路径不同,我们可以提供文件的二进制内容。这允许在创建和获取描述集时具有灵活性。

  • 包含Protobuf Java类的Jar包:包含Java类的Jar包应该是经过阴影处理的。具体来说, com.google.protobuf.* 应该被阴影处理为 org.sparkproject.spark_protobuf.protobuf.* https://github.com/rangadi/shaded-protobuf-classes 对于从Protobuf文件创建阴影Jar包非常有用。Jar文件可以通过spark-submit选项–jars添加。

新增于版本 3.4.0。

版本3.5.0更改: 支持 binaryDescriptorSet 参数直接传递二进制描述符。 支持Spark Connect。

Parameters
data Column or str

二进制列。

messageName: str, optional

要在描述文件中查找的protobuf消息名称,或当descFilePath参数未设置时的Protobuf类名称。例如 com.example.protos.ExampleEvent

descFilePath str, optional

Protobuf 描述文件。

options dict, optional

控制protobuf记录解析方式的选项。

binaryDescriptorSet: bytes, optional

Protobuf FileDescriptorSet 序列化为二进制。

注释

Protobuf 功能作为可插拔的外部模块提供。

示例

>>> import tempfile
>>> data = [("1", (2, "Alice", 109200))]
>>> ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726'
...    '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61'
...    '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121'
...    '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363'
...    '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707'
...    '26F746F33')
>>> # Writing a protobuf description into a file, generated by using
>>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file
>>> with tempfile.TemporaryDirectory() as tmp_dir:
...     desc_file_path = "%s/pyspark_test.desc" % tmp_dir
...     with open(desc_file_path, "wb") as f:
...         _ = f.write(bytearray.fromhex(desc_hex))
...         f.flush()
...         message_name = 'SimpleMessage'
...         proto_df = df.select(
...             to_protobuf(df.value, message_name, desc_file_path).alias("value"))
...         proto_df.show(truncate=False)
...         proto_df_1 = proto_df.select( # With file name for descriptor
...             from_protobuf(proto_df.value, message_name, desc_file_path).alias("value"))
...         proto_df_1.show(truncate=False)
...         proto_df_2 = proto_df.select( # With binary for descriptor
...             from_protobuf(proto_df.value, message_name,
...                           binaryDescriptorSet = bytearray.fromhex(desc_hex))
...             .alias("value"))
...         proto_df_2.show(truncate=False)
+----------------------------------------+
|value                                   |
+----------------------------------------+
|[08 02 12 05 41 6C 69 63 65 18 90 D5 06]|
+----------------------------------------+
+------------------+
|value             |
+------------------+
|{2, Alice, 109200}|
+------------------+
+------------------+
|value             |
+------------------+
|{2, Alice, 109200}|
+------------------+
>>> data = [([(1668035962, 2020)])]
>>> ddl_schema = "value struct<seconds: LONG, nanos: INT>"
>>> df = spark.createDataFrame(data, ddl_schema)
>>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp"
>>> to_proto_df = df.select(to_protobuf(df.value, message_class_name).alias("value"))
>>> from_proto_df = to_proto_df.select(
...     from_protobuf(to_proto_df.value, message_class_name).alias("value"))
>>> from_proto_df.show(truncate=False)
+------------------+
|value             |
+------------------+
|{1668035962, 2020}|
+------------------+