从/到其他DBMS ¶
在Spark上的pandas API与其他DBMS交互的API与pandas中的略有不同,因为Spark上的pandas API利用PySpark中的JDBC API从其他DBMS读取和写入。
用于从/向外部数据库管理系统读取/写入的API如下:
|
将SQL数据库表读取到DataFrame中。 |
|
将SQL查询读取到DataFrame中。 |
|
将SQL查询或数据库表读取到DataFrame中。 |
pandas-on-Spark 需要一个规范的 JDBC URL 用于
con
,并且能够接受额外的关键字参数用于
PySpark JDBC APIs中的选项
:
ps.read_sql(..., dbtable="...", driver="", keytab="", ...)
读取和写入数据框 ¶
在下面的示例中,您将读取和写入SQLite中的一个表。
首先,通过Python的SQLite库创建如下面所示的
example
数据库。这个数据库将在后续的pandas-on-Spark中读取:
import sqlite3
con = sqlite3.connect('example.db')
cur = con.cursor()
# Create table
cur.execute(
'''CREATE TABLE stocks
(date text, trans text, symbol text, qty real, price real)''')
# Insert a row of data
cur.execute("INSERT INTO stocks VALUES ('2006-01-05','BUY','RHAT',100,35.14)")
# Save (commit) the changes
con.commit()
con.close()
Pandas API on Spark 需要一个 JDBC 驱动程序来读取,因此它需要你特定数据库的驱动程序在 Spark 的类路径上。对于 SQLite JDBC 驱动程序,你可以下载它,例如,如下所示:
curl -O https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar
之后,您应该首先将其添加到您的 Spark 会话中。一旦添加,Spark 上的 pandas API 将自动检测 Spark 会话并利用它。
import os
from pyspark.sql import SparkSession
(SparkSession.builder
.master("local")
.appName("SQLite JDBC")
.config(
"spark.jars",
"{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
.config(
"spark.driver.extraClassPath",
"{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
.getOrCreate())
现在,您准备好查看表格了:
import pyspark.pandas as ps
df = ps.read_sql("stocks", con="jdbc:sqlite:{}/example.db".format(os.getcwd()))
df
date trans symbol qty price
0 2006-01-05 BUY RHAT 100.0 35.14
您也可以如下面所示将其写回到
stocks
表中:
df.price += 1
df.spark.to_spark_io(
format="jdbc", mode="append",
dbtable="stocks", url="jdbc:sqlite:{}/example.db".format(os.getcwd()))
ps.read_sql("stocks", con="jdbc:sqlite:{}/example.db".format(os.getcwd()))
date trans symbol qty price
0 2006-01-05 BUY RHAT 100.0 35.14
1 2006-01-05 BUY RHAT 100.0 36.14