通用加载/保存函数

在最简单的形式中,默认数据源( parquet ,除非通过 spark.sql.sources.default 进行其他配置)将用于所有操作。

df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
数据集<> usersDF = spark.读取().加载("examples/src/main/resources/users.parquet");
usersDF.选择("name", "favorite_color").写入().保存("namesAndFavColors.parquet");
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

手动指定选项

您也可以手动指定将与数据源一起使用的数据源,以及您想传递给数据源的任何其他选项。数据源通过其完全限定名称(即, org.apache.spark.sql.parquet )进行指定,但对于内置源,您也可以使用其简短名称( json parquet jdbc orc libsvm csv text )。从任何数据源类型加载的DataFrames可以使用此语法转换为其他类型。

请参考API文档以获取内置数据源的可用选项,例如, org.apache.spark.sql.DataFrameReader org.apache.spark.sql.DataFrameWriter 。文档中记录的选项也应适用于非Scala的Spark API(例如PySpark)。对于其他格式,请参阅特定格式的API文档。

要加载一个JSON文件,您可以使用:

df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
数据集<> peopleDF =
spark.().格式("json").加载("examples/src/main/resources/people.json");
peopleDF.选择("name", "age").写入().格式("parquet").保存("namesAndAges.parquet");
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

要加载一个CSV文件,你可以使用:

df = spark.read.load("examples/src/main/resources/people.csv",
format="csv", sep=";", inferSchema="true", header="true")
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
数据集<> peopleDFCsv = spark.read().format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv");
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

额外选项在写入操作中也会被使用。 例如,您可以控制ORC数据源的布隆过滤器和字典编码。 以下ORC示例将创建布隆过滤器,并仅对 favorite_color 使用字典编码。 对于Parquet,也存在 parquet.bloom.filter.enabled parquet.enable.dictionary 。 要找到有关额外ORC/Parquet选项的更详细信息, 请访问官方Apache ORC / Parquet 网站。

ORC 数据源:

df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc"))
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
usersDF.write().format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc");
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.
创建  users_with_options (
姓名 字符串,
最喜爱的颜色 字符串,
最喜欢的数字 数组<整数>
) 使用 ORC
选项 (
orc.bloom.filter.columns 'favorite_color',
orc.dictionary.key.threshold '1.0',
orc.column.encoding.direct 'name'
)

Parquet 数据源:

df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet"))
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
usersDF.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
usersDF.write().format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet");
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.
创建  users_with_options (
姓名 字符串,
喜欢的颜色 字符串,
喜欢的数字 数组<整数>
) 使用 parquet
选项 (
`parquet.bloom.filter.enabled#favorite_color` true,
`parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
parquet.启用.字典 true,
parquet.页面.写入-校验和.启用 true
)

直接在文件上运行 SQL

您可以直接使用SQL查询文件,而不是使用读取API将文件加载到DataFrame中并查询它。

df = spark.sql("选择 * 从 parquet.`examples/src/main/resources/users.parquet`")
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
val sqlDF = spark.sql("从 parquet.`examples/src/main/resources/users.parquet` 中选择 * ")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
数据集<> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
df <- sql("选择 * 从 parquet.`examples/src/main/resources/users.parquet`")
Find full example code at "examples/src/main/r/RSparkSQLExample.R" in the Spark repo.

保存模式

保存操作可以选择性地采用一个 SaveMode ,用于指定如何处理现有数据(如果存在的话)。重要的是要意识到,这些保存模式不使用任何锁定,并且不是原子的。此外,当执行 Overwrite 操作时,数据将在写出新数据之前被删除。

Scala/Java 任何语言 含义
SaveMode.ErrorIfExists (默认) "error" 或 "errorifexists" (默认) 当将 DataFrame 保存到数据源时,如果数据已存在, 预计会抛出异常。
SaveMode.Append "append" 当将 DataFrame 保存到数据源时,如果数据/表已存在, 预计将 DataFrame 的内容附加到现有数据。
SaveMode.Overwrite "overwrite" 覆盖模式意味着在将 DataFrame 保存到数据源时, 如果数据/表已存在,预计现有数据将被 DataFrame 的内容覆盖。
SaveMode.Ignore "ignore" 忽略模式意味着在将 DataFrame 保存到数据源时,如果数据已存在, 预计保存操作不会保存 DataFrame 的内容,也不会更改现有数据。这类似于 SQL 中的 CREATE TABLE IF NOT EXISTS

保存到持久化表

DataFrames 也可以使用 saveAsTable 命令被保存为持久化表到 Hive 元存储中。请注意,使用此功能不需要现有的 Hive 部署。Spark 将为您创建一个默认的本地 Hive 元存储(使用 Derby)。与 createOrReplaceTempView 命令不同, saveAsTable 会物化 DataFrame 的内容,并在 Hive 元存储中创建数据的指针。持久化表即使在您的 Spark 程序重新启动后仍将存在,只要您保持与同一元存储的连接。可以通过在 SparkSession 上调用 table 方法并提供表名来创建持久化表的 DataFrame。

对于基于文件的数据源,例如文本、parquet、json等,您可以通过 path 选项指定自定义表路径,例如 df.write.option("path", "/some/path").saveAsTable("t") 。当表被删除时,自定义表路径将不会被移除,表数据仍然存在。如果没有指定自定义表路径,Spark将把数据写入仓库目录下的默认表路径。当表被删除时,默认表路径也会被移除。

从 Spark 2.1 开始,持久化数据源表在 Hive 元数据存储中具有每个分区的元数据。这带来了几个好处:

请注意,在创建外部数据源表时(那些带有 path 选项的表),默认情况下不会收集分区信息。要同步元存储中的分区信息,您可以调用 MSCK REPAIR TABLE

分桶、排序和分区

对于基于文件的数据源,也可以对输出进行分桶和排序或分区。 分桶和排序仅适用于持久表:

df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
创建  users_bucketed_by_name(
名称 字符串,
最喜欢的颜色 字符串,
最喜欢的数字 数组<整数>
) 使用 parquet
按照 (名称) 分成 42 个桶;

虽然在使用数据集API时, save saveAsTable 都可以使用分区。

df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
usersDF
.write()
.partitionBy("favorite_color")
.format("parquet")
.save("namesPartByColor.parquet");
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
创建  users_by_favorite_color(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) 使用 csv 分区 (favorite_color);

可以对单个表同时使用分区和桶化:

df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed"))
Find full example code at "examples/src/main/python/sql/datasource.py" in the Spark repo.
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
Find full example code at "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" in the Spark repo.
usersDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed");
Find full example code at "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" in the Spark repo.
创建  users_bucketed_and_partitioned(
名字 STRING,
最喜欢的颜色 STRING,
最喜欢的数字 array<integer>
) 使用 parquet
分区  (最喜欢的颜色)
聚类 (名字) 排序  (最喜欢的数字) 分入 42 ;

partitionBy 创建一个如 分区发现 部分所描述的目录结构。因此,它在高基数列上的适用性有限。相反, bucketBy 将数据分配到固定数量的桶中,当唯一值的数量是无限制时可以使用。