并发控制
Delta Lake 在读写操作之间提供ACID事务保证。这意味着:
对于支持的存储系统,跨多个集群的多个写入者可以同时修改表分区,并看到表的一致快照视图,这些写入将按串行顺序执行。
即使表在作业期间被修改,读取器仍能看到Apache Spark作业开始时表的统一快照视图。
在本文中:
乐观并发控制
Delta Lake 采用 乐观并发控制机制来保障写入操作的事务性。该机制下,写入操作分为三个阶段:
读取: 读取(如果需要)表的最新可用版本,以确定需要修改(即重写)的文件。
写入: 通过写入新的数据文件来暂存所有变更。
验证并提交: 在提交更改之前,检查提议的更改是否与自读取快照以来可能已并发提交的任何其他更改发生冲突。如果没有冲突,所有暂存的更改将作为一个新的版本化快照提交,写入操作成功。然而,如果存在冲突,写入操作将因并发修改异常而失败,而不是像在Parquet表上执行写入操作那样导致表损坏。
写入冲突
下表描述了哪些写入操作对可能发生冲突。Compaction指的是使用选项dataChange设置为false执行的文件压缩操作。
插入 |
更新、删除、合并到 |
压缩 |
|
|---|---|---|---|
INSERT |
不会产生冲突 |
||
UPDATE, DELETE, MERGE INTO |
可能冲突 |
可能冲突 |
|
COMPACTION |
不会冲突 |
可能冲突 |
可能冲突 |
通过分区和互斥命令条件避免冲突
在所有标记为"可能冲突"的情况下,两个操作是否会发生冲突取决于它们是否操作同一组文件。您可以通过按照操作条件中使用的相同列对表进行分区,使两组文件互不相交。例如,如果表未按日期分区,UPDATE table WHERE date > '2010-01-01' ...和DELETE table WHERE date < '2010-01-01'这两个命令可能会冲突,因为它们都可能尝试修改相同的文件集。通过date对表进行分区可以避免这种冲突。因此,根据命令中常用的条件对表进行分区可以显著减少冲突。然而,对具有高基数列的表进行分区可能会由于子目录数量过多而导致其他性能问题。
冲突异常
当发生事务冲突时,您将观察到以下异常之一:
ConcurrentAppendException
当并发操作在您的操作读取的同一分区(或未分区表中的任何位置)添加文件时,会发生此异常。文件添加可能由INSERT、DELETE、UPDATE或MERGE操作引起。
此异常通常发生在并发执行DELETE、UPDATE或MERGE操作时。虽然并发操作可能在物理上更新不同的分区目录,但其中一个操作可能会读取另一个操作正在并发更新的相同分区,从而导致冲突。您可以通过在操作条件中明确分隔来避免这种情况。请考虑以下示例。
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
假设您针对不同日期或国家并发运行上述代码。由于每个作业都在目标Delta表的一个独立分区上操作,您预计不会发生任何冲突。然而,该条件不够明确,可能会扫描整个表,并可能与更新其他分区的并发操作产生冲突。相反,您可以重写语句,在合并条件中添加具体的日期和国家,如下例所示。
// Target 'deltaTable' is partitioned by date and country
deltaTable.as("t").merge(
source.as("s"),
"s.user_id = t.user_id AND s.date = t.date AND s.country = t.country AND t.date = '" + <date> + "' AND t.country = '" + <country> + "'")
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
现在可以安全地在不同日期和国家并发运行此操作。
ConcurrentDeleteReadException
当并发操作删除了您的操作所读取的文件时,会发生此异常。常见原因是执行了DELETE、UPDATE或MERGE操作重写了文件。
ConcurrentDeleteDeleteException
当并发操作删除了您的操作也试图删除的文件时,会发生此异常。这可能是由两个并发压缩操作重写相同文件引起的。
MetadataChangedException
当并发事务更新Delta表的元数据时会发生此异常。常见原因包括ALTER TABLE操作或对Delta表的写入操作更新了表结构。
ConcurrentTransactionException
如果多次同时启动使用相同检查点位置的流式查询,并尝试同时写入Delta表。您绝不应该让两个流式查询使用相同的检查点位置并同时运行。
ProtocolChangedException
在以下情况下可能会出现此异常:
当您的Delta表升级到新版本时。为了确保后续操作成功,您可能需要升级您的Delta Lake版本。
当多个写入者同时创建或替换表时。
当多个写入者同时向空路径写入时。