结构化流处理编程指南
- 概述
- 快速示例
- 编程模型
- 使用数据集和数据框的API
- 异步进度跟踪
- 连续处理
- 附加信息
- 迁移指南
概述
结构化流处理是建立在Spark SQL引擎上的可扩展和容错的流处理引擎。您可以以与在静态数据上表达批处理计算相同的方式来表达流计算。Spark SQL引擎将负责增量和持续地运行它,并在流数据持续到达时更新最终结果。您可以使用 Dataset/DataFrame API 在Scala、Java、Python或R中表达流聚合、事件时间窗口、流到批处理连接等。计算在相同的优化的Spark SQL引擎上执行。最后,系统通过检查点和写前日志确保端到端的准确一次容错保证。简而言之, 结构化流处理提供快速、可扩展、容错、端到端准确一次的流处理,而用户无需考虑流处理。
在内部,默认情况下,结构化流查询使用一个 微批处理 引擎处理,这个引擎将数据流处理为一系列小批量作业,从而实现低至100毫秒的端到端延迟和一次性容错保证。然而,自从Spark 2.3以来,我们引入了一种新的低延迟处理模式,称为 连续处理 ,该模式可以实现低至1毫秒的端到端延迟,并且具备至少一次的保证。在不改变查询中的Dataset/DataFrame操作的情况下,您将能够根据应用程序需求选择模式。
在本指南中,我们将引导您了解编程模型和API。我们将主要使用默认的微批处理模型来解释这些概念,然后 稍后 讨论连续处理模型。首先,让我们从一个结构化流处理查询的简单示例开始——流式单词计数。
快速示例
假设你想要维护从监听TCP套接字的数据服务器接收到的文本数据的实时字数统计。让我们看看如何使用结构化流来表达这个。你可以在 Scala / Java / Python / R 中查看完整代码。如果你 下载Spark ,你可以直接 运行示例 。无论如何,让我们一步一步地走过这个例子,理解它是如何工作的。首先,我们必须导入必要的类并创建一个本地SparkSession,这是与Spark相关的所有功能的起点。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("结构化网络单词计数") \
.getOrCreate()
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import java.util.Arrays;
import java.util.Iterator;
SparkSession spark = SparkSession
.builder()
.appName("JavaStructuredNetworkWordCount")
.getOrCreate();
sparkR.session(appName = "结构化网络单词计数")
接下来,让我们创建一个表示来自监听在localhost:9999的服务器接收的文本数据的流式DataFrame,并转换DataFrame以计算词频。
# 创建一个DataFrame,代表来自 localhost:9999 的输入行流
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# 将行分割成单词
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# 生成实时单词计数
wordCounts = words.groupBy("word").count()
这个
lines
DataFrame 表示一个包含流文本数据的无限表格。这个表格包含一列字符串,名称为“value”,并且流文本数据中的每一行成为表中的一行。请注意,目前没有接收任何数据,因为我们只是在设置转换,还没有开始。接下来,我们使用了两个内置的 SQL 函数 - split 和 explode,将每一行拆分为多个包含一个单词的行。此外,我们使用
alias
函数将新列命名为“word”。最后,我们通过对数据集中唯一值的分组和计数定义了
wordCounts
DataFrame。请注意,这是一个流 DataFrame,表示流的运行单词计数。
// 创建一个 DataFrame,表示从连接到 localhost:9999 的输入行流
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 将行分割成单词
val words = lines.as[String].flatMap(_.split(" "))
// 生成运行中的单词计数
val wordCounts = words.groupBy("value").count()
这个
lines
数据框表示一个无限制的表,包含流式文本数据。这个表包含一列字符串,名为“value”,流式文本数据中的每一行都成为表中的一行。请注意,目前并未接收任何数据,因为我们只是在设置转换,尚未开始。接下来,我们使用
.as[String]
将数据框转换为字符串的数据集,以便我们可以应用
flatMap
操作,将每一行拆分成多个单词。生成的
words
数据集包含所有单词。最后,我们通过对数据集中唯一值进行分组并计数,定义了
wordCounts
数据框。请注意,这是一种流数据框,表示流的实时单词计数。
// 创建一个表示从连接到localhost:9999的输入行流的DataFrame
Dataset<Row> lines = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
// 将行分割成单词
Dataset<String> words = lines
.as(Encoders.STRING())
.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
// 生成正在运行的单词计数
Dataset<Row> wordCounts = words.groupBy("value").count();
这个
lines
DataFrame 表示一个包含流文本数据的无限制表。这个表包含一个名为“value”的字符串列,流文本数据中的每一行成为表中的一行。请注意,目前并没有接收任何数据,因为我们只是在设置转换,并尚未开始它。接下来,我们使用
.as(Encoders.STRING())
将 DataFrame 转换为字符串的数据集,这样我们就可以应用
flatMap
操作,将每一行拆分为多个单词。结果
words
数据集包含所有单词。最后,我们通过对数据集中唯一值进行分组并计数来定义
wordCounts
DataFrame。请注意,这是一个流式 DataFrame,表示流的实时单词计数。
# 创建一个DataFrame,表示来自localhost:9999连接的输入行流
lines <- read.stream("socket", host = "localhost", port = 9999)
# 将行拆分成单词
words <- selectExpr(lines, "explode(split(value, ' ')) as word")
# 生成运行单词计数
wordCounts <- count(group_by(words, "word"))
这个
lines
SparkDataFrame 表示一个包含流文本数据的无界表。该表包含一个名为“value”的字符串列,流文本数据中的每一行都成为表中的一行。注意,目前还没有接收任何数据,因为我们只是设置转换,还没有开始它。接下来,我们有一个 SQL 表达式,包含两个 SQL 函数 - split 和 explode,用于将每一行拆分为多个包含一个单词的行。此外,我们将新列命名为“word”。最后,我们通过按 SparkDataFrame 中的唯一值分组并计数来定义
wordCounts
SparkDataFrame。注意,这是一个流式 SparkDataFrame,表示流的实时单词计数。
我们现在已经设置好了对流数据的查询。剩下的就是实际开始接收数据并计算计数。为此,我们设置它在每次更新时将完整的计数集(由
outputMode("complete")
指定)打印到控制台。然后使用
start()
开始流计算。
# 开始运行查询,将运行计数打印到控制台
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
// 开始运行查询,将运行计数打印到控制台
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
// 开始运行查询,将运行计数打印到控制台
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.start();
query.awaitTermination();
# 开始运行查询,将运行计数打印到控制台
query <- write.stream(wordCounts, "console", outputMode = "complete")
awaitTermination(query)
在这段代码执行之后,流式计算将在后台开始。
query
对象是对那个正在进行的流式查询的句柄,我们决定使用
awaitTermination()
来等待查询的结束,以防止在查询仍然活动时进程退出。
要实际执行此示例代码,您可以在自己的 Spark 应用程序 中编译代码,或者简单地在下载 Spark 后 运行示例 。我们展示的是后者。您首先需要通过使用
$ nc -lk 9999
然后,在另一个终端中,您可以使用以下命令启动示例
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
然后,在运行netcat服务器的终端中输入的任何行都将被计数并每秒在屏幕上打印一次。它将看起来类似于以下内容。
|
|
编程模型
结构化流处理的关键思想是将实时数据流视为一个不断被追加的表。这导致了一种新的流处理模型,该模型与批处理模型非常相似。您将把流计算表达为对静态表的标准批处理查询,而Spark将其作为对 无限 输入表的 增量 查询运行。让我们更详细地了解这一模型。
基本概念
将输入数据流视为“输入表”。每个到达流的数据项就像是新的行被附加到输入表中。
对输入的查询将生成“结果表”。每个触发间隔(例如,每秒1次),新的行会附加到输入表中,最终更新结果表。每当结果表被更新时,我们希望将更改的结果行写入外部接收器。
“输出”被定义为写入外部存储的内容。输出可以以不同的模式定义:
-
完整模式 - 整个更新后的结果表将被写入外部存储。如何处理整个表的写入取决于存储连接器。
-
追加模式 - 仅将自上次触发以来附加到结果表的新行写入外部存储。这仅适用于结果表中现有行不预期发生变化的查询。
-
更新模式 - 自上次触发以来仅将结果表中更新的行写入外部存储(自Spark 2.1.1起可用)。请注意,这与完整模式不同,因为此模式仅输出自上次触发以来已更改的行。如果查询不包含聚合,则将等同于追加模式。
请注意,每种模式适用于某些类型的查询。这个将在 后面 详细讨论。
为了说明这个模型的使用,首先让我们在上述的
快速示例
中理解这个模型。第一个
lines
数据框是输入表,而最终的
wordCounts
数据框是结果表。请注意,对流式
lines
数据框生成
wordCounts
的查询与静态数据框的查询
完全相同
。然而,当这个查询启动时,Spark 将持续检查来自套接字连接的新数据。如果有新数据,Spark 将运行一个“增量”查询,将之前的运行计数与新数据结合起来以计算更新后的计数,如下所示。
请注意,结构化流处理不会物化整个表 。它从流数据源读取最新的可用数据,进行增量处理以更新结果,然后丢弃源数据。它仅保留最小的中间 状态 数据,以便更新结果(例如,在之前的示例中进行的中间计数)。
该模型与许多其他流处理引擎显著不同。许多流处理系统要求用户自己维护运行中的聚合,因此需要考虑容错和数据一致性(至少一次、一致至多一次或完全一致)。在这个模型中,当有新数据时,Spark 负责更新结果表,从而使用户不必考虑这些问题。作为一个例子,让我们看看这个模型如何处理基于事件时间的处理和迟到的数据。
处理事件时间和迟到数据
事件时间是嵌入在数据本身中的时间。对于许多应用程序,您可能希望基于此事件时间进行操作。例如,如果您想获取每分钟由物联网设备生成的事件数量,那么您可能想使用数据生成时的时间(即,数据中的事件时间),而不是Spark接收它们的时间。这种事件时间在这个模型中非常自然地表达 - 来自设备的每个事件都是表中的一行,而事件时间是该行中的一个列值。这允许基于窗口的聚合(例如每分钟的事件数量)仅仅是一种特殊类型的对事件时间列的分组和聚合 - 每个时间窗口都是一个组,每行可以属于多个窗口/组。因此,这种基于事件时间窗口的聚合查询可以在静态数据集(例如来自已收集设备事件日志)和数据流上一致地定义,从而使用户的生活更加轻松。
此外,该模型自然处理基于事件时间晚于预期到达的数据。由于Spark正在更新结果表,它完全控制在有晚到数据时更新旧聚合,以及清理旧聚合以限制中间状态数据的大小。自Spark 2.1以来,我们支持水印功能,允许用户指定晚到数据的阈值,并允许引擎相应地清理旧状态。有关详细信息将在 窗口操作 部分中进一步解释。
容错语义
提供端到端的准确一次语义是结构化流设计的关键目标之一。为了实现这一目标,我们设计了结构化流的源、汇和执行引擎,以可靠地跟踪处理的确切进度,以便能够通过重新启动和/或重新处理来处理任何类型的故障。每个流源都假设具有偏移量(类似于Kafka偏移量或Kinesis序列号)来跟踪流中的读取位置。引擎使用检查点和提前写入日志来记录每次触发时处理的数据的偏移量范围。流汇被设计为幂等,以便处理重新处理。通过使用可重放的源和幂等汇,结构化流可以在任何故障下确保 端到端的准确一次语义 。
使用数据集和数据框的API
自 Spark 2.0 起,DataFrames 和 Datasets 可以表示静态、有界数据,以及流式、无界数据。与静态 Datasets/DataFrames 类似,您可以使用常见的入口点
SparkSession
(
Scala
/
Java
/
Python
/
R
文档)
从流式源创建流式 DataFrames/Datasets,并对它们应用与静态 DataFrames/Datasets 相同的操作。如果您对 Datasets/DataFrames 不熟悉,强烈建议您通过使用
DataFrame/Dataset 编程指南
来熟悉它们。
创建流式数据框和流式数据集
可以通过
DataStreamReader
接口创建流数据框
(
Scala
/
Java
/
Python
文档)
由
SparkSession.readStream()
返回。在
R
中,使用
read.stream()
方法。与创建静态数据框的读取接口类似,您可以指定源的详细信息 – 数据格式、模式、选项等。
输入源
有一些内置的数据源。
-
文件源
- 作为数据流读取目录中写入的文件。文件将按照文件修改时间的顺序进行处理。如果设置了
latestFirst
,顺序将会被反转。支持的文件格式有文本、CSV、JSON、ORC、Parquet。有关最新列表和每种文件格式支持的选项,请参阅 DataStreamReader 接口的文档。请注意,文件必须原子性地放置在给定目录中,这在大多数文件系统中可以通过文件移动操作实现。 -
Kafka源 - 从Kafka读取数据。它与Kafka代理版本0.10.0或更高版本兼容。有关更多详情,请参阅 Kafka集成指南 。
-
Socket源(用于测试) - 从socket连接读取UTF8文本数据。监听服务器socket位于驱动程序处。请注意,这只能用于测试,因为这不提供端到端的容错保障。
-
速率源(用于测试) - 以每秒指定的行数生成数据,每个输出行包含一个
timestamp
和value
。其中timestamp
是一个Timestamp
类型,包含消息调度的时间,而value
是一个Long
类型,包含消息计数,从0开始作为第一行。此源旨在用于测试和基准测试。 -
每个微批次的速率源(用于测试)
- 以每个微批次指定的行数生成数据,每个输出行包含一个
timestamp
和value
。其中timestamp
是一个Timestamp
类型,包含消息调度的时间,而value
是一个Long
类型,包含消息计数,从0开始作为第一行。与rate
数据源不同,此数据源提供每个微批次的一组一致输入行,无论查询执行(触发器配置、查询滞后等),例如,批次0将生成0~999,批次1将生成1000~1999,以此类推。生成时间也适用此情况。此源旨在用于测试和基准测试。
一些源不是容错的,因为它们不能保证在故障后可以使用检查点偏移量重放数据。请参阅前面的 容错语义 部分。以下是Spark中所有源的详细信息。
源 | 选项 | 容错 | 备注 |
---|---|---|---|
文件源 |
path
: 输入目录的路径,适用于所有文件格式。
maxFilesPerTrigger
: 每次触发时最大考虑的新文件数量(默认:无最大限制)
latestFirst
: 是否优先处理最新的新文件,当有大量文件积压时很有用(默认:假)
fileNameOnly
: 是否仅根据文件名而不是完整路径检查新文件(默认:假)。将此设置为 `true` 时,以下文件将被视为同一文件,因为它们的文件名“dataset.txt”是相同的:
"file:///dataset.txt" "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt"
maxFileAge
: 可在该目录中找到的文件的最大年龄,超过该年龄将被忽略。对于第一批,所有文件将被视为有效。如果
latestFirst
设置为 `true` 且
maxFilesPerTrigger
被设置,则该参数将被忽略,因为有效且应被处理的旧文件可能会被忽略。最大年龄是相对于最新文件的时间戳指定的,而不是当前系统的时间戳(默认:1周)。
cleanSource
: 处理后清理已完成文件的选项。
可用选项包括 "archive"、"delete"、"off"。如果未提供选项,则默认值为 "off"。 提供 "archive" 时,必须提供额外选项
sourceArchiveDir
。 "sourceArchiveDir" 的值在深度上不得与源模式匹配(从根目录起的目录数),其中深度为两个路径的最小深度。这将确保被归档的文件永远不会作为新源文件包含在内。
例如,假设您提供 '/hello?/spark/*' 作为源模式,'/hello1/spark/archive/dir' 不能用作 "sourceArchiveDir" 的值,因为 '/hello?/spark/*' 和 '/hello1/spark/archive' 将被匹配。'/hello1/spark' 也不能用作 "sourceArchiveDir" 的值,因为 '/hello?/spark' 和 '/hello1/spark' 将被匹配。'/archived/here' 是可以的,因为它不匹配。 Spark 将移动源文件,尊重其自身路径。例如,如果源文件的路径是
/a/b/dataset.txt
,而归档目录的路径是
/archived/here
,文件将被移动到
/archived/here/a/b/dataset.txt
。
注意:归档(通过移动)或删除已完成文件将引入开销(减慢速度,即使它发生在单独线程中),因此您需要在启用此选项之前了解每个操作在您的文件系统中的成本。另一方面,启用此选项将减少列出源文件的成本,这可能是一个昂贵的操作。 用于完成文件清理器的线程数量可以通过
spark.sql.streaming.fileSource.cleaner.numThreads
配置(默认:1)。
注意 2:启用此选项时,源路径不应被多个源或查询使用。同样,您必须确保源路径与文件流接收器的输出目录中的任何文件不匹配。 注意 3:删除和移动操作都是尽力而为。删除或移动文件失败不会使流查询失败。Spark 在某些情况下可能不会清理某些源文件 - 例如,应用程序没有正常关闭,排队等待清理的文件过多。 有关特定于文件格式的选项,请参见
DataStreamReader
中的相关方法
(
Scala
/
Java
/
Python
/
R
).
例如,对于 "parquet" 格式选项,请参见
DataStreamReader.parquet()
。
此外,还有会影响某些文件格式的会话配置。有关更多详细信息,请参见 SQL Programming Guide 。例如,针对 "parquet",请参见 Parquet 配置 部分。 |
是 | 支持通配符路径,但不支持多个以逗号分隔的路径/通配符。 |
Socket 源 |
host
: 要连接的主机,必须指定
port
: 要连接的端口,必须指定
|
否 | |
速率源 |
rowsPerSecond
(例如 100,默认:1):每秒应生成多少行。
rampUpTime
(例如 5s,默认:0s):生成速度达到
rowsPerSecond
之前的提升时间。使用比秒更细的粒度将被截断为整数秒。
numPartitions
(例如 10,默认:Spark 的默认并行度):生成行的分区数量。
源将尽力达到
rowsPerSecond
,但查询可能会受到资源限制,
numPartitions
可以调整以帮助达到预期速度。
|
是 | |
每个微批量的速率源 (格式: rate-micro-batch ) |
rowsPerBatch
(例如 100):每个微批量应生成多少行。
numPartitions
(例如 10,默认:Spark 的默认并行度):生成行的分区数量。
startTimestamp
(例如 1000,默认:0):生成时间的起始值。
advanceMillisPerBatch
(例如 1000,默认:1000):每个微批量中生成时间的推进量。
|
是 | |
Kafka 源 | 请参见 Kafka 集成指南 。 | 是 | |
这里有一些示例。
spark = SparkSession. ...
# 从套接字读取文本
socketDF = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
socketDF.isStreaming() # 对于具有流媒体源的DataFrame返回True
socketDF.printSchema()
# 读取目录中原子写入的所有csv文件
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
.readStream \
.option("sep", ";") \
.schema(userSchema) \
.csv("/path/to/directory") # 等同于 format("csv").load("/path/to/directory")
val spark: SparkSession = ...
// 从套接字读取文本
val socketDF = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
socketDF.isStreaming // 对于具有流式源的DataFrames返回True
socketDF.printSchema
// 读取原子方式写入目录的所有csv文件
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
.readStream
.option("sep", ";")
.schema(userSchema) // 指定csv文件的模式
.csv("/path/to/directory") // 等同于 format("csv").load("/path/to/directory")
SparkSession spark = ...
// 从套接字读取文本
Dataset<Row> socketDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load();
socketDF.isStreaming(); // 对于具有流数据源的DataFrame返回True
socketDF.printSchema();
// 读取目录中以原子方式写入的所有csv文件
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
.readStream()
.option("sep", ";")
.schema(userSchema) // 指定csv文件的模式
.csv("/path/to/directory"); // 等价于 format("csv").load("/path/to/directory")
sparkR.session(...)
# 从套接字读取文本
socketDF <- read.stream("socket", host = hostname, port = port)
isStreaming(socketDF) # 对于具有流源的SparkDataFrames返回TRUE
printSchema(socketDF)
# 读取在目录中原子性写入的所有csv文件
schema <- structType(structField("name", "string"),
structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")
这些示例生成了未类型化的流式 DataFrame,这意味着 DataFrame 的模式在编译时不会被检查,只在查询提交时的运行时被检查。一些操作如
map
、
flatMap
等需要在编译时知道类型。要做到这一点,您可以使用与静态 DataFrame 相同的方法将这些未类型化的流式 DataFrame 转换为类型化的流式 Dataset。有关更多细节,请参见
SQL 编程指南
。此外,关于支持的流式源的更多细节将在文档后面讨论。
自 Spark 3.1 以来,您还可以通过
DataStreamReader.table()
从表中创建流式 DataFrame。有关更多详细信息,请参阅
Streaming Table APIs
。
流式 DataFrames/Datasets 的模式推断和分区
默认情况下,基于文件的结构化流需要您指定模式,而不是依赖 Spark 自动推断。这一限制确保在流查询中将使用一致的模式,即使在发生故障的情况下。对于临时使用,可以通过将
spark.sql.streaming.schemaInference
设置为
true
来重新启用模式推断。
当存在命名为
/key=value/
的子目录时,会发生分区发现,并且列出目录时将自动递归这些目录。如果这些列出现在用户提供的模式中,它们将由Spark根据正在读取的文件的路径填充。构成分区方案的目录必须在查询开始时存在,并且必须保持静态。例如,当
/data/year=2015/
存在时,可以添加
/data/year=2016/
,但是通过创建目录
/data/date=2016-04-17/
来更改分区列是无效的。
对流数据框/数据集的操作
您可以对流数据框/DataSets 执行各种操作——从无类型的、类 SQL 操作(例如
select
、
where
、
groupBy
),到有类型的类 RDD 操作(例如
map
、
filter
、
flatMap
)。有关更多详细信息,请参见
SQL 编程指南
。让我们来看看一些您可以使用的示例操作。
基本操作 - 选择、投影、聚合
对DataFrame/Dataset的大多数常见操作支持流处理。少数不支持的操作 将在本节后面讨论 。
df = ... # 带有IOT设备数据的流数据框,模式为 { device: string, deviceType: string, signal: double, time: DateType }
# 选择信号大于10的设备
df.select("device").where("signal > 10")
# 计算每种设备类型的更新次数
df.groupBy("deviceType").count()
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
val df: DataFrame = ... // 包含 IOT 设备数据的流式 DataFrame,模式为 { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData] // 包含 IOT 设备数据的流式 Dataset
// 选择信号大于 10 的设备
df.select("device").where("signal > 10") // 使用未类型化的 API
ds.filter(_.signal > 10).map(_.device) // 使用类型化的 API
// 统计每种设备类型的更新数量
df.groupBy("deviceType").count() // 使用未类型化的 API
// 计算每种设备类型的平均信号
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal)) // 使用类型化的 API
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
public class DeviceData {
private String device;
private String deviceType;
private Double signal;
private java.sql.Date time;
...
// 每个字段的getter和setter方法
}
Dataset<Row> df = ...; // 带有IOT设备数据的流式DataFrame,模式为{ device: string, type: string, signal: double, time: DateType }
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // 带有IOT设备数据的流式Dataset
// 选择信号大于10的设备
df.select("device").where("signal > 10"); // 使用无类型API
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
.map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());
// 每种设备类型更新次数的运行计数
df.groupBy("deviceType").count(); // 使用无类型API
// 每种设备类型的运行平均信号
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
.agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
df <- ... # 流数据框,包含IOT设备数据,模式为 { device: string, deviceType: string, signal: double, time: DateType }
# 选择信号大于10的设备
select(where(df, "signal > 10"), "device")
# 对每种设备类型的更新数量进行计数
count(groupBy(df, "deviceType"))
你也可以将流式 DataFrame/Dataset 注册为临时视图,然后在其上应用 SQL 命令。
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates") # 返回另一个流式数据框
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates") // 返回另一个流式数据框
df.createOrReplaceTempView("updates");
spark.sql("select count(*) from updates"); // 返回另一个流式 DataFrame
createOrReplaceTempView(df, "updates")
sql("select count(*) from updates")
注意,您可以通过使用
df.isStreaming
来识别一个 DataFrame/Dataset 是否具有流数据。
df.isStreaming()
df.isStreaming
df.isStreaming()
isStreaming(df)
您可能想查看查询的查询计划,因为Spark在对流数据集解释SQL语句时可能会注入有状态操作。在查询计划中注入有状态操作后,您可能需要考虑有状态操作来检查您的查询。(例如:输出模式、水印、状态存储大小维护等)
基于事件时间的窗口操作
基于滑动事件时间窗口的聚合在结构化流处理中是直接的,并且与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如计数)。在基于窗口的聚合中,为事件时间落入的每个窗口维护聚合值。让我们通过一个例子来理解这一点。
想象一下我们的 快速示例 被修改,现在流中包含生成该行时的时间。我们不再进行单词计数,而是希望在10分钟的时间窗口内计数单词,每5分钟更新一次。也就是说,在时间窗口12:00 - 12:10、12:05 - 12:15、12:10 - 12:20等之间接收到的单词计数。请注意,12:00 - 12:10意味着在12:00之后但在12:10之前到达的数据。现在,考虑一个在12:07接收到的单词。这个单词应该增加与两个窗口12:00 - 12:10和12:05 - 12:15相应的计数。因此,计数将根据组键(即单词)和窗口(可以从事件时间计算)进行索引。
结果表看起来类似于以下内容。
由于这种窗口化类似于分组,在代码中,您可以使用
groupBy()
和
window()
操作来表示窗口聚合。您可以在以下链接中查看完整的代码示例
Scala
/
Java
/
Python
.
words = ... # 流式 DataFrame 的模式 { timestamp: 时间戳, word: 字符串 }
# 按窗口和单词对数据进行分组,并计算每个组的计数
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
import spark.implicits._
val words = ... // 流数据框的模式 { timestamp: Timestamp, word: String }
// 按窗口和单词对数据进行分组,并计算每组的计数
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
Dataset<Row> words = ... // 流式数据框架,模式为 { timestamp: Timestamp, word: String }
// 按窗口和单词对数据进行分组,并计算每个组的数量
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word")
).count();
words <- ... # 流式 DataFrame 的模式 { timestamp: Timestamp, word: String }
# 按窗口和单词分组数据并计算每组的计数
windowedCounts <- count(
groupBy(
words,
window(wordstimestamp, "10 minutes", "5 minutes"),
wordsword))
处理延迟数据和水印
现在考虑如果其中一个事件迟到应用会发生什么。
比如,假设在12:04生成的一个词(即事件时间)可能会在12:11被应用接收。应用应该使用12:04而不是12:11
来更新窗口
12:00 - 12:10
的旧计数。这种情况在我们的基于窗口的分组中自然发生– 结构化流可以维护
部分聚合的中间状态很长一段时间,以便迟到数据可以正确更新旧窗口的聚合,如下所示。
然而,为了让这个查询运行几天,系统必须限制它所积累的中间内存状态的数量。这意味着系统需要知道何时可以从内存状态中删除旧的聚合,因为应用程序将不再接收该聚合的迟到数据。为了实现这一点,在Spark 2.1中,我们引入了
水印
,它允许引擎自动跟踪数据中的当前事件时间,并尝试相应地清理旧状态。您可以通过指定事件时间列和关于数据在事件时间方面预计会迟到多少的阈值来定义查询的水印。对于在时间
T
结束的特定窗口,引擎将维护状态并允许迟到的数据更新状态,直到
(引擎看到的最大事件时间 - 迟到阈值 > T)
。换句话说,阈值内的迟到数据将被聚合,但超过阈值的数据将开始被删除(请参见
后面
部分以获取确切的保证)。让我们通过一个例子来理解这一点。我们可以使用
withWatermark()
轻松地在前面的例子中定义水印,如下所示。
words = ... # 流式 DataFrame 的模式 { timestamp: Timestamp, word: String }
# 按窗口和单词对数据进行分组,并计算每个组的计数
windowedCounts = words \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word) \
.count()
import spark.implicits._
val words = ... // 流式 DataFrame 的模式 { timestamp: 时间戳, word: 字符串 }
// 按窗口和单词对数据进行分组,并计算每组的计数
val windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word")
.count()
Dataset<Row> words = ... // 流数据的 DataFrame,模式为 { timestamp: Timestamp, word: String }
// 按照窗口和单词对数据进行分组,并计算每组的计数
Dataset<Row> windowedCounts = words
.withWatermark("timestamp", "10 minutes")
.groupBy(
window(col("timestamp"), "10 minutes", "5 minutes"),
col("word"))
.count();
words <- ... # 流数据帧,模式为 { timestamp: Timestamp, word: String }
# 按窗口和单词对数据进行分组,并计算每个组的计数
words <- withWatermark(words, "timestamp", "10 minutes")
windowedCounts <- count(
groupBy(
words,
window(words$timestamp, "10 minutes", "5 minutes"),
words$word))
在这个例子中,我们正在根据“timestamp”列的值定义查询的水印,并且还将“10分钟”定义为数据允许的最晚延迟时间。如果此查询在更新输出模式下运行(将在 输出模式 部分中讨论),引擎将继续更新结果表中窗口的计数,直到窗口的时间比水印更旧,而水印比“timestamp”列中的当前事件时间滞后10分钟。这里是一幅插图。
如插图所示,引擎跟踪的最大事件时间是
蓝色虚线
,而在每次触发开始时设置的水印为
(max event time - '10 mins')
是红线。例如,当引擎观察到数据
(12:14, dog)
时,它将下一次触发的水印设置为
12:04
。这个水印允许引擎保持中间状态额外10分钟,以便延迟数据能够被计算。例如,数据
(12:09, cat)
是乱序和延迟的,并且它落在窗口
12:00 - 12:10
和
12:05 - 12:15
中。由于它仍然领先于触发器中的水印
12:04
,引擎仍然保持中间计数作为状态,并正确更新相关窗口的计数。然而,当水印更新为
12:11
时,窗口
(12:00 - 12:10)
的中间状态被清除,所有后续数据(例如
(12:04, donkey)
)被视为“太迟”,因此被忽略。请注意,在每次触发后,更新的计数(即紫色行)被写入接收器作为触发输出,按照更新模式的规定。
某些接收器(例如文件)可能不支持更新模式所需的细粒度更新。为了与它们一起工作,我们还支持追加模式,在该模式下,只有 最终计数 被写入接收器。下面进行了说明。
注意,在非流处理数据集上使用
withWatermark
是无操作的。因为水印不应该以任何方式影响任何批处理查询,所以我们将直接忽略它。
与之前的更新模式类似,引擎为每个窗口维护中间计数。然而,部分计数不会更新到结果表中,也不会写入接收器。引擎会等待“10分钟”以便计算延迟数据,然后丢弃小于水印的窗口的中间状态,并将最终计数附加到结果表/接收器。例如,窗口
12:00 - 12:10
的最终计数仅在水印更新到
12:11
后才附加到结果表。
时间窗口的类型
Spark支持三种类型的时间窗口:固定的(tumbling)、滑动的(sliding)和会话的(session)。
滚动窗口是一系列固定大小、非重叠和连续的时间间隔。一个输入只能绑定到一个窗口。
滑动窗口类似于翻滚窗口,因为它们都是“固定大小”的,但是如果滑动的持续时间小于窗口的持续时间,则窗口可以重叠,在这种情况下,输入可以绑定到多个窗口。
翻转和滑动窗口使用
window
函数,已经在上面的例子中描述过。
会话窗口与前两种类型具有不同的特征。会话窗口的窗口长度大小是动态的,取决于输入。会话窗口以一个输入开始,如果在间隔持续时间内接收到后续输入,它会自行扩展。对于静态间隔持续时间,会话窗口在接收到最新输入后,如果在间隔持续时间内没有接收到输入,则关闭。
会话窗口使用
session_window
函数。该函数的使用方式类似于
window
函数。
events = ... # 流式数据框架,模式为 { timestamp: Timestamp, userId: String }
# 按会话窗口和用户ID对数据进行分组,并计算每个组的计数
sessionizedCounts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
session_window(events.timestamp, "5 minutes"),
events.userId) \
.count()
import spark.implicits._
val events = ... // 流式数据框,模式为 { timestamp: Timestamp, userId: String }
// 按会话窗口和userId分组数据,并计算每个组的计数
val sessionizedCounts = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
session_window($"timestamp", "5 minutes"),
$"userId")
.count()
Dataset<Row> events = ... // 流式 DataFrame 的 schema { timestamp: Timestamp, userId: String }
// 按会话窗口和 userId 分组数据,并计算每个组的计数
Dataset<Row> sessionizedCounts = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
session_window(col("timestamp"), "5 minutes"),
col("userId"))
.count();
我们还可以提供一个表达式,根据输入行动态指定间隔持续时间,而不是静态值。请注意,持续时间为负或零的行将从聚合中被过滤掉。
使用动态间隔持续时间,会话窗口的关闭不再依赖于最新的输入。会话窗口的范围是所有事件范围的并集,这些范围由事件开始时间和查询执行期间评估的间隔持续时间决定。
from pyspark.sql import functions as sf
events = ... # 流数据框的模式 { timestamp: 时间戳, userId: 用户ID }
session_window = session_window(events.timestamp, \
sf.when(events.userId == "user1", "5 seconds") \
.when(events.userId == "user2", "20 seconds").otherwise("5 minutes"))
# 按会话窗口和用户ID对数据进行分组,并计算每个组的数量
sessionizedCounts = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
session_window,
events.userId) \
.count()
import spark.implicits._
val events = ... // 流式 DataFrame,模式为 { timestamp: 时间戳, userId: 字符串 }
val sessionWindow = session_window($"timestamp", when($"userId" === "user1", "5 seconds")
.when($"userId" === "user2", "20 seconds")
.otherwise("5 minutes"))
// 根据会话窗口和 userId 对数据进行分组,并计算每个组的计数
val sessionizedCounts = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
Column(sessionWindow),
$"userId")
.count()
Dataset<Row> events = ... // 流式 DataFrame,模式为 { timestamp: Timestamp, userId: String }
SessionWindow sessionWindow = session_window(col("timestamp"), when(col("userId").equalTo("user1"), "5 seconds")
.when(col("userId").equalTo("user2"), "20 seconds")
.otherwise("5 minutes"))
// 按会话窗口和用户ID对数据进行分组,并计算每组的计数
Dataset<Row> sessionizedCounts = events
.withWatermark("timestamp", "10 minutes")
.groupBy(
new Column(sessionWindow),
col("userId"))
.count();
请注意,在使用流查询中的会话窗口时,有一些限制,如下所示:
- 不支持将“更新模式”作为输出模式。
-
在分组键中除了
session_window
之外,应该至少有一列。
对于批量查询,支持全局窗口(分组键中仅有
session_window
)。
默认情况下,Spark不会对会话窗口聚合执行部分聚合,因为在分组之前需要在本地分区中进行额外的排序。当每个本地分区中只有少量输入行具有相同的组键时,它的效果更好,但对于在本地分区中具有相同组键的输入行数量众多的情况,尽管需要额外的排序,执行部分聚合仍然可以显著提高性能。
您可以启用
spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition
来指示 Spark 执行部分聚合。
时间窗口的时间表示
在某些使用场景中,有必要提取时间窗口的时间表示,以便对时间窗口数据应用需要时间戳的操作。一个例子是链式时间窗口聚合,其中用户希望在时间窗口上定义另一个时间窗口。比如,有人想将5分钟的时间窗口聚合为1小时的滚动时间窗口。
有两种方法可以实现这一点,如下所示:
-
使用
window_time
SQL 函数,时间窗口列作为参数 -
使用
window
SQL 函数,时间窗口列作为参数
window_time
函数将生成一个时间戳,表示时间窗口的时间。用户可以将结果传递给
window
函数的参数(或者任何需要时间戳的地方)以执行与需要时间戳的时间窗口相关的操作。
words = ... # 流式数据框架,模式为 { timestamp: 时间戳, word: 字符串 }
# 按窗口和单词对数据分组,并计算每个组的计数
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# 通过另一个窗口和单词对已分组数据进行分组,并计算每个组的计数
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
import spark.implicits._
val words = ... // 流式 DataFrame 具有架构 { timestamp: Timestamp, word: String }
// 根据窗口和单词对数据进行分组,并计算每个组的计数
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// 根据另一个窗口和单词对窗口化的数据进行分组,并计算每个组的计数
val anotherWindowedCounts = windowedCounts.groupBy(
window(window_time($"window"), "1 hour"),
$"word"
).count()
Dataset<Row> words = ... // 流数据框架,模式为 { timestamp: 时间戳, word: 字符串 }
// 按窗口和单词对数据进行分组,并计算每个组的计数
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word")
).count();
// 按另一个窗口和单词对窗口数据进行分组,并计算每个组的计数
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
functions.window(functions.window_time("window"), "1 hour"),
windowedCounts.col("word")
).count();
window
函数不仅需要时间戳列,还需要时间窗口列。这在用户想要应用链式时间窗口聚合的情况下特别有用。
words = ... # 流式DataFrame,模式为 { timestamp: 时间戳, word: 字符串 }
# 按窗口和单词对数据进行分组,并计算每组的计数
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# 按另一个窗口和单词对窗口数据进行分组,并计算每组的计数
anotherWindowedCounts = windowedCounts.groupBy(
window(windowedCounts.window, "1 hour"),
windowedCounts.word
).count()
import spark.implicits._
val words = ... // 流数据框架,模式 { timestamp: Timestamp, word: String }
// 按窗口和单词对数据进行分组,并计算每个组的计数
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// 按另一个窗口和单词对窗口数据进行分组,并计算每个组的计数
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Dataset<Row> words = ... // 流式 DataFrame,模式为 { timestamp: Timestamp, word: String }
// 按窗口和单词对数据进行分组,并计算每个组的计数
Dataset<Row> windowedCounts = words.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word")
).count();
// 按另一个窗口和单词对窗口化数据进行分组,并计算每个组的计数
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
functions.window("window", "1 hour"),
windowedCounts.col("word")
).count();
用于清理聚合状态的水印条件
需要注意的是,以下条件必须满足,以便水印能够清理聚合查询中的状态 (截至Spark 2.1.1,未来可能会有变更) 。
-
输出模式必须是添加或更新。 完整模式要求保留所有聚合数据,因此不能使用水印来丢弃中间状态。有关每种输出模式语义的详细说明,请参见 输出模式 部分。
-
聚合必须具有事件时间列,或者在事件时间列上有一个
window
。 -
withWatermark
必须在与用于聚合的时间戳列相同的列上调用。例如,df.withWatermark("time", "1 min").groupBy("time2").count()
在添加输出模式下无效,因为水印是在与聚合列不同的列上定义的。 -
withWatermark
必须在聚合之前调用,以便使用水印详细信息。例如,df.groupBy("time").count().withWatermark("time", "1 min")
在添加输出模式下无效。
带水印的汇聚的语义保证
-
水印延迟(通过
withWatermark
设置)为“2小时”保证引擎绝不会丢弃任何延迟少于2小时的数据。换句话说,任何比最新处理的数据(以事件时间计)延迟少于2小时的数据都保证会被聚合。 -
然而,这一保证仅在一个方向上是严格的。延迟超过2小时的数据不保证会被丢弃;它可能会被聚合,也可能不会。数据越延迟,引擎处理它的可能性就越小。
连接操作
结构化流支持将流数据集/数据框与静态数据集/数据框连接,也支持与另一个流数据集/数据框的连接。流连接的结果是增量生成的,类似于上一节中流聚合的结果。在本节中,我们将探讨在上述情况下支持的连接类型(即内连接、外连接、半连接等)。请注意,在所有支持的连接类型中,与流数据集/数据框的连接结果将与与包含流中相同数据的静态数据集/数据框的连接结果完全相同。
流静态连接
自从在Spark 2.0引入以来,结构化流处理支持对流式和静态DataFrame/Dataset之间的连接(内部连接和某些类型的外部连接)。这里是一个简单的例子。
staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") # 与静态数据框的内部等联接
streamingDf.join(staticDf, "type", "left_outer") # 与静态数据框的左外联接
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type") // 与静态数据帧进行内连接
streamingDf.join(staticDf, "type", "left_outer") // 与静态数据帧进行左外连接
Dataset<Row> staticDf = spark.read(). ...;
Dataset<Row> streamingDf = spark.readStream(). ...;
streamingDf.join(staticDf, "type"); // 与静态DF进行内部等值连接
streamingDf.join(staticDf, "type", "left_outer"); // 与静态DF进行左外连接
staticDf <- read.df(...)
streamingDf <- read.stream(...)
joined <- merge(streamingDf, staticDf, sort = FALSE) # 与一个静态数据框进行内连接
joined <- join(
streamingDf,
staticDf,
streamingDf$value == staticDf$value,
"left_outer") # 与一个静态数据框进行左外连接
请注意,流静态连接不是有状态的,因此不需要状态管理。然而,几种类型的流静态外连接尚不支持。这些在 本连接部分的末尾 列出。
流-流连接
在 Spark 2.3 中,我们添加了对流-流连接的支持,也就是说,您可以连接两个流式 Datasets/DataFrames。生成两个数据流之间连接结果的挑战在于,在任何时刻,数据集的视图对于连接的两侧都是不完整的,这使得在输入之间找到匹配项变得更加困难。来自一个输入流的任何行都可以与尚未接收的来自另一个输入流的任何未来行匹配。因此,对于两个输入流,我们将过去的输入缓冲作为流状态,以便我们可以将每个未来的输入与过去的输入匹配,并相应地生成连接结果。此外,类似于流式聚合,我们自动处理延迟的、无序的数据,并可以使用水印限制状态。让我们讨论一下支持的不同类型的流-流连接以及如何使用它们。
具有可选水印的内部连接
支持在任何类型的列上进行内部连接,以及任何类型的连接条件。但是,随着流的运行,流状态的大小将不断增长,因为 所有 过去的输入都必须被保存,因为任何新的输入都可能与过去的任何输入匹配。为了避免无界状态,您必须定义额外的连接条件,以便无限期的旧输入不能与未来的输入匹配,因此可以从状态中清除。换句话说,您将需要在连接中执行以下额外步骤。
-
定义两输入的水印延迟,以便引擎知道输入可能延迟的程度(类似于流聚合)
-
在两个输入之间定义事件时间的约束,以便引擎能够判断何时某个输入的旧行不再需要(即,不满足时间约束)与另一个输入进行匹配。这个约束可以通过以下两种方式之一定义。
-
时间范围连接条件(例如
...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR
), -
在事件时间窗口上连接(例如
...JOIN ON leftTimeWindow = rightTimeWindow
)。
-
我们通过一个例子来理解这点。
假设我们想要将一系列广告展示(广告被展示时)与另一系列用户点击广告的行为进行联接,以关联展示何时导致可变现的点击。为了允许在这个流-流连接中进行状态清理,您需要按如下方式指定水印延迟和时间约束。
-
水印延迟:例如,展示和相应的点击在事件时间上最多可以延迟/乱序2小时和3小时。
-
事件时间范围条件:例如,点击可以在相应展示后的0秒到1小时的时间范围内发生。
代码将如下所示。
from pyspark.sql.functions import expr
impressions = spark.readStream. ...
clicks = spark.readStream. ...
# 对事件时间列应用水印
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
# 使用事件时间约束进行连接
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)
import org.apache.spark.sql.functions.expr
val impressions = spark.readStream. ...
val clicks = spark.readStream. ...
// 在事件时间列上应用水印
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
// 按事件时间约束进行连接
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
""")
)
import static org.apache.spark.sql.functions.expr
Dataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...
// 在事件时间列上应用水印
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
// 与事件时间约束进行连接
impressionsWithWatermark.join(
clicksWithWatermark,
expr(
"clickAdId = impressionAdId AND " +
"clickTime >= impressionTime AND " +
"clickTime <= impressionTime + interval 1 hour ")
);
impressions <- read.stream(...)
clicks <- read.stream(...)
# 在事件时间列上应用水印
impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")
# 使用事件时间约束进行连接
joined <- join(
impressionsWithWatermark,
clicksWithWatermark,
expr(
paste(
"clickAdId = impressionAdId AND",
"clickTime >= impressionTime AND",
"clickTime <= impressionTime + interval 1 hour"
)))
带有水印的流-流内连接的语义保证
这与 水印在聚合中的保证 相似。水印延迟为“2小时”保证引擎永远不会丢弃任何延迟少于2小时的数据。但延迟超过2小时的数据可能会被处理,也可能不会被处理。
带水印的外连接
虽然水印 + 事件时间约束对内部连接是可选的,但对外部连接必须指定。这是因为在外部连接中生成 NULL 结果时,引擎必须知道某个输入行将来不会与任何内容匹配。因此,必须指定水印 + 事件时间约束以生成正确的结果。因此,带有外部连接的查询将看起来与之前的广告货币化示例相似,只是会有一个额外的参数来指定它为外部连接。
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # 可以是 "inner","leftOuter","rightOuter","fullOuter","leftSemi"
)
impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // 可以是 "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
impressionsWithWatermark.join(
clicksWithWatermark,
expr(
"clickAdId = impressionAdId AND " +
"clickTime >= impressionTime AND " +
"clickTime <= impressionTime + interval 1 hour "),
"leftOuter" // 可以是 "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
joined <- join(
impressionsWithWatermark,
clicksWithWatermark,
expr(
paste(
"clickAdId = impressionAdId AND",
"clickTime >= impressionTime AND",
"clickTime <= impressionTime + interval 1 hour"),
"left_outer" # 可以是 "inner", "left_outer", "right_outer", "full_outer", "left_semi"
))
带水印的流-流外连接的语义保证
外部连接在水印延迟和数据是否会丢失方面具有与 内部连接 相同的保证。
警告
关于外部结果生成,有几个重要特性需要注意。
-
生成外部NULL结果的延迟取决于指定的水印延迟和时间范围条件。 这是因为引擎必须等待这么长时间以确保没有匹配,并且将来也不会有匹配。
-
在当前的微批处理引擎实现中,水印在微批处理结束时推进,下一个微批处理使用更新后的水印来清理状态并输出外部结果。由于我们仅在有新的数据需要处理时触发微批处理,如果在流中没有接收到新的数据,外部结果的生成可能会延迟。 简而言之,如果任何两个被连接的输入流在一段时间内没有接收数据,外部(无论是左侧还是右侧)输出可能会延迟。
带水印的半连接
半连接返回与右侧匹配的关系左侧的值。它也被称为左半连接。与外连接类似,必须为半连接指定水印 + 事件时间约束。这是为了移除左侧未匹配的输入行,引擎必须知道左侧的输入行将来不会与右侧的任何内容匹配。
带水印的流-流半连接的语义保证
半连接在水印延迟以及数据是否会被丢弃方面,具有与 内连接 相同的保证。
流查询中连接的支持矩阵
左输入 | 右输入 | 连接类型 | |
---|---|---|---|
静态 | 静态 | 所有类型 | 支持,因为它不在流数据中,即使它 可能出现在流查询中 |
流 | 静态 | 内部 | 支持,不是有状态的 |
左外部 | 支持,不是有状态的 | ||
右外部 | 不支持 | ||
全外部 | 不支持 | ||
左半 | 支持,不是有状态的 | ||
静态 | 流 | 内部 | 支持,不是有状态的 |
左外部 | 不支持 | ||
右外部 | 支持,不是有状态的 | ||
全外部 | 不支持 | ||
左半 | 不支持 | ||
流 | 流 | 内部 | 支持,选择性地在两侧指定水印 + 状态清理的时间约束 |
左外部 | 条件支持,必须在右侧指定水印 + 正确 结果的时间约束,选择性地在左侧指定水印以进行所有状态清理 | ||
右外部 | 条件支持,必须在左侧指定水印 + 正确 结果的时间约束,选择性地在右侧指定水印以进行所有状态清理 | ||
全外部 | 条件支持,必须在一侧指定水印 + 正确 结果的时间约束,选择性地在另一侧指定水印以进行所有状态清理 | ||
左半 | 条件支持,必须在右侧指定水印 + 正确 结果的时间约束,选择性地在左侧指定水印以进行所有状态清理 | ||
关于支持的连接的更多细节:
-
连接可以级联,也就是说,你可以做
df1.join(df2, ...).join(df3, ...).join(df4, ....)
. -
从 Spark 2.4 开始,只有在查询处于附加输出模式时才可以使用连接。其他输出模式尚不支持。
-
在连接之前和之后,你无法使用 mapGroupsWithState 和 flatMapGroupsWithState。
在追加输出模式下,您可以构造具有非映射操作的查询,例如聚合、去重、流-流连接在连接之前/之后。
例如,这是在两个流中进行时间窗口聚合的示例,随后是带有事件时间窗口的流间连接:
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Dataset<Row> clicksWindow = clicksWithWatermark
.groupBy(functions.window(clicksWithWatermark.col("clickTime"), "1 hour"))
.count();
Dataset<Row> impressionsWindow = impressionsWithWatermark
.groupBy(functions.window(impressionsWithWatermark.col("impressionTime"), "1 hour"))
.count();
clicksWindow.join(impressionsWindow, "window", "inner");
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
这是另一个使用时间范围连接条件进行流-流连接的例子,随后进行时间窗口聚合:
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // 可以是 "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Dataset<Row> joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr(
"clickAdId = impressionAdId AND " +
"clickTime >= impressionTime AND " +
"clickTime <= impressionTime + interval 1 hour "),
"leftOuter" // 可以是 "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
joined
.groupBy(joined.col("clickAdId"), functions.window(joined.col("clickTime"), "1 hour"))
.count();
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # 可以是 "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
流媒体去重
您可以使用事件中的唯一标识符来对数据流中的记录进行去重。这与使用唯一标识符列在静态数据上进行去重是完全相同的。查询将存储来自先前记录所需的数据量,以便能够过滤重复记录。与聚合类似,您可以选择使用或不使用水印进行去重。
-
带水印 - 如果对重复记录到达的时间有上限限制,则可以在事件时间列上定义水印,并使用 guid 和事件时间列进行去重。查询将使用水印从过去记录中删除不再预计会有重复的旧状态数据。这样就限制了查询必须维护的状态量。
-
不带水印 - 由于对重复记录到达时间没有限制,查询将所有过去记录的数据存储为状态。
streamingDf = spark.readStream. ...
# 不使用水印,使用 guid 列
streamingDf.dropDuplicates("guid")
# 使用水印,使用 guid 和 eventTime 列
streamingDf \
.withWatermark("eventTime", "10 seconds") \
.dropDuplicates("guid", "eventTime")
val streamingDf = spark.readStream. ... // 列:guid, eventTime, ...
// 使用guid列而不带水印
streamingDf.dropDuplicates("guid")
// 使用guid和eventTime列并带水印
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime")
Dataset<Row> streamingDf = spark.readStream(). ...; // 列: guid, eventTime, ...
// 不使用水印,使用 guid 列
streamingDf.dropDuplicates("guid");
// 使用水印,使用 guid 和 eventTime 列
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime");
streamingDf <- read.stream(...)
# 使用 guid 列而不带水印
streamingDf <- dropDuplicates(streamingDf, "guid")
# 使用 guid 和 eventTime 列带水印
streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
专门针对流处理,您可以使用事件中的唯一标识符在数据流中去重记录,时间范围为水印。 例如,如果您将水印的延迟阈值设置为“1小时”,则在1小时内发生的重复事件可以正确去重。 (有关更多详细信息,请参阅 dropDuplicatesWithinWatermark 的API文档。)
这可以用于处理事件时间列无法成为唯一标识符的一部分的用例,主要是因为事件时间在相同记录中以某种方式不同的情况。(例如,非幂等写入者在写入时发生事件时间)
鼓励用户将水印的延迟阈值设置为大于重复事件之间最大时间戳差异的值。
此功能需要在流式数据框/数据集中设置带延迟阈值的水印。
streamingDf = spark.readStream. ...
# 使用基于事件时间列的水印对 guid 列进行去重
streamingDf \
.withWatermark("eventTime", "10 hours") \
.dropDuplicatesWithinWatermark("guid")
val streamingDf = spark.readStream. ... // 列:guid, eventTime, ...
// 使用基于 eventTime 列的 watermark,利用 guid 列去重
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark("guid")
Dataset<Row> streamingDf = spark.readStream(). ...; // 列:guid, eventTime, ...
// 使用基于eventTime列的水印的guid列进行去重
streamingDf
.withWatermark("eventTime", "10小时")
.dropDuplicatesWithinWatermark("guid");
处理多个水印的政策
一个流查询可以有多个输入流,这些流可以被联合或连接在一起。每个输入流可以有不同的延迟数据容忍阈值,用于有状态操作。你可以使用
withWatermarks("eventTime", delay)
在每个输入流上指定这些阈值。例如,考虑一个在
inputStream1
和
inputStream2
之间的流-流连接的查询。
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
在执行查询时,结构化流分别跟踪每个输入流中看到的最大事件时间,基于相应的延迟计算水印,并选择一个全局水印用于有状态操作。默认情况下,选择最小值作为全局水印,因为它确保不会意外丢弃数据,以免因某个流落后于其他流而被视为过晚(例如,某个流由于上游故障停止接收数据)。换句话说,全局水印将安全地以最慢流的速度移动,查询输出将相应地延迟。
然而,在某些情况下,即使这意味着丢弃来自最慢流的数据,您可能仍希望获得更快的结果。自Spark 2.4以来,您可以通过将SQL配置
spark.sql.streaming.multipleWatermarkPolicy
设置为
max
(默认值为
min
)来设置多个水印策略,以选择最大值作为全局水印。这使得全局水印可以跟随最快流的步伐移动。然而,作为副作用,来自慢速流的数据将被积极丢弃。因此,请谨慎使用此配置。
任意状态操作
许多用例需要比聚合更高级的有状态操作。例如,在许多用例中,您必须跟踪来自事件数据流的会话。为了进行这样的会话化,您需要将任意类型的数据作为状态保存,并使用每个触发器中的数据流事件对状态执行任意操作。自Spark 2.2以来,可以使用操作
mapGroupsWithState
和更强大的操作
flatMapGroupsWithState
来实现。这两种操作允许您在分组数据集上应用用户定义的代码,以更新用户定义的状态。有关更具体的细节,请查看API文档(
Scala
/
Java
)和示例(
Scala
/
Java
)。
尽管Spark无法检查并强制执行,但状态函数应根据输出模式的语义实现。例如,在更新模式下,Spark不期望状态函数会发出比当前水印加上允许的迟到记录延迟更早的行,而在追加模式下,状态函数可以发出这些行。
不支持的操作
某些DataFrame/Dataset操作不支持流式DataFrames/Datasets。它们如下所示。
-
流数据集不支持限制并获取前N行。
-
流数据集不支持去重操作。
-
流数据集的排序操作仅在聚合之后并且在完整输出模式下支持。
-
流数据集的某些类型的外连接不被支持。详细信息请参见 连接操作部分中的支持矩阵 。
-
在流数据集上链接多个有状态操作在更新和完整模式下不被支持。
- 此外,在附加模式下,mapGroupsWithState/flatMapGroupsWithState 操作后跟其他有状态操作是不被支持的。
- 一个已知的解决方法是将流查询拆分为多个查询,每个查询中只包含一个有状态操作,确保每个查询的端到端精确一次。确保最后一个查询的端到端精确一次是可选的。
此外,有一些数据集方法在流数据集上无法使用。它们是会立即运行查询并返回结果的操作,这在流数据集上没有意义。相反,这些功能可以通过明确启动一个流查询来完成(请参见下一节有关内容)。
-
count()
- 不能从流式数据集返回单一计数。相反,使用ds.groupBy().count()
,它返回一个包含运行计数的流式数据集。 -
foreach()
- 相反,使用ds.writeStream.foreach(...)
(见下一节)。 -
show()
- 相反,使用控制台接收器(见下一节)。
如果您尝试任何这些操作,您将看到一个
AnalysisException
,例如“操作 XYZ 在流式 DataFrames/Datasets 中不被支持”。
虽然其中一些可能在未来的 Spark 版本中受到支持,
但还有其他一些在流式数据上有效地实现是基本困难的。
例如,输入流上的排序不被支持,因为这需要保持
跟踪所有在流中接收的数据。因此,这在执行上是基本困难的。
状态存储
状态存储是一个版本化的键值存储,提供读取和写入操作。在结构化流处理中,我们使用状态存储提供者来处理跨批次的有状态操作。有两个内置的状态存储提供者实现。最终用户也可以通过扩展 StateStoreProvider 接口来实现自己的状态存储提供者。
HDFS 状态存储提供者
HDFS 后端状态存储提供程序是 [[StateStoreProvider]] 和 [[StateStore]] 的默认实现,其中所有数据在第一阶段存储在内存映射中,然后由 HDFS 兼容文件系统中的文件备份。所有对存储的更新必须以集合的形式进行事务处理,每组更新都会递增存储的版本。这些版本可以用于在存储的正确版本上重新执行更新(通过 RDD 操作中的重试),并重新生成存储版本。
RocksDB 状态存储实现
自 Spark 3.2 起,我们添加了一个新的内置状态存储实现,RocksDB 状态存储提供者。
如果您的流式查询中有状态操作(例如,流式聚合、流式去重、流-流连接、使用状态的映射组或使用状态的扁平映射组),并且您希望在状态中维护数百万个键,那么您可能会面临与大型JVM垃圾收集(GC)暂停相关的问题,这会导致微批处理时间的高度波动。这是因为,通过HDFSBackedStateStore的实现,状态数据是保存在执行器的JVM内存中,大量的状态对象给JVM带来了内存压力,从而导致较长的GC暂停。
在这种情况下,您可以选择使用基于 RocksDB 的更优化的状态管理解决方案。这个解决方案使用RocksDB有效地管理本地内存和本地磁盘中的状态,而不是将状态保存在JVM内存中。此外,对该状态的任何更改都会自动由结构化流处理保存到您提供的检查点位置,从而提供完全的容错保证(与默认状态管理相同)。
要启用新的内置状态存储实现,将
spark.sql.streaming.stateStore.providerClass
设置为
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
。
以下是关于状态存储提供者的RocksDB实例的配置:
配置名称 | 描述 | 默认值 |
---|---|---|
spark.sql.streaming.stateStore.rocksdb.compactOnCommit | 在提交操作时是否对RocksDB实例执行范围压缩 | False |
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled | 在RocksDB StateStore提交期间,是否上传变更日志而不是快照 | False |
spark.sql.streaming.stateStore.rocksdb.blockSizeKB | RocksDB BlockBasedTable中每个块打包的用户数据的近似大小(以KB为单位),这是RocksDB的默认SST文件格式。 | 4 |
spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB | 块缓存的大小容量(以MB为单位)。 | 8 |
spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs | 在RocksDB实例的加载操作中获取锁的等待时间(以毫秒为单位)。 | 60000 |
spark.sql.streaming.stateStore.rocksdb.maxOpenFiles | RocksDB实例可以使用的打开文件的数量。值为-1表示打开的文件始终保持打开状态。如果达到打开文件的限制,RocksDB将从打开的文件缓存中驱逐条目并关闭那些文件描述符,并从缓存中移除条目。 | -1 |
spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad | 我们是否在加载时重置RocksDB的所有计数器和直方图统计信息。 | True |
spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows | 我们是否跟踪状态存储中的总行数。请参阅 性能方面的考虑 中的详细信息。 | True |
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB | RocksDB中MemTable的最大大小。值为-1表示将使用RocksDB内部的默认值 | -1 |
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber | RocksDB中MemTable的最大数量,包括活动和不可变。值为-1表示将使用RocksDB内部的默认值 | -1 |
spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage | 单个节点上的RocksDB状态存储实例的总内存使用是否有限制。 | false |
spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB | 单个节点上RocksDB状态存储实例的总内存限制(以MB为单位)。 | 500 |
spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio | 使用maxMemoryUsageMB通过单个节点上所有RocksDB实例分配的内存,写入缓冲区占用的总内存作为内存的一个分数。 | 0.5 |
spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio | 使用maxMemoryUsageMB通过单个节点上所有RocksDB实例分配的内存,高优先级池中占用块的总内存作为内存的一个分数。 | 0.1 |
RocksDB 状态存储内存管理
RocksDB 为不同对象分配内存,如 memtables、块缓存和过滤/索引块。如果不进行限制,RocksDB 在多个实例中的内存使用可能会无限增长,从而可能导致 OOM(内存不足)问题。 RocksDB 提供了一种方法,通过使用写缓冲区管理器功能来限制在单个节点上运行的所有 DB 实例的内存使用。如果您想在 Spark 结构化流处理部署中限制 RocksDB 的内存使用,可以通过将
spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage
配置设置为
true
来启用此功能。您还可以通过将
spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB
的值设置为一个静态数字或节点上可用物理内存的一个比例来确定 RocksDB 实例的最大允许内存。可以通过将
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB
和
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber
设置为所需值来配置单个 RocksDB 实例的限制。默认情况下,将使用 RocksDB 的内部默认值作为这些设置。
RocksDB 状态存储变更日志检查点
在较新的Spark版本中,引入了变更日志检查点机制用于RocksDB状态存储。传统的RocksDB状态存储检查点机制是增量快照检查点,其中RocksDB实例的清单文件和新生成的RocksDB SST文件被上传到持久存储。 变更日志检查点不是上传RocksDB实例的数据文件,而是上传自上次检查点以来对状态所做的更改以确保持久性。 快照定期在后台持久化,以便进行可预测的故障恢复和变更日志修剪。 变更日志检查点避免了捕获和上传RocksDB实例快照的成本,并显著减少了流查询延迟。
更改日志检查点默认是禁用的。您可以通过将
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled
配置设置为
true
来启用 RocksDB 状态存储更改日志检查点。
更改日志检查点设计为与传统检查点机制向后兼容。
RocksDB 状态存储提供者在两个检查点机制之间提供无缝支持,允许双向转换。这使您可以利用更改日志检查点的性能优势,而不必丢弃旧的状态检查点。
在支持更改日志检查点的 Spark 版本中,您可以通过在 Spark 会话中启用更改日志检查点,将流式查询从较旧版本的 Spark 迁移到更改日志检查点。
反之,您可以在新版本的 Spark 中安全地禁用更改日志检查点,然后任何已经使用更改日志检查点运行的查询将切换回传统检查点。
您需要重新启动流式查询,才能应用检查点机制的更改,但在此过程中您不会观察到任何性能下降。
性能方面的考虑
- 您可能想要禁用对总行数的跟踪,以提高RocksDB状态存储的性能。
跟踪行数会增加写操作的额外查找 - 我们鼓励您尝试关闭在调整RocksDB状态存储时的配置,尤其是状态操作符的度量值很大 -
numRowsUpdated
,
numRowsRemoved
。
您可以在重新启动查询时更改配置,这使您能够更改“可观测性与性能”之间的权衡决策。
如果配置被禁用,状态中的行数(
numTotalStateRows
)将报告为0。
状态存储与任务局部性
有状态操作在执行器的状态存储中存储事件的状态。状态存储占用资源,如内存和磁盘空间来存储状态。 因此,在不同的流批次中将状态存储提供者保持在同一执行器中运行是更加高效的。 更改状态存储提供者的位置需要额外的开销来加载检查点状态。从检查点加载状态的开销取决于 外部存储和状态的大小,这往往会影响微批运行的延迟。对于某些用例,例如处理非常大的状态数据, 从检查点状态加载新的状态存储提供者可能非常耗时且效率低下。
结构化流处理查询中的状态操作依赖于Spark RDD的首选位置功能,以便在同一执行器上运行状态存储提供者。 如果在下一个批处理中,相应的状态存储提供者再次被安排在此执行器上,它可以重用之前的状态,从而节省加载检查点状态的时间。
然而,通常首选位置并不是一个严格的要求,Spark仍然可能将任务调度到其他执行器而不是首选的执行器。在这种情况下,Spark将从检查点状态加载状态存储提供程序到新的执行器。上一批次中运行的状态存储提供程序不会立即被卸载。Spark运行一个维护任务,该任务检查并卸载在执行器上不活跃的状态存储提供程序。
通过更改与任务调度相关的Spark配置,例如
spark.locality.wait
,用户可以配置Spark等待多长时间来启动数据本地任务。
对于结构化流中的有状态操作,它可以用来让状态存储提供者在跨批次的相同执行器上运行。
专门针对内置的 HDFS 状态存储提供者,用户可以检查状态存储指标,例如
loadedMapCacheHitCount
和
loadedMapCacheMissCount
。理想情况下,最好将缓存缺失计数最小化,这意味着 Spark 不会浪费太多时间加载检查点状态。用户可以增加 Spark 本地性等待配置,以避免在不同的执行器之间跨批次加载状态存储提供者。
开始流式查询
一旦您定义了最终的结果 DataFrame/Dataset,您要做的就是开始流计算。为此,您必须使用
DataStreamWriter
(
Scala
/
Java
/
Python
文档)
通过
Dataset.writeStream()
返回。您必须在此接口中指定以下一个或多个内容。
-
输出接收器的详细信息: 数据格式、位置等。
-
输出模式: 指定写入输出接收器的内容。
-
查询名称: 可选,指定查询的唯一名称以便于识别。
-
触发间隔: 可选,指定触发间隔。如果未指定,系统将在前一处理完成后立即检查新数据的可用性。如果因前一处理未完成而错过了触发时间,则系统将立即触发处理。
-
检查点位置: 对于某些能够保证端到端容错的输出接收器,指定系统将写入所有检查点信息的位置。该位置应为 HDFS 兼容的容错文件系统中的一个目录。检查点的语义将在下一节中更详细地讨论。
输出模式
有几种输出模式。
-
追加模式(默认) - 这是默认模式,只有自上次触发以来添加到结果表的新行将被输出到接收器。仅支持那些添加到结果表的行永远不会更改的查询。因此,此模式保证每一行只会输出一次(假设接收器是容错的)。例如,只有
select
、where
、map
、flatMap
、filter
、join
等的查询将支持追加模式。 -
完整模式 - 每次触发后,整个结果表将被输出到接收器。这适用于聚合查询。
-
更新模式 - ( 自Spark 2.1.1起可用 )只有自上次触发以来在结果表中更新的行将被输出到接收器。更多信息将在未来的版本中添加。
不同类型的流查询支持不同的输出模式。
这是兼容性矩阵。
查询类型 | 支持的输出模式 | 备注 | |
---|---|---|---|
带有聚合的查询 | 基于事件时间的聚合,使用水印 | 追加、更新、完整 |
追加模式使用水印来丢弃旧的聚合状态。但是,窗口聚合的输出会延迟
在
withWatermark()
中指定的晚到阈值,因为根据模式语义,
行只能在最终确定后(即在水印跨越后)添加到结果表中一次。有关更多详细信息,请参见
晚到数据
部分。
更新模式使用水印来丢弃旧的聚合状态。 完整模式不会丢弃旧的聚合状态,因为根据定义,该模式会保留结果表中的所有数据。 |
其他聚合 | 完整、更新 |
由于未定义水印(仅在其他类别中定义),
不会丢弃旧的聚合状态。
追加模式不支持,因为聚合可以更新,从而违反该模式的语义。 |
|
带有
mapGroupsWithState
的查询
|
更新 |
在带有
mapGroupsWithState
的查询中不允许聚合。
|
|
带有
flatMapGroupsWithState
的查询
|
追加操作模式 | 追加 |
在
flatMapGroupsWithState
之后允许聚合。
|
更新操作模式 | 更新 |
在带有
flatMapGroupsWithState
的查询中不允许聚合。
|
|
带有
joins
的查询
|
追加 | 目前不支持更新和完整模式。有关哪些类型的连接被支持的更多详细信息,请查看 连接操作部分的支持矩阵 。 | |
其他查询 | 追加、更新 | 不支持完整模式,因为在结果表中保留所有未聚合的数据是不可行的。 | |
输出接收器
内置输出接收器有几种类型。
- 文件接收器 - 将输出存储到一个目录。
writeStream
.format("parquet") // 可以是 "orc", "json", "csv" 等。
.option("path", "path/to/destination/dir")
.start()
- Kafka sink - 将输出存储到一个或多个Kafka主题中。
writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "更新")
.start()
- Foreach sink - 对输出中的记录执行任意计算。有关更多详细信息,请参见本节后面的内容。
writeStream
.foreach(...)
.start()
- 控制台接收器(用于调试) - 每次触发时将输出打印到控制台/stdout。支持追加和完整输出模式。这应该用于低数据量的调试,因为在每次触发后,整个输出都会被收集并存储在驱动程序的内存中。
writeStream
.format("console")
.start()
- 内存接收器(用于调试) - 输出作为内存表存储在内存中。 支持追加和完整输出模式。这应该用于低数据量的调试目的,因为整个输出被收集并存储在驱动程序的内存中。 因此,请谨慎使用。
writeStream
.format("memory")
.queryName("tableName")
.start()
某些接收器不是容错的,因为它们不保证输出的持久性,仅用于调试目的。请参阅前面关于 容错语义 的部分。 以下是Spark中所有接收器的详细信息。
接收器 | 支持的输出模式 | 选项 | 容错 | 备注 |
---|---|---|---|---|
文件接收器 | 追加 |
path
: 输出目录的路径,必须指定。
retention
: 输出文件的存活时间 (TTL)。已提交的输出文件如果超过TTL会在元数据日志中最终被排除。这意味着读取接收器的输出目录的查询可能无法处理它们。您可以提供字符串格式的时间值。(例如 "12h", "7d" 等)默认情况下是禁用的。
有关文件格式特定选项,请参见 DataFrameWriter 中的相关方法 ( Scala / Java / Python / R ). 例如,对于 "parquet" 格式选项,请参见
DataFrameWriter.parquet()
|
是(准确一次) | 支持写入分区表。按时间分区可能是有用的。 |
Kafka 接收器 | 追加,更新,完整 | 请参阅 Kafka 集成指南 | 是(至少一次) | 更多详细信息请参阅 Kafka 集成指南 |
Foreach 接收器 | 追加,更新,完整 | 无 | 是(至少一次) | 更多详细信息请参阅 下一节 |
ForeachBatch 接收器 | 追加,更新,完整 | 无 | 取决于实现 | 更多详细信息请参阅 下一节 |
控制台接收器 | 追加,更新,完整 |
numRows
: 每次触发时打印的行数(默认:20)
truncate
: 当输出过长时是否截断输出(默认:true)
|
否 | |
内存接收器 | 追加,完整 | 无 | 否。但是在完整模式下,重新启动的查询将重新创建整个表。 | 表名是查询名称。 |
请注意,您必须调用
start()
才能真正开始查询的执行。此操作返回一个 StreamingQuery 对象,它是对持续运行的执行的句柄。您可以使用此对象来管理查询,我们将在下一小节中讨论此内容。现在,让我们通过几个示例来理解这些内容。
# ========== 无聚合的 DF ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")
# 将新数据打印到控制台
noAggDF \
.writeStream \
.format("console") \
.start()
# 将新数据写入 Parquet 文件
noAggDF \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.option("path", "path/to/destination/dir") \
.start()
# ========== 带聚合的 DF ==========
aggDF = df.groupBy("device").count()
# 将更新的聚合打印到控制台
aggDF \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
# 在内存表中保留所有聚合。查询名称将成为表名
aggDF \
.writeStream \
.queryName("aggregates") \
.outputMode("complete") \
.format("memory") \
.start()
spark.sql("select * from aggregates").show() # 交互查询内存表
// ========== 无聚合的DF ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")
// 将新数据输出到控制台
noAggDF
.writeStream
.format("console")
.start()
// 将新数据写入Parquet文件
noAggDF
.writeStream
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start()
// ========== 带聚合的DF ==========
val aggDF = df.groupBy("device").count()
// 将更新的聚合输出到控制台
aggDF
.writeStream
.outputMode("complete")
.format("console")
.start()
// 在内存表中保存所有聚合
aggDF
.writeStream
.queryName("aggregates") // 该查询名称将成为表名
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show() // 交互查询内存表
// ========== 无聚合的DF ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");
// 将新数据打印到控制台
noAggDF
.writeStream()
.format("console")
.start();
// 将新数据写入Parquet文件
noAggDF
.writeStream()
.format("parquet")
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start();
// ========== 有聚合的DF ==========
Dataset<Row> aggDF = df.groupBy("device").count();
// 将更新后的聚合打印到控制台
aggDF
.writeStream()
.outputMode("complete")
.format("console")
.start();
// 在内存表中保存所有聚合
aggDF
.writeStream()
.queryName("aggregates") // 这个查询名称将作为表名称
.outputMode("complete")
.format("memory")
.start();
spark.sql("select * from aggregates").show(); // 交互式查询内存表
# ========== 无聚合的 DF ==========
noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")
# 将新数据打印到控制台
write.stream(noAggDF, "console")
# 将新数据写入 Parquet 文件
write.stream(noAggDF,
"parquet",
path = "path/to/destination/dir",
checkpointLocation = "path/to/checkpoint/dir")
# ========== 有聚合的 DF ==========
aggDF <- count(groupBy(df, "device"))
# 将更新后的聚合打印到控制台
write.stream(aggDF, "console", outputMode = "complete")
# 在内存表中保存所有聚合。查询名称将是表名
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")
# 交互查询内存表
head(sql("select * from aggregates"))
使用 Foreach 和 ForeachBatch
foreach
和
foreachBatch
操作允许您对流查询的输出应用任意操作和编写逻辑。它们的使用案例略有不同 -
foreach
允许对每一行进行自定义写入逻辑,而
foreachBatch
允许对每个微批次的输出进行任意操作和自定义逻辑。让我们更详细地了解它们的用法。
遍历每批次
foreachBatch(...)
允许您指定一个函数,该函数在每个微批次的流查询的输出数据上执行。自 Spark 2.4 起,Scala、Java 和 Python 都支持此功能。它接受两个参数:一个包含微批次输出数据的 DataFrame 或 Dataset,以及微批次的唯一 ID。
def foreach_batch_function(df, epoch_id):
# 转换并写入 batchDF
pass
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
// 转换并写入 batchDF
}.start()
streamingDatasetOfString.writeStream().foreachBatch(
new VoidFunction2<Dataset<String>, Long>() {
public void call(Dataset<String> dataset, Long batchId) {
// 转换并写入 batchDF
}
}
).start();
R 还不被支持。
使用
foreachBatch
,您可以执行以下操作。
-
重用现有的批处理数据源
- 对于许多存储系统,可能还没有可用的流处理接收器,但可能已经存在用于批处理查询的数据写入器。使用
foreachBatch
,您可以在每个微批次的输出上使用批处理数据写入器。 - 写入多个位置 - 如果您想将流处理查询的输出写入多个位置,那么您可以简单地多次写入输出的 DataFrame/Dataset。然而,每次写入尝试都可能导致输出数据被重新计算(包括可能重新读取输入数据)。为了避免重新计算,您应该缓存输出的 DataFrame/Dataset,将其写入多个位置,然后取消缓存。这是一个大致的框架。
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // 位置 1
batchDF.write.format(...).save(...) // 位置 2
batchDF.unpersist()
}
-
应用额外的 DataFrame 操作
- 许多 DataFrame 和 Dataset 操作在流处理 DataFrame 中不被支持,因为在这些情况下 Spark 不支持生成增量计划。使用
foreachBatch
,您可以对每个微批处理输出应用其中一些操作。然而,您必须自己推理执行该操作的端到端语义。
注意:
-
默认情况下,
foreachBatch
只提供至少一次的写入保证。然而,您可以使用提供给函数的 batchId 作为删除重复输出的方式,以获得准确一次的保证。 -
foreachBatch
不适用于连续处理模式,因为它从根本上依赖于流查询的微批处理执行。如果您在连续模式下写入数据,请改用foreach
。
遍历
如果
foreachBatch
不是一个选项(例如,相应的批处理数据写入器不存在,或者连续处理模式),那么您可以使用
foreach
表达您的自定义写入逻辑。具体而言,您可以通过将数据写入逻辑划分为三个方法来表达它:
open
、
process
和
close
。自 Spark 2.4 起,
foreach
可以在 Scala、Java 和 Python 中使用。
在Python中,你可以通过两种方式调用foreach:在函数中或在对象中。 函数提供了一种简单的方法来表达你的处理逻辑,但不允许你在某些输入数据因失败而重新处理时去重生成的数据。 在这种情况下,你必须在对象中指定处理逻辑。
- 首先,函数接受一行作为输入。
def process_row(row):
# 将行写入存储
pass
query = streamingDF.writeStream.foreach(process_row).start()
- 其次,对象有一个处理方法和可选的打开和关闭方法:
class ForeachWriter:
def open(self, partition_id, epoch_id):
# 打开连接。这个方法在Python中是可选的。
pass
def process(self, row):
# 将行写入连接。这个方法在Python中是必需的。
pass
def close(self, error):
# 关闭连接。这个方法在Python中是可选的。
pass
query = streamingDF.writeStream.foreach(ForeachWriter()).start()
在Scala中,您必须扩展类
ForeachWriter
(
文档
)。
streamingDatasetOfString.writeStream.foreach(
new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// 打开连接
}
def process(record: String): Unit = {
// 将字符串写入连接
}
def close(errorOrNull: Throwable): Unit = {
// 关闭连接
}
}
).start()
在Java中,您必须扩展类
ForeachWriter
(
文档
)。
streamingDatasetOfString.writeStream().foreach(
new ForeachWriter<String>() {
@Override public boolean open(long partitionId, long version) {
// 打开连接
}
@Override public void process(String record) {
// 将字符串写入连接
}
@Override public void close(Throwable errorOrNull) {
// 关闭连接
}
}
).start();
R 还不被支持。
执行语义 当流查询启动时,Spark以以下方式调用函数或对象的方法:
-
这个对象的单个副本负责查询中单个任务生成的所有数据。换句话说,一个实例负责以分布式方式处理生成数据的一个分区。
-
这个对象必须是可序列化的,因为每个任务将获得提供对象的新序列化-反序列化副本。因此,强烈建议任何写入数据的初始化(例如,打开连接或开始事务)是在调用open()方法之后进行的,这标志着任务准备好生成数据。
-
方法的生命周期如下:
-
对于每个分区,使用partition_id:
-
对于每个批次/流数据的epoch,使用epoch_id:
-
调用方法 open(partitionId, epochId)。
-
如果 open(…) 返回 true,针对分区和批次/流的每一行,调用方法 process(row)。
-
在处理行时,调用方法 close(error),其中 error 为处理时遇到的错误(如果有的话)。
-
-
-
-
如果存在 open() 方法且成功返回 (与返回值无关),则调用 close() 方法(如果存在),除非 JVM 或 Python 进程在中间崩溃。
-
注意: Spark 不保证 (partitionId, epochId) 的输出相同,因此无法通过 (partitionId, epochId) 实现去重。比如,某些原因导致源提供不同数量的分区,Spark 优化改变了分区的数量等。有关详细信息,请参见 SPARK-28650 。如果您需要输出的去重,可以尝试使用
foreachBatch
。
流式表 API
自Spark 3.1以来,您还可以使用
DataStreamReader.table()
读取表作为流式DataFrame,并使用
DataStreamWriter.toTable()
将流式DataFrame写入表:
spark = ... # spark 会话
# 创建一个流式 DataFrame
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 10) \
.load()
# 将流式 DataFrame 写入表
df.writeStream \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.toTable("myTable")
# 检查表格结果
spark.read.table("myTable").show()
# 转换源数据集并写入新表
spark.readStream \
.table("myTable") \
.select("value") \
.writeStream \
.option("checkpointLocation", "path/to/checkpoint/dir") \
.format("parquet") \
.toTable("newTable")
# 检查新表结果
spark.read.table("newTable").show()
val spark: SparkSession = ...
// 创建一个流式 DataFrame
val df = spark.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load()
// 将流式 DataFrame 写入表
df.writeStream
.option("checkpointLocation", "path/to/checkpoint/dir")
.toTable("myTable")
// 检查表结果
spark.read.table("myTable").show()
// 转换源数据集并写入新表
spark.readStream
.table("myTable")
.select("value")
.writeStream
.option("checkpointLocation", "path/to/checkpoint/dir")
.format("parquet")
.toTable("newTable")
// 检查新表结果
spark.read.table("newTable").show()
SparkSession spark = ...
// 创建一个流式 DataFrame
Dataset<Row> df = spark.readStream()
.format("rate")
.option("rowsPerSecond", 10)
.load();
// 将流式 DataFrame 写入表
df.writeStream()
.option("checkpointLocation", "path/to/checkpoint/dir")
.toTable("myTable");
// 检查表的结果
spark.read().table("myTable").show();
// 转换源数据集并写入新表
spark.readStream()
.table("myTable")
.select("value")
.writeStream()
.option("checkpointLocation", "path/to/checkpoint/dir")
.format("parquet")
.toTable("newTable");
// 检查新表的结果
spark.read().table("newTable").show();
在R中不可用。
有关更多详细信息,请查看 DataStreamReader ( Scala / Java / Python 文档) 和 DataStreamWriter ( Scala / Java / Python 文档)。
触发器
流查询的触发器设置定义了流数据处理的时机,查询是作为具有固定批次间隔的微批处理查询执行,还是作为连续处理查询执行。这里是支持的不同类型的触发器。
触发器类型 | 描述 |
---|---|
未指定(默认) | 如果没有明确指定触发器设置,则默认情况下,查询将以微批处理模式执行, 微批将在前一个微批完成处理后立即生成。 |
固定间隔微批处理 |
查询将在微批处理模式下执行,微批将在用户指定的间隔内启动。
|
一次性微批处理 (已弃用) | 查询将仅执行 一个 微批来处理所有可用数据,然后 自动停止。这在您希望定期启动集群的场景中很有用, 处理自上一个周期以来所有可用的数据,然后关闭集群。在某些情况下,这可能导致显著的成本节省。 请注意,此触发器已弃用,建议用户迁移到 可用微批处理 , 因为它提供了更好的处理保证、更细粒度的批处理规模以及更好的渐进处理 水印进展,包括无数据批处理。 |
可用微批处理 |
类似于一次性微批触发器,查询将处理所有可用数据,然后
自动停止。不同之处在于,它将基于源选项(例如
maxFilesPerTrigger
用于文件源)
以(可能)多个微批的方式处理数据,这将导致更好的查询可扩展性。
|
带有固定检查点间隔的连续处理
(实验性) |
查询将以新的低延迟、连续处理模式执行。有关此内容的更多信息, 请参阅下面的 连续处理部分 。 |
这里有一些代码示例。
# 默认触发器(尽快运行微批处理)
df.writeStream \
.format("console") \
.start()
# 处理时间触发器,微批间隔为两秒
df.writeStream \
.format("console") \
.trigger(processingTime='2 seconds') \
.start()
# 单次触发器(已有废弃,建议使用现可用触发器)
df.writeStream \
.format("console") \
.trigger(once=True) \
.start()
# 现可用触发器
df.writeStream \
.format("console") \
.trigger(availableNow=True) \
.start()
# 连续触发器,检查点间隔为一秒
df.writeStream
.format("console")
.trigger(continuous='1 second')
.start()
import org.apache.spark.sql.streaming.Trigger
// 默认触发器(尽快运行微批处理)
df.writeStream
.format("console")
.start()
// 处理时间触发器,带有两秒微批间隔
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime.("2 seconds"))
.start()
// 一次性触发器(已弃用,鼓励使用可用现在触发器)
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start()
// 可用现在触发器
df.writeStream
.format("console")
.trigger(Trigger.AvailableNow())
.start()
// 连续触发器,带有一秒的检查点间隔
df.writeStream
.format("console")
.trigger(Trigger.Continuous.("1 second"))
.start()
import org.apache.spark.sql.streaming.Trigger
// 默认触发器(尽快运行微批处理)
df.writeStream
.format("console")
.start();
// 处理时间触发器,微批处理间隔为两秒
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start();
// 一次性触发器(已弃用,建议使用可用现在触发器)
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start();
// 可用现在触发器
df.writeStream
.format("console")
.trigger(Trigger.AvailableNow())
.start();
// 连续触发器,检查点间隔为一秒
df.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.start();
# 默认触发器(尽快运行微批处理)
write.stream(df, "console")
# 处理时间触发器,微批间隔为两秒
write.stream(df, "console", trigger.processingTime = "2 seconds")
# 一次性触发器
write.stream(df, "console", trigger.once = TRUE)
# 目前不支持连续触发器
管理流查询
当查询开始时创建的
StreamingQuery
对象可以用于监控和管理查询。
query = df.writeStream.format("console").start() # 获取查询对象
query.id() # 获取在重启时从检查点数据中持久存在的运行查询的唯一标识符
query.runId() # 获取此查询运行的唯一 ID,该 ID 在每次启动/重启时生成
query.name() # 获取自动生成或用户指定的名称
query.explain() # 打印查询的详细解释
query.stop() # 停止查询
query.awaitTermination() # 阻塞直到查询终止,使用 stop() 或发生错误
query.exception() # 如果查询因错误而终止,则为异常
query.recentProgress # 此查询的最近进度更新列表
query.lastProgress # 此流查询的最新进度更新
val query = df.writeStream.format("console").start() // 获取查询对象
query.id // 获取在从检查点数据重启时持久化的运行查询的唯一标识符
query.runId // 获取此查询运行的唯一 ID,该 ID 将在每次启动/重启时生成
query.name // 获取自动生成或用户指定的名称
query.explain() // 打印查询的详细解释
query.stop() // 停止查询
query.awaitTermination() // 阻塞直到查询终止,使用stop()或错误
query.exception // 如果查询因错误而终止,则为异常
query.recentProgress // 此查询最近的进度更新数组
query.lastProgress // 此流查询的最新进度更新
StreamingQuery query = df.writeStream().format("console").start(); // 获取查询对象
query.id(); // 获取持久化从检查点数据恢复的运行查询的唯一标识符
query.runId(); // 获取此查询运行的唯一 ID,该 ID 在每次开始/重启时生成
query.name(); // 获取自动生成或用户指定名称的名称
query.explain(); // 打印查询的详细说明
query.stop(); // 停止查询
query.awaitTermination(); // 阻塞直到查询被终止,可以因为 stop() 或错误
query.exception(); // 如果查询因错误被终止,则返回异常
query.recentProgress(); // 此查询的最近进度更新数组
query.lastProgress(); // 此流查询的最新进度更新
query <- write.stream(df, "console") # 获取查询对象
queryName(query) # 获取自动生成或用户指定名称
explain(query) # 打印查询的详细解释
stopQuery(query) # 停止查询
awaitTermination(query) # 阻塞,直到查询终止,使用stop()或出现错误
lastProgress(query) # 此流查询的最新进度更新
您可以在一个 SparkSession 中启动任意数量的查询。它们将同时运行,共享集群资源。您可以使用
sparkSession.streams()
来获取
StreamingQueryManager
(
Scala
/
Java
/
Python
文档)
可用于管理当前活动的查询。
spark = ... # spark 会话
spark.streams.active # 获取当前活跃的流查询列表
spark.streams.get(id) # 根据唯一 id 获取查询对象
spark.streams.awaitAnyTermination() # 阻塞直到其中任意一个终止
val spark: SparkSession = ...
spark.streams.active // 获取当前活动流查询的列表
spark.streams.get(id) // 通过唯一ID获取查询对象
spark.streams.awaitAnyTermination() // 阻塞直到其中一个终止
SparkSession spark = ...
spark.streams().active(); // 获取当前活动流查询的列表
spark.streams().get(id); // 通过唯一 id 获取查询对象
spark.streams().awaitAnyTermination(); // 阻塞直到其中任何一个终止
在R中不可用。
监控流查询
监控活动流查询有多种方法。您可以使用Spark的Dropwizard Metrics支持将指标推送到外部系统,或以编程方式访问它们。
交互式阅读指标
您可以直接使用
streamingQuery.lastProgress()
和
streamingQuery.status()
获取活跃查询的当前状态和指标。
lastProgress()
返回一个
StreamingQueryProgress
对象
在
Scala
和
Java
以及一个具有相同字段的字典在Python中。它包含关于流的最后一个触发器中所做进展的所有信息 - 处理了什么数据,
处理速度、延迟等是什么。还有
streamingQuery.recentProgress
,它返回最近几次进展的数组。
此外,
streamingQuery.status()
返回一个
StreamingQueryStatus
对象
在
Scala
和
Java
以及在Python中具有相同字段的字典。它提供有关
查询当前正在做什么的信息 - 触发器是否处于活动状态,数据是否正在处理,等等。
这里有一些例子。
query = ... # 一个 StreamingQuery
print(query.lastProgress)
'''
将打印类似以下内容。
{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''
print(query.status)
'''
将打印类似以下内容。
{u'message': u'等待数据到达', u'isTriggerActive': False, u'isDataAvailable': False}
'''
val query: StreamingQuery = ...
println(query.lastProgress)
/* 将会打印类似以下内容。
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
println(query.status)
/* 将会打印类似以下内容。
{
"message" : "等待数据到达",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/
StreamingQuery query = ...
System.out.println(query.lastProgress());
/* 将打印如下内容。
{
"id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
"runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
"name" : "MyQuery",
"timestamp" : "2016-12-14T18:45:24.873Z",
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0,
"durationMs" : {
"triggerExecution" : 3,
"getOffset" : 2
},
"eventTime" : {
"watermark" : "2016-12-14T18:45:24.873Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[topic-0]]",
"startOffset" : {
"topic-0" : {
"2" : 0,
"4" : 1,
"1" : 1,
"3" : 1,
"0" : 1
}
},
"endOffset" : {
"topic-0" : {
"2" : 0,
"4" : 115,
"1" : 134,
"3" : 21,
"0" : 534
}
},
"numInputRows" : 10,
"inputRowsPerSecond" : 120.0,
"processedRowsPerSecond" : 200.0
} ],
"sink" : {
"description" : "MemorySink"
}
}
*/
System.out.println(query.status());
/* 将打印如下内容。
{
"message" : "等待数据到达",
"isDataAvailable" : false,
"isTriggerActive" : false
}
*/
query <- ... # 一个 StreamingQuery
lastProgress(query)
'''
将打印类似以下内容。
{
"id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
"runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
"name" : null,
"timestamp" : "2017-04-26T08:27:28.835Z",
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 0,
"triggerExecution" : 1
},
"stateOperators" : [ {
"numRowsTotal" : 4,
"numRowsUpdated" : 0
} ],
"sources" : [ {
"description" : "TextSocketSource[host: localhost, port: 9999]",
"startOffset" : 1,
"endOffset" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
}
}
'''
status(query)
'''
将打印类似以下内容。
{
"message" : "等待数据到达",
"isDataAvailable" : false,
"isTriggerActive" : false
}
'''
使用异步API以编程方式报告指标
您还可以通过附加一个
StreamingQueryListener
(
Scala
/
Java
/
Python
文档)异步监控与
SparkSession
关联的所有查询。 一旦您使用
sparkSession.streams.addListener()
附加了自定义的
StreamingQueryListener
对象,您将在查询开始和停止时以及在活动查询中取得进展时获得回调。以下是一个示例,
spark = ...
class Listener(StreamingQueryListener):
def onQueryStarted(self, event):
print("查询已开始: " + queryStarted.id)
def onQueryProgress(self, event):
print("查询进度更新: " + queryProgress.progress)
def onQueryTerminated(self, event):
print("查询已结束: " + queryTerminated.id)
spark.streams.addListener(Listener())
val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("查询开始: " + queryStarted.id)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("查询结束: " + queryTerminated.id)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("查询进度更新: " + queryProgress.progress)
}
})
SparkSession spark = ...
spark.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("查询开始: " + queryStarted.id());
}
@Override
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("查询结束: " + queryTerminated.id());
}
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("查询进度: " + queryProgress.progress());
}
});
在R中不可用。
使用Dropwizard报告指标
Spark支持使用
Dropwizard库
报告指标。为了使结构化流查询的指标也能被报告,您必须在SparkSession中显式启用配置
spark.sql.streaming.metricsEnabled
。
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# 或者
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// 或者
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
// 或者
spark.sql("SET spark.sql.streaming.metricsEnabled=true");
sql("设置 spark.sql.streaming.metricsEnabled=true")
启用此配置后,在SparkSession中启动的所有查询将通过Dropwizard报告指标到已配置的 接收器 (例如,Ganglia、Graphite、JMX等)。
使用检查点从故障中恢复
在发生故障或故意关闭的情况下,您可以恢复先前查询的进度和状态,并从暂停的地方继续。这个过程是通过检查点和预写日志来实现的。您可以为查询配置一个检查点位置,查询将保存所有进度信息(即每个触发器中处理的偏移量范围)和运行中的聚合(例如 快速示例 中的单词计数)到检查点位置。这个检查点位置必须是一个HDFS兼容文件系统中的路径,并且可以在 启动查询 时在DataStreamWriter中设置为选项。
aggDF \
.writeStream \
.outputMode("complete") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.format("memory") \
.start()
aggDF
.writeStream
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start()
aggDF
.writeStream()
.outputMode("complete")
.option("checkpointLocation", "path/to/HDFS/dir")
.format("memory")
.start();
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")
流查询中变更后的恢复语义
在从相同检查点位置重新启动时,对流查询允许的更改有一些限制。以下是几种不允许的更改,或者更改的效果没有明确定义。对于所有这些:
-
术语 allowed 意味着您可以进行指定的更改,但其效果的语义是否明确定义取决于查询和更改。
-
术语 not allowed 意味着您不应进行指定的更改,因为重新启动的查询可能会因不可预测的错误而失败。
sdf
代表一个通过 sparkSession.readStream 生成的流式 DataFrame/Dataset。
变更类型
-
输入源的数量或类型(即不同的源)变化 :这是不允许的。
-
输入源参数的变化 :这是否被允许,以及变化的语义是否明确定义,取决于源和查询。以下是一些示例。
-
允许增加/删除/修改速率限制:
spark.readStream.format("kafka").option("subscribe", "topic")
到spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
-
对订阅主题/文件的修改通常是不允许的,因为结果是不确定的:
spark.readStream.format("kafka").option("subscribe", "topic")
到spark.readStream.format("kafka").option("subscribe", "newTopic")
-
-
输出接收器类型的变化 :在特定的接收器组合之间的变化是允许的。这需要根据具体情况进行验证。以下是一些示例。
-
文件接收器到Kafka接收器是允许的。Kafka将只看到新数据。
-
Kafka接收器到文件接收器是不允许的。
-
Kafka接收器更改为foreach,或相反是允许的。
-
-
输出接收器参数的变化 :这是否被允许,以及变化的语义是否明确定义,取决于接收器和查询。以下是一些示例。
-
文件接收器的输出目录的变化是不允许的:
sdf.writeStream.format("parquet").option("path", "/somePath")
到sdf.writeStream.format("parquet").option("path", "/anotherPath")
-
输出主题的变化是允许的:
sdf.writeStream.format("kafka").option("topic", "someTopic")
到sdf.writeStream.format("kafka").option("topic", "anotherTopic")
-
用户定义的foreach接收器(即
ForeachWriter
代码)的变化是允许的,但变化的语义取决于代码。
-
-
投影/过滤/类似映射操作的变化 :某些情况是允许的。例如:
-
允许添加/删除过滤器:
sdf.selectExpr("a")
到sdf.where(...).selectExpr("a").filter(...)
。 -
具有相同输出模式的投影的变化是允许的:
sdf.selectExpr("stringColumn AS json").writeStream
到sdf.selectExpr("anotherStringColumn AS json").writeStream
-
具有不同输出模式的投影的变化在有条件情况下是允许的:
sdf.selectExpr("a").writeStream
到sdf.selectExpr("b").writeStream
只有在输出接收器允许从"a"
到"b"
的模式变化时才被允许。
-
-
有状态操作的变化 :某些流查询中的操作需要保持状态数据,以便持续更新结果。结构化流处理自动将状态数据检查点保存到容错存储中(例如,HDFS,AWS S3,Azure Blob存储),并在重新启动后恢复。然而,这假定状态数据的模式在重启时保持不变。这意味着 在重启之间,流查询的有状态操作的任何更改(即添加、删除或模式修改)都不允许 。以下是有状态操作的列表,它们的模式在重启之间不应更改,以确保状态恢复:
-
流聚合 :例如,
sdf.groupBy("a").agg(...)
。聚合键或聚合类型的任何变化都是不允许的。 -
流去重 :例如,
sdf.dropDuplicates("a")
。去重列的数量或类型的任何变化都是不允许的。 -
流-流连接 :例如,
sdf1.join(sdf2, ...)
(即两个输入都是通过sparkSession.readStream
生成的)。模式或等值连接列的变化是不允许的。连接类型(外部或内部)的变化是不允许的。连接条件的其他变化是不明确的。 -
任意有状态操作 :例如,
sdf.groupByKey(...).mapGroupsWithState(...)
或sdf.groupByKey(...).flatMapGroupsWithState(...)
。用户定义状态的任何模式变化和超时类型的变化都是不允许的。用户定义的状态映射函数中的任何变化都是允许的,但变化的语义效果取决于用户定义的逻辑。如果您真的想支持状态模式变化,则可以使用支持模式迁移的编码/解码方案,显式地将您的复杂状态数据结构编码/解码为字节。例如,如果您将状态保存为Avro编码的字节,则在查询重启之间,您可以自由更改Avro状态模式,因为二进制状态将始终成功恢复。
-
异步进度追踪
它是什么?
异步进度跟踪允许流式查询在微批处理内异步并行地检查点进度,从而减少与维护偏移量日志和提交日志相关的延迟。
它是如何工作的?
结构化流依赖于持久化和管理偏移量作为查询处理的进度指示器。偏移量管理操作直接影响处理延迟,因为在这些操作完成之前无法进行数据处理。异步进度跟踪使流查询能够在不受这些偏移量管理操作影响的情况下检查点进度。
如何使用它?
以下代码片段提供了如何使用此功能的示例:
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
下面的表格描述了此功能的配置及其相关的默认值。
选项 | 值 | 默认值 | 描述 |
---|---|---|---|
asyncProgressTrackingEnabled | true/false | false | 启用或禁用异步进度跟踪 |
asyncProgressTrackingCheckpointIntervalMs | 毫秒 | 1000 | 我们提交偏移量和完成提交的时间间隔 |
限制
该功能的初始版本具有以下限制:
- 异步进度跟踪仅在使用Kafka Sink的无状态查询中受到支持
- 由于批次的偏移范围在发生故障的情况下可能会更改,因此该异步进度跟踪将不支持恰好一次的端到端处理。尽管许多接收器,如Kafka接收器,本身并不支持恰好一次的写入。
关闭设置
关闭异步进度跟踪可能会导致以下异常被抛出
java.lang.IllegalStateException: 批处理 x 不存在
此外,以下错误信息可能会打印在驱动程序日志中:
批次 x 的偏移日志不存在,这对于从偏移日志的最新批次 x 重新启动查询是必需的。请确保通过手动删除偏移文件,有两个后续偏移日志可用于最新的批次。请确保提交日志的最新批次等于或比偏移日志的最新批次早一个批次。
这是由于当启用异步进度跟踪时,框架不会对每个批次进行进度检查。这与没有使用异步进度跟踪时的情况不同。要解决此问题,只需重新启用“asyncProgressTrackingEnabled”,并将“asyncProgressTrackingCheckpointIntervalMs”设置为0,然后运行流式查询,直到至少处理了两个微批次。现在可以安全地禁用异步进度跟踪,并且重新启动查询应该正常进行。
连续处理
[实验性的]
连续处理 是在 Spark 2.3 中引入的一种新的实验性流式执行模式,它能够以低延迟(约 1 毫秒)和至少一次的容错保证来运行。与默认的 微批处理 引擎相比,后者能够实现准确一次的保证,但最佳延迟约为 100 毫秒。对于某些类型的查询(下面讨论),您可以选择以哪种模式执行它们,而无需修改应用程序逻辑(即无需更改 DataFrame/Dataset 操作)。
要在连续处理模式下运行支持的查询,您只需指定一个 连续触发器 和所需的检查点间隔作为参数。例如,
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.trigger(continuous="1 second") \ # 仅在查询中变化
.start()
import org.apache.spark.sql.streaming.Trigger
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.trigger(Trigger.Continuous("1 second")) // 仅在查询中更改
.start()
import org.apache.spark.sql.streaming.Trigger;
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.trigger(Trigger.Continuous("1 second")) // 仅在查询变化时
.start();
1秒的检查点间隔意味着连续处理引擎将每秒记录一次查询的进度。生成的检查点格式与微批处理引擎兼容,因此任何查询都可以使用任何触发器重新启动。例如,使用微批处理模式启动的支持查询可以在连续模式下重新启动,反之亦然。请注意,每次切换到连续模式时,您将获得至少一次的容错保证。
支持的查询
从Spark 2.4开始,仅支持以下类型的查询在连续处理模式中。
-
操作
: 仅支持类似映射的 Dataset/DataFrame 操作在连续模式下,即仅支持投影 (
select
,map
,flatMap
,mapPartitions
等) 和选择 (where
,filter
等).-
所有 SQL 函数都受支持,除了聚合函数(由于尚不支持聚合),
current_timestamp()
和current_date()
(使用时间的确定性计算很具挑战性)。
-
所有 SQL 函数都受支持,除了聚合函数(由于尚不支持聚合),
-
sources
:
- Kafka 源: 所有选项均受支持。
-
速率源: 适合测试。仅支持连续模式下的选项是
numPartitions
和rowsPerSecond
。
-
接收器
:
- Kafka 接收器: 所有选项均受支持。
- 内存接收器: 适合调试。
- 控制台接收器: 适合调试。所有选项均受支持。请注意,控制台将打印您在连续触发器中指定的每个检查点间隔。
有关它们的更多详细信息,请参见 输入源 和 输出汇 部分。虽然控制台汇适合测试,但使用Kafka作为源和汇可以最好地观察端到端低延迟处理,因为这允许引擎在输入数据可用的毫秒内处理数据并使结果在输出主题中可用。
注意事项
- 连续处理引擎启动多个长时间运行的任务,这些任务不断地从数据源读取数据,处理数据并持续写入数据接收端。查询所需的任务数量取决于查询可以并行从数据源读取的分区数量。因此,在启动连续处理查询之前,必须确保集群中有足够的核心来支持所有任务的并行运行。例如,如果您从一个有10个分区的Kafka主题读取数据,那么集群必须至少有10个核心,以便查询能够进行。
- 停止一个连续处理流可能会产生虚假任务终止警告。这些可以安全忽略。
- 目前对于失败的任务没有自动重试机制。任何故障都将导致查询停止,并需要从检查点手动重新启动。
附加信息
注意事项
-
查询运行后,几个配置是不可修改的。要更改它们,请放弃检查点并开始一个新的查询。这些配置包括:
-
spark.sql.shuffle.partitions
- 这是由于状态的物理分区:状态是通过对键应用哈希函数进行分区的,因此状态的分区数应保持不变。
-
如果您想要为有状态操作运行更少的任务,
coalesce
可以帮助避免不必要的重新分区。-
在
coalesce
之后,任务数(减少后)将保持不变,除非发生另一次洗牌。
-
在
-
spark.sql.streaming.stateStore.providerClass
: 为了正确读取查询的先前状态,状态存储提供者的类应保持不变。 -
spark.sql.streaming.multipleWatermarkPolicy
: 修改此项会导致查询包含多个水印时水印值不一致,因此该策略应保持不变。
-
进一步阅读
-
查看并运行
Scala
/
Java
/
Python
/
R
示例。
- 如何运行 Spark 示例 的说明
- 在 结构化流处理 Kafka 集成指南 中阅读有关与 Kafka 集成的内容
- 在 Spark SQL 编程指南 中阅读有关使用 DataFrames/Datasets 的更多详细信息
- 第三方博客文章
演讲
-
Spark Summit Europe 2017
- 使用Apache Spark中的结构化流处理实现简单、可扩展且容错的流处理 - 第1部分幻灯片/视频 , 第2部分幻灯片/视频
- 深入探讨结构化流处理中的有状态流处理 - 幻灯片/视频
-
Spark Summit 2016
- 深入探讨结构化流处理 - 幻灯片/视频
迁移指南
迁移指南现在已归档 在此页面 。