表删除、更新与合并

Delta Lake 支持多种语句,便于从 Delta 表中删除数据和更新数据。

从表中删除数据

您可以从Delta表中删除符合条件的数据。例如,在一个名为people10m的表或路径为/tmp/delta/people-10m的位置,要删除birthDate列中值早于1955年的所有人员对应行,可以运行以下命令:

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

查看配置SparkSession了解启用SQL命令支持的步骤。

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
import io.delta.tables.*;
import org.apache.spark.sql.functions;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m");

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'");

// Declare the predicate by using Spark SQL functions.
deltaTable.delete(functions.col("birthDate").lt(functions.lit("1955-01-01")));

详情请参阅Delta Lake APIs

重要

delete 会从Delta表的最新版本中删除数据,但在显式执行vacuum清理旧版本之前,不会从物理存储中移除这些数据。详情请参阅vacuum

提示

在可能的情况下,对分区Delta表的分区列提供谓词条件,因为这类谓词可以显著加快操作速度。

更新表

您可以更新Delta表中符合谓词条件的数据。例如,在名为people10m的表或路径为/tmp/delta/people-10m的表中,要将gender列中的缩写从MF更改为MaleFemale,可以运行以下命令:

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

查看配置SparkSession了解启用SQL命令支持的步骤。

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/data/events/");

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  new HashMap<String, String>() {{
    put("gender", "'Female'");
  }}
);

// Declare the predicate by using Spark SQL functions.
deltaTable.update(
  functions.col(gender).eq("M"),
  new HashMap<String, Column>() {{
    put("gender", functions.lit("Male"));
  }}
);

详情请参阅Delta Lake APIs

提示

与删除操作类似,更新操作在分区上使用谓词可以显著提高速度。

使用merge操作对表进行更新插入

您可以通过使用MERGE SQL操作,将源表、视图或DataFrame中的数据更新插入到目标Delta表中。Delta Lake支持在MERGE中进行插入、更新和删除操作,并支持超出SQL标准的扩展语法,以支持高级用例。

假设您有一个名为people10mupdates的源表,或者位于/tmp/delta/people-10m-updates的源路径,其中包含名为people10m的目标表或位于/tmp/delta/people-10m的目标路径的新数据。这些新记录中的部分可能已经存在于目标数据中。为了合并新数据,您希望更新那些id已存在的行,并在没有匹配id时插入新行。您可以运行以下操作:

MERGE INTO people10m
USING people10mupdates
ON people10m.id = people10mupdates.id
WHEN MATCHED THEN
  UPDATE SET
    id = people10mupdates.id,
    firstName = people10mupdates.firstName,
    middleName = people10mupdates.middleName,
    lastName = people10mupdates.lastName,
    gender = people10mupdates.gender,
    birthDate = people10mupdates.birthDate,
    ssn = people10mupdates.ssn,
    salary = people10mupdates.salary
WHEN NOT MATCHED
  THEN INSERT (
    id,
    firstName,
    middleName,
    lastName,
    gender,
    birthDate,
    ssn,
    salary
  )
  VALUES (
    people10mupdates.id,
    people10mupdates.firstName,
    people10mupdates.middleName,
    people10mupdates.lastName,
    people10mupdates.gender,
    people10mupdates.birthDate,
    people10mupdates.ssn,
    people10mupdates.salary
  )

查看配置SparkSession了解启用SQL命令支持的步骤。

from delta.tables import *

deltaTablePeople = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
deltaTablePeopleUpdates = DeltaTable.forPath(spark, '/tmp/delta/people-10m-updates')

dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople.alias('people') \
  .merge(
    dfUpdates.alias('updates'),
    'people.id = updates.id'
  ) \
  .whenMatchedUpdate(set =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "id": "updates.id",
      "firstName": "updates.firstName",
      "middleName": "updates.middleName",
      "lastName": "updates.lastName",
      "gender": "updates.gender",
      "birthDate": "updates.birthDate",
      "ssn": "updates.ssn",
      "salary": "updates.salary"
    }
  ) \
  .execute()
