JDBC 连接其他数据库
Spark SQL 还包括一个数据源,可以通过 JDBC 从其他数据库读取数据。这个功能应该优先于使用 JdbcRDD 。因为结果作为 DataFrame 返回,可以很容易地在 Spark SQL 中处理或与其他数据源连接。JDBC 数据源在 Java 或 Python 中也更易于使用,因为它不要求用户提供 ClassTag。 (注意,这与 Spark SQL JDBC 服务器不同,后者允许其他应用程序使用 Spark SQL 运行查询。)
要开始使用,你需要在 Spark 类路径中包含特定数据库的 JDBC 驱动程序。比如,从 Spark Shell 连接到 Postgres,你可以运行以下命令:
./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
数据源选项
Spark支持以下不区分大小写的JDBC选项。JDBC的数据源选项可以通过以下方式设置:
-
.option/.options方法-
DataFrameReader -
DataFrameWriter
-
-
OPTIONS子句在 创建使用数据源的表
对于连接属性,用户可以在数据源选项中指定JDBC连接属性。
user
和
password
通常作为连接属性提供,用于登录到数据源。
| 属性名称 | 默认值 | 含义 | 范围 |
|---|---|---|---|
url
|
(无) |
连接的 JDBC URL 形式为
jdbc:subprotocol:subname
。可以在 URL 中指定特定于源的连接属性。例如:
jdbc:postgresql://localhost/test?user=fred&password=secret
|
读/写 |
dbtable
|
(无) |
应从中读取或写入的 JDBC 表。请注意,在读取路径中,任何在 SQL 查询的
FROM
子句中有效的内容都可以使用。例如,您可以使用带括号的子查询,而不是完整的表。不允许同时指定
dbtable
和
query
选项。
|
读/写 |
query
|
(无) |
将用于将数据读取到 Spark 的查询。指定的查询将被括起来,并用作
FROM
子句中的子查询。Spark 还会为子查询子句分配一个别名。例如,spark 将向 JDBC 源发出以下格式的查询。
SELECT
使用此选项时有以下几个限制。
|
读/写 |
prepareQuery
|
(无) |
将与
query
一起形成最终查询的前缀。由于指定的
query
将被括起来,作为
FROM
子句中的子查询,并且某些数据库不支持子查询中的所有子句,
prepareQuery
属性提供了一种运行此类复杂查询的方法。例如,spark 将向 JDBC 源发出以下格式的查询。
下面是几个示例。
|
读/写 |
driver
|
(无) | 用于连接此 URL 的 JDBC 驱动程序的类名。 | 读/写 |
partitionColumn, lowerBound, upperBound
|
(无) |
如果指定了任何一个选项,则必须同时指定所有这些选项。此外,
numPartitions
必须被指定。它们描述了在从多个工作者并行读取时如何对表进行划分。
partitionColumn
必须是相关表中的数字、日期或时间戳列。请注意,
lowerBound
和
upperBound
仅用于决定分区步幅,而不是用于过滤表中的行。因此,表中的所有行将被划分并返回。此选项仅适用于读取。
例如:
spark.read.format("jdbc")
|
读 |
numPartitions
|
(无) |
用于表读取和写入的最大分区数。这还决定了最大并发 JDBC 连接数。如果要写入的分区数超过此限制,则通过在写入之前调用
coalesce(numPartitions)
将其减少到此限制。
|
读/写 |
queryTimeout
|
0
|
驱动程序将在给定的秒数内等待 Statement 对象执行的秒数。零表示没有限制。在写路径中,此选项取决于 JDBC 驱动程序如何实现 API
setQueryTimeout
,例如,h2 JDBC 驱动程序检查每个查询的超时,而不是整个 JDBC 批次。
|
读/写 |
fetchsize
|
0
|
JDBC 获取大小,决定每次往返获取多少行。这可以帮助使用默认获取大小较低的 JDBC 驱动程序(例如 Oracle,默认 10 行)的性能。 | 读 |
batchsize
|
1000
|
JDBC 批量大小,决定每次往返插入多少行。这可以帮助提高 JDBC 驱动程序的性能。此选项仅适用于写入。 | 写 |
isolationLevel
|
READ_UNCOMMITTED
|
事务隔离级别,适用于当前连接。它可以是
NONE
、
READ_COMMITTED
、
READ_UNCOMMITTED
、
REPEATABLE_READ
或
SERIALIZABLE
中的一个,具体对应 JDBC 的 Connection 对象定义的标准事务隔离级别,默认值为
READ_UNCOMMITTED
。有关详细信息,请参见
java.sql.Connection
文档。
|
写 |
sessionInitStatement
|
(无) |
每次数据库会话与远程数据库打开后,在开始读取数据之前,此选项执行自定义 SQL 语句(或 PL/SQL 块)。使用此功能来实现会话初始化代码。示例:
option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
|
读 |
truncate
|
false
|
这是一个与 JDBC 写入器相关的选项。当启用
SaveMode.Overwrite
时,此选项导致 Spark 截断现有表,而不是删除并重新创建它。这可能会更有效,并防止表元数据(例如,索引)被移除。但是,它可能在某些情况下无效,例如当新数据具有不同的模式时。如果出现故障,用户应关闭
truncate
选项,以再次使用
DROP TABLE
。此外,由于不同数据库管理系统中的
TRUNCATE TABLE
行为不同,因此不一定安全使用。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect 和 OracleDialect 支持这一点,而 PostgresDialect 和默认的 JDBCDirect 不支持。对于未知和不支持的 JDBCDirect,用户选项
truncate
被忽略。
|
写 |
cascadeTruncate
|
相关 JDBC 数据库的默认级联截断行为,在每个 JDBCDialect 的
isCascadeTruncate
中指定
|
这是一个与 JDBC 写入器相关的选项。如果启用且 JDBC 数据库支持(目前为 PostgreSQL 和 Oracle),此选项允许执行
TRUNCATE TABLE t CASCADE
(在 PostgreSQL 中,执行
TRUNCATE TABLE ONLY t CASCADE
以防止意外截断子表)。这将影响其他表,因此应该谨慎使用。
|
写 |
createTableOptions
|
|
这是一个与 JDBC 写入器相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如:
CREATE TABLE t (name string) ENGINE=InnoDB.
)。
|
写 |
createTableColumnTypes
|
(无) |
创建表时使用的数据库列数据类型,代替默认数据类型。数据类型信息应以与创建表列语法相同的格式指定(例如:
"name CHAR(64), comments VARCHAR(1024)"
)。指定的类型应为有效的 Spark SQL 数据类型。
|
写 |
customSchema
|
(无) |
从 JDBC 连接器读取数据时使用的自定义模式。例如,
"id DECIMAL(38, 0), name STRING"
。您还可以指定部分字段,其他字段使用默认类型映射。例如,
"id DECIMAL(38, 0)"
。列名称应与 JDBC 表的相应列名称相同。用户可以指定 Spark SQL 的对应数据类型,而不是使用默认值。
|
读 |
pushDownPredicate
|
true
|
启用或禁用谓词下推到 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将尽可能将过滤器下推到 JDBC 数据源。否则,如果设置为 false,则不会将过滤器推送到 JDBC 数据源,因此所有过滤器将由 Spark 处理。通常,当 Spark 执行谓词过滤的速度比 JDBC 数据源快时,会关闭谓词下推。 | 读 |
pushDownAggregate
|
true
|
启用或禁用在 V2 JDBC 数据源中下推聚合的选项。默认值为 true,在这种情况下,Spark 将将聚合下推到 JDBC 数据源。否则,如果设置为 false,则不会将聚合下推到 JDBC 数据源。当聚合的执行速度比 JDBC 数据源快时,通常会关闭聚合下推。请注意,只有在所有聚合函数和相关过滤器都可以被下推时,聚合才可以被下推。如果
numPartitions
等于 1 或分组键与
partitionColumn
相同,则 Spark 会将聚合完全下推到数据源,并不对数据源输出应用最终聚合。否则,Spark 会对数据源输出应用最终聚合。
|
读 |
pushDownLimit
|
true
|
启用或禁用 LIMIT 下推到 V2 JDBC 数据源的选项。LIMIT 下推还包括 LIMIT + SORT,即 Top N 操作。默认值为 true,在这种情况下,Spark 将将 LIMIT 或 LIMIT 与 SORT 下推到 JDBC 数据源。否则,如果设置为 false,则 LIMIT 或 LIMIT 与 SORT 不会被下推到 JDBC 数据源。如果
numPartitions
大于 1,Spark 仍会在数据源结果上应用 LIMIT 或 LIMIT 与 SORT,即使 LIMIT 或 LIMIT 与 SORT 被下推。如果 LIMIT 或 LIMIT 与 SORT 被下推,并且
numPartitions
等于 1,Spark 则不会在数据源结果上应用 LIMIT 或 LIMIT 与 SORT。
|
读 |
pushDownOffset
|
true
|
启用或禁用 OFFSET 下推到 V2 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将将 OFFSET 下推到 JDBC 数据源。否则,如果设置为 false,Spark 将不会尝试将 OFFSET 下推到 JDBC 数据源。如果
pushDownOffset
为 true 且
numPartitions
等于 1,则 OFFSET 将被下推到 JDBC 数据源。否则,OFFSET 将不会被下推,Spark 仍会在数据源结果上应用 OFFSET。
|
读 |
pushDownTableSample
|
true
|
启用或禁用 TABLESAMPLE 下推到 V2 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将 TABLESAMPLE 下推到 JDBC 数据源。否则,如果值设置为 false,则 TABLESAMPLE 不会被下推到 JDBC 数据源。 | 读 |
keytab
|
(无) |
Kerberos keytab 文件的位置(必须通过
--files
选项的 spark-submit 或手动方式预先上传到所有节点)。当发现路径信息时,Spark 会认为 keytab 已手动分发,否则假定为
--files
。如果同时定义了
keytab
和
principal
,则 Spark 尝试进行 Kerberos 身份验证。
|
读/写 |
principal
|
(无) |
为 JDBC 客户端指定 Kerberos 凭证名称。如果同时定义了
keytab
和
principal
,则 Spark 尝试进行 Kerberos 身份验证。
|
读/写 |
refreshKrb5Config
|
false
|
此选项控制在为 JDBC 客户端建立新的连接之前,是否刷新 Kerberos 配置。如果希望刷新配置,则将其设置为 true,否则设置为 false。默认值为 false。请注意,如果将此选项设置为 true 并尝试建立多个连接,可能会发生竞争条件。一种可能的情况如下。
|
读/写 |
connectionProvider
|
(无) |
用于连接到此 URL 的 JDBC 连接提供者的名称,例如
db2
、
mssql
。必须是加载的 JDBC 数据源中的提供者之一。当有多个提供者可以处理指定的驱动程序和选项时,用于消除歧义。所选提供者不得被
spark.sql.sources.disabledJdbcConnProviderList
禁用。
|
读/写 |
preferTimestampNTZ
|
false |
当将选项设置为
true
时,TIMESTAMP WITHOUT TIME ZONE 类型推断为 Spark 的 TimestampNTZ 类型。否则,它被解释为 Spark 的 Timestamp 类型(等同于 TIMESTAMP WITH LOCAL TIME ZONE)。此设置仅影响 TIMESTAMP WITHOUT TIME ZONE 数据类型的推断。TIMESTAMP WITH LOCAL TIME ZONE 和 TIMESTAMP WITH TIME ZONE 数据类型始终被解读为 Spark 的 Timestamp 类型,无论此设置如何。
|
读 |
请注意,带有 keytab 的 kerberos 认证并不总是受到 JDBC 驱动程序的支持。
在使用
keytab
和
principal
配置选项之前,请确保满足以下要求:
- 包含的JDBC驱动程序版本支持带有keytab的kerberos身份验证。
- 有一个内置连接提供程序,支持使用的数据库。
以下数据库有内置的连接提供程序:
- DB2
- 玛丽亚数据库
- 微软 SQL
- 甲骨文
- 后 PostgreSQL
如果不满足要求,请考虑使用
JdbcConnectionProvider
开发者API 来处理自定义认证。
# 注意:通过 load/save 或 jdbc 方法可以实现 JDBC 的加载和保存
# 从 JDBC 源加载数据
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
jdbcDF2 = spark.read \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# 在读取时指定数据框架列的数据类型
jdbcDF3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.option("customSchema", "id DECIMAL(38, 0), name STRING") \
.load()
# 将数据保存到 JDBC 源
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
jdbcDF2.write \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# 在写入时指定创建表的列数据类型
jdbcDF.write \
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
// 注意:JDBC 加载和保存可以通过 load/save 或 jdbc 方法实现
// 从 JDBC 源加载数据
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 指定读取模式的自定义数据类型
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 将数据保存到 JDBC 源
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 在写入时指定创建表的列数据类型
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 注意:JDBC 加载和保存可以通过 load/save 或 jdbc 方法实现
// 从 JDBC 数据源加载数据
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// 将数据保存到 JDBC 数据源
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// 在写入时指定创建表的列数据类型
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
# 从 JDBC 数据源加载数据
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
# 将数据保存到 JDBC 数据源
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
创建 临时 视图 jdbcTable
使用 org.apache.spark.sql.jdbc
选项 (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename",
用户 'username',
密码 'password'
)
插入 到 表 jdbcTable
选择 * 从 resultTable