Hive 表

Spark SQL 还支持读取和写入存储在 Apache Hive 中的数据。 然而,由于 Hive 有大量的依赖项,这些依赖项未包含在默认的 Spark 发行版中。如果 Hive 依赖项可以在类路径上找到,Spark 将自动加载它们。 请注意,这些 Hive 依赖项也必须在所有工作节点上存在,因为它们需要访问 Hive 序列化和反序列化库(SerDes),以便访问存储在 Hive 中的数据。

Hive的配置是通过将你的 hive-site.xml core-site.xml (用于安全配置),以及 hdfs-site.xml (用于HDFS配置) 文件放置在 conf/ 中完成的。

在使用 Hive 时,必须使用 Hive 支持实例化 SparkSession ,包括连接到持久化的 Hive 元数据库、支持 Hive 序列化/反序列化和 Hive 用户定义函数。没有现有 Hive 部署的用户仍然可以启用 Hive 支持。当未通过 hive-site.xml 配置时,上下文会自动在当前目录中创建 metastore_db ,并创建一个由 spark.sql.warehouse.dir 配置的目录,该目录默认是 Spark 应用程序启动的当前目录中的 spark-warehouse 目录。请注意, hive.metastore.warehouse.dir 属性在 hive-site.xml 中自 Spark 2.0.0 起已被弃用。相反,使用 spark.sql.warehouse.dir 来指定仓库中数据库的默认位置。您可能需要授予启动 Spark 应用程序的用户写入权限。

from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location 指向默认的托管数据库和表的位置
warehouse_location = abspath('spark-warehouse')
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()
# spark 是一个已存在的 SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# 查询以 HiveQL 表达
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key|  value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...

# 也支持聚合查询。
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# |    500 |
# +--------+

# SQL 查询的结果本身是 DataFrame,并支持所有常规函数。
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
# DataFrame 中的项为 Row 类型,可以通过序号访问每一列。
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...

# 你还可以使用 DataFrame 在 SparkSession 中创建临时视图。
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# 查询可以将 DataFrame 数据与存储在 Hive 中的数据联接。
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# |  2| val_2|  2| val_2|
# |  4| val_4|  4| val_4|
# |  5| val_5|  5| val_5|
# ...
Find full example code at "examples/src/main/python/sql/hive.py" in the Spark repo.
import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Record(key: Int, value: String)
// warehouseLocation 指向管理数据库和表的默认位置
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive 示例")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// 查询以 HiveQL 表达
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// 聚合查询也受支持。
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+
// SQL 查询的结果本身是 DataFrame,并支持所有正常的功能。
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// DataFrame 中的项目类型为 Row,允许您按序号访问每一列。
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// 您还可以使用 DataFrame 创建 SparkSession 内的临时视图。
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// 查询可以将 DataFrame 数据与存储在 Hive 中的数据进行连接。
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...
// 创建一个 Hive 管理的 Parquet 表,使用 HQL 语法而不是 Spark SQL 原生语法
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// 将 DataFrame 保存到 Hive 管理的表中
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// 插入后,Hive 管理的表现在有数据了
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// 准备一个 Parquet 数据目录
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// 创建一个 Hive 外部 Parquet 表
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// Hive 外部表应该已经有数据
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// |  0|
// |  1|
// |  2|
// ... 顺序可能会有所不同,因为 Spark 并行处理分区。
// 开启 Hive 动态分区标志
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// 使用 DataFrame API 创建 Hive 分区表
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// 分区列 `key` 将被移动到模式的末尾。
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// |  value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...
spark.stop()
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala" in the Spark repo.
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation 指向管理数据库和表的默认位置
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// 查询以 HiveQL 表达
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// 聚合查询也被支持。
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+
// SQL 查询的结果本身是 DataFrames,并支持所有正常功能。
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// DataFrames 中的项是 Row 类型,这让你可以按序访问每一列。
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// 你也可以使用 DataFrames 在 SparkSession 内创建临时视图。
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// 查询然后可以将 DataFrames 数据与存储在 Hive 中的数据连接。
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java" in the Spark repo.

在使用Hive时,必须实例化 SparkSession ,并支持Hive。这为在MetaStore中查找表和使用HiveQL编写查询提供了支持。

# enableHiveSupport 默认为 TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# 查询可以用 HiveQL 表达。
results <- collect(sql("FROM src SELECT key, value"))
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

为Hive表指定存储格式

