最佳实践
本文介绍使用Delta Lake时的最佳实践。
选择正确的分区列
您可以按列对Delta表进行分区。最常用的分区列是date。
遵循以下两条经验法则来决定按哪列进行分区:
如果某列的唯一值数量非常多,则不要使用该列进行分区。例如,如果按
userId列进行分区,而该列可能存在100万个不同的用户ID,那么这就是一种糟糕的分区策略。每个分区的数据量:如果预计某个分区的数据至少有1 GB,可以按该列进行分区。
压缩文件
如果您持续向Delta表写入数据,随着时间的推移会积累大量文件,特别是以小批量方式添加数据时。这会对表读取效率产生不利影响,也可能影响文件系统性能。理想情况下,应该定期将大量小文件重写为数量较少的大文件。这个过程被称为压缩(compaction)。
您可以通过将表重新分区为较少数量的文件来压缩表。此外,您可以指定选项dataChange为false,表示该操作不会更改数据,仅重新排列数据布局。这将确保其他并发操作因此压缩操作受到的影响最小。
例如,您可以将表压缩为16个文件:
val path = "..."
val numFiles = 16
spark.read
.format("delta")
.load(path)
.repartition(numFiles)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.save(path)
path = "..."
numFiles = 16
(spark.read
.format("delta")
.load(path)
.repartition(numFiles)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.save(path))
如果您的表已分区,并且您希望基于谓词仅重新分区某个分区,可以使用where仅读取该分区,并使用replaceWhere将其写回:
val path = "..."
val partition = "year = '2019'"
val numFilesPerPartition = 16
spark.read
.format("delta")
.load(path)
.where(partition)
.repartition(numFilesPerPartition)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.option("replaceWhere", partition)
.save(path)
path = "..."
partition = "year = '2019'"
numFilesPerPartition = 16
(spark.read
.format("delta")
.load(path)
.where(partition)
.repartition(numFilesPerPartition)
.write
.option("dataChange", "false")
.format("delta")
.mode("overwrite")
.option("replaceWhere", partition)
.save(path))
警告
在会改变数据的操作上使用dataChange = false可能导致表中的数据损坏。
注意
此操作不会删除旧文件。如需删除它们,请运行VACUUM命令。
替换表的内容或架构
有时您可能想要替换一个Delta表。例如:
您发现表中的数据不正确,想要替换内容。
您想重写整个表以进行不兼容的架构更改(例如更改列类型)。
虽然您可以删除Delta表的整个目录并在同一路径上创建新表,但不建议这样做,因为:
删除目录的效率不高。包含超大文件的目录可能需要数小时甚至数天才能删除。
删除文件后,你将丢失其中的所有内容;如果误删了表,很难恢复。
目录删除操作不是原子性的。当您正在删除表时,并发查询读取该表可能会失败或看到不完整的表。
如果不需要更改表结构,可以从Delta表中删除数据并插入新数据,或者更新表以修正错误值。
如果您想更改表结构,可以原子性地替换整个表。例如:
dataframe.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy(<your-partition-columns>) \
.saveAsTable("<your-table>") # Managed table
dataframe.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.option("path", "<your-table-path>") \
.partitionBy(<your-partition-columns>) \
.saveAsTable("<your-table>") # External table
REPLACE TABLE <your-table> USING DELTA PARTITIONED BY (<your-partition-columns>) AS SELECT ... -- Managed table
REPLACE TABLE <your-table> USING DELTA PARTITIONED BY (<your-partition-columns>) LOCATION "<your-table-path>" AS SELECT ... -- External table
dataframe.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.partitionBy(<your-partition-columns>)
.saveAsTable("<your-table>") // Managed table
dataframe.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.option("path", "<your-table-path>")
.partitionBy(<your-partition-columns>)
.saveAsTable("<your-table>") // External table
这种方法有以下多个优势:
覆盖表的操作速度要快得多,因为它不需要递归列出目录或删除任何文件。
旧版本的表格仍然存在。如果您误删了表格,可以使用Time Travel轻松恢复旧数据。
这是一个原子操作。在删除表时,并发查询仍然可以读取该表。
由于Delta Lake的ACID事务保证,如果覆盖表失败,表将恢复到之前的状态。
此外,如果您希望在覆盖表后删除旧文件以节省存储成本,可以使用VACUUM来删除它们。该操作针对文件删除进行了优化,通常比删除整个目录更快。