JDBC 连接其他数据库

Spark SQL 还包括一个数据源,可以通过 JDBC 从其他数据库读取数据。这个功能应该优先于使用 JdbcRDD 。因为结果作为 DataFrame 返回,可以很容易地在 Spark SQL 中处理或与其他数据源连接。JDBC 数据源在 Java 或 Python 中也更易于使用,因为它不要求用户提供 ClassTag。 (注意,这与 Spark SQL JDBC 服务器不同,后者允许其他应用程序使用 Spark SQL 运行查询。)

要开始使用,你需要在 Spark 类路径中包含特定数据库的 JDBC 驱动程序。比如,从 Spark Shell 连接到 Postgres,你可以运行以下命令:

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

数据源选项

Spark支持以下不区分大小写的JDBC选项。JDBC的数据源选项可以通过以下方式设置:

对于连接属性,用户可以在数据源选项中指定JDBC连接属性。 user password 通常作为连接属性提供,用于登录到数据源。

属性名称 默认值 含义 范围
url (无) 连接的 JDBC URL 形式为 jdbc:subprotocol:subname 。可以在 URL 中指定特定于源的连接属性。例如: jdbc:postgresql://localhost/test?user=fred&password=secret 读/写
dbtable (无) 应从中读取或写入的 JDBC 表。请注意,在读取路径中,任何在 SQL 查询的 FROM 子句中有效的内容都可以使用。例如,您可以使用带括号的子查询,而不是完整的表。不允许同时指定 dbtable query 选项。 读/写
query (无) 将用于将数据读取到 Spark 的查询。指定的查询将被括起来,并用作 FROM 子句中的子查询。Spark 还会为子查询子句分配一个别名。例如,spark 将向 JDBC 源发出以下格式的查询。

SELECT FROM ( ) spark_gen_alias

使用此选项时有以下几个限制。
  1. 不允许同时指定 dbtable query 选项。
  2. 不允许同时指定 query partitionColumn 选项。当指定 partitionColumn 选项时,要求必需,子查询可以改为使用 dbtable 选项,并且可以使用作为 dbtable 一部分提供的子查询别名限定分区列。
    例如:
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("query", "select c1, c2 from t1")
    .load()
读/写
prepareQuery (无) 将与 query 一起形成最终查询的前缀。由于指定的 query 将被括起来,作为 FROM 子句中的子查询,并且某些数据库不支持子查询中的所有子句, prepareQuery 属性提供了一种运行此类复杂查询的方法。例如,spark 将向 JDBC 源发出以下格式的查询。

SELECT FROM ( ) spark_gen_alias

下面是几个示例。
  1. MSSQL Server 不接受子查询中的 WITH 子句,但可以将此查询拆分为 prepareQuery query :
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "WITH t AS (SELECT x, y FROM tbl)")
    .option("query", "SELECT * FROM t WHERE x > 10")
    .load()
  2. MSSQL Server 不接受子查询中的临时表子句,但可以将此查询拆分为 prepareQuery query :
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl) t)")
    .option("query", "SELECT * FROM #TempTable")
    .load()