当你创建一个 Hive 表时,你需要定义这个表如何从文件系统读取/写入数据,即“输入格式”和“输出格式”。你还需要定义这个表如何将数据反序列化为行,或将行序列化为数据,即“serde”。可以使用以下选项来指定存储格式(“serde”、“输入格式”、“输出格式”),例如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet') 。默认情况下,我们将以纯文本格式读取表文件。请注意,在创建表时,Hive 存储处理程序尚不受支持,你可以在 Hive 端使用存储处理程序创建一个表,并使用 Spark SQL 读取它。

属性名称 含义
fileFormat fileFormat 是一种存储格式规范的包,包括 "serde"、"输入格式" 和 "输出格式"。目前我们支持 6 种 fileFormats: 'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile' 和 'avro'。
inputFormat, outputFormat 这两个选项指定对应的 InputFormat OutputFormat 类的名称,作为字符串文字,例如 org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 。这两个选项必须成对出现,如果您已经指定了 fileFormat 选项,则不能指定它们。
serde 此选项指定一个 serde 类的名称。当指定 fileFormat 选项时,如果给定的 fileFormat 已经包含 serde 信息,则不要指定此选项。目前 "sequencefile"、"textfile" 和 "rcfile" 不包含 serde 信息,您可以将此选项与这三种 fileFormats 一起使用。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项仅能与 "textfile" fileFormat 一起使用。它们定义如何将分隔文件读取为行。

所有其他通过 OPTIONS 定义的属性将被视为 Hive serde 属性。

与不同版本的Hive Metastore交互

Spark SQL对Hive支持的最重要部分之一是与Hive元存储的交互,这使得Spark SQL能够访问Hive表的元数据。从Spark 1.4.0开始,单一的Spark SQL二进制构建可以用于查询不同版本的Hive元存储,使用下面描述的配置。请注意,无论使用哪个版本的Hive与元存储进行交互,Spark SQL在内部将编译为内置Hive并使用这些类进行内部执行(序列化/反序列化,UDF,UDAF等)。

可以使用以下选项来配置用于获取元数据的Hive版本:

属性名称 默认 含义 自版本起
spark.sql.hive.metastore.version 2.3.9 Hive 元存储的版本。可用的选项是 0.12.0 2.3.9 3.0.0 3.1.3 1.4.0
spark.sql.hive.metastore.jars builtin 应用于实例化 HiveMetastoreClient 的 jar 的位置。该属性可以是以下四个选项之一:
  1. builtin
  2. 使用与 Spark 组合在一起的 Hive 2.3.9,当 -Phive 被启用时。选择此选项时, spark.sql.hive.metastore.version 必须是 2.3.9 或未定义。
  3. maven
  4. 使用从 Maven 仓库下载的指定版本的 Hive jar。此配置通常不推荐用于生产部署。
  5. path
  6. 使用由 spark.sql.hive.metastore.jars.path 配置的 Hive jar, 采用逗号分隔的格式。支持本地或远程路径。提供的 jar 应与 spark.sql.hive.metastore.version 版本相同。
  7. 标准格式的类路径,以便 JVM 使用。该类路径必须包括 Hive 及其依赖项,包括正确版本的 Hadoop。提供的 jar 应与 spark.sql.hive.metastore.version 版本相同。这些 jar 只需在驱动程序上存在,但如果您在 yarn 集群模式中运行,则必须确保它们与您的应用程序一起打包。
1.4.0
spark.sql.hive.metastore.jars.path (空) 用于实例化 HiveMetastoreClient 的 jar 的逗号分隔路径。 当 spark.sql.hive.metastore.jars 设置为 path 时,此配置非常有用。
路径可以是以下任何格式:
  1. file://path/to/jar/foo.jar
  2. hdfs://nameservice/path/to/jar/foo.jar
  3. /path/to/jar/ (路径不带 URI 方案,遵循配置 fs.defaultFS 的 URI 方案)
  4. [http/https/ftp]://path/to/jar/foo.jar
注意 1、2 和 3 支持通配符。例如:
  1. file://path/to/jar/*,file://path2/to/jar/*/*.jar
  2. hdfs://nameservice/path/to/jar/*,hdfs://nameservice2/path/to/jar/*/*.jar
3.1.0
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

应该使用在 Spark SQL 和特定版本的 Hive 之间共享的类加载器加载的类前缀的逗号分隔列表。应该共享的类的一个例子是与元存储对话所需的 JDBC 驱动程序。其他需要共享的类是与已经共享的类交互的类。例如,用于 log4j 的自定义附加器。

1.4.0
spark.sql.hive.metastore.barrierPrefixes (空)

应该为 Spark SQL 正在通信的每个 Hive 版本显式重新加载的类前缀的逗号分隔列表。例如,在通常会被共享的前缀中声明的 Hive UDF。

1.4.0