转换
描述
TRANSFORM
子句用于指定 Hive 风格的转换查询规范,通过运行用户指定的命令或脚本来转换输入。
Spark的脚本转换支持两种模式:
-
Hive支持已禁用:Spark脚本转换可以在
spark.sql.catalogImplementation=in-memory
下运行,或者在不使用SparkSession.builder.enableHiveSupport()
的情况下运行。在这种情况下,Spark仅使用带有ROW FORMAT DELIMITED
的脚本转换,并将所有传递给脚本的值视为字符串。 -
Hive支持已启用:当Spark以
spark.sql.catalogImplementation=hive
运行时,或者Spark SQL以SparkSession.builder.enableHiveSupport()
启动时,Spark可以使用带有Hive SerDe和ROW FORMAT DELIMITED
的脚本转换。
语法
SELECT TRANSFORM ( expression [ , ... ] )
[ ROW FORMAT row_format ]
[ RECORDWRITER record_writer_class ]
USING command_or_script [ AS ( [ col_name [ col_type ] ] [ , ... ] ) ]
[ ROW FORMAT row_format ]
[ RECORDREADER record_reader_class ]
参数
-
expression
指定一个或多个值、运算符和SQL函数的组合,以产生一个值。
-
row_format
指定输入和输出的行格式。有关更多语法详细信息,请参见 HIVE FORMAT 。
-
RECORDWRITER
指定自定义RecordWriter的完全限定类名。默认值是
org.apache.hadoop.hive.ql.exec.TextRecordWriter
。 -
RECORDREADER
指定自定义RecordReader的完全限定类名。默认值是
org.apache.hadoop.hive.ql.exec.TextRecordReader
。 -
command_or_script
指定处理数据的命令或脚本路径。
行格式定界行为
当Spark使用
ROW FORMAT DELIMITED
格式时:
-
Spark 使用字符
\u0001
作为默认字段分隔符, 可以通过FIELDS TERMINATED BY
来覆盖此分隔符。 -
Spark 使用字符
\n
作为默认行分隔符,可以通过LINES TERMINATED BY
来覆盖此分隔符。 -
Spark 使用字符串
\N
作为默认NULL
值,以区分NULL
值与字面字符串NULL
。该分隔符可以通过NULL DEFINED AS
来覆盖。 -
Spark 将所有列转换为
STRING
,并通过制表符将列组合在一起,然后输入到用户脚本中。对于复杂类型,如ARRAY
/MAP
/STRUCT
,Spark 使用to_json
将其转换为输入的JSON
字符串,并使用from_json
将结果输出的JSON
字符串转换为ARRAY
/MAP
/STRUCT
数据。 -
COLLECTION ITEMS TERMINATED BY
和MAP KEYS TERMINATED BY
是分隔符,用于拆分复杂数据,如ARRAY
/MAP
/STRUCT
,Spark 使用to_json
和from_json
来处理具有JSON
格式的复杂数据类型。因此COLLECTION ITEMS TERMINATED BY
和MAP KEYS TERMINATED BY
在默认行格式中将无效。 -
用户脚本的标准输出被视为用制表符分隔的
STRING
列。任何仅包含字符串\N
的单元格将被重新解释为字面NULL
值,然后结果的STRING
列将转换为col_type
中指定的数据类型。 -
如果实际输出列的数量少于指定的输出列的数量,额外的输出列将填充
NULL
。例如:output tabs: 1, 2 output columns: A: INT, B: INT, C: INT result: +---+---+------+ | a| b| c| +---+---+------+ | 1| 2| NULL| +---+---+------+
-
如果实际输出列的数量超过指定的输出列的数量,输出列仅选择对应的列,剩余部分将被丢弃。例如,如果输出有三个制表符,而只有两个输出列:
output tabs: 1, 2, 3 output columns: A: INT, B: INT result: +---+---+ | a| b| +---+---+ | 1| 2| +---+---+
-
如果在
USING my_script
之后没有AS
子句,输出模式为key: STRING, value: STRING
。key
列包含第一个制表符之前的所有字符,value
列包含第一个制表符之后的剩余字符。如果没有制表符,Spark 将返回NULL
值。例如:output tabs: 1, 2, 3 output columns: result: +-----+-------+ | key| value| +-----+-------+ | 1| 2| +-----+-------+ output tabs: 1, 2 output columns: result: +-----+-------+ | key| value| +-----+-------+ | 1| NULL| +-----+-------+
Hive SerDe 行为
当启用Hive支持并使用Hive SerDe模式时:
-
Spark 默认使用 Hive SerDe
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
,因此列被转换为STRING
并通过制表符组合,然后传递给用户脚本。 -
所有文字
NULL
值都被转换为字符串\N
,以区分文字NULL
值和文字字符串NULL
。 -
用户脚本的标准输出被视为制表符分隔的
STRING
列,任何单元格仅包含字符串\N
将重新解释为NULL
值,然后结果 STRING 列将被转换为col_type
中指定的数据类型。 -
如果实际输出列的数量少于指定的输出列数量,其他输出列将用
NULL
填充。 - 如果实际输出列的数量多于指定的输出列数量,输出列仅选择相应的列,其余部分将被丢弃。
-
如果在
USING my_script
后面没有AS
子句,则输出模式为key: STRING, value: STRING
。key
列包含第一个制表符之前的所有字符,而value
列包含第一个制表符之后的剩余字符。如果没有制表符,Spark 将返回NULL
值。 -
这些默认值可以通过
ROW FORMAT SERDE
或ROW FORMAT DELIMITED
来覆盖。
示例
CREATE TABLE person (zip_code INT, name STRING, age INT);
INSERT INTO person VALUES
(94588, 'Zen Hui', 50),
(94588, 'Dan Li', 18),
(94588, 'Anil K', 27),
(94588, 'John V', NULL),
(94511, 'David K', 42),
(94511, 'Aryan B.', 18),
(94511, 'Lalit B.', NULL);
-- 输出指定,未指定数据类型
SELECT TRANSFORM(zip_code, name, age)
USING 'cat' AS (a, b, c)
FROM person
WHERE zip_code > 94511;
+-------+---------+-----+
| a | b| c|
+-------+---------+-----+
| 94588| Anil K| 27|
| 94588| John V| NULL|
| 94588| Zen Hui| 50|
| 94588| Dan Li| 18|
+-------+---------+-----+
-- 输出指定并指定数据类型
SELECT TRANSFORM(zip_code, name, age)
USING 'cat' AS (a STRING, b STRING, c STRING)
FROM person
WHERE zip_code > 94511;
+-------+---------+-----+
| a | b| c|
+-------+---------+-----+
| 94588| Anil K| 27|
| 94588| John V| NULL|
| 94588| Zen Hui| 50|
| 94588| Dan Li| 18|
+-------+---------+-----+
-- 使用行格式分隔
SELECT TRANSFORM(name, age)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
NULL DEFINED AS 'NULL'
USING 'cat' AS (name_age string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY LINES TERMINATED BY '\nNULL DEFINED AS FROM person;
+---------------+
| name_age|
+---------------+
| Anil K,27|
| John V,null|
| ryan B.,18|
| David K,42|
| Zen Hui,50|
| Dan Li,18|
| Lalit B.,null|
+---------------+
-- 使用 Hive Serde
SELECT TRANSFORM(zip_code, name, age)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
'field.delim' = \t)
USING AS (a STRING, b STRING, c STRING)
ROW FORMAT SERDE WITH SERDEPROPERTIES (
= \t)
FROM person
WHERE zip_code > 94511;
+-------+---------+-----+
| a | b| c|
+-------+---------+-----+
| 94588| Anil K| 27|
| 94588| John V| NULL|
| 94588| Zen Hui| 50|
| 94588| Dan Li| 18|
+-------+---------+-----+
-- 无模式模式
SELECT TRANSFORM(zip_code, name, age)
USING FROM person
WHERE zip_code > 94500;
+-------+---------------------+
| key| value|
+-------+---------------------+
| 94588| Anil K 27|
| 94588| John V \N|
| 94511| Aryan B. 18|
| 94511| David K 42|
| 94588| Zen Hui 50|
| 94588| Dan Li 18|
| 94511| Lalit B. \N|
+-------+---------------------+