pyspark.sql.protobuf.functions.from_protobuf ¶
-
pyspark.sql.protobuf.functions.
from_protobuf
( data : ColumnOrName , messageName : str , descFilePath : Optional [ str ] = None , options : Optional [ Dict [ str , str ] ] = None , binaryDescriptorSet : Optional [ bytes ] = None ) → pyspark.sql.column.Column [source] ¶ -
将Protobuf格式的二进制列转换为其对应的catalyst值。 Protobuf定义通过以下方式之一提供:
-
- Protobuf描述文件:例如,使用以下命令创建的描述文件
-
protoc –include_imports –descriptor_set_out=abc.desc abc.proto
-
Protobuf描述文件的二进制内容:与前一个选项中的文件路径不同,我们可以提供文件的二进制内容。这允许在创建和获取描述集时具有灵活性。
-
包含Protobuf Java类的Jar包:包含Java类的Jar包应该是经过阴影处理的。具体来说, com.google.protobuf.* 应该被阴影处理为 org.sparkproject.spark_protobuf.protobuf.* 。 https://github.com/rangadi/shaded-protobuf-classes 对于从Protobuf文件创建阴影Jar包非常有用。Jar文件可以通过spark-submit选项–jars添加。
新增于版本 3.4.0。
版本3.5.0更改: 支持 binaryDescriptorSet 参数直接传递二进制描述符。 支持Spark Connect。
- Parameters
-
-
data
Column
or str -
二进制列。
- messageName: str, optional
-
要在描述文件中查找的protobuf消息名称,或当descFilePath参数未设置时的Protobuf类名称。例如 com.example.protos.ExampleEvent 。
- descFilePath str, optional
-
Protobuf 描述文件。
- options dict, optional
-
控制protobuf记录解析方式的选项。
- binaryDescriptorSet: bytes, optional
-
Protobuf FileDescriptorSet 序列化为二进制。
-
data
注释
Protobuf 功能作为可插拔的外部模块提供。
示例
>>> import tempfile >>> data = [("1", (2, "Alice", 109200))] >>> ddl_schema = "key STRING, value STRUCT<age: INTEGER, name: STRING, score: LONG>" >>> df = spark.createDataFrame(data, ddl_schema) >>> desc_hex = str('0ACE010A41636F6E6E6563746F722F70726F746F6275662F7372632F746573742F726' ... '5736F75726365732F70726F746F6275662F7079737061726B5F746573742E70726F746F121D6F72672E61' ... '70616368652E737061726B2E73716C2E70726F746F627566224B0A0D53696D706C654D657373616765121' ... '00A03616765180120012805520361676512120A046E616D6518022001280952046E616D6512140A057363' ... '6F7265180320012803520573636F72654215421353696D706C654D65737361676550726F746F736206707' ... '26F746F33') >>> # Writing a protobuf description into a file, generated by using >>> # connector/protobuf/src/test/resources/protobuf/pyspark_test.proto file >>> with tempfile.TemporaryDirectory() as tmp_dir: ... desc_file_path = "%s/pyspark_test.desc" % tmp_dir ... with open(desc_file_path, "wb") as f: ... _ = f.write(bytearray.fromhex(desc_hex)) ... f.flush() ... message_name = 'SimpleMessage' ... proto_df = df.select( ... to_protobuf(df.value, message_name, desc_file_path).alias("value")) ... proto_df.show(truncate=False) ... proto_df_1 = proto_df.select( # With file name for descriptor ... from_protobuf(proto_df.value, message_name, desc_file_path).alias("value")) ... proto_df_1.show(truncate=False) ... proto_df_2 = proto_df.select( # With binary for descriptor ... from_protobuf(proto_df.value, message_name, ... binaryDescriptorSet = bytearray.fromhex(desc_hex)) ... .alias("value")) ... proto_df_2.show(truncate=False) +----------------------------------------+ |value | +----------------------------------------+ |[08 02 12 05 41 6C 69 63 65 18 90 D5 06]| +----------------------------------------+ +------------------+ |value | +------------------+ |{2, Alice, 109200}| +------------------+ +------------------+ |value | +------------------+ |{2, Alice, 109200}| +------------------+ >>> data = [([(1668035962, 2020)])] >>> ddl_schema = "value struct<seconds: LONG, nanos: INT>" >>> df = spark.createDataFrame(data, ddl_schema) >>> message_class_name = "org.sparkproject.spark_protobuf.protobuf.Timestamp" >>> to_proto_df = df.select(to_protobuf(df.value, message_class_name).alias("value")) >>> from_proto_df = to_proto_df.select( ... from_protobuf(to_proto_df.value, message_class_name).alias("value")) >>> from_proto_df.show(truncate=False) +------------------+ |value | +------------------+ |{1668035962, 2020}| +------------------+
-