import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTablePeople = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
val deltaTablePeopleUpdates = DeltaTable.forPath(spark, "tmp/delta/people-10m-updates")
val dfUpdates = deltaTablePeopleUpdates.toDF()

deltaTablePeople
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched
  .updateExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .whenNotMatched
  .insertExpr(
    Map(
      "id" -> "updates.id",
      "firstName" -> "updates.firstName",
      "middleName" -> "updates.middleName",
      "lastName" -> "updates.lastName",
      "gender" -> "updates.gender",
      "birthDate" -> "updates.birthDate",
      "ssn" -> "updates.ssn",
      "salary" -> "updates.salary"
    ))
  .execute()
import io.delta.tables.*;
import org.apache.spark.sql.functions;
import java.util.HashMap;

DeltaTable deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
Dataset<Row> dfUpdates = spark.read("delta").load("/tmp/delta/people-10m-updates")

deltaTable
  .as("people")
  .merge(
    dfUpdates.as("updates"),
    "people.id = updates.id")
  .whenMatched()
  .updateExpr(
    new HashMap<String, String>() {{
      put("id", "updates.id");
      put("firstName", "updates.firstName");
      put("middleName", "updates.middleName");
      put("lastName", "updates.lastName");
      put("gender", "updates.gender");
      put("birthDate", "updates.birthDate");
      put("ssn", "updates.ssn");
      put("salary", "updates.salary");
    }})
  .whenNotMatched()
  .insertExpr(
    new HashMap<String, String>() {{
      put("id", "updates.id");
      put("firstName", "updates.firstName");
      put("middleName", "updates.middleName");
      put("lastName", "updates.lastName");
      put("gender", "updates.gender");
      put("birthDate", "updates.birthDate");
      put("ssn", "updates.ssn");
      put("salary", "updates.salary");
    }})
  .execute();

请参阅Delta Lake APIs获取Scala、Java和Python语法详情。

重要

Delta Lake合并操作通常需要对源数据进行两次扫描。如果源数据包含非确定性表达式,多次扫描源数据可能会产生不同的行,从而导致错误结果。非确定性表达式的一些常见例子包括current_datecurrent_timestamp函数。在Delta Lake 2.2及以上版本中,这个问题通过自动将源数据物化为合并命令的一部分得到解决,使得源数据在多次扫描时保持确定性。在Delta Lake 2.1及以下版本中,如果无法避免使用非确定性函数,建议将源数据保存到存储中,例如作为临时Delta表。缓存源数据可能无法解决此问题,因为缓存失效可能导致源数据被部分或完全重新计算(例如当集群在缩容时丢失部分执行器时)。

使用merge修改所有不匹配的行

注意

WHEN NOT MATCHED BY SOURCE 子句在Delta 2.3及以上版本中受到Scala、Python和Java Delta Lake APIs的支持。SQL支持从Delta 2.4及以上版本开始提供。

你可以使用WHEN NOT MATCHED BY SOURCE子句来UPDATEDELETE目标表中与源表不匹配的记录。我们建议添加一个可选的条件子句,以避免完全重写目标表。

以下代码示例展示了使用此功能进行删除的基本语法,用源表的内容覆盖目标表并删除目标表中不匹配的记录。

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)
targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED
  UPDATE SET *
WHEN NOT MATCHED
  INSERT *
WHEN NOT MATCHED BY SOURCE
  DELETE

以下示例向WHEN NOT MATCHED BY SOURCE子句添加条件,并指定要在未匹配目标行中更新的值。

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatchedUpdate(
    set = {"target.lastSeen": "source.timestamp"}
  )
  .whenNotMatchedInsert(
    values = {
      "target.key": "source.key",
      "target.lastSeen": "source.timestamp",
      "target.status": "'active'"
    }
  )
  .whenNotMatchedBySourceUpdate(
    condition="target.lastSeen >= (current_date() - INTERVAL '5' DAY)",
    set = {"target.status": "'inactive'"}
  )
  .execute()
)
targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenMatched()
  .updateExpr(Map("target.lastSeen" -> "source.timestamp"))
  .whenNotMatched()
  .insertExpr(Map(
    "target.key" -> "source.key",
    "target.lastSeen" -> "source.timestamp",
    "target.status" -> "'active'",
    )
  )
  .whenNotMatchedBySource("target.lastSeen >= (current_date() - INTERVAL '5' DAY)")
  .updateExpr(Map("target.status" -> "'inactive'"))
  .execute()
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET target.lastSeen = source.timestamp
WHEN NOT MATCHED THEN
  INSERT (key, lastSeen, status) VALUES (source.key,  source.timestamp, 'active')
