Hive 表
Spark SQL 还支持读取和写入存储在 Apache Hive 中的数据。 然而,由于 Hive 有大量的依赖项,这些依赖项未包含在默认的 Spark 发行版中。如果 Hive 依赖项可以在类路径上找到,Spark 将自动加载它们。 请注意,这些 Hive 依赖项也必须在所有工作节点上存在,因为它们需要访问 Hive 序列化和反序列化库(SerDes),以便访问存储在 Hive 中的数据。
Hive的配置是通过将你的
hive-site.xml
、
core-site.xml
(用于安全配置),以及
hdfs-site.xml
(用于HDFS配置) 文件放置在
conf/
中完成的。
在使用 Hive 时,必须使用 Hive 支持实例化
SparkSession
,包括连接到持久化的 Hive 元数据库、支持 Hive 序列化/反序列化和 Hive 用户定义函数。没有现有 Hive 部署的用户仍然可以启用 Hive 支持。当未通过
hive-site.xml
配置时,上下文会自动在当前目录中创建
metastore_db
,并创建一个由
spark.sql.warehouse.dir
配置的目录,该目录默认是 Spark 应用程序启动的当前目录中的
spark-warehouse
目录。请注意,
hive.metastore.warehouse.dir
属性在
hive-site.xml
中自 Spark 2.0.0 起已被弃用。相反,使用
spark.sql.warehouse.dir
来指定仓库中数据库的默认位置。您可能需要授予启动 Spark 应用程序的用户写入权限。
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location 指向默认的托管数据库和表的位置
warehouse_location = abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# spark 是一个已存在的 SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# 查询以 HiveQL 表达
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key| value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...
# 也支持聚合查询。
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# | 500 |
# +--------+
# SQL 查询的结果本身是 DataFrame,并支持所有常规函数。
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
# DataFrame 中的项为 Row 类型,可以通过序号访问每一列。
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...
# 你还可以使用 DataFrame 在 SparkSession 中创建临时视图。
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# 查询可以将 DataFrame 数据与存储在 Hive 中的数据联接。
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# | 2| val_2| 2| val_2|
# | 4| val_4| 4| val_4|
# | 5| val_5| 5| val_5|
# ...
import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Record(key: Int, value: String)
// warehouseLocation 指向管理数据库和表的默认位置
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive 示例")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// 查询以 HiveQL 表达
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// 聚合查询也受支持。
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// SQL 查询的结果本身是 DataFrame,并支持所有正常的功能。
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// DataFrame 中的项目类型为 Row,允许您按序号访问每一列。
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// 您还可以使用 DataFrame 创建 SparkSession 内的临时视图。
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// 查询可以将 DataFrame 数据与存储在 Hive 中的数据进行连接。
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
// 创建一个 Hive 管理的 Parquet 表,使用 HQL 语法而不是 Spark SQL 原生语法
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// 将 DataFrame 保存到 Hive 管理的表中
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// 插入后,Hive 管理的表现在有数据了
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// 准备一个 Parquet 数据目录
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// 创建一个 Hive 外部 Parquet 表
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// Hive 外部表应该已经有数据
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// | 0|
// | 1|
// | 2|
// ... 顺序可能会有所不同,因为 Spark 并行处理分区。
// 开启 Hive 动态分区标志
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// 使用 DataFrame API 创建 Hive 分区表
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// 分区列 `key` 将被移动到模式的末尾。
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...
spark.stop()
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation 指向管理数据库和表的默认位置
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// 查询以 HiveQL 表达
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// 聚合查询也被支持。
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// SQL 查询的结果本身是 DataFrames,并支持所有正常功能。
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// DataFrames 中的项是 Row 类型,这让你可以按序访问每一列。
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// 你也可以使用 DataFrames 在 SparkSession 内创建临时视图。
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// 查询然后可以将 DataFrames 数据与存储在 Hive 中的数据连接。
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// ...
在使用Hive时,必须实例化
SparkSession
,并支持Hive。这为在MetaStore中查找表和使用HiveQL编写查询提供了支持。
# enableHiveSupport 默认为 TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# 查询可以用 HiveQL 表达。
results <- collect(sql("FROM src SELECT key, value"))
为Hive表指定存储格式
当你创建一个 Hive 表时,你需要定义这个表如何从文件系统读取/写入数据,即“输入格式”和“输出格式”。你还需要定义这个表如何将数据反序列化为行,或将行序列化为数据,即“serde”。可以使用以下选项来指定存储格式(“serde”、“输入格式”、“输出格式”),例如
CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')
。默认情况下,我们将以纯文本格式读取表文件。请注意,在创建表时,Hive 存储处理程序尚不受支持,你可以在 Hive 端使用存储处理程序创建一个表,并使用 Spark SQL 读取它。
| 属性名称 | 含义 |
|---|---|
fileFormat
|
fileFormat 是一种存储格式规范的包,包括 "serde"、"输入格式" 和 "输出格式"。目前我们支持 6 种 fileFormats: 'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile' 和 'avro'。 |
inputFormat, outputFormat
|
这两个选项指定对应的
InputFormat
和
OutputFormat
类的名称,作为字符串文字,例如
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
。这两个选项必须成对出现,如果您已经指定了
fileFormat
选项,则不能指定它们。
|
serde
|
此选项指定一个 serde 类的名称。当指定
fileFormat
选项时,如果给定的
fileFormat
已经包含 serde 信息,则不要指定此选项。目前 "sequencefile"、"textfile" 和 "rcfile" 不包含 serde 信息,您可以将此选项与这三种 fileFormats 一起使用。
|
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim
|
这些选项仅能与 "textfile" fileFormat 一起使用。它们定义如何将分隔文件读取为行。 |
所有其他通过
OPTIONS
定义的属性将被视为 Hive serde 属性。
与不同版本的Hive Metastore交互
Spark SQL对Hive支持的最重要部分之一是与Hive元存储的交互,这使得Spark SQL能够访问Hive表的元数据。从Spark 1.4.0开始,单一的Spark SQL二进制构建可以用于查询不同版本的Hive元存储,使用下面描述的配置。请注意,无论使用哪个版本的Hive与元存储进行交互,Spark SQL在内部将编译为内置Hive并使用这些类进行内部执行(序列化/反序列化,UDF,UDAF等)。
可以使用以下选项来配置用于获取元数据的Hive版本:
| 属性名称 | 默认 | 含义 | 自版本起 |
|---|---|---|---|
spark.sql.hive.metastore.version
|
2.3.9
|
Hive 元存储的版本。可用的选项是
0.12.0
到
2.3.9
和
3.0.0
到
3.1.3
。
|
1.4.0 |
spark.sql.hive.metastore.jars
|
builtin
|
应用于实例化 HiveMetastoreClient 的 jar 的位置。该属性可以是以下四个选项之一:
-Phive
被启用时。选择此选项时,
spark.sql.hive.metastore.version
必须是
2.3.9
或未定义。
spark.sql.hive.metastore.jars.path
配置的 Hive jar,
采用逗号分隔的格式。支持本地或远程路径。提供的 jar 应与
spark.sql.hive.metastore.version
版本相同。
|
1.4.0 |
spark.sql.hive.metastore.jars.path
|
(空)
|
用于实例化 HiveMetastoreClient 的 jar 的逗号分隔路径。
当
spark.sql.hive.metastore.jars
设置为
path
时,此配置非常有用。
路径可以是以下任何格式:
|
3.1.0 |
spark.sql.hive.metastore.sharedPrefixes
|
com.mysql.jdbc,
|
应该使用在 Spark SQL 和特定版本的 Hive 之间共享的类加载器加载的类前缀的逗号分隔列表。应该共享的类的一个例子是与元存储对话所需的 JDBC 驱动程序。其他需要共享的类是与已经共享的类交互的类。例如,用于 log4j 的自定义附加器。 |
1.4.0 |
spark.sql.hive.metastore.barrierPrefixes
|
(空)
|
应该为 Spark SQL 正在通信的每个 Hive 版本显式重新加载的类前缀的逗号分隔列表。例如,在通常会被共享的前缀中声明的 Hive UDF。 |
1.4.0 |