读/写
driver (无) 用于连接此 URL 的 JDBC 驱动程序的类名。 读/写
partitionColumn, lowerBound, upperBound (无) 如果指定了任何一个选项,则必须同时指定所有这些选项。此外, numPartitions 必须被指定。它们描述了在从多个工作者并行读取时如何对表进行划分。 partitionColumn 必须是相关表中的数字、日期或时间戳列。请注意, lowerBound upperBound 仅用于决定分区步幅,而不是用于过滤表中的行。因此,表中的所有行将被划分并返回。此选项仅适用于读取。
例如:
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
numPartitions (无) 用于表读取和写入的最大分区数。这还决定了最大并发 JDBC 连接数。如果要写入的分区数超过此限制,则通过在写入之前调用 coalesce(numPartitions) 将其减少到此限制。 读/写
queryTimeout 0 驱动程序将在给定的秒数内等待 Statement 对象执行的秒数。零表示没有限制。在写路径中,此选项取决于 JDBC 驱动程序如何实现 API setQueryTimeout ,例如,h2 JDBC 驱动程序检查每个查询的超时,而不是整个 JDBC 批次。 读/写
fetchsize 0 JDBC 获取大小,决定每次往返获取多少行。这可以帮助使用默认获取大小较低的 JDBC 驱动程序(例如 Oracle,默认 10 行)的性能。
batchsize 1000 JDBC 批量大小,决定每次往返插入多少行。这可以帮助提高 JDBC 驱动程序的性能。此选项仅适用于写入。
isolationLevel READ_UNCOMMITTED 事务隔离级别,适用于当前连接。它可以是 NONE READ_COMMITTED READ_UNCOMMITTED REPEATABLE_READ SERIALIZABLE 中的一个,具体对应 JDBC 的 Connection 对象定义的标准事务隔离级别,默认值为 READ_UNCOMMITTED 。有关详细信息,请参见 java.sql.Connection 文档。
sessionInitStatement (无) 每次数据库会话与远程数据库打开后,在开始读取数据之前,此选项执行自定义 SQL 语句(或 PL/SQL 块)。使用此功能来实现会话初始化代码。示例: option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
truncate false 这是一个与 JDBC 写入器相关的选项。当启用 SaveMode.Overwrite 时,此选项导致 Spark 截断现有表,而不是删除并重新创建它。这可能会更有效,并防止表元数据(例如,索引)被移除。但是,它可能在某些情况下无效,例如当新数据具有不同的模式时。如果出现故障,用户应关闭 truncate 选项,以再次使用 DROP TABLE 。此外,由于不同数据库管理系统中的 TRUNCATE TABLE 行为不同,因此不一定安全使用。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect 和 OracleDialect 支持这一点,而 PostgresDialect 和默认的 JDBCDirect 不支持。对于未知和不支持的 JDBCDirect,用户选项 truncate 被忽略。
cascadeTruncate 相关 JDBC 数据库的默认级联截断行为,在每个 JDBCDialect 的 isCascadeTruncate 中指定 这是一个与 JDBC 写入器相关的选项。如果启用且 JDBC 数据库支持(目前为 PostgreSQL 和 Oracle),此选项允许执行 TRUNCATE TABLE t CASCADE (在 PostgreSQL 中,执行 TRUNCATE TABLE ONLY t CASCADE 以防止意外截断子表)。这将影响其他表,因此应该谨慎使用。
createTableOptions 这是一个与 JDBC 写入器相关的选项。如果指定,此选项允许在创建表时设置特定于数据库的表和分区选项(例如: CREATE TABLE t (name string) ENGINE=InnoDB. )。
createTableColumnTypes (无) 创建表时使用的数据库列数据类型,代替默认数据类型。数据类型信息应以与创建表列语法相同的格式指定(例如: "name CHAR(64), comments VARCHAR(1024)" )。指定的类型应为有效的 Spark SQL 数据类型。
customSchema (无) 从 JDBC 连接器读取数据时使用的自定义模式。例如, "id DECIMAL(38, 0), name STRING" 。您还可以指定部分字段,其他字段使用默认类型映射。例如, "id DECIMAL(38, 0)" 。列名称应与 JDBC 表的相应列名称相同。用户可以指定 Spark SQL 的对应数据类型,而不是使用默认值。
pushDownPredicate true 启用或禁用谓词下推到 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将尽可能将过滤器下推到 JDBC 数据源。否则,如果设置为 false,则不会将过滤器推送到 JDBC 数据源,因此所有过滤器将由 Spark 处理。通常,当 Spark 执行谓词过滤的速度比 JDBC 数据源快时,会关闭谓词下推。
pushDownAggregate true 启用或禁用在 V2 JDBC 数据源中下推聚合的选项。默认值为 true,在这种情况下,Spark 将将聚合下推到 JDBC 数据源。否则,如果设置为 false,则不会将聚合下推到 JDBC 数据源。当聚合的执行速度比 JDBC 数据源快时,通常会关闭聚合下推。请注意,只有在所有聚合函数和相关过滤器都可以被下推时,聚合才可以被下推。如果 numPartitions 等于 1 或分组键与 partitionColumn 相同,则 Spark 会将聚合完全下推到数据源,并不对数据源输出应用最终聚合。否则,Spark 会对数据源输出应用最终聚合。
pushDownLimit true 启用或禁用 LIMIT 下推到 V2 JDBC 数据源的选项。LIMIT 下推还包括 LIMIT + SORT,即 Top N 操作。默认值为 true,在这种情况下,Spark 将将 LIMIT 或 LIMIT 与 SORT 下推到 JDBC 数据源。否则,如果设置为 false,则 LIMIT 或 LIMIT 与 SORT 不会被下推到 JDBC 数据源。如果 numPartitions 大于 1,Spark 仍会在数据源结果上应用 LIMIT 或 LIMIT 与 SORT,即使 LIMIT 或 LIMIT 与 SORT 被下推。如果 LIMIT 或 LIMIT 与 SORT 被下推,并且 numPartitions 等于 1,Spark 则不会在数据源结果上应用 LIMIT 或 LIMIT 与 SORT。
pushDownOffset true 启用或禁用 OFFSET 下推到 V2 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将将 OFFSET 下推到 JDBC 数据源。否则,如果设置为 false,Spark 将不会尝试将 OFFSET 下推到 JDBC 数据源。如果 pushDownOffset 为 true 且 numPartitions 等于 1,则 OFFSET 将被下推到 JDBC 数据源。否则,OFFSET 将不会被下推,Spark 仍会在数据源结果上应用 OFFSET。
pushDownTableSample true 启用或禁用 TABLESAMPLE 下推到 V2 JDBC 数据源的选项。默认值为 true,在这种情况下,Spark 将 TABLESAMPLE 下推到 JDBC 数据源。否则,如果值设置为 false,则 TABLESAMPLE 不会被下推到 JDBC 数据源。
keytab (无) Kerberos keytab 文件的位置(必须通过 --files 选项的 spark-submit 或手动方式预先上传到所有节点)。当发现路径信息时,Spark 会认为 keytab 已手动分发,否则假定为 --files 。如果同时定义了 keytab principal ,则 Spark 尝试进行 Kerberos 身份验证。 读/写
principal (无) 为 JDBC 客户端指定 Kerberos 凭证名称。如果同时定义了 keytab principal ,则 Spark 尝试进行 Kerberos 身份验证。 读/写
refreshKrb5Config false 此选项控制在为 JDBC 客户端建立新的连接之前,是否刷新 Kerberos 配置。如果希望刷新配置,则将其设置为 true,否则设置为 false。默认值为 false。请注意,如果将此选项设置为 true 并尝试建立多个连接,可能会发生竞争条件。一种可能的情况如下。
  1. refreshKrb5Config 标志设置为安全上下文 1
  2. 用于相应 DBMS 的 JDBC 连接提供者被使用
  3. krb5.conf 被修改,但 JVM 尚未意识到必须重新加载
  4. Spark 成功验证安全上下文 1
  5. JVM 从修改后的 krb5.conf 加载安全上下文 2
  6. Spark 还原之前保存的安全上下文 1
  7. 修改后的 krb5.conf 内容已丢失