WHEN NOT MATCHED BY SOURCE AND target.lastSeen >= (current_date() - INTERVAL '5' DAY) THEN
  UPDATE SET target.status = 'inactive'

操作语义

以下是关于merge编程操作的详细说明。

  • 可以有任意数量的whenMatchedwhenNotMatched子句。

  • whenMatched 子句在源行根据匹配条件与目标表行匹配时执行。这些子句具有以下语义。

    • whenMatched 子句最多只能有一个 update 和一个 delete 操作。merge 中的 update 操作仅更新匹配目标行的指定列(类似于 update operation)。delete 操作会删除匹配的行。

    • 每个whenMatched子句可以包含一个可选条件。如果存在该子句条件,则只有当子句条件为真时,才会对任何匹配的源-目标行对执行updatedelete操作。

    • 如果存在多个whenMatched子句,则按指定顺序依次评估。除最后一个外,所有whenMatched子句都必须包含条件。

    • 如果对于匹配合并条件的源行和目标行对,没有任何whenMatched条件评估为真,则目标行保持不变。

    • 要使用源数据集中对应的列更新目标Delta表的所有列,可以使用whenMatched(...).updateAll()。这等同于:

      whenMatched(...).updateExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      针对目标Delta表的所有列执行此操作。因此,此操作假设源表具有与目标表相同的列,否则查询将抛出分析错误。

      注意

      当启用自动模式迁移时,此行为会发生变化。详情请参阅Automatic schema evolution

  • whenNotMatched 子句在源行根据匹配条件未找到任何目标行匹配时执行。这些子句具有以下语义。

    • whenNotMatched 子句只能包含 insert 操作。新行是根据指定的列和相应表达式生成的。您不需要指定目标表中的所有列。对于未指定的目标列,将插入 NULL 值。

    • 每个whenNotMatched子句可以包含一个可选条件。如果存在子句条件,则仅当该条件对该行成立时才会插入源行。否则,将忽略源列。

    • 如果有多个whenNotMatched子句,则按它们指定的顺序进行评估。除最后一个外,所有whenNotMatched子句都必须有条件。

    • 要使用源数据集的所有列插入目标Delta表的所有对应列,请使用whenNotMatched(...).insertAll()。这相当于:

      whenNotMatched(...).insertExpr(Map("col1" -> "source.col1", "col2" -> "source.col2", ...))
      

      针对目标Delta表的所有列执行此操作。因此,此操作假设源表具有与目标表相同的列,否则查询将抛出分析错误。

      注意

      当启用自动模式迁移时,此行为会发生变化。详情请参阅Automatic schema evolution

  • whenNotMatchedBySource 子句在目标行根据合并条件不匹配任何源行时执行。这些子句具有以下语义。

    • whenNotMatchedBySource 子句可以指定 deleteupdate 操作。

    • 每个whenNotMatchedBySource子句可以包含一个可选条件。如果存在子句条件,则仅当该条件对该行成立时才会修改目标行。否则,目标行将保持不变。

    • 如果有多个whenNotMatchedBySource子句,则按它们指定的顺序进行评估。除最后一个外,所有whenNotMatchedBySource子句都必须有条件。

    • 根据定义,whenNotMatchedBySource子句没有源行可以提取列值,因此无法引用源列。对于要修改的每个列,您可以指定一个字面量或对目标列执行操作,例如SET target.deleted_count = target.deleted_count + 1

