表流式读写

Delta Lake与Spark Structured Streaming通过readStreamwriteStream深度集成。Delta Lake克服了通常与流式系统和文件相关的许多限制,包括:

  • 在多个流(或并发批处理作业)中保持"恰好一次"处理

  • 高效发现哪些文件是新的,当使用文件作为流的数据源时

对于Delta Lake表上的许多操作,您可以通过在创建新的SparkSession时设置配置来启用与Apache Spark DataSourceV2和Catalog API(自3.0起)的集成。请参阅Configure SparkSession

在本文中:

Delta表作为数据源

当您将Delta表作为流数据源加载并在流式查询中使用时,该查询会处理表中现有的所有数据以及流启动后到达的任何新数据。

spark.readStream.format("delta")
  .load("/tmp/delta/events")

import io.delta.implicits._
spark.readStream.delta("/tmp/delta/events")

限制输入速率

以下选项可用于控制微批次:

  • maxFilesPerTrigger: 每个微批次中要考虑的新文件数量。默认值为1000。

  • maxBytesPerTrigger: 每个微批次处理的数据量。该选项设置了一个"软性上限",意味着一个批次大约会处理这么多数据,但在最小输入单元超过此限制的情况下,可能会处理更多数据以使流查询继续推进。如果在流处理中使用Trigger.Once,则会忽略此选项。默认情况下未设置此值。

如果同时使用maxBytesPerTriggermaxFilesPerTrigger,微批处理会在达到maxFilesPerTriggermaxBytesPerTrigger任一限制时停止处理数据。

注意

当源表事务由于logRetentionDuration configuration配置被清理且流处理滞后时,Delta Lake会处理与源表最新可用事务历史记录对应的数据,但不会使流处理失败。这可能导致数据被丢弃。

忽略更新和删除

结构化流处理(Structured Streaming)无法处理非追加类型的输入,如果作为数据源的表格发生任何修改,系统会抛出异常。对于无法自动向下游传播的变更,主要有两种处理策略:

  • 您可以删除输出和检查点,然后从头开始重新启动流。

  • 您可以设置以下两个选项中的任意一个:

    • ignoreDeletes: 忽略在分区边界删除数据的事务。

    • ignoreChanges: 如果源表中的文件由于数据变更操作(如UPDATEMERGE INTODELETE(分区内)或OVERWRITE)需要重写,则重新处理更新。未更改的行可能仍会被发出,因此您的下游消费者应能够处理重复项。删除操作不会传播到下游。ignoreChanges包含了ignoreDeletes的功能。因此,如果您使用ignoreChanges,您的流将不会因源表的删除或更新操作而中断。

示例

例如,假设您有一个表user_events,其中包含dateuser_emailaction列,并按date进行分区。您从user_events表中流式读取数据,并且由于GDPR要求需要从中删除数据。

当您在分区边界删除数据时(即WHERE条件作用于分区列),文件已经按值进行了分段,因此删除操作只需从元数据中移除这些文件。因此,如果您只想删除某些分区的数据,可以使用:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/tmp/delta/user_events")

然而,如果您必须基于user_email删除数据,那么您需要使用:

spark.readStream.format("delta")
  .option("ignoreChanges", "true")
  .load("/tmp/delta/user_events")

如果您使用UPDATE语句更新user_email,则包含该user_email的文件将被重写。当您使用ignoreChanges时,新记录会与同一文件中所有未更改的记录一起传播到下游。您的逻辑应该能够处理这些传入的重复记录。

指定初始位置

您可以使用以下选项来指定Delta Lake流式数据源的起始点,而无需处理整个表。

  • startingVersion: 指定从哪个Delta Lake版本开始读取。流数据源将读取从该版本(包含)开始的所有表变更。您可以通过DESCRIBE HISTORY命令输出中的version列获取提交版本号。

  • 要仅返回最新更改,请指定latest

  • startingTimestamp: 起始时间戳。流式数据源将读取该时间戳或之后提交的所有表变更(包含该时间戳)。可选参数之一:

    • 一个时间戳字符串。例如,"2019-01-01T00:00:00.000Z"

    • 日期字符串。例如,"2019-01-01"

不能同时设置这两个选项;只能使用其中之一。它们仅在启动新的流式查询时生效。如果流式查询已启动且进度已记录在其检查点中,这些选项将被忽略。

重要

虽然您可以从指定版本或时间戳启动流式数据源,但流式数据源的架构始终是Delta表的最新架构。您必须确保在指定版本或时间戳之后,Delta表没有发生不兼容的架构变更。否则,当使用错误的架构读取数据时,流式数据源可能会返回不正确的结果。

示例

例如,假设您有一个表 user_events。如果您想读取自版本5以来的变更,请使用:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/tmp/delta/user_events")

如果您想读取自2018年10月18日以来的变更,请使用:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/tmp/delta/user_events")

处理初始快照而不丢失数据

当将Delta表用作流数据源时,查询首先会处理表中存在的所有数据。该版本的Delta表称为初始快照。默认情况下,Delta表的数据文件是基于最后修改时间进行处理的。然而,最后修改时间并不一定代表记录事件的时间顺序。

在定义了水印的有状态流查询中,按修改时间处理文件可能导致记录以错误顺序被处理。这可能会使记录因水印机制而被视为迟到事件而丢弃。

您可以通过启用以下选项来避免数据丢失问题:

  • withEventTimeOrder: 初始快照是否应按事件时间顺序处理。

