表的批量读写
Delta Lake 支持 Apache Spark DataFrame 读写 API 提供的大多数选项,用于对表执行批量读写操作。
对于Delta Lake表上的许多操作,您可以通过在创建新的SparkSession时设置配置来启用与Apache Spark DataSourceV2和Catalog API(自3.0起)的集成。请参阅Configure SparkSession。
在本文中:
创建表
Delta Lake 支持创建两种类型的表——在元存储中定义的表和通过路径定义的表。
要使用元存储定义的表,您必须在创建新的SparkSession时通过设置配置来启用与Apache Spark DataSourceV2和Catalog API的集成。请参阅Configure SparkSession。
您可以通过以下方式创建表格。
SQL DDL命令: 您可以使用Apache Spark支持的标准SQL DDL命令(例如
CREATE TABLE和REPLACE TABLE)来创建Delta表。CREATE TABLE IF NOT EXISTS default.people10m ( id INT, firstName STRING, middleName STRING, lastName STRING, gender STRING, birthDate TIMESTAMP, ssn STRING, salary INT ) USING DELTA CREATE OR REPLACE TABLE default.people10m ( id INT, firstName STRING, middleName STRING, lastName STRING, gender STRING, birthDate TIMESTAMP, ssn STRING, salary INT ) USING DELTA
SQL 还支持在路径上创建表,而无需在 Hive 元存储中创建条目。
```sql
– 使用路径创建或替换表
创建或替换表 delta./tmp/delta/people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn 字符串,
salary INT
) USING DELTA
```
DataFrameWriterAPI: 如果您想同时创建表并从Spark DataFrames或Datasets插入数据,可以使用Spark的DataFrameWriter(Scala或Java 和 Python)。# 使用DataFrame的schema在元存储中创建表并写入数据 df.write.format("delta").saveAsTable("default.people10m") # 使用DataFrame的schema创建或替换带路径的分区表并写入/覆盖数据 df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
// 使用DataFrame的schema在元存储中创建表并写入数据 df.write.format("delta").saveAsTable("default.people10m") // 使用DataFrame的schema创建带路径的表并写入数据 df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
您还可以使用Spark的DataFrameWriterV2 API来创建Delta表。
DeltaTableBuilderAPI: 您也可以使用Delta Lake中的DeltaTableBuilderAPI来创建表。与DataFrameWriter API相比,该API可以更轻松地指定列注释、表属性和generated columns等附加信息。
注意
该功能是新增的,目前处于预览阶段。
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
详情请参阅API文档。
分区数据
您可以通过分区数据来加速涉及分区列谓词的查询或DML操作。 在创建Delta表时,可以通过指定分区列来进行数据分区。以下示例按性别进行分区。
-- Create table in the metastore
CREATE TABLE default.people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
USING DELTA
PARTITIONED BY (gender)
df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")
DeltaTable.create(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.partitionedBy("gender") \
.execute()
df.write.format("delta").partitionBy("gender").saveAsTable("default.people10m")
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.partitionedBy("gender")
.execute()
要判断表是否包含特定分区,可以使用语句SELECT COUNT(*) > 0 FROM 。如果分区存在,将返回true。例如:
SELECT COUNT(*) > 0 AS `Partition exists` FROM default.people10m WHERE gender = "M"
display(spark.sql("SELECT COUNT(*) > 0 AS `Partition exists` FROM default.people10m WHERE gender = 'M'"))
display(spark.sql("SELECT COUNT(*) > 0 AS `Partition exists` FROM default.people10m WHERE gender = 'M'"))
控制数据位置
对于在元存储中定义的表,您可以选择将LOCATION指定为路径。使用指定LOCATION创建的表被视为不受元存储管理。与未指定路径的托管表不同,当您DROP非托管表时,其文件不会被删除。
当你使用CREATE TABLE命令并指定一个已经包含Delta Lake存储数据的LOCATION时,Delta Lake会执行以下操作:
如果仅指定表名和位置,例如:
CREATE TABLE default.people10m USING DELTA LOCATION '/tmp/delta/people10m'
元存储中的表会自动继承现有数据的模式、分区和表属性。此功能可用于将数据"导入"到元存储中。
如果指定了任何配置(模式、分区或表属性),Delta Lake会验证该规范是否与现有数据的配置完全匹配。
重要提示
如果指定的配置与数据的配置不完全匹配,Delta Lake会抛出一个描述差异的异常。
注意
元存储并非Delta表最新信息的真实来源。实际上,元存储中的表定义可能不包含诸如模式和属性等所有元数据。它仅包含表的位置信息,而该位置的事务日志才是真实来源。如果从不了解Delta特定定制的系统查询元存储,可能会看到不完整或过时的表信息。
使用生成的列
注意
此功能是新增的,目前处于预览阶段。
Delta Lake支持生成列,这是一种特殊类型的列,其值会根据用户指定的函数基于Delta表中其他列自动生成。当您向带有生成列的表中写入数据且未明确提供这些列的值时,Delta Lake会自动计算这些值。例如,您可以从时间戳列自动生成日期列(用于按日期对表进行分区);任何向表中写入数据的操作只需指定时间戳列的数据即可。但是,如果您明确为这些列提供值,则这些值必须满足约束 (,否则写入操作将失败并报错。
重要
使用生成列创建的表具有比默认值更高的表写入协议版本。请参阅Delta Lake如何管理功能兼容性?以了解表协议版本控制以及拥有更高版本表协议版本的含义。
以下示例展示如何创建一个包含生成列的表:
DeltaTable.create(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("dateOfBirth", DateType(), generatedAlwaysAs="CAST(birthDate AS DATE)") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.partitionedBy("gender") \
.execute()
DeltaTable.create(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn(
DeltaTable.columnBuilder("dateOfBirth")
.dataType(DateType)
.generatedAlwaysAs("CAST(dateOfBirth AS DATE)")
.build())
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.partitionedBy("gender")
.execute()
生成的列会像普通列一样存储。也就是说,它们会占用存储空间。
以下限制适用于生成列:
生成表达式可以使用Spark中任何在给定相同参数值时始终返回相同结果的SQL函数,除了以下类型的函数:
用户自定义函数。
聚合函数。
窗口函数。
返回多行的函数。
对于Delta Lake 1.1.0及以上版本,当您将
spark.databricks.delta.schema.autoMerge.enabled设置为true时,MERGE操作支持生成列。
Delta Lake 可以在分区列由以下任一表达式定义时,为查询生成分区过滤器:
CAST(col AS DATE)且col的类型是TIMESTAMP。YEAR(col)且col的类型为TIMESTAMP。由
YEAR(col), MONTH(col)定义的两个分区列,且col的类型为TIMESTAMP。由
YEAR(col), MONTH(col), DAY(col)定义的三个分区列,且col的类型为TIMESTAMP。由
YEAR(col), MONTH(col), DAY(col), HOUR(col)定义的四个分区列,且col的类型为TIMESTAMP。SUBSTRING(col, pos, len)且col的类型为STRINGDATE_FORMAT(col, format)且col的类型为TIMESTAMP。DATE_TRUNC(format, col) and the type of thecol` 的类型是TIMESTAMP或DATE。TRUNC(col, format)且col的类型为TIMESTAMP或DATE。
如果分区列由上述表达式之一定义,并且查询使用生成表达式的基础列来过滤数据,Delta Lake会检查基础列与生成列之间的关系,并在可能的情况下基于生成的分区列填充分区过滤器。例如,给定以下表:
DeltaTable.create(spark) \
.tableName("default.events") \
.addColumn("eventId", "BIGINT") \
.addColumn("data", "STRING") \
.addColumn("eventType", "STRING") \
.addColumn("eventTime", "TIMESTAMP") \
.addColumn("eventDate", "DATE", generatedAlwaysAs="CAST(eventTime AS DATE)") \
.partitionedBy("eventType", "eventDate") \
.execute()
如果您随后运行以下查询:
spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" <= "2020-10-01 12:00:00"')
Delta Lake 会自动生成分区过滤器,因此即使未指定分区过滤器,上述查询也只会读取分区 date=2020-10-01 中的数据。
再举一个例子,给定以下表格:
DeltaTable.create(spark) \
.tableName("default.events") \
.addColumn("eventId", "BIGINT") \
.addColumn("data", "STRING") \
.addColumn("eventType", "STRING") \
.addColumn("eventTime", "TIMESTAMP") \
.addColumn("year", "INT", generatedAlwaysAs="YEAR(eventTime)") \
.addColumn("month", "INT", generatedAlwaysAs="MONTH(eventTime)") \
.addColumn("day", "INT", generatedAlwaysAs="DAY(eventTime)") \
.partitionedBy("eventType", "year", "month", "day") \
.execute()
如果随后运行以下查询:
spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" <= "2020-10-01 12:00:00"')
Delta Lake 会自动生成分区过滤器,这样即使未指定分区过滤器,上述查询也只会读取分区 year=2020/month=10/day=01 中的数据。
您可以使用EXPLAIN子句并检查提供的执行计划,查看Delta Lake是否自动生成了任何分区过滤器。
使用标识列
重要
在Delta表上声明标识列会禁用并发事务。仅在不需要对目标表进行并发写入的用例中使用标识列。
Delta Lake标识列在Delta Lake 3.3及以上版本中受支持。它们是一种生成列,为插入表中的每条记录分配唯一值。以下示例展示了如何在创建表命令中声明标识列:
from delta.tables import DeltaTable, IdentityGenerator
from pyspark.sql.types import LongType
DeltaTable.create()
.tableName("table_name")
.addColumn("id_col1", dataType=LongType(), generatedAlwaysAs=IdentityGenerator())
.addColumn("id_col2", dataType=LongType(), generatedAlwaysAs=IdentityGenerator(start=-1, step=1))
.addColumn("id_col3", dataType=LongType(), generatedByDefaultAs=IdentityGenerator())
.addColumn("id_col4", dataType=LongType(), generatedByDefaultAs=IdentityGenerator(start=-1, step=1))
.execute()
import io.delta.tables.DeltaTable
import org.apache.spark.sql.types.LongType
DeltaTable.create(spark)
.tableName("table_name")
.addColumn(
DeltaTable.columnBuilder(spark, "id_col1")
.dataType(LongType)
.generatedAlwaysAsIdentity().build())
.addColumn(
DeltaTable.columnBuilder(spark, "id_col2")
.dataType(LongType)
.generatedAlwaysAsIdentity(start = -1L, step = 1L).build())
.addColumn(
DeltaTable.columnBuilder(spark, "id_col3")
.dataType(LongType)
.generatedByDefaultAsIdentity().build())
.addColumn(
DeltaTable.columnBuilder(spark, "id_col4")
.dataType(LongType)
.generatedByDefaultAsIdentity(start = -1L, step = 1L).build())
.execute()
注意
目前尚不支持用于标识列的SQL API。
您可以选择性地指定以下内容:
起始值。
步长,可以是正数或负数。
起始值和步长默认均为1。不能指定步长为0。
由标识列分配的值是唯一的,并按照指定步长的方向和倍数递增,但不能保证是连续的。例如,起始值为0且步长为2时,所有值都是正偶数,但可能会跳过某些偶数。
当标识列被指定为generated by default as identity时,插入操作可以指定标识列的值。将其指定为generated always as identity可覆盖手动设置值的能力。
标识列仅支持LongType类型,如果分配的值超出LongType支持的范围,操作将失败。
您可以使用ALTER TABLE table_name ALTER COLUMN column_name SYNC IDENTITY来同步标识列(identity column)的元数据与实际数据。当您向标识列写入自定义值时,这些值可能与元数据不匹配。该选项会评估当前状态并更新元数据以与实际数据保持一致。执行此命令后,下一个自动分配的标识值将从start + (n + 1) * step开始,其中n是满足start + n * step >= max()的最小值(对于正步长的情况)。
CTAS与标识列
在使用CREATE TABLE table_name AS SELECT (CTAS)语句时,您无法定义模式、标识列约束或任何其他表规范。
要创建一个带有标识列的新表并用现有数据填充它,请执行以下操作:
创建一个具有正确模式的表,包括标识列定义和其他表属性。
执行插入操作。
以下示例将标识列定义为generated by default as identity。如果插入表中的数据包含标识列的有效值,则使用这些值。
from delta.tables import DeltaTable, IdentityGenerator
from pyspark.sql.types import LongType, DateType
DeltaTable.create(spark)
.tableName("new_table")
.addColumn("id", dataType=LongType(), generatedByDefaultAs=IdentityGenerator(start=5, step=1))
.addColumn("event_date", dataType=DateType())
.addColumn("some_value", dataType=LongType())
.execute()
# Insert records including existing IDs
old_table_df = spark.table("old_table").select("id", "event_date", "some_value")
old_table_df.write
.format("delta")
.mode("append")
.saveAsTable("new_table")
# Insert records and generate new IDs
new_records_df = spark.table("new_records").select("event_date", "some_value")
new_records_df.write
.format("delta")
.mode("append")
.saveAsTable("new_table")
import org.apache.spark.sql.types._
import io.delta.tables.DeltaTable
DeltaTable.createOrReplace(spark)
.tableName("new_table")
.addColumn(
DeltaTable.columnBuilder(spark, "id")
.dataType(LongType)
.generatedByDefaultAsIdentity(start = 5L, step = 1L)
.build())
.addColumn(
DeltaTable.columnBuilder(spark, "event_date")
.dataType(DateType)
.nullable(true)
.build())
.addColumn(
DeltaTable.columnBuilder(spark, "some_value")
.dataType(LongType)
.nullable(true)
.build())
.execute()
// Insert records including existing IDs
val oldTableDF = spark.table("old_table").select("id", "event_date", "some_value")
oldTableDF.write
.format("delta")
.mode("append")
.saveAsTable("new_table")
// Insert records and generate new IDs
val newRecordsDF = spark.table("new_records").select("event_date", "some_value")
newRecordsDF.write
.format("delta")
.mode("append")
.saveAsTable("new_table")
为列指定默认值
Delta 支持为 Delta 表中的列指定默认表达式。当用户写入这些表时未明确为某些列提供值,或者当他们显式使用 DEFAULT SQL 关键字时,Delta 会自动为这些列生成默认值。更多信息,请参阅专用文档页面。
在列名中使用特殊字符
默认情况下,表列名不支持特殊字符,如空格以及任何,;{}()\n\t=字符。若要在表的列名中包含这些特殊字符,请启用列映射功能。
默认表属性
在SparkSession中设置的Delta Lake配置会覆盖会话中新建Delta Lake表的默认表属性。 SparkSession中使用的前缀与表属性中的配置项不同。
Delta Lake 配置 |
SparkSession 配置 |
|---|---|
|
|
例如,要为会话中创建的所有新Delta Lake表设置delta.appendOnly = true属性,请设置以下内容:
SET spark.databricks.delta.properties.defaults.appendOnly = true
要修改现有表的表属性,请使用SET TBLPROPERTIES。
读取表
您可以通过指定表名或路径将Delta表加载为DataFrame:
SELECT * FROM default.people10m -- query table in the metastore
SELECT * FROM delta.`/tmp/delta/people10m` -- query table by path
spark.table("default.people10m") # query table in the metastore
spark.read.format("delta").load("/tmp/delta/people10m") # query table by path
spark.table("default.people10m") // query table in the metastore
spark.read.format("delta").load("/tmp/delta/people10m") // create table by path
import io.delta.implicits._
spark.read.delta("/tmp/delta/people10m")
返回的DataFrame会自动读取表的最新快照进行查询;您无需运行REFRESH TABLE。Delta Lake会自动利用分区和统计信息,在查询中存在适用谓词时读取最少量的数据。
查询表的旧快照(时间旅行)
Delta Lake 时间旅行功能允许您查询 Delta 表的旧版本快照。时间旅行有许多应用场景,包括:
重新创建分析、报告或输出(例如机器学习模型的输出)。这对于调试或审计非常有用,特别是在受监管的行业中。
编写复杂的时间查询。
修复数据中的错误。
为快速变化的表提供查询集的快照隔离。
本节介绍查询旧版本表的支持方法、数据保留注意事项,并提供相关示例。
注意
每个版本N的时间戳取决于Delta表日志中对应版本N的日志文件的时间戳。因此,如果您将整个Delta表目录复制到新位置,按时间戳进行时间旅行可能会中断。按版本进行时间旅行则不会受到影响。
语法
本节展示如何查询Delta表的旧版本。
SQL AS OF 语法
SELECT * FROM table_name TIMESTAMP AS OF timestamp_expression
SELECT * FROM table_name VERSION AS OF version
timestamp_expression可以是以下任意一种:'2018-10-18T22:15:12.013Z',即可以转换为时间戳的字符串cast('2018-10-18 13:36:32 CEST' as timestamp)'2018-10-18',即一个日期字符串current_timestamp() - interval 12 hoursdate_sub(current_date(), 1)任何其他可以或已经被转换为时间戳的表达式
version是一个可以从DESCRIBE HISTORY table_spec的输出中获取的长整型值。
timestamp_expression 和 version 都不能是子查询。
DataFrameReader选项
DataFrameReader选项允许您从Delta表中创建一个固定到该表特定版本的DataFrame。
df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/tmp/delta/people10m")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/tmp/delta/people10m")
对于timestamp_string,仅接受日期或时间戳字符串。例如,"2019-01-01"和"2019-01-01T00:00:00.000Z"。
一种常见的模式是在作业执行过程中使用Delta表的最新状态来更新下游应用。
由于Delta表会自动更新,如果底层数据被更新,从Delta表加载的DataFrame在多次调用中可能会返回不同的结果。通过使用时间旅行功能,您可以固定DataFrame在多次调用中返回的数据:
history = spark.sql("DESCRIBE HISTORY delta.`/tmp/delta/people10m`")
latest_version = history.selectExpr("max(version)").collect()
df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/tmp/delta/people10m")
示例
修复用户
111对表的意外删除操作:
yesterday = spark.sql("SELECT CAST(date_sub(current_date(), 1) AS STRING)").collect()[0][0]
df = spark.read.format("delta").option("timestampAsOf", yesterday).load("/tmp/delta/events")
df.where("userId = 111").write.format("delta").mode("append").save("/tmp/delta/events")
修复对表的意外错误更新:
yesterday = spark.sql("SELECT CAST(date_sub(current_date(), 1) AS STRING)").collect()[0][0]
df = spark.read.format("delta").option("timestampAsOf", yesterday).load("/tmp/delta/events")
df.createOrReplaceTempView("my_table_yesterday")
spark.sql('''
MERGE INTO delta.`/tmp/delta/events` target
USING my_table_yesterday source
ON source.userId = target.userId
WHEN MATCHED THEN UPDATE SET *
''')
查询过去一周新增客户数量。
last_week = spark.sql("SELECT CAST(date_sub(current_date(), 7) AS STRING)").collect()[0][0]
df = spark.read.format("delta").option("timestampAsOf", last_week).load("/tmp/delta/events")
last_week_count = df.select("userId").distinct().count()
count = spark.read.format("delta").load("/tmp/delta/events").select("userId").distinct().count()
new_customers_count = count - last_week_count
数据保留
要回溯到之前的版本,你必须保留该版本的日志和数据文件。
Delta Lake表的数据文件不会自动删除;数据文件仅在运行VACUUM时才会被删除。VACUUM不会删除Delta日志文件;日志文件在检查点写入后会自动清理。
默认情况下,您可以回溯Delta表最多30天的历史记录,除非您具备以下条件:
在您的Delta表上运行
VACUUM命令。使用以下表属性更改数据或日志文件的保留期限:
delta.logRetentionDuration = "interval: 控制表历史记录的保留时长。默认值为" interval 30 days。
每次写入检查点时,Delta会自动清理超过保留间隔的旧日志条目。如果将此项配置设置为足够大的值,则会保留许多日志条目。这不会影响性能,因为针对日志的操作是恒定时间。历史记录操作是并行进行的,但随着日志大小的增加,操作成本会变得更高。
delta.deletedFileRetentionDuration = "interval: 控制文件被删除后必须经过多长时间才能成为" VACUUM的候选对象。默认值为interval 7 days。即使对Delta表执行
VACUUM操作后仍要访问30天的历史数据,请设置delta.deletedFileRetentionDuration = "interval 30 days"。此设置可能会导致存储成本上升。
注意
由于日志条目清理,可能会出现无法时间旅行到早于保留间隔版本的情况。Delta Lake要求自上一个检查点以来的所有连续日志条目才能时间旅行到特定版本。例如,一个表最初包含版本[0,19]的日志条目,并在版本10有一个检查点,如果版本0的日志条目被清理,那么您就无法时间旅行到版本[1,9]。增加表属性delta.logRetentionDuration可以帮助避免这些情况。
提交内时间戳
概述
Delta Lake 3.3 引入了In-Commit Timestamps功能,提供了一种更可靠、更一致的方式来追踪表修改时间戳。这些修改时间戳在多种应用场景中都是必需的,例如回溯到过去的特定时间点。该功能解决了传统依赖文件修改时间戳方法的局限性,特别是在涉及数据迁移或复制的场景中。
功能详情
In-Commit Timestamps(提交内时间戳)将修改时间戳存储在提交本身中,确保无论文件系统操作如何,它们都保持不变。这带来了几个好处:
不可变历史: 时间戳成为表的永久提交历史的一部分
一致的时间旅行: 使用基于时间戳的时间旅行查询即使在表迁移后也能产生可靠的结果
在没有In-Commit Timestamp功能的情况下,Delta Lake使用文件修改时间戳作为提交时间戳。这种方法存在多种限制:
数据迁移问题:当表在存储位置之间移动时,文件修改时间戳会发生变化,可能会破坏历史跟踪
复制场景:在不同环境间复制数据时可能出现时间戳不一致的情况
时间旅行可靠性:这些时间戳变更可能会影响时间旅行查询的准确性和一致性
写入表
追加
要以原子方式向现有的Delta表添加新数据,请使用append模式:
INSERT INTO default.people10m SELECT * FROM morePeople
df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")
df.write.format("delta").mode("append").save("/tmp/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")
import io.delta.implicits._
df.write.mode("append").delta("/tmp/delta/people10m")
覆盖写入
要以原子方式替换表中的所有数据,请使用overwrite模式:
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")
import io.delta.implicits._
df.write.mode("overwrite").delta("/tmp/delta/people10m")
您可以有选择地仅覆盖与任意表达式匹配的数据。此功能在Delta Lake 1.1.0及以上版本中的DataFrames中可用,并在Delta Lake 2.4.0及以上版本的SQL中受支持。
以下命令原子性地替换目标表中1月份的事件数据,该表按start_date分区,使用replace_data中的数据:
replace_data.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'") \
.save("/tmp/delta/events")
replace_data.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.save("/tmp/delta/events")
INSERT INTO TABLE events REPLACE WHERE start_data >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data
此示例代码将replace_data中的数据写出,验证其是否全部符合谓词条件,并执行原子替换。如果您想写出不完全符合谓词条件的数据来替换目标表中的匹配行,可以通过将spark.databricks.delta.replaceWhere.constraintCheck.enabled设置为false来禁用约束检查:
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false
在Delta Lake 1.0.0及更早版本中,replaceWhere仅覆盖与分区列谓词匹配的数据。以下命令会原子性地将目标表中按date分区的1月份数据替换为df中的数据:
df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'") \
.save("/tmp/delta/people10m")
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.save("/tmp/delta/people10m")
在Delta Lake 1.1.0及以上版本中,如果您想回退到旧的行为,可以禁用spark.databricks.delta.replaceWhere.dataColumns.enabled标志:
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false
动态分区覆盖写入
Delta Lake 2.0及以上版本支持对分区表的动态分区覆盖模式。
在动态分区覆盖模式下,我们会覆盖每个逻辑分区中现有的所有数据,写入操作将提交新数据。对于写入操作不包含数据的任何现有逻辑分区,将保持不变。此模式仅适用于以覆盖模式写入数据的情况:在SQL中使用INSERT OVERWRITE,或使用df.write.mode("overwrite")写入DataFrame。
通过将Spark会话配置spark.sql.sources.partitionOverwriteMode设置为dynamic来配置动态分区覆盖模式。您也可以通过将DataFrameWriter选项partitionOverwriteMode设置为dynamic来启用此功能。如果存在查询特定选项,它将覆盖会话配置中定义的模式。partitionOverwriteMode的默认值为static。
SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;
df.write \
.format("delta") \
.mode("overwrite") \
.option("partitionOverwriteMode", "dynamic") \
.saveAsTable("default.people10m")
df.write
.format("delta")
.mode("overwrite")
.option("partitionOverwriteMode", "dynamic")
.saveAsTable("default.people10m")
注意
动态分区覆盖与分区表的replaceWhere选项存在冲突。
如果在Spark会话配置中启用了动态分区覆盖功能,并且将
replaceWhere作为DataFrameWriter选项提供,那么Delta Lake会根据replaceWhere表达式覆盖数据(查询特定选项会覆盖会话配置)。如果
DataFrameWriter选项同时启用了动态分区覆盖和replaceWhere,您将会收到一个错误。
重要
验证使用动态分区覆盖写入的数据仅影响预期分区。错误分区中的单行数据可能导致意外覆盖整个分区。我们建议使用replaceWhere来指定要覆盖的数据。
如果分区被意外覆盖,您可以使用将Delta表恢复到早期状态来撤销更改。
有关Delta Lake对表更新的支持,请参阅表删除、更新和合并。
限制文件中写入的行数
您可以使用SQL会话配置spark.sql.files.maxRecordsPerFile来指定Delta Lake表中单个文件写入的最大记录数。指定零或负值表示无限制。
在使用DataFrame API写入Delta Lake表时,您也可以使用DataFrameWriter选项maxRecordsPerFile。当指定maxRecordsPerFile时,SQL会话配置spark.sql.files.maxRecordsPerFile的值将被忽略。
df.write.format("delta") \
.mode("append") \
.option("maxRecordsPerFile", "10000") \
.save("/tmp/delta/people10m")
df.write.format("delta")
.mode("append")
.option("maxRecordsPerFile", "10000")
.save("/tmp/delta/people10m")
幂等写入
有时,由于各种原因(例如作业遇到故障),将数据写入Delta表的作业会被重新启动。失败的作业在终止前可能已经将数据写入Delta表,也可能没有。在数据已写入Delta表的情况下,重新启动的作业会将相同的数据再次写入Delta表,从而导致数据重复。
为了解决这个问题,Delta表支持以下DataFrameWriter选项以使写入操作具有幂等性:
txnAppId: 一个唯一的字符串,你可以在每次DataFrame写入时传递。例如,这可以是作业的名称。txnVersion: 一个单调递增的数字,作为事务版本号。这个数字对于写入Delta表的数据必须是唯一的。例如,可以是在首次尝试查询时的纪元秒数。同一作业的任何后续重启都需要保持相同的txnVersion值。
上述选项组合对于每次新摄入Delta表的数据必须是唯一的,且txnVersion需要高于最后一次摄入Delta表的数据版本。例如:
最后成功写入的数据包含选项值,如
dailyETL:23423(txnAppId:txnVersion)。下一次数据写入时,
txnAppId = dailyETL和txnVersion至少应为23424(比上次写入数据的txnVersion值大1)。任何尝试使用
txnAppId = dailyETL和txnVersion值为23422或更小来写入数据的操作都会被忽略,因为该txnVersion小于表中最后记录的txnVersion值。尝试使用
txnAppId:txnVersion作为anotherETL:23424写入数据成功,因为与上次摄取数据中的相同选项值相比,它包含不同的txnAppId。
您还可以通过设置Spark会话配置spark.databricks.delta.write.txnAppId和spark.databricks.delta.write.txnVersion来配置幂等写入。此外,您可以将spark.databricks.delta.write.txnVersion.autoReset.enabled设置为true,以便在每次写入后自动重置spark.databricks.delta.write.txnVersion。当写入选项和会话配置都设置时,我们将使用写入选项的值。
警告
该解决方案假设在作业多次重试期间写入Delta表的数据是相同的。如果在Delta表中的一次写入尝试成功,但由于某些下游故障导致第二次写入尝试使用相同的事务选项但不同的数据,那么第二次写入尝试将被忽略。这可能导致意外结果。
示例
app_id = ... # A unique string that is used as an application ID.
version = ... # A monotonically increasing number that acts as transaction version.
dataFrame.write.format(...).option("txnVersion", version).option("txnAppId", app_id).save(...)
val appId = ... // A unique string that is used as an application ID.
version = ... // A monotonically increasing number that acts as transaction version.
dataFrame.write.format(...).option("txnVersion", version).option("txnAppId", appId).save(...)
SET spark.databricks.delta.write.txnAppId = ...;
SET spark.databricks.delta.write.txnVersion = ...;
SET spark.databricks.delta.write.txnVersion.autoReset.enabled = true; -- if set to true, this will reset txnVersion after every write
设置用户自定义的提交元数据
您可以在这些操作生成的提交中指定用户自定义字符串作为元数据,可以通过DataFrameWriter选项userMetadata或SparkSession配置spark.databricks.delta.commitInfo.userMetadata来实现。如果两者都已指定,则优先采用选项中的设置。这些用户自定义的元数据可以通过history操作读取。
SET spark.databricks.delta.commitInfo.userMetadata=overwritten-for-fixing-incorrect-data
INSERT OVERWRITE default.people10m SELECT * FROM morePeople
df.write.format("delta") \
.mode("overwrite") \
.option("userMetadata", "overwritten-for-fixing-incorrect-data") \
.save("/tmp/delta/people10m")
df.write.format("delta")
.mode("overwrite")
.option("userMetadata", "overwritten-for-fixing-incorrect-data")
.save("/tmp/delta/people10m")
模式验证
Delta Lake 会自动验证待写入DataFrame的架构是否与表的架构兼容。Delta Lake 使用以下规则来确定从DataFrame到表的写入是否兼容:
DataFrame中的所有列都必须存在于目标表中。如果DataFrame中有表中不存在的列,则会引发异常。表中存在但DataFrame中不存在的列将被设置为null。
DataFrame列的数据类型必须与目标表中的列数据类型匹配。如果不匹配,则会引发异常。
DataFrame列名不能仅大小写不同。这意味着同一表中不能定义如"Foo"和"foo"这样的列。虽然Spark可以使用区分大小写(默认不区分)模式,但Parquet在存储和返回列信息时是区分大小写的。Delta Lake在存储模式时保留大小写但不区分大小写,并设置此限制以避免潜在错误、数据损坏或丢失问题。
Delta Lake 支持通过 DDL 显式添加新列,并具备自动更新模式的能力。
如果指定了其他选项,例如partitionBy与追加模式结合使用时,Delta Lake会验证它们是否匹配,并在出现不匹配时抛出错误。当partitionBy不存在时,追加操作会自动遵循现有数据的分区方式。
更新表结构
Delta Lake 允许您更新表的模式。支持以下类型的更改:
添加新列(在任意位置)
重新排列现有列
您可以通过显式使用DDL或隐式使用DML来进行这些更改。
重要
当您更新Delta表结构时,从该表读取的流将终止。如果您希望流继续运行,必须重新启动它。
显式更新模式
您可以使用以下DDL显式更改表的模式。
添加列
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
默认情况下,可空性为 true。
要向嵌套字段添加列,请使用:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
更改列注释或排序
ALTER TABLE table_name ALTER [COLUMN] col_name col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
要更改嵌套字段中的列,请使用:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name]
重命名列
注意
此功能在Delta Lake 1.2.0及以上版本中可用。该功能目前处于实验阶段。
要在不重写任何现有列数据的情况下重命名列,必须为表启用列映射。请参阅enable column mapping。
要重命名列:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
要重命名嵌套字段:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
删除列
注意
此功能在Delta Lake 2.0及以上版本中可用。该功能目前处于实验阶段。
要以仅修改元数据而不重写任何数据文件的方式删除列,必须为表启用列映射功能。请参阅enable column mapping。
重要
从元数据中删除列不会删除文件中该列的基础数据。
要删除一列:
ALTER TABLE table_name DROP COLUMN col_name
要删除多列:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
自动模式更新
Delta Lake 可以自动更新表的模式作为DML事务的一部分(无论是追加还是覆盖),并使模式与写入的数据兼容。
替换表结构
默认情况下,覆盖表中的数据不会覆盖表结构。当使用mode("overwrite")覆盖表而不使用replaceWhere时,您可能仍希望覆盖写入数据的表结构。通过将overwriteSchema选项设置为true,您可以替换表的表结构和分区:
df.write.option("overwriteSchema", "true")
表上的视图
Delta Lake支持在Delta表上创建视图,就像对数据源表操作一样。
在使用视图时的核心挑战是解析模式。如果修改了Delta表模式,必须重新创建衍生视图以适应模式中的任何新增内容。例如,如果在Delta表中添加了新列,必须确保该列在基于该基础表构建的相应视图中可用。
表属性
您可以使用TBLPROPERTIES在CREATE和ALTER语句中将自定义元数据存储为表属性。之后可以通过SHOW命令查看这些元数据。例如:
ALTER TABLE default.people10m SET TBLPROPERTIES ('department' = 'accounting', 'delta.appendOnly' = 'true');
-- Show the table's properties.
SHOW TBLPROPERTIES default.people10m;
-- Show just the 'department' table property.
SHOW TBLPROPERTIES default.people10m ('department');
TBLPROPERTIES作为Delta表元数据的一部分存储。如果给定位置已存在Delta表,则无法在CREATE语句中定义新的TBLPROPERTIES。
此外,为了定制行为和性能,Delta Lake支持特定的Delta表属性:
阻止对Delta表的删除和更新操作:
delta.appendOnly=true。配置时间旅行保留属性:
delta.logRetentionDuration=和delta.deletedFileRetentionDuration=。详情请参阅数据保留。配置收集统计信息的列数:
delta.dataSkippingNumIndexedCols=n。该属性指示写入器仅收集表中前n列的统计信息。同时数据跳过代码会忽略超出此列索引的任何列的统计信息。此属性仅对新写入的数据生效。
注意
修改Delta表属性是一个写操作,会与其他并发写操作产生冲突,导致它们失败。我们建议您仅在表上没有并发写操作时修改表属性。
你也可以在首次提交到Delta表时通过Spark配置设置以delta.为前缀的属性。例如,要初始化一个带有属性delta.appendOnly=true的Delta表,可将Spark配置spark.databricks.delta.properties.defaults.appendOnly设为true。示例如下:
spark.sql("SET spark.databricks.delta.properties.defaults.appendOnly = true")
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
spark.conf.set("spark.databricks.delta.properties.defaults.appendOnly", "true")
另请参阅Delta表属性参考。
将表结构和属性同步到Hive元存储
您可以通过将spark.databricks.delta.catalog.update.enabled设置为true来启用表模式和属性到元存储的异步同步。每当Delta客户端检测到这两个属性因更新而发生变化时,它会将变更同步到元存储中。
该模式存储在HMS的表属性中。如果模式较小,它将直接存储在键spark.sql.sources.schema下:
{
"spark.sql.sources.schema": "{'name':'col1','type':'string','nullable':true, 'metadata':{}},{'name':'col2','type':'string','nullable':true,'metadata':{}}"
}
如果Schema很大,它将被分解为多个部分。将这些部分拼接在一起应该能得到正确的Schema。例如:
{
"spark.sql.sources.schema.numParts": "4",
"spark.sql.sources.schema.part.1": "{'name':'col1','type':'string','nullable':tr",
"spark.sql.sources.schema.part.2": "ue, 'metadata':{}},{'name':'co",
"spark.sql.sources.schema.part.3": "l2','type':'string','nullable':true,'meta",
"spark.sql.sources.schema.part.4": "data':{}}"
}
表元数据
Delta Lake 提供了丰富的功能用于探索表元数据。
它支持 SHOW COLUMNS 和 DESCRIBE TABLE。
它还提供以下独特命令:
DESCRIBE DETAIL
提供关于表结构、分区、表大小等信息。详情请参阅Retrieve Delta table details。
DESCRIBE HISTORY
提供来源信息,包括操作、用户等,以及每次对表的写入操作指标。表历史记录保留30天。详情请参阅检索Delta表历史记录。
配置SparkSession
对于许多Delta Lake操作,您可以在创建新的SparkSession时通过设置以下配置来启用与Apache Spark DataSourceV2和Catalog API(自3.0版本起)的集成。
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("...") \
.master("...") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate();
或者,您可以在提交Spark应用程序时通过spark-submit,或者在启动spark-shell或pyspark时,将它们作为命令行参数来添加配置。
spark-submit --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" ...
pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
配置存储凭证
Delta Lake 使用 Hadoop 文件系统 API 访问存储系统。存储系统的凭证通常可以通过 Hadoop 配置进行设置。Delta Lake 提供了多种设置 Hadoop 配置的方式,类似于 Apache Spark。
Spark配置
当您在集群上启动Spark应用程序时,可以通过spark.hadoop.*形式的Spark配置来传递自定义Hadoop配置。例如,为spark.hadoop.a.b.c设置值会将该值作为Hadoop配置a.b.c传递,Delta Lake将使用它来访问Hadoop文件系统API。
更多详情请参阅Spark文档。
SQL会话配置
Spark SQL会将所有当前的SQL会话配置传递给Delta Lake,Delta Lake将使用这些配置访问Hadoop FileSystem API。例如,SET a.b.c=x.y.z会告知Delta Lake将值x.y.z作为Hadoop配置a.b.c传递,Delta Lake将使用该配置访问Hadoop FileSystem API。
DataFrame选项
除了通过Spark(集群)配置或SQL会话配置设置Hadoop文件系统配置外,Delta还支持在读取或写入表时,通过使用DataFrameReader.load(path)或DataFrameWriter.save(path),从DataFrameReader和DataFrameWriter选项(即以fs.前缀开头的选项键)中读取Hadoop文件系统配置。
例如,您可以通过DataFrame选项传递存储凭据:
df1 = spark.read.format("delta") \
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-1>") \
.read("...")
df2 = spark.read.format("delta") \
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-2>") \
.read("...")
df1.union(df2).write.format("delta") \
.mode("overwrite") \
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-3>") \
.save("...")
val df1 = spark.read.format("delta")
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-1>")
.read("...")
val df2 = spark.read.format("delta")
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-2>")
.read("...")
df1.union(df2).write.format("delta")
.mode("overwrite")
.option("fs.azure.account.key.<storage-account-name>.dfs.core.windows.net", "<storage-account-access-key-3>")
.save("...")
您可以在存储配置中找到存储的Hadoop文件系统配置详细信息。