重要

  • 如果源数据集的多行匹配,并且合并操作尝试更新目标Delta表的相同行,merge操作可能会失败。根据合并的SQL语义,这种更新操作是不明确的,因为不清楚应该使用哪一行源数据来更新匹配的目标行。您可以预处理源表以消除多次匹配的可能性。参见变更数据捕获示例——它展示了如何在将变更应用到目标Delta表之前预处理变更数据集(即源数据集),仅保留每个键的最新变更。

  • 你可以在SQL视图上应用MERGE操作,前提是该视图被定义为CREATE VIEW viewName AS SELECT * FROM deltaTable

模式验证

merge 会自动验证由插入和更新表达式生成的数据模式是否与表的模式兼容。它使用以下规则来确定 merge 操作是否兼容:

  • 对于updateinsert操作,指定的目标列必须存在于目标Delta表中。

  • 对于updateAllinsertAll操作,源数据集必须包含目标Delta表的所有列。源数据集可以包含额外的列,这些列将被忽略。

如果您不希望忽略额外的列,而是希望更新目标表模式以包含新列,请参阅自动模式演进

  • 对于所有操作,如果生成目标列的表达式所产生的数据类型与目标Delta表中对应的列不同,merge会尝试将它们转换为表中的类型。

自动模式演进

模式演进允许用户在合并时解决目标表和源表之间的模式不匹配问题。它处理以下两种情况:

  1. 源表中的某个列在目标表中不存在。该新列会被添加到目标表的架构中,并使用源值插入或更新其值。

  2. 目标表中的某个列在源表中不存在。目标表结构保持不变;额外目标列中的值要么保持不变(对于UPDATE操作),要么被设为NULL(对于INSERT操作)。

重要

要使用模式演进功能,必须在运行merge命令前将Spark会话配置`spark.databricks.delta.schema.autoMerge.enabled`设为true

注意

在Delta 2.3及以上版本中,可以通过名称指定源表中存在的列用于插入或更新操作。在Delta 2.2及以下版本中,只有INSERT *UPDATE SET *操作可以用于通过merge实现模式演进。

以下是使用和不使用模式演进的merge操作效果的一些示例。

查询(SQL)

无模式演进时的行为(默认)

带模式演进时的行为

目标列: key, value

源列: key, value, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

表结构保持不变;仅更新/插入列keyvalue

表结构已更改为(key, value, new_value)。源数据中匹配的现有记录将使用valuenew_value进行更新。新行将按照(key, value, new_value)的结构插入。

目标列: key, old_value

源列: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *

UPDATEINSERT 操作会抛出错误,因为目标列 old_value 不在源数据中。

表模式已更改为 (key, old_value, new_value)。匹配到的现有记录会使用源中的 new_value 进行更新,同时保持 old_value 不变。新记录会插入指定的 keynew_value,并将 old_value 设为 NULL

目标列: key, old_value

源列: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN MATCHED
  THEN UPDATE SET new_value = s.new_value

UPDATE 抛出错误,因为目标表中不存在列 new_value

表结构被更改为(key, old_value, new_value)。源数据中匹配的现有记录会更新为new_value而保持old_value不变,未匹配的记录则在new_value字段填入NULL

目标列: key, old_value

源列: key, new_value

MERGE INTO target_table t
USING source_table s
ON t.key = s.key
WHEN NOT MATCHED
  THEN INSERT (key, new_value) VALUES (s.key, s.new_value)

INSERT 抛出错误,因为目标表中不存在列 new_value

表结构已更改为(key, old_value, new_value)。新插入的记录会使用指定的keynew_value,并将old_value设为NULL。现有记录的new_value会被设为NULL,而old_value保持不变。参见注释(1)

包含结构体数组的模式的特殊注意事项

Delta MERGE INTO 支持通过名称解析结构体字段,并支持结构体数组的模式演进。启用模式演进后,目标表的模式将对结构体数组进行演进,这也适用于数组内的任何嵌套结构体。

注意

在Delta 2.3及以上版本中,可以通过名称在插入或更新命令中指定源表中存在的结构体字段。在Delta 2.2及以下版本中,只能使用INSERT *UPDATE SET *命令进行模式演化的合并操作。

以下是针对结构体数组进行合并操作时,使用和不使用模式演化的效果对比示例。

源模式

目标模式

无模式演进的行为(默认)

