通用加载/保存函数
在最简单的形式中,默认数据源(
parquet
,除非通过
spark.sql.sources.default
进行其他配置)将用于所有操作。
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
数据集<行> usersDF = spark.读取().加载("examples/src/main/resources/users.parquet");
usersDF.选择("name", "favorite_color").写入().保存("namesAndFavColors.parquet");
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
手动指定选项
您也可以手动指定将与数据源一起使用的数据源,以及您想传递给数据源的任何其他选项。数据源通过其完全限定名称(即,
org.apache.spark.sql.parquet
)进行指定,但对于内置源,您也可以使用其简短名称(
json
、
parquet
、
jdbc
、
orc
、
libsvm
、
csv
、
text
)。从任何数据源类型加载的DataFrames可以使用此语法转换为其他类型。
请参考API文档以获取内置数据源的可用选项,例如,
org.apache.spark.sql.DataFrameReader
和
org.apache.spark.sql.DataFrameWriter
。文档中记录的选项也应适用于非Scala的Spark API(例如PySpark)。对于其他格式,请参阅特定格式的API文档。
要加载一个JSON文件,您可以使用:
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
数据集<行> peopleDF =
spark.读().格式("json").加载("examples/src/main/resources/people.json");
peopleDF.选择("name", "age").写入().格式("parquet").保存("namesAndAges.parquet");
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
要加载一个CSV文件,你可以使用:
df = spark.read.load("examples/src/main/resources/people.csv",
format="csv", sep=";", inferSchema="true", header="true")
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
数据集<行> peopleDFCsv = spark.read().format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv");
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")
额外选项在写入操作中也会被使用。
例如,您可以控制ORC数据源的布隆过滤器和字典编码。
以下ORC示例将创建布隆过滤器,并仅对
favorite_color
使用字典编码。
对于Parquet,也存在
parquet.bloom.filter.enabled
和
parquet.enable.dictionary
。
要找到有关额外ORC/Parquet选项的更详细信息,
请访问官方Apache
ORC
/
Parquet
网站。
ORC 数据源:
df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc"))
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")
usersDF.write().format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc");
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
创建 表 users_with_options (
姓名 字符串,
最喜爱的颜色 字符串,
最喜欢的数字 数组<整数>
) 使用 ORC
选项 (
orc.bloom.filter.columns 'favorite_color',
orc.dictionary.key.threshold '1.0',
orc.column.encoding.direct 'name'
)
Parquet 数据源:
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet"))
usersDF.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet")
usersDF.write().format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet");
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
创建 表 users_with_options (
姓名 字符串,
喜欢的颜色 字符串,
喜欢的数字 数组<整数>
) 使用 parquet
选项 (
`parquet.bloom.filter.enabled#favorite_color` true,
`parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
parquet.启用.字典 true,
parquet.页面.写入-校验和.启用 true
)
直接在文件上运行 SQL
您可以直接使用SQL查询文件,而不是使用读取API将文件加载到DataFrame中并查询它。
df = spark.sql("选择 * 从 parquet.`examples/src/main/resources/users.parquet`")
val sqlDF = spark.sql("从 parquet.`examples/src/main/resources/users.parquet` 中选择 * ")
数据集<行> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
df <- sql("选择 * 从 parquet.`examples/src/main/resources/users.parquet`")
保存模式
保存操作可以选择性地采用一个
SaveMode
,用于指定如何处理现有数据(如果存在的话)。重要的是要意识到,这些保存模式不使用任何锁定,并且不是原子的。此外,当执行
Overwrite
操作时,数据将在写出新数据之前被删除。
Scala/Java | 任何语言 | 含义 |
---|---|---|
SaveMode.ErrorIfExists
(默认)
|
"error" 或 "errorifexists"
(默认)
|
当将 DataFrame 保存到数据源时,如果数据已存在, 预计会抛出异常。 |
SaveMode.Append
|
"append"
|
当将 DataFrame 保存到数据源时,如果数据/表已存在, 预计将 DataFrame 的内容附加到现有数据。 |
SaveMode.Overwrite
|
"overwrite"
|
覆盖模式意味着在将 DataFrame 保存到数据源时, 如果数据/表已存在,预计现有数据将被 DataFrame 的内容覆盖。 |
SaveMode.Ignore
|
"ignore"
|
忽略模式意味着在将 DataFrame 保存到数据源时,如果数据已存在,
预计保存操作不会保存 DataFrame 的内容,也不会更改现有数据。这类似于 SQL 中的
CREATE TABLE IF NOT EXISTS
。
|
保存到持久化表
DataFrames
也可以使用
saveAsTable
命令被保存为持久化表到 Hive 元存储中。请注意,使用此功能不需要现有的 Hive 部署。Spark 将为您创建一个默认的本地 Hive 元存储(使用 Derby)。与
createOrReplaceTempView
命令不同,
saveAsTable
会物化 DataFrame 的内容,并在 Hive 元存储中创建数据的指针。持久化表即使在您的 Spark 程序重新启动后仍将存在,只要您保持与同一元存储的连接。可以通过在
SparkSession
上调用
table
方法并提供表名来创建持久化表的 DataFrame。
对于基于文件的数据源,例如文本、parquet、json等,您可以通过
path
选项指定自定义表路径,例如
df.write.option("path", "/some/path").saveAsTable("t")
。当表被删除时,自定义表路径将不会被移除,表数据仍然存在。如果没有指定自定义表路径,Spark将把数据写入仓库目录下的默认表路径。当表被删除时,默认表路径也会被移除。
从 Spark 2.1 开始,持久化数据源表在 Hive 元数据存储中具有每个分区的元数据。这带来了几个好处:
- 由于元存储只会为查询返回必要的分区,因此在第一次查询表时不再需要发现所有分区。
-
像
ALTER TABLE PARTITION ... SET LOCATION
这样的 Hive DDL 现在可以用于通过 Datasource API 创建的表。
请注意,在创建外部数据源表时(那些带有
path
选项的表),默认情况下不会收集分区信息。要同步元存储中的分区信息,您可以调用
MSCK REPAIR TABLE
。
分桶、排序和分区
对于基于文件的数据源,也可以对输出进行分桶和排序或分区。 分桶和排序仅适用于持久表:
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
创建 表 users_bucketed_by_name(
名称 字符串,
最喜欢的颜色 字符串,
最喜欢的数字 数组<整数>
) 使用 parquet
按照 由(名称) 分成 42 个桶;
虽然在使用数据集API时,
save
和
saveAsTable
都可以使用分区。
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
usersDF
.write()
.partitionBy("favorite_color")
.format("parquet")
.save("namesPartByColor.parquet");
创建 表 users_by_favorite_color(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) 使用 csv 分区 由(favorite_color);
可以对单个表同时使用分区和桶化:
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed"))
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
usersDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed");
创建 表 users_bucketed_and_partitioned(
名字 STRING,
最喜欢的颜色 STRING,
最喜欢的数字 array<integer>
) 使用 parquet
分区 按 (最喜欢的颜色)
聚类 按(名字) 排序 按 (最喜欢的数字) 分入 42 桶;
partitionBy
创建一个如
分区发现
部分所描述的目录结构。因此,它在高基数列上的适用性有限。相反,
bucketBy
将数据分配到固定数量的桶中,当唯一值的数量是无限制时可以使用。