迁移指南
将工作负载迁移至Delta Lake
当您将工作负载迁移到Delta Lake时,应该注意以下与Apache Spark和Apache Hive提供的数据源相比的简化和差异。
Delta Lake 会自动处理以下操作,您永远不应手动执行:
添加和删除分区: Delta Lake 自动跟踪表中存在的分区集合并随着数据的添加或删除更新列表。因此无需运行
ALTER TABLE [ADD|DROP] PARTITION或MSCK。加载单个分区: 作为优化手段,有时您可以直接加载感兴趣的数据分区。例如,
spark.read.format("parquet").load("/data/date=2017-01-01")。这在Delta Lake中是不必要的,因为它可以快速从事务日志中读取文件列表来查找相关文件。如果您只对单个分区感兴趣,请使用WHERE子句指定。例如,spark.read.delta("/data").where("date = '2017-01-01'")。对于分区中包含大量文件的大表,这比从Parquet表加载单个分区(通过直接指定分区路径或使用WHERE子句)要快得多,因为从目录中列出文件通常比从事务日志中读取文件列表要慢。
当您将现有应用程序迁移到Delta Lake时,应避免以下绕过事务日志的操作:
手动修改数据: Delta Lake 使用事务日志以原子方式提交对表的更改。由于日志是唯一真实来源,Spark 不会读取已写出但未添加到事务日志的文件。同样,即使您手动删除文件,事务日志中仍会保留指向该文件的指针。请始终使用本指南中描述的命令来操作 Delta 表中的文件,而非手动修改。
示例
假设您有一个名为/data-pipeline的目录存储了Parquet数据,并且您想创建一个名为events的Delta表。
第一个示例展示了如何:
从原始位置
/data-pipeline读取 Parquet 数据到 DataFrame 中。将DataFrame的内容以Delta格式保存在单独的位置
/tmp/delta/data-pipeline/。基于该独立位置
/tmp/delta/data-pipeline/创建events表。
第二个示例展示了如何使用CONVERT TO TABLE将数据从Parquet格式转换为Delta格式,同时保持其原始位置/data-pipeline/不变。
另存为Delta表
将Parquet数据读取到DataFrame中,然后将DataFrame的内容以
delta格式保存到新目录:data = spark.read.format("parquet").load("/data-pipeline") data.write.format("delta").save("/tmp/delta/data-pipeline/")
创建一个名为
events的Delta表,该表指向新目录中的文件:spark.sql("CREATE TABLE events USING DELTA LOCATION '/tmp/delta/data-pipeline/'")
转换为Delta表
将Parquet表转换为Delta表有两种选择:
将文件转换为Delta Lake格式并创建Delta表:
CONVERT TO DELTA parquet.`/data-pipeline/` CREATE TABLE events USING DELTA LOCATION '/data-pipeline/'
创建一个Parquet表,然后将其转换为Delta表:
CREATE TABLE events USING PARQUET OPTIONS (path '/data-pipeline/') CONVERT TO DELTA events
详情请参阅将Parquet表转换为Delta表。
将Delta Lake工作负载迁移到较新版本
本节讨论从旧版Delta Lake迁移到新版时,用户代码可能需要进行的任何更改。
从Delta Lake 3.0以下版本升级到Delta Lake 3.0或更高版本
请注意,Spark上的Delta Lake Maven构件已从delta-core(3.0之前版本)更名为delta-spark(3.0及更高版本)。
从Delta Lake 2.1.1或更低版本升级到Delta Lake 2.2或更高版本
Delta Lake 2.2在将Parquet表转换为Delta Lake表时默认会收集统计信息(例如使用CONVERT TO DELTA命令)。若要选择不收集统计信息并恢复为2.1.1或更早版本的默认行为,请使用NO STATISTICS SQL API(例如CONVERT TO DELTA parquet.`)
从Delta Lake 1.2.1、2.0.0或2.1.0升级到Delta Lake 2.0.1、2.1.1或更高版本
Delta Lake 1.2.1、2.0.0和2.1.0版本中,基于DynamoDB的S3多集群配置实现存在一个bug,错误的时间戳值被写入DynamoDB。这导致DynamoDB的TTL功能在不安全的情况下清理了已完成的项目。该问题已在Delta Lake 2.0.1和2.1.1版本中修复,并且TTL属性已从commitTime重命名为expireTime。
如果您已经在DynamoDB表上使用旧属性启用了TTL,您需要先禁用该属性的TTL,然后为新属性启用TTL。这两项操作之间可能需要等待一小时,因为TTL设置的更改可能需要一些时间才能生效。请参阅DynamoDB文档此处。如果不这样做,DyanmoDB的TTL功能将不会删除任何新的过期条目。不存在数据丢失的风险。
# Disable TTL on old attribute
aws dynamodb update-time-to-live \
--region <region> \
--table-name <table-name> \
--time-to-live-specification "Enabled=false, AttributeName=commitTime"
# Enable TTL on new attribute
aws dynamodb update-time-to-live \
--region <region> \
--table-name <table-name> \
--time-to-live-specification "Enabled=true, AttributeName=expireTime"
从Delta Lake 2.0或更低版本升级到Delta Lake 2.1或更高版本
当在目录表上调用CONVERT TO DELTA时,Delta Lake 2.1会从目录中推断数据模式。在2.0及以下版本中,Delta Lake是从数据中推断数据模式。这意味着在Delta 2.1中,原始目录表中未定义的数据列将不会出现在转换后的Delta表中。可以通过设置Spark会话配置spark.databricks.delta.convert.useCatalogSchema=false来禁用此行为。
从Delta Lake 1.2或更低版本升级到Delta Lake 2.0或更高版本
Delta Lake 2.0.0 引入了对 DROP CONSTRAINT 的行为变更。在1.2及以下版本中,尝试删除不存在的约束时不会抛出错误。在2.0.0及以上版本中,行为变更为会抛出约束不存在的错误。为避免此错误,请使用 IF EXISTS 结构(例如 ALTER TABLE events DROP CONSTRAINT IF EXISTS constraint_name)。删除现有约束的行为没有变化。
Delta Lake 2.0.0 引入了对动态分区覆盖的支持。在1.2及更早版本中,无论是在Spark会话配置还是DataFrameWriter选项中启用动态分区覆盖模式都无效,使用overwrite模式的写入会替换表中所有现有分区的数据。而在2.0.0及以上版本中,当启用动态分区覆盖模式时,Delta Lake只会替换那些将要提交新数据的逻辑分区中的所有现有数据。
从Delta Lake 1.1或更低版本升级到Delta Lake 1.2或更高版本
与LogStore相关的代码已从delta-core Maven模块中提取出来,作为#951问题的一部分,放入新模块delta-storage以便更好地管理代码。这导致delta-core需要额外依赖一个JAR包delta-storage-。默认情况下,这个额外的JAR会作为delta-core-依赖的一部分被下载。在没有互联网连接的集群中,delta-storage-无法自动下载。建议手动下载delta-storage-并将其放入Java类路径中。
从Delta Lake 1.0或更低版本升级到Delta Lake 1.1或更高版本
如果Delta表中的分区列名称包含无效字符( ,;{}()\n\t=),由于SPARK-36271问题,您将无法在Delta Lake 1.1及以上版本中读取该表。不过这种情况应该很少见,因为从Delta Lake 0.6及以上版本开始就无法创建此类表。如果您仍在使用这类遗留表,可以在将Delta Lake升级到1.1及以上版本之前,使用Delta Lake 1.0或更低版本通过覆盖表的方式来使用新的有效列名,例如以下方式:
spark.read \
.format("delta") \
.load("/the/delta/table/path") \
.withColumnRenamed("column name", "column-name") \
.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.save("/the/delta/table/path")
spark.read
.format("delta")
.load("/the/delta/table/path")
.withColumnRenamed("column name", "column-name")
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.save("/the/delta/table/path")
从Delta Lake 0.6或更低版本升级到Delta Lake 0.7或更高版本
如果您在Scala、Java或Python中使用DeltaTable API来更新或运行实用程序操作,那么在创建用于执行这些操作的SparkSession时,可能需要添加以下配置。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("...") \
.master("...") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate();
或者,您可以在提交Spark应用程序时通过spark-submit,或者在启动spark-shell/pyspark时,通过命令行参数指定额外的配置项。
spark-submit --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" ...
pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" ...