带模式演进的行为

array>

array>

表结构保持不变。列将通过名称解析并进行更新或插入。

表结构保持不变。列将通过名称解析并进行更新或插入。

array>

array>

updateinsert 会抛出错误,因为目标表中不存在 cd

表结构已更改为array>。对于目标表中的现有条目,cd被插入为NULLupdateinsert操作将源表中的条目填充为a转换为字符串类型,而b则为NULL

array>>

array>>

updateinsert 会抛出错误,因为目标表中不存在 d

目标表的模式已更改为array>>。对于目标表中的现有条目,d被插入为NULL

性能调优

您可以通过以下方法减少合并所需的时间:

  • 缩小匹配搜索范围:默认情况下,merge操作会搜索整个Delta表来寻找源表中的匹配项。加速merge的一种方法是通过在匹配条件中添加已知约束来缩小搜索范围。例如,假设您有一个按countrydate分区的表,并且您想使用merge来更新最近一天和特定国家/地区的信息。添加条件

    events.date = current_date() AND events.country = 'USA'
    

    将使查询更快,因为它只在相关分区中查找匹配项。此外,这还将减少与其他并发操作发生冲突的可能性。有关更多详细信息,请参阅Concurrency control

  • 压缩文件: 如果数据存储在多个小文件中,读取数据以搜索匹配项可能会变慢。您可以将小文件压缩成较大的文件以提高读取吞吐量。详情请参阅压缩文件

  • 控制写入时的shuffle分区数: merge操作会多次shuffle数据以计算并写入更新后的数据。用于shuffle的任务数由Spark会话配置spark.sql.shuffle.partitions控制。设置此参数不仅控制并行度,还决定输出文件的数量。增加该值会提高并行度,但也会生成更多小数据文件。

  • 写入前重新分区输出数据: 对于分区表,merge操作可能会产生比shuffle分区数量多得多的细小文件。这是因为每个shuffle任务都可以在多个分区中写入多个文件,这可能成为性能瓶颈。在许多情况下,在写入前按表的分区列对输出数据进行重新分区会有所帮助。您可以通过将Spark会话配置spark.databricks.delta.merge.repartitionBeforeWrite.enabled设置为true来启用此功能。

合并示例

以下是几个在不同场景中使用merge的示例。

写入Delta表时的数据去重

一个常见的ETL用例是通过将日志追加到表中来收集到Delta表中。然而,数据源经常可能生成重复的日志记录,下游需要去重步骤来处理它们。使用merge可以避免插入重复记录。

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId
WHEN NOT MATCHED
  THEN INSERT *
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId") \
  .whenNotMatchedInsertAll() \
  .execute()
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute()
deltaTable
  .as("logs")
  .merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId")
  .whenNotMatched()
  .insertAll()
  .execute();

注意

包含新日志的数据集需要在其内部进行去重。根据merge的SQL语义,它会将新数据与表中的现有数据进行匹配和去重,但如果新数据集中存在重复数据,则会插入这些重复项。因此,在合并到表之前,需要先对新数据进行去重处理。

如果您知道可能仅在几天内出现重复记录,可以通过按日期对表进行分区,然后指定目标表的日期范围进行匹配,从而进一步优化查询。

MERGE INTO logs
USING newDedupedLogs
ON logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS
WHEN NOT MATCHED AND newDedupedLogs.date > current_date() - INTERVAL 7 DAYS
  THEN INSERT *