读/写
connectionProvider (无) 用于连接到此 URL 的 JDBC 连接提供者的名称,例如 db2 mssql 。必须是加载的 JDBC 数据源中的提供者之一。当有多个提供者可以处理指定的驱动程序和选项时,用于消除歧义。所选提供者不得被 spark.sql.sources.disabledJdbcConnProviderList 禁用。 读/写
preferTimestampNTZ false 当将选项设置为 true 时,TIMESTAMP WITHOUT TIME ZONE 类型推断为 Spark 的 TimestampNTZ 类型。否则,它被解释为 Spark 的 Timestamp 类型(等同于 TIMESTAMP WITH LOCAL TIME ZONE)。此设置仅影响 TIMESTAMP WITHOUT TIME ZONE 数据类型的推断。TIMESTAMP WITH LOCAL TIME ZONE 和 TIMESTAMP WITH TIME ZONE 数据类型始终被解读为 Spark 的 Timestamp 类型,无论此设置如何。

请注意,带有 keytab 的 kerberos 认证并不总是受到 JDBC 驱动程序的支持。
在使用 keytab principal 配置选项之前,请确保满足以下要求:

以下数据库有内置的连接提供程序:

如果不满足要求,请考虑使用 JdbcConnectionProvider 开发者API 来处理自定义认证。

# 注意:通过 load/save 或 jdbc 方法可以实现 JDBC 的加载和保存
# 从 JDBC 源加载数据
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()
jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# 在读取时指定数据框架列的数据类型
jdbcDF3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("customSchema", "id DECIMAL(38, 0), name STRING") \
    .load()
# 将数据保存到 JDBC 源
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()
jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# 在写入时指定创建表的列数据类型
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
// 注意:JDBC 加载和保存可以通过 load/save 或 jdbc 方法实现
// 从 JDBC 源加载数据
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 指定读取模式的自定义数据类型
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 将数据保存到 JDBC 源
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// 在写入时指定创建表的列数据类型
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
// 注意:JDBC 加载和保存可以通过 load/save 或 jdbc 方法实现
// 从 JDBC 数据源加载数据
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// 将数据保存到 JDBC 数据源
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// 在写入时指定创建表的列数据类型
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
# 从 JDBC 数据源加载数据
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
# 将数据保存到 JDBC 数据源
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.
创建 临时 视图 jdbcTable
使用 org.apache.spark.sql.jdbc
选项 (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename",
用户 'username',
密码 'password'
)
插入   jdbcTable
选择 *  resultTable