启用事件时间排序后,初始快照数据的事件时间范围会被划分为时间桶。每个微批次通过筛选时间范围内的数据来处理一个桶。maxFilesPerTrigger和maxBytesPerTrigger配置选项仍适用于控制微批次大小,但由于处理特性,这种方式只是近似控制。

下图展示了这一过程:

Initial Snapshot

关于此功能的注意事项:

  • 数据丢失问题仅在以默认顺序处理有状态流查询的初始Delta快照时发生。

  • 一旦流查询启动且初始快照仍在处理中,您无法更改withEventTimeOrder。若要更改withEventTimeOrder后重新启动,需要删除检查点。

  • 如果在启用withEventTimeOrder的情况下运行流查询,在初始快照处理完成之前,无法将其降级到不支持此功能的Delta版本。如果需要降级,可以等待初始快照完成,或者删除检查点并重新启动查询。

  • 以下不常见场景不支持此功能:

    • 事件时间列是一个生成列,在Delta源和水印之间存在非投影转换。

    • 流查询中存在一个水印,该水印有多个Delta源。

  • 启用事件时间排序后,Delta初始快照处理的性能可能会变慢。

  • 每个微批次会扫描初始快照以筛选对应事件时间范围内的数据。为了加快筛选操作,建议使用Delta源列作为事件时间,这样可以应用数据跳过功能(请查看_了解适用条件)。此外,沿事件时间列进行表分区可以进一步加速处理。您可以通过Spark UI查看特定微批次扫描了多少个delta文件。

示例

假设您有一个包含event_time列的表user_events。您的流式查询是一个聚合查询。如果想确保初始快照处理期间不会丢失数据,可以使用:

spark.readStream.format("delta")
  .option("withEventTimeOrder", "true")
  .load("/tmp/delta/user_events")
  .withWatermark("event_time", "10 seconds")

..注意:您也可以通过集群上的Spark配置启用此功能,这将适用于所有流式查询: spark.databricks.delta.withEventTimeOrder.enabled true

追踪非累加式模式变更

您可以提供一个模式跟踪位置,以便从启用了列映射的Delta表中启用流式传输。这通过允许流以精确模式读取过去的表数据(就像表进行了时间旅行一样),克服了非增量模式变更可能导致流中断的问题。

每次对数据源的流式读取都必须指定自己的schemaTrackingLocation。指定的schemaTrackingLocation必须包含在目标表流式写入的checkpointLocation目录内。

注意

对于需要合并多个源Delta表数据的流式工作负载,您需要在每个源表的checkpointLocation中指定唯一目录。

示例

选项 schemaTrackingLocation 用于指定模式跟踪的路径,如下面的代码示例所示:

checkpoint_path = "/path/to/checkpointLocation"

(spark.readStream
  .option("schemaTrackingLocation", checkpoint_path)
  .table("delta_source_table")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .toTable("output_table")
)

Delta表作为接收端

您还可以使用结构化流(Structured Streaming)将数据写入Delta表。事务日志使Delta Lake能够确保精确一次(exactly-once)处理,即使有其他流或批处理查询同时对该表进行操作。

注意

Delta Lake的VACUUM函数会移除所有未被Delta Lake管理的文件,但会跳过任何以_开头的目录。您可以安全地将检查点与其他数据和元数据一起存储在Delta表中,使用类似/_checkpoints的目录结构。

在本节中:

追加模式

默认情况下,流以追加模式运行,该模式会将新记录添加到表中。

你可以使用路径方法:

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/_checkpoints/")
  .start("/delta/events")
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .start("/tmp/delta/events")

import io.delta.implicits._
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .delta("/tmp/delta/events")

或者使用toTable方法(在Spark 3.1及更高版本中)如下:

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Complete mode

你也可以使用结构化流式处理(Structured Streaming)来用每个批次替换整个表。一个示例用例是通过聚合计算摘要:

(spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")
)
spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")

前面的示例持续更新一个包含按客户分组的事件聚合数量的表。

对于延迟要求较为宽松的应用程序,您可以使用一次性触发器来节省计算资源。通过设定特定计划来更新汇总聚合表,仅处理自上次更新以来到达的新数据。

foreachBatch中实现幂等的表写入

注意

适用于 Delta Lake 2.0.0 及以上版本。

命令 foreachBatch 允许您指定一个函数,该函数在流查询中任意转换后的每个微批次输出上执行。这使得可以实现一个 foreachBatch 函数,将微批次输出写入一个或多个目标 Delta 表。然而,foreachBatch 并不能保证这些写入是幂等的,因为这些写入尝试缺乏批次是否正在重新执行的信息。例如,重新运行失败的批次可能导致数据重复写入。

为了解决这个问题,Delta表支持以下DataFrameWriter选项以使写入操作具有幂等性:

  • txnAppId: 一个唯一的字符串,你可以在每次DataFrame写入时传递。例如,你可以使用StreamingQuery ID作为txnAppId

  • txnVersion: 一个单调递增的数字,用作事务版本号。

Delta表使用txnAppIdtxnVersion的组合来识别重复写入并忽略它们。

如果批量写入因故障中断,重新运行该批次会使用相同的应用程序和批次ID,这将有助于运行时正确识别重复写入并忽略它们。应用程序ID(txnAppId)可以是任何用户生成的唯一字符串,不需要与流ID相关联。

警告

如果删除流式检查点并使用新的检查点重新启动查询,必须提供一个不同的appId;否则,重新启动的查询中的写入操作将被忽略,因为它将包含相同的txnAppId,且批次ID将从0开始。

同样的DataFrameWriter选项可用于在非流式作业中实现幂等写入。详情请参阅Idempotent writes

示例

app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}