deltaTable.alias("logs").merge(
    newDedupedLogs.alias("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS") \
  .whenNotMatchedInsertAll("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS") \
  .execute()
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute()
deltaTable.as("logs").merge(
    newDedupedLogs.as("newDedupedLogs"),
    "logs.uniqueId = newDedupedLogs.uniqueId AND logs.date > current_date() - INTERVAL 7 DAYS")
  .whenNotMatched("newDedupedLogs.date > current_date() - INTERVAL 7 DAYS")
  .insertAll()
  .execute();

这比之前的命令更高效,因为它只在最近7天的日志中查找重复项,而不是整个表。此外,您可以将这种仅插入的合并与结构化流式处理一起使用,以持续对日志进行去重。

  • 在流式查询中,您可以在foreachBatch中使用合并操作,将任何流数据持续写入具有去重功能的Delta表。有关foreachBatch的更多信息,请参阅以下streaming example

  • 在另一个流式查询中,您可以持续从此Delta表中读取去重后的数据。这是可行的,因为仅插入合并操作只会向Delta表追加新数据。

将缓慢变化数据(SCD)类型2操作应用到Delta表

另一个常见操作是SCD类型2,它维护维度表中每个键的所有变更历史。这类操作需要更新现有行以将键的旧值标记为过期,同时插入新行作为最新值。给定包含更新的源表和存储维度数据的目标表,SCD类型2可以通过merge操作实现。

以下是一个具体示例,展示了如何维护客户地址历史记录以及每个地址的有效日期范围。当需要更新客户地址时,必须将旧地址标记为非当前地址,更新其有效日期范围,并将新地址添加为当前地址。

val customersTable: DeltaTable = ...   // table with schema (customerId, address, current, effectiveDate, endDate)

val updatesDF: DataFrame = ...          // DataFrame with schema (customerId, address, effectiveDate)

// Rows to INSERT new addresses of existing customers
val newAddressesToInsert = updatesDF
  .as("updates")
  .join(customersTable.toDF.as("customers"), "customerid")
  .where("customers.current = true AND updates.address <> customers.address")

// Stage the update by unioning two sets of rows
// 1. Rows that will be inserted in the whenNotMatched clause
// 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customers
val stagedUpdates = newAddressesToInsert
  .selectExpr("NULL as mergeKey", "updates.*")   // Rows for 1.
  .union(
    updatesDF.selectExpr("updates.customerId as mergeKey", "*")  // Rows for 2.
  )

// Apply SCD Type 2 operation using merge
customersTable
  .as("customers")
  .merge(
    stagedUpdates.as("staged_updates"),
    "customers.customerId = mergeKey")
  .whenMatched("customers.current = true AND customers.address <> staged_updates.address")
  .updateExpr(Map(                                      // Set current to false and endDate to source's effective date.
    "current" -> "false",
    "endDate" -> "staged_updates.effectiveDate"))
  .whenNotMatched()
  .insertExpr(Map(
    "customerid" -> "staged_updates.customerId",
    "address" -> "staged_updates.address",
    "current" -> "true",
    "effectiveDate" -> "staged_updates.effectiveDate",  // Set current to true along with the new address and its effective date.
    "endDate" -> "null"))
  .execute()
customersTable = ...  # DeltaTable with schema (customerId, address, current, effectiveDate, endDate)

updatesDF = ...       # DataFrame with schema (customerId, address, effectiveDate)

# Rows to INSERT new addresses of existing customers
newAddressesToInsert = updatesDF \
  .alias("updates") \
  .join(customersTable.toDF().alias("customers"), "customerid") \
  .where("customers.current = true AND updates.address <> customers.address")

# Stage the update by unioning two sets of rows
# 1. Rows that will be inserted in the whenNotMatched clause
# 2. Rows that will either update the current addresses of existing customers or insert the new addresses of new customers
stagedUpdates = (
  newAddressesToInsert
  .selectExpr("NULL as mergeKey", "updates.*")   # Rows for 1
  .union(updatesDF.selectExpr("updates.customerId as mergeKey", "*"))  # Rows for 2.
)

# Apply SCD Type 2 operation using merge
customersTable.alias("customers").merge(
  stagedUpdates.alias("staged_updates"),
  "customers.customerId = mergeKey") \
.whenMatchedUpdate(
  condition = "customers.current = true AND customers.address <> staged_updates.address",
  set = {                                      # Set current to false and endDate to source's effective date.
    "current": "false",
    "endDate": "staged_updates.effectiveDate"
  }
).whenNotMatchedInsert(
  values = {
    "customerid": "staged_updates.customerId",
    "address": "staged_updates.address",
    "current": "true",
    "effectiveDate": "staged_updates.effectiveDate",  # Set current to true along with the new address and its effective date.
    "endDate": "null"
  }
).execute()

将变更数据写入Delta表

与SCD类似,另一个常见用例(通常称为变更数据捕获(CDC))是将外部数据库生成的所有数据变更应用到Delta表中。换句话说,需要将应用于外部表的一系列更新、删除和插入操作同步到Delta表中。您可以通过如下方式使用merge来实现这一功能。

val deltaTable: DeltaTable = ... // DeltaTable with schema (key, value)

// DataFrame with changes having following columns
// - key: key of the change
// - time: time of change for ordering between changes (can replaced by other ordering id)
// - newValue: updated or inserted value if key was not deleted
// - deleted: true if the key was deleted, false if the key was inserted or updated
val changesDF: DataFrame = ...

// Find the latest change for each key based on the timestamp
// Note: For nested structs, max on struct is computed as
// max on first struct field, if equal fall back to second fields, and so on.
val latestChangeForEachKey = changesDF
  .selectExpr("key", "struct(time, newValue, deleted) as otherCols" )
  .groupBy("key")
  .agg(max("otherCols").as("latest"))
  .selectExpr("key", "latest.*")

deltaTable.as("t")
  .merge(
    latestChangeForEachKey.as("s"),
    "s.key = t.key")
  .whenMatched("s.deleted = true")
  .delete()
  .whenMatched()
  .updateExpr(Map("key" -> "s.key", "value" -> "s.newValue"))
  .whenNotMatched("s.deleted = false")
  .insertExpr(Map("key" -> "s.key", "value" -> "s.newValue"))
  .execute()
deltaTable = ... # DeltaTable with schema (key, value)

# DataFrame with changes having following columns
# - key: key of the change
# - time: time of change for ordering between changes (can replaced by other ordering id)
# - newValue: updated or inserted value if key was not deleted
# - deleted: true if the key was deleted, false if the key was inserted or updated
changesDF = spark.table("changes")

# Find the latest change for each key based on the timestamp
# Note: For nested structs, max on struct is computed as
# max on first struct field, if equal fall back to second fields, and so on.
latestChangeForEachKey = changesDF \
  .selectExpr("key", "struct(time, newValue, deleted) as otherCols") \
  .groupBy("key") \
  .agg(max("otherCols").alias("latest")) \
  .select("key", "latest.*") \

deltaTable.alias("t").merge(
    latestChangeForEachKey.alias("s"),
    "s.key = t.key") \
  .whenMatchedDelete(condition = "s.deleted = true") \
  .whenMatchedUpdate(set = {
    "key": "s.key",
    "value": "s.newValue"
  }) \
  .whenNotMatchedInsert(
    condition = "s.deleted = false",
    values = {
      "key": "s.key",
      "value": "s.newValue"
    }
  ).execute()

使用foreachBatch从流式查询中进行更新插入

你可以结合使用mergeforeachBatch(更多信息请参阅foreachbatch),将流式查询中的复杂upsert操作写入Delta表。例如:

  • 以更新模式写入流式聚合:这比完全模式高效得多。

import io.delta.tables.*

val deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

// Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
  deltaTable.as("t")
    .merge(
      microBatchOutputDF.as("s"),
      "s.key = t.key")
    .whenMatched().updateAll()
    .whenNotMatched().insertAll()
    .execute()
}

// Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/aggregates")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertToDelta(microBatchOutputDF, batchId):
  deltaTable.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.key = t.key") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()
}

# Write the output of a streaming aggregation query into Delta table
streamingAggregatesDF.writeStream \
  .format("delta") \
  .foreachBatch(upsertToDelta) \
  .outputMode("update") \
  .start()

注意

  • 确保在foreachBatch中的merge语句是幂等的,因为流查询的重启可能会对同一批数据多次应用该操作。

  • 当在foreachBatch中使用merge时,流式查询的输入数据速率(通过StreamingQueryProgress报告并在笔记本速率图中可见)可能会显示为数据源实际生成速率的倍数。这是因为merge会多次读取输入数据,导致输入指标被放大。如果这成为性能瓶颈,您可以在merge之前缓存批处理DataFrame,然后在merge之后取消缓存。