Spark Streaming 编程指南
注意
Spark Streaming 是 Spark 流处理引擎的前一代。现在已经不再更新 Spark Streaming,它是一个遗留项目。Spark 中有一个更新且更易于使用的流处理引擎,称为结构化流处理(Structured Streaming)。对于您的流处理应用程序和管道,您应该使用 Spark 结构化流处理。请查看 结构化流处理编程指南 。
概述
Spark Streaming是核心Spark API的一个扩展,能够实现可扩展的、高吞吐量的、容错的实时数据流处理。数据可以从许多来源获取,如Kafka、Kinesis或TCP套接字,并可以使用复杂的算法进行处理,这些算法通过高级函数如
map
、
reduce
、
join
和
window
表达。最后,处理后的数据可以推送到文件系统、数据库和实时仪表板。实际上,您可以在数据流上应用Spark的
机器学习
和
图形处理
算法。
内部工作原理如下。Spark Streaming接收实时输入数据流,并将数据分成批次,然后由Spark引擎处理这些批次,以生成最终的结果流。
Spark Streaming 提供了一种高层次的抽象,称为 离散化流 或 DStream ,它表示连续的数据流。DStreams 可以通过来自 Kafka 和 Kinesis 等源的输入数据流创建,或者通过在其他 DStreams 上应用高层次的操作创建。内部,DStream 被表示为一系列 RDDs 。
本指南向您展示如何使用DStreams开始编写Spark Streaming程序。您可以使用Scala、Java或Python(在Spark 1.2中引入)编写Spark Streaming程序,这些内容在本指南中均有介绍。您会发现本指南中有多个选项卡,可以让您选择不同语言的代码片段。
注意: 有一些API在Python中是不同的或者不可用的。在本指南中,您会发现标签 Python API 用于突出这些差异。
一个简单的例子
在我们详细介绍如何编写自己的 Spark Streaming 程序之前,让我们快速看看一个简单的 Spark Streaming 程序是什么样的。假设我们想要计算从监听 TCP 套接字的数据服务器接收到的文本数据中的单词数量。您只需要做以下几步。
首先,我们导入 StreamingContext ,这是所有流功能的主要入口点。我们创建一个具有两个执行线程和1秒批处理间隔的本地StreamingContext。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 创建一个具有两个工作线程和1秒批处理间隔的本地StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
在此上下文中,我们可以创建一个 DStream,表示来自 TCP 源的流数据,指定为主机名(例如
localhost
)和端口(例如
9999
)。
# 创建一个 DStream,连接到 hostname:port,例如 localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
这个
lines
DStream 代表将从数据服务器接收的数据流。该 DStream 中的每条记录是一行文本。接下来,我们想根据空格将这些行分割成单词。
# 将每一行拆分为单词
words = lines.flatMap(lambda line: line.split(" "))
flatMap
是一种一对多的 DStream 操作,它通过从源 DStream 中每个记录生成多个新记录来创建一个新的 DStream。在这种情况下,每行将被拆分为多个单词,而单词流被表示为
words
DStream。接下来,我们想要统计这些单词。
# 计算每个批次中的每个单词
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# 将此 DStream 中生成的每个 RDD 的前十个元素打印到控制台
wordCounts.pprint()
这个
words
DStream 进一步被映射(一对一转换)到一个
(word, 1)
对应的 DStream,然后被减少以获取每批数据中单词的频率。最后,
wordCounts.pprint()
将每秒打印出一些生成的计数。
请注意,当这些行被执行时,Spark Streaming 仅仅设置了它将在启动时执行的计算,并且尚未开始任何真正的处理。为了在所有转换设置完成后启动处理,我们最终调用
ssc.start() # 开始计算
ssc.awaitTermination() # 等待计算结束
完整的代码可以在 Spark Streaming 示例中找到
NetworkWordCount
。
首先,我们从 StreamingContext 导入 Spark Streaming 类的名称和一些隐式转换,以便为我们需要的其他类(如 DStream)添加有用的方法。 StreamingContext 是所有流处理功能的主要入口点。我们创建一个具有两个执行线程的本地 StreamingContext,以及 1 秒的批处理间隔。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // 从 Spark 1.3 开始不必要
// 创建一个本地的 StreamingContext, 有两个工作线程和 1 秒的批处理间隔。
// 主节点需要 2 个核心以防止出现饥饿现象。
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
在此上下文中,我们可以创建一个 DStream,表示来自 TCP 源的流数据,指定为主机名(例如
localhost
)和端口(例如
9999
)。
// 创建一个 DStream,它将连接到 hostname:port,例如 localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
这个
lines
DStream 表示从数据服务器接收的数据流。这个 DStream 中的每一条记录是一行文本。接下来,我们想通过空格字符将这些行拆分成单词。
// 将每一行拆分为单词
val words = lines.flatMap(_.split(" "))
flatMap
是一种一对多的 DStream 操作,通过从源 DStream 中的每条记录生成多个新记录来创建新的 DStream。在这种情况下,每行将被拆分成多个单词,单词流表示为
words
DStream。接下来,我们想要统计这些单词。
import org.apache.spark.streaming.StreamingContext._ // 自 Spark 1.3 起不再必要
// 统计每个批次中的每个单词
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// 将此 DStream 中生成的每个 RDD 的前十个元素打印到控制台
wordCounts.print()
这个
words
DStream 进一步被映射(一对一转换)为一个
(word, 1)
对的 DStream,然后进行归约以获取每批数据中单词的频率。最后,
wordCounts.print()
将每秒打印出生成的一些计数。
请注意,当这些代码行被执行时,Spark Streaming 仅设置将在启动时执行的计算,尚未真正开始处理。为了在所有转换设置完成后启动处理,我们最终调用
ssc.start() // 启动计算
ssc.awaitTermination() // 等待计算结束
完整的代码可以在 Spark Streaming 示例中找到
NetworkWordCount
。
首先,我们创建一个 JavaStreamingContext 对象, 这是所有流功能的主要入口点。我们创建一个具有两个执行线程的本地 StreamingContext,批处理间隔为 1 秒。
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// 创建一个具有两个工作线程和1秒批处理间隔的本地StreamingContext
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
在此上下文中,我们可以创建一个 DStream,表示来自 TCP 源的流数据,指定为主机名(例如
localhost
)和端口(例如
9999
)。
// 创建一个 DStream,将连接到 hostname:port,例如 localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
这个
lines
DStream 代表将从数据服务器接收的数据流。该数据流中的每条记录是一行文本。然后,我们想要通过空格将这些行分割成单词。
// 将每行拆分为单词
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
flatMap
是一个 DStream 操作,它通过从源 DStream 中的每个记录生成多个新记录来创建一个新的 DStream。在这种情况下,每一行将被拆分为多个单词,单词的流表示为
words
DStream。请注意,我们使用
FlatMapFunction
对象定义了转换。正如我们将在过程中发现的那样,Java API 中有许多这样的便利类,有助于定义 DStream 转换。
接下来,我们想要统计这些单词。
// 计算每个批次中的每个单词
JavaPairDStream<String, Integer> pairs =</
将
words
DStream 进一步映射(一对一转换)到
(word,
1)
对,使用
PairFunction
对象。然后,使用
Function2
对象将其归约以获取每批数据中单词的频率。最后,
wordCounts.print()
将打印每秒生成的几个计数。
请注意,当这些代码行执行时,Spark Streaming 只是在启动后设置将要执行的计算,实际处理尚未开始。要在所有转换设置完成后启动处理,我们最终调用
start
方法。
jssc.start(); // 开始计算
jssc.awaitTermination(); // 等待计算结束
完整代码可以在Spark Streaming示例中找到
JavaNetworkWordCount
。
如果您已经 下载 并 构建 了Spark,您可以按照以下方式运行此示例。您首先需要运行Netcat(在大多数类Unix系统中找到的一个小工具)作为数据服务器,使用
$ nc -lk 9999
然后,在另一个终端中,您可以使用以下命令启动示例
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
然后,在运行netcat服务器的终端中输入的任何行都会被计算并每秒在屏幕上打印一次。它的显示大致如下所示。
|
|
基本概念
接下来,我们将超越简单的例子,详细阐述Spark Streaming的基础知识。
链接
类似于 Spark,Spark Streaming 通过 Maven Central 提供。要编写您自己的 Spark Streaming 程序,您需要将以下依赖项添加到您的 SBT 或 Maven 项目中。
org.apache.spark
spark-streaming_2.12
3.5.3
提供
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.5.3" % "provided"
要从如Kafka和Kinesis等不在Spark Streaming核心API中的源中摄取数据,您需要将相应的工件
spark-streaming-xyz_2.12
添加到依赖项中。例如,一些常见的有如下。
源 | 工件 |
---|---|
Kafka | spark-streaming-kafka-0-10_2.12 |
Kinesis
|
spark-streaming-kinesis-asl_2.12 [亚马逊软件许可] |
有关最新的列表,请参考 Maven 仓库 以获取完整的支持源和工件列表。
初始化 StreamingContext
要初始化一个 Spark Streaming 程序,必须创建一个 StreamingContext 对象,这是所有 Spark Streaming 功能的主要入口点。
可以从一个 SparkContext 对象创建一个 StreamingContext 对象。
来自 pyspark 导入 SparkContext
来自 pyspark.streaming 导入 StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
参数
appName
是您希望在集群 UI 上显示的应用程序名称。
master
是一个
Spark、Mesos 或 YARN 集群 URL
,或者一个特殊的
“local[*]”
字符串,用于在本地模式下运行。实际上,当在集群上运行时,您不想在程序中硬编码
master
,而是
使用
spark-submit
启动应用程序
并在那接收它。然而,对于本地测试和单元测试,您可以传递 “local[*]” 以在进程中运行 Spark Streaming(检测本地系统中的核心数量)。
批量间隔必须根据您的应用程序的延迟要求和可用的集群资源来设置。有关更多详细信息,请参见 性能调优 部分。
可以从 SparkConf 对象创建一个 StreamingContext 对象。
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
参数
appName
是您希望在集群 UI 上显示的应用程序名称。
master
是一个
Spark、Mesos、Kubernetes 或 YARN 集群 URL
,或一个特殊的
“local[*]”
字符串,用于在本地模式下运行。实际上,当在集群上运行时,您不会想在程序中硬编码
master
,而是
使用
spark-submit
启动应用程序
并在那里接收它。但是,对于本地测试和单元测试,您可以传递 “local[*]” 来在进程中运行 Spark Streaming(检测本地系统中的核心数量)。请注意,这内部创建了一个
SparkContext
(所有 Spark 功能的起始点),可以通过
ssc.sparkContext
访问。
批处理间隔必须根据您的应用程序的延迟要求和可用的集群资源设置。请参阅 性能调优 部分获取更多详细信息。
一个
StreamingContext
对象也可以从一个现有的
SparkContext
对象中创建。
import org.apache.spark.streaming._
val sc = ... // 现有的 SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
一个 JavaStreamingContext 对象可以从一个 SparkConf 对象中创建。
import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
参数
appName
是您的应用程序在集群 UI 上显示的名称。
master
是一个
Spark、Mesos 或 YARN 集群 URL
,或一个特殊的
“local[*]”
字符串,用于在本地模式下运行。实际上,当在集群上运行时,您不想在程序中硬编码
master
,而是应该
使用
spark-submit
启动应用程序
并在其中接收它。然而,对于本地测试和单元测试,您可以传递 “local[*]”来在进程中运行 Spark Streaming。请注意,这内部创建了一个
JavaSparkContext
(所有 Spark 功能的起点),可以通过
ssc.sparkContext
访问。
批处理间隔必须根据您的应用程序的延迟要求和可用的集群资源进行设置。有关更多详细信息,请参阅 性能调优 部分。
一个
JavaStreamingContext
对象也可以从现有的
JavaSparkContext
创建。
导入 org.apache.spark.streaming.api.java.*;
JavaSparkContext sc = ... //现有的 JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
在上下文定义后,您需要执行以下操作。
- 通过创建输入 DStreams 定义输入源。
- 通过对 DStreams 应用转换和输出操作定义流计算。
-
使用
streamingContext.start()
开始接收数据并处理它。 -
使用
streamingContext.awaitTermination()
等待处理被停止(手动或因任何错误)。 -
可以使用
streamingContext.stop()
手动停止处理。
注意事项:
- 一旦上下文被启动,就无法设置或添加新的流计算。
- 一旦上下文被停止,便无法重新启动。
- 在一个JVM中只能有一个活动的StreamingContext。
-
对StreamingContext调用stop()也会停止SparkContext。要仅停止StreamingContext,设置
stop()
的可选参数stopSparkContext
为false。 - SparkContext可以被重复使用以创建多个StreamingContexts,只要在创建下一个StreamingContext之前,先停止之前的StreamingContext(而不停止SparkContext)。
离散化流 (DStreams)
离散流 或 DStream 是Spark Streaming提供的基本抽象。 它表示一个连续的数据流,可以是从来源接收到的输入数据流, 也可以是通过变换输入流生成的处理 数据流。在内部, DStream由一系列连续的RDD表示,RDD是Spark对不可变的、 分布式数据集的抽象(有关更多详细信息,请参见 Spark 编程指南 )。DStream中的每个RDD包含来自某个间隔的数据, 如下图所示。
对DStream应用的任何操作都会转化为对基础RDDs的操作。例如,在
之前的例子
中,将一系列行转换为单词的过程,
flatMap
操作被应用于
lines
DStream中的每个RDD,以生成
words
DStream的RDDs。这在下图中显示。
这些底层的RDD转换是由Spark引擎计算的。DStream操作隐藏了大多数这些细节,并为开发者提供了一个更高级别的API以方便使用。这些操作将在后面的章节中详细讨论。
输入 DStreams 和接收器
输入 DStreams 是表示从流媒体源接收的输入数据流的 DStreams。在
快速示例
中,
lines
是一个输入 DStream,因为它代表了从 netcat 服务器接收的数据流。每个输入 DStream(除文件流,在本节稍后讨论)都与一个
接收器
(
Scala 文档
,
Java 文档
) 对象相关联,该对象从源接收数据并将其存储在 Spark 的内存中以进行处理。
Spark Streaming 提供两类内置流源。
- 基本数据源 : 在StreamingContext API中直接可用的数据源。示例:文件系统和套接字连接。
- 高级数据源 : 像Kafka、Kinesis等数据源通过额外的工具类可用。这些需要链接额外的依赖,如 链接 部分所讨论的。
我们将在本节稍后讨论每个类别中存在的一些来源。
请注意,如果您希望在流式应用程序中并行接收多个数据流,您可以创建多个输入 DStreams(在 性能调优 部分中进一步讨论)。这样会创建多个接收器,这些接收器将同时接收多个数据流。但请注意,Spark 工作者/执行者是一个长时间运行的任务,因此它占用了分配给 Spark Streaming 应用程序的一个核心。因此,重要的是记住,Spark Streaming 应用程序需要分配足够的核心(或者,如果在本地运行,则分配足够的线程)来处理接收到的数据,以及运行接收器。
需要记住的要点
-
在本地运行 Spark Streaming 程序时,请不要将“local”或“local[1]”用作主节点 URL。这两种方式都意味着仅使用一个线程在本地运行任务。如果您使用基于接收器的输入 DStream(例如,套接字、Kafka 等),那么单个线程将用于运行接收器,从而没有线程用于处理接收到的数据。因此,在本地运行时,请始终使用“local[ n ]”作为主节点 URL,其中 n > 要运行的接收器数量(有关如何设置主节点的信息,请参见 Spark 属性 )。
-
将逻辑扩展到在集群上运行时,分配给 Spark Streaming 应用程序的核心数量必须大于接收器的数量。否则系统将接收数据,但无法处理它。
基本来源
我们已经查看了
ssc.socketTextStream(...)
在
快速示例
中,它通过一个TCP套接字连接接收的文本数据创建了一个DStream。除了套接字,StreamingContext API还提供了从文件作为输入源创建DStreams的方法。
文件流
要从任何与HDFS API兼容的文件系统(即HDFS、S3、NFS等)读取数据,可以通过
StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]
创建一个DStream。
文件流不需要运行接收器,因此无需为接收文件数据分配任何核心。
对于简单的文本文件,最简单的方法是
StreamingContext.textFileStream(dataDirectory)
。
fileStream
在 Python API 中不可用;只有
textFileStream
可用。
streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
对于文本文件
streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
对于文本文件
streamingContext.textFileStream(dataDirectory);
目录如何被监控
Spark Streaming 将监视目录
dataDirectory
并处理在该目录中创建的任何文件。
-
可以监控一个简单的目录,例如
"hdfs://namenode:8040/logs/"
. 在该路径下的所有文件将被处理,随着它们的发现。 -
可以提供一个
POSIX glob 模式
,例如
"hdfs://namenode:8040/logs/2017/*"
. 在这里,DStream将由匹配该模式的目录中的所有文件组成。 也就是说:这是一个目录模式,而不是目录中文件的模式。 - 所有文件必须采用相同的数据格式。
- 文件根据其修改时间被认为属于某个时间段,而不是其创建时间。
- 一旦处理,当前窗口内文件的更改将不会导致文件被重新读取。 也就是说: 更新被忽略 。
- 目录下的文件越多,扫描更改所需的时间就越长——即使没有文件被修改。
-
如果使用通配符来识别目录,例如
"hdfs://namenode:8040/logs/2016-*"
, 将整个目录重命名为匹配该路径将把该目录添加到监控目录的列表中。只有修改时间在当前窗口内的目录中的文件将会包含在流中。 -
调用
FileSystem.setTimes()
来修正时间戳是一种在后续窗口中捕获文件的方法,即使其内容没有更改。
将对象存储作为数据源
“完整”文件系统,如HDFS,倾向于在创建输出流后立即设置其文件的修改时间。当一个文件被打开时,即使数据尚未完全写入,它也可能被包含在
DStream
中 - 之后在同一窗口内对文件的更新将被忽略。也就是说:更改可能会被遗漏,数据可能会从流中省略。
为了确保更改在窗口中被检测到,请将文件写入一个不受监控的目录,然后,在输出流关闭后立即将其重命名为目标目录。只要重命名的文件在其创建的窗口期间出现在扫描的目标目录中,就会检测到新数据。
相比之下,像 Amazon S3 和 Azure Storage 这样的对象存储通常具有较慢的重命名操作,因为数据实际上是被复制的。此外,重命名的对象可能会将
rename()
操作的时间作为其修改时间,因此可能不会被视为原始创建时间所暗示的时间窗口的一部分。
需要对目标对象存储进行仔细测试,以验证存储的时间戳行为是否与 Spark Streaming 预期的一致。直接写入目标目录可能是通过所选对象存储流式数据的合适策略。
有关此主题的更多详细信息,请参考 Hadoop 文件系统规范 。
基于自定义接收器的流
DStreams可以通过自定义接收器接收的数据流创建。有关更多详细信息,请参阅 自定义接收器指南 。
作为流的RDD队列
要测试一个使用测试数据的Spark Streaming应用程序,可以基于RDD队列创建DStream,使用
streamingContext.queueStream(queueOfRDDs)
。推送到队列中的每个RDD将被视为DStream中的一批数据,并像流一样进行处理。
有关来自套接字和文件的流的更多详细信息,请参阅相关函数的API文档,在 StreamingContext 中查看 Scala, JavaStreamingContext 用于Java,以及 StreamingContext 用于Python。
高级来源
Python API 截至 Spark 3.5.3, 在这些来源中,Kafka 和 Kinesis 在 Python API 中可用。
这一类源需要与外部非Spark库进行接口,其中一些具有复杂的依赖关系(例如,Kafka)。因此,为了最小化与依赖项版本冲突相关的问题,从这些源创建DStreams的功能已被移至可以在必要时明确 链接 的单独库中。
请注意,这些高级数据源在Spark shell中不可用,因此基于这些高级数据源的应用程序无法在shell中进行测试。如果您确实想在Spark shell中使用它们,您需要下载相应的Maven构件的JAR及其依赖项,并将其添加到类路径中。
这些高级来源包括以下内容。
-
Kafka: Spark Streaming 3.5.3 与 Kafka broker 版本 0.10 或更高版本兼容。有关更多详细信息,请参阅 Kafka 集成指南 。
-
Kinesis: Spark Streaming 3.5.3 与 Kinesis 客户端库 1.2.1 兼容。有关更多详细信息,请参阅 Kinesis 集成指南 。
自定义来源
Python API 这在Python中尚不支持。
输入 DStreams 也可以从自定义数据源创建。您所要做的就是实现一个用户定义的 接收器 (请参见下一节以了解这是什么),该接收器可以从自定义源接收数据并将其推送到 Spark。有关详细信息,请参阅 自定义接收器指南 。
接收器可靠性
根据其 可靠性 ,数据源可以分为两种类型。源(例如 Kafka)允许被传输的数据被确认。如果接收来自这些 可靠 源的数据的系统正确地确认了接收到的数据,就可以确保不会因为任何类型的故障而丢失数据。这导致了两种类型的接收者:
- 可靠接收器 - 一个 可靠接收器 在数据被接收并在 Spark 中存储成功之后,会正确地向可靠源发送确认。
- 不可靠接收器 - 一个 不可靠接收器 不会向源发送确认。这可以用于不支持确认的源,或者在不想或不需要处理确认复杂性时,也可以用于可靠源。
如何编写可靠接收器的详细信息在 自定义接收器指南 中讨论。
DStreams上的转换
与 RDDs 相似,转换允许对输入 DStream 的数据进行修改。DStreams 支持许多可用的普通 Spark RDD 的转换。一些常见的转换如下。
转换 | 含义 |
---|---|
map ( func ) | 返回一个新的 DStream,通过将源 DStream 的每个元素传递给函数 func 。 |
flatMap ( func ) | 与 map 类似,但每个输入项可以映射到 0 或多个输出项。 |
filter ( func ) | 通过仅选择源 DStream 中 func 返回 true 的记录返回一个新的 DStream。 |
repartition ( numPartitions ) | 通过创建更多或更少的分区来改变此 DStream 的并行级别。 |
union ( otherStream ) | 返回一个新的 DStream,包含源 DStream 和 otherDStream 中元素的并集。 |
count () | 通过计算源 DStream 中每个 RDD 的元素数量,返回一个单元素 RDD 的新 DStream。 |
reduce ( func ) | 通过使用函数 func (接受两个参数并返回一个)聚合源 DStream 中每个 RDD 的元素,返回一个单元素 RDD 的新 DStream。该函数应是结合性和交换性,以便可以并行计算。 |
countByValue () | 在类型为 K 的元素的 DStream 上调用时,返回一个新的 (K, Long) 对的 DStream,其中每个键的值是其在源 DStream 每个 RDD 中的频率。 |
reduceByKey ( func , [ numTasks ]) |
在 (K, V) 对的 DStream 上调用时,返回一个新的 (K, V) 对的 DStream,其中每个键的值通过给定的归约函数聚合。
注意:
默认情况下,使用 Spark 的默认并行任务数(本地模式下为 2,在集群模式下由配置属性
spark.default.parallelism
决定)进行分组。您可以传递一个可选的
numTasks
参数来设置不同数量的任务。
|
join ( otherStream , [ numTasks ]) | 在两个 (K, V) 和 (K, W) 对的 DStream 上调用时,返回所有每个键的元素对的 (K, (V, W)) 新 DStream。 |
cogroup ( otherStream , [ numTasks ]) | 在 (K, V) 和 (K, W) 对的 DStream 上调用时,返回一个新的 (K, Seq[V], Seq[W]) 元组的 DStream。 |
transform ( func ) | 通过将 RDD-to-RDD 函数应用于源 DStream 的每个 RDD,返回一个新的 DStream。可以用于对 DStream 进行任意 RDD 操作。 |
updateStateByKey ( func ) | 返回一个新的“状态”DStream,其中每个键的状态通过对该键的先前状态和新值应用给定函数进行更新。这可以用于维护每个键的任意状态数据。 |
这些转换中的一部分值得更详细地讨论。
按键更新状态操作
这个
updateStateByKey
操作允许您在不断更新的新信息的同时维护任意状态。要使用此功能,您需要完成两个步骤。
- 定义状态 - 状态可以是任意数据类型。
- 定义状态更新函数 - 使用函数指定如何根据前一个状态和来自输入流的新值更新状态。
在每个批处理中,Spark将对所有现有键应用状态更新函数,无论它们在批处理中是否有新数据。如果更新函数返回
None
,则该键值对将被消除。
让我们用一个例子来说明这一点。假设您想维护一个文本数据流中每个单词的运行计数。在这里,运行计数是状态,它是一个整数。我们将更新函数定义为:
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # 将新的值与之前的运行计数相加以获得新的计数
这应用于包含单词的 DStream(例如,
pairs
DStream 包含
(word,
1)
对,在
之前的例子
中)。
runningCounts = pairs.updateStateByKey(updateFunction)
更新函数将对每个单词被调用,
newValues
将拥有一系列的 1(来自
(word, 1)
对),而
runningCount
将拥有之前的计数。有关完整的 Python 代码,请查看示例
stateful_network_wordcount.py
。
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // 将新值与之前的运行计数相加以获得新计数
Some(newCount)
}
这应用于包含单词的 DStream(例如,
pairs
DStream 包含
(word,
1)
对,在
前面的例子
中)。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
更新函数将为每个单词调用,
newValues
将具有一系列 1(来自
(word, 1)
对),而
runningCount
将具有前一个计数。
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
(values, state) -> {
Integer newSum = ... // 将新值与之前的运行计数相加以获得新计数
return Optional.of(newSum);
};
这适用于包含单词的 DStream(假设是
pairs
DStream,包含
(word, 1)
对的
快速示例
)。
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);
更新函数将针对每个单词被调用,
newValues
拥有一系列的 1(来自
(word, 1)
对),
runningCount
则拥有之前的计数。有关完整的
Java 代码,请查看示例
JavaStatefulNetworkWordCount.java
。
请注意,使用
updateStateByKey
需要配置检查点目录,这在
检查点
部分进行了详细讨论。
变换操作
这项
transform
操作(以及它的变体如
transformWith
)允许在 DStream 上应用任意的 RDD 到 RDD 函数。它可以用于应用任何在 DStream API 中未暴露的 RDD 操作。
例如,将数据流中的每个批次与另一个数据集连接的功能并未直接在 DStream API 中暴露。然而,您可以轻松地使用
transform
来实现这一点。这使得非常强大的可能性。例如,可以通过将输入数据流与预计算的垃圾邮件信息(可能也是使用 Spark 生成的)结合进行实时数据清理,然后基于此进行过滤。
spamInfoRDD = sc.pickleFile(...) # 包含垃圾邮件信息的RDD
# 将数据流与垃圾邮件信息连接以进行数据清理
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 包含垃圾邮件信息的RDD
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // 将数据流与垃圾邮件信息连接以进行数据清理
...
}
import org.apache.spark.streaming.api.java.*;
// 包含垃圾信息的RDD
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
rdd.join(spamInfoRDD).filter(...); // 将数据流与垃圾信息连接以进行数据清洗
...
});
请注意,提供的函数在每个批处理间隔内都会被调用。这使您能够进行时间变化的RDD操作,也就是说,RDD操作、分区数量、广播变量等可以在批处理之间进行更改。
窗口操作
Spark Streaming 还提供了 窗口计算 ,允许您对一段滑动的数据窗口应用转化。下图说明了这个滑动窗口。
如图所示,每次窗口 滑动 经过一个源 DStream 时,落在窗口内的源 RDD 会被结合并进行操作,以产生窗口化 DStream 的 RDD。这种特定情况下,操作应用于最后 3 个时间单位的数据,并滑动 2 个时间单位。这表明任何窗口操作需要指定两个参数。
- 窗口长度 - 窗口的持续时间(图中的3)。
- 滑动间隔 - 执行窗口操作的间隔(图中的2)。
这两个参数必须是源 DStream 批处理间隔(图中的 1)的倍数。
让我们通过一个示例来说明窗口操作。假设您想通过生成最近30秒数据的词频,每10秒钟进行一次扩展
先前示例
。为此,我们必须在最近30秒的数据的
pairs
DStream上应用
reduceByKey
操作,数据为
(word, 1)
对。这是通过操作
reduceByKeyAndWindow
完成的。
# 每10秒减少最近30秒的数据
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
// 每10秒处理最近30秒的数据
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
// 每10秒减少最后30秒的数据
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
一些常见的窗口操作如下。所有这些操作都采用上述两个参数 - windowLength 和 slideInterval 。
转换 | 含义 |
---|---|
window ( windowLength , slideInterval ) | 返回一个新DStream,该DStream基于源DStream的窗口批次进行计算。 |
countByWindow ( windowLength , slideInterval ) | 返回流中元素的滑动窗口计数。 |
reduceByWindow ( func , windowLength , slideInterval ) | 返回一个新的单元素流,通过使用 func 对流中的元素在滑动区间内进行聚合创建。该函数应该是结合性和交换性的,以便可以正确并行计算。 |
reduceByKeyAndWindow ( func , windowLength , slideInterval , [ numTasks ]) |
当在(DStream (K, V) 对)上调用时,返回一个新的(DStream (K, V) 对),其中每个键的值通过给定的减少函数
func
在滑动窗口的批次上聚合。
注意:
默认情况下,这使用Spark的默认并行任务数量(本地模式为2,在集群模式中数量由配置属性
spark.default.parallelism
决定)来进行分组。 您可以传递一个可选的
numTasks
参数来设置不同的任务数量。
|
reduceByKeyAndWindow ( func , invFunc , windowLength , slideInterval , [ numTasks ]) |
上面
|
countByValueAndWindow ( windowLength , slideInterval , [ numTasks ]) |
当在(DStream (K, V) 对)上调用时,返回一个新的(DStream (K, Long) 对),其中每个键的值是在滑动窗口内的频率。 如在
reduceByKeyAndWindow
中,减少任务的数量可以通过可选参数进行配置。
|
连接操作
最后,值得强调的是,您可以在Spark Streaming中轻松执行各种类型的连接。
流-流连接
流可以很容易地与其他流连接。
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);
在这里,在每个批处理间隔中,由
stream1
生成的 RDD 将与由
stream2
生成的 RDD 连接。您还可以执行
leftOuterJoin
、
rightOuterJoin
、
fullOuterJoin
。此外,在流的数据窗口上进行连接通常非常有用。这也很简单。
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
流数据集连接
这在之前解释
DStream.transform
操作时已经展示过了。这里还有一个将窗口化流与数据集进行连接的示例。
dataset = ... # 一些 RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));
实际上,您还可以动态更改要连接的 dataset。传递给
transform
的函数在每个批处理间隔内进行评估,因此将使用当前
dataset
引用所指向的数据集。
完整的DStream变换列表可在API文档中找到。有关Scala API,请参见 DStream 和 PairDStreamFunctions 。有关Java API,请参见 JavaDStream 和 JavaPairDStream 。有关Python API,请参见 DStream 。
在DStreams上的输出操作
输出操作允许将DStream的数据推送到外部系统,如数据库或文件系统。由于输出操作实际上允许变换后的数据被外部系统消费,它们会触发所有DStream变换的实际执行(类似于RDD的行动)。目前,定义了以下输出操作:
输出操作 | 含义 |
---|---|
print () |
在运行流应用程序的驱动节点上打印每个数据批次的前十个元素。这对开发和调试非常有用。
Python API 在Python API中称为 pprint() 。 |
saveAsTextFiles ( prefix , [ suffix ]) | 将此DStream的内容保存为文本文件。在每个批处理间隔的文件名是基于 prefix 和 suffix 生成的: "prefix-TIME_IN_MS[.suffix]" 。 |
saveAsObjectFiles ( prefix , [ suffix ]) |
将此DStream的内容保存为
SequenceFiles
形式的序列化Java对象。在每个批处理间隔的文件名是基于
prefix
和
suffix
生成的:
"prefix-TIME_IN_MS[.suffix]"
。
Python API 此功能在Python API中不可用。 |
saveAsHadoopFiles ( prefix , [ suffix ]) |
将此DStream的内容保存为Hadoop文件。在每个批处理间隔的文件名是基于
prefix
和
suffix
生成的:
"prefix-TIME_IN_MS[.suffix]"
。
Python API 此功能在Python API中不可用。 |
foreachRDD ( func ) | 最通用的输出操作,将一个函数 func 应用于来自流的每个RDD。该函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件中,或通过网络写入数据库。请注意,函数 func 是在运行流应用程序的驱动进程中执行的,通常会包含强制计算流RDD的RDD操作。 |
使用 foreachRDD 的设计模式
dstream.foreachRDD
是一个强大的原语,用于将数据发送到外部系统。 然而,理解如何正确和高效地使用这个原语是很重要的。 一些常见的错误如下所示。
通常,将数据写入外部系统需要创建一个连接对象 (例如,TCP连接到远程服务器),并使用它将数据发送到远程系统。 为此,开发人员可能会不小心在Spark驱动程序中尝试创建一个连接对象, 然后在Spark工作节点中尝试使用它将记录保存到RDD中。 例如(在Scala中),
def sendRecord(rdd):
connection = createNewConnection() # 在驱动程序中执行
rdd.foreach(lambda record: connection.send(record))
connection.close()
dstream.foreachRDD(sendRecord)
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // 在驱动程序中执行
rdd.foreach { record =>
connection.send(record) // 在工作节点中执行
}
}
dstream.foreachRDD(rdd -> {
Connection connection = createNewConnection(); // 在驱动程序中执行
rdd.foreach(record -> {
connection.send(record); // 在工作节点中执行
});
});
这是不正确的,因为这需要将连接对象序列化并从驱动程序发送到工作节点。这种连接对象在机器之间很少可转移。此错误可能表现为序列化错误(连接对象不可序列化)、初始化错误(连接对象需要在工作节点初始化)等。正确的解决方案是在工作节点创建连接对象。
然而,这可能导致另一个常见的错误 - 为每条记录创建一个新连接。 例如,
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
dstream.foreachRDD(rdd -> {
rdd.foreach(record -> {
Connection connection = createNewConnection();
connection.send(record);
connection.close();
});
});
通常,创建连接对象会有时间和资源开销。因此,为每条记录创建和销毁一个连接对象可能会产生不必要的高开销,并且会显著降低系统的整体吞吐量。一种更好的解决方案是使用
rdd.foreachPartition
- 创建一个连接对象,并使用该连接发送 RDD 分区中的所有记录。
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = createNewConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
connection.close();
});
});
这将连接创建的开销分摊到多个记录上。
最后,这可以通过在多个RDD/batch之间重用连接对象进一步优化。 可以维护一个静态的连接对象池,以便在多个batch的RDD被推送到外部系统时重复使用,从而进一步减少开销。
def sendPartition(iter):
# ConnectionPool 是一个静态的、延迟初始化的连接池
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# 返回连接池以供未来重用
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool 是一个静态的、延迟初始化的连接池
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // 返回到池中以供将来重用
}
}
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
// 连接池是一个静态的、延迟初始化的连接池
Connection connection = ConnectionPool.getConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
ConnectionPool.returnConnection(connection); // 返回连接池以供未来重用
});
});
请注意,连接池中的连接应在需要时懒惰地创建,并且如果一段时间未使用则应超时。这实现了向外部系统发送数据的最高效率。
其他要记住的要点:
-
DStreams通过输出操作进行惰性执行,就像RDD通过RDD操作惰性执行一样。具体而言,DStream输出操作中的RDD操作强制处理接收到的数据。因此,如果您的应用程序没有任何输出操作,或者仅有像
dstream.foreachRDD()
这样的输出操作,但内部没有任何RDD操作,那么将不会执行任何操作。系统将仅接收数据并丢弃它。 -
默认情况下,输出操作是一次执行一个的。它们按照在应用程序中定义的顺序执行。
数据框和SQL操作
您可以轻松地在流数据上使用 DataFrames 和 SQL 操作。您必须使用 StreamingContext 正在使用的 SparkContext 创建一个 SparkSession。此外,这必须以可以在驱动程序故障时重新启动的方式完成。这是通过创建一个惰性实例化的 SparkSession 单例来实现的。在下面的示例中,它修改了之前的 单词计数示例 ,以使用 DataFrames 和 SQL 生成单词计数。每个 RDD 都被转换为 DataFrame,注册为临时表,然后使用 SQL 查询。
# 懒惰实例化的全局 SparkSession 实例
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
...
# 数据帧操作在你的流式程序内部
words = ... # 字符串的 DStream
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# 获取 SparkSession 的单实例
spark = getSparkSessionInstance(rdd.context.getConf())
# 将 RDD[String] 转换为 RDD[Row] 然后到 DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = spark.createDataFrame(rowRdd)
# 使用 DataFrame 创建一个临时视图
wordsDataFrame.createOrReplaceTempView("words")
# 使用 SQL 对表进行单词计数并打印
wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
except:
pass
words.foreachRDD(process)
查看完整的 源代码 。
/** 在您的流处理程序中进行 DataFrame 操作 */
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// 获取 SparkSession 的单例实例
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// 将 RDD[String] 转换为 DataFrame
val wordsDataFrame = rdd.toDF("word")
// 创建一个临时视图
wordsDataFrame.createOrReplaceTempView("words")
// 使用 SQL 对 DataFrame 进行单词计数并打印结果
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
查看完整的 source code 。
/** 用于将 RDD 转换为 DataFrame 的 Java Bean 类 */
public class JavaRow implements java.io.Serializable {
private String word;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
...
/** 在您的流处理程序中的 DataFrame 操作 */
JavaDStream<String> words = ...
words.foreachRDD((rdd, time) -> {
// 获取 SparkSession 的单例实例
SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
// 将 RDD[String] 转换为 RDD[case class] 以创建 DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
JavaRow record = new JavaRow();
record.setWord(word);
return record;
});
DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
// 使用 DataFrame 创建临时视图
wordsDataFrame.createOrReplaceTempView("words");
// 使用 SQL 对表进行单词计数并打印结果
DataFrame wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word");
wordCountsDataFrame.show();
});
查看完整的 源代码 。
您还可以在来自不同线程的流数据定义的表上运行 SQL 查询(也就是说,与正在运行的 StreamingContext 异步)。只需确保您设置 StreamingContext 以记住足够数量的流数据,以便查询可以运行。否则,StreamingContext 不会意识到任何异步 SQL 查询,因此在查询完成之前,它将删除旧的流数据。例如,如果您想查询最后一批数据,但您的查询可能需要 5 分钟才能运行,则请调用
streamingContext.remember(Minutes(5))
(在 Scala 中或在其他语言中的等效方法)。
请参阅 数据框和SQL 指南以了解有关数据框的更多信息。
MLlib 操作
您还可以轻松使用 MLlib 提供的机器学习算法。首先,有流式机器学习算法(例如 流式线性回归 、 流式K均值 等),这些算法可以同时从流数据中学习并应用模型于流数据。除此之外,对于更大范围的机器学习算法,您可以离线学习一个学习模型(即使用历史数据),然后将该模型在线应用于流数据。有关更多详细信息,请参见 MLlib 指南。
缓存 / 持久性
与RDD类似,DStreams也允许开发人员在内存中持久化数据流的数据。也就是说,使用
persist()
方法在DStream上将自动在内存中持久化该DStream的每个RDD。如果DStream中的数据将被多次计算(例如,在同一数据上进行多次操作),这将非常有用。对于基于窗口的操作,如
reduceByWindow
和
reduceByKeyAndWindow
以及基于状态的操作,如
updateStateByKey
,这是隐式成立的。因此,基于窗口操作生成的DStreams会自动在内存中持久化,而无需开发人员调用
persist()
。
对于通过网络接收数据的输入流(例如,Kafka、套接字等),默认的持久性级别设置为将数据复制到两个节点以实现容错。
请注意,与RDD不同,DStream的默认持久化级别将在内存中保持数据序列化。这在 性能调优 部分中有进一步讨论。有关不同持久化级别的更多信息,可以在 Spark编程指南 中找到。
检查点
流媒体应用程序必须全天候运行,因此必须能够抵御与应用程序逻辑无关的故障(例如,系统故障、JVM崩溃等)。为了实现这一点,Spark Streaming 需要将足够的信息 检查点 到一个容错存储系统,以便可以从故障中恢复。需要进行检查的有两种类型的数据。
-
元数据检查点
- 将定义流计算的信息保存到像 HDFS 这样的容错存储中。这用于从运行流应用程序驱动程序的节点故障中恢复(稍后将详细讨论)。元数据包括:
- 配置 - 用于创建流应用程序的配置。
- DStream 操作 - 定义流应用程序的一组 DStream 操作。
- 不完整批次 - 作业已排队但尚未完成的批次。
- 数据检查点 - 将生成的 RDD 保存到可靠存储中。这在某些 有状态 转换中是必要的,这些转换跨多个批次组合数据。在这种转换中,生成的 RDD 依赖于以前批次的 RDD,这导致依赖链的长度随着时间的推移不断增加。为了避免恢复时间的这种无限制增长(与依赖链成正比),有状态转换的中间 RDD 会定期 检查点 到可靠存储中(例如 HDFS),以切断依赖链。
综上所述,元数据检查点主要用于从驱动程序故障中恢复,而如果使用有状态转换,则即使是基本功能,也需要数据或RDD检查点。
何时启用检查点
必须为具有以下任何要求的应用程序启用检查点:
-
有状态转换的使用
- 如果在应用程序中使用了
updateStateByKey
或reduceByKeyAndWindow
(带有逆函数),则必须提供检查点目录以便进行周期性的 RDD 检查点。 - 从运行应用程序的驱动程序故障中恢复 - 元数据检查点用于恢复进度信息。
请注意,简单的流式应用程序在没有上述有状态转换的情况下可以运行,而无需启用检查点。当发生驱动程序故障时,恢复也将是部分的(一些收到但未处理的数据可能会丢失)。这通常是可以接受的,许多人以这种方式运行 Spark Streaming 应用程序。对非 Hadoop 环境的支持预计将来会有所改善。
如何配置检查点
检查点可以通过在容错的、可靠的文件系统(例如,HDFS、S3等)中设置一个目录来启用,该目录将保存检查点信息。这是通过使用
streamingContext.checkpoint(checkpointDirectory)
来完成的。这将允许你使用上述有状态转换。此外,如果你希望应用程序能够从驱动程序故障中恢复,你应该重新编写你的流应用程序,使其具有以下行为。
- 当程序第一次启动时,它将创建一个新的 StreamingContext,设置所有流,然后调用 start()。
- 当程序在故障后重启时,它将从检查点目录中的检查点数据重新创建一个 StreamingContext。
这种行为通过使用
StreamingContext.getOrCreate
变得简单。它的使用方法如下。
# 函数用于创建和设置新的 StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # 新上下文
ssc = StreamingContext(...)
lines = ssc.socketTextStream(...) # 创建 DStreams
...
ssc.checkpoint(checkpointDirectory) # 设置检查点目录
return ssc
# 从检查点数据中获取 StreamingContext 或创建一个新的
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
# 在上下文上执行额外的设置,无论是启动还是重新启动都需要做
context. ...
# 启动上下文
context.start()
context.awaitTermination()
如果
checkpointDirectory
存在,那么上下文将从检查点数据中重新创建。如果目录不存在(即第一次运行),那么将调用函数
functionToCreateContext
来创建一个新的上下文并设置 DStreams。请参见 Python 示例
recoverable_network_wordcount.py
。该示例将网络数据的词频追加到文件中。
您还可以显式地从检查点数据创建一个
StreamingContext
并使用
StreamingContext.getOrCreate(checkpointDirectory, None)
开始计算。
通过使用
StreamingContext.getOrCreate
,这个行为变得简单。它的用法如下。
// 创建和设置新的 StreamingContext 的函数
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // 新的上下文
val lines = ssc.socketTextStream(...) // 创建 DStreams
...
ssc.checkpoint(checkpointDirectory) // 设置检查点目录
ssc
}
// 从检查点数据获取 StreamingContext 或创建一个新的
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// 在上下文上执行额外的设置,无论是启动还是重新启动都需要执行
context. ...
// 启动上下文
context.start()
context.awaitTermination()
如果
checkpointDirectory
存在,则上下文将从检查点数据中重新创建。 如果目录不存在(即第一次运行),则将调用函数
functionToCreateContext
来创建一个新的上下文并设置 DStreams。 请参阅 Scala 示例
RecoverableNetworkWordCount
。 该示例将网络数据的词频附加到文件中。
这个行为通过使用
JavaStreamingContext.getOrCreate
变得简单。其用法如下。
// 创建一个工厂对象,可以创建和设置新的 JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...); // 新建上下文
JavaDStream<String> lines = jssc.socketTextStream(...); // 创建 DStreams
...
jssc.checkpoint(checkpointDirectory); // 设置检查点目录
return jssc;
}
};
// 从检查点数据获取 JavaStreamingContext 或创建一个新的
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
// 执行对上下文的额外设置,
// 无论是启动还是重启都需要这样做
context. ...
// 启动上下文
context.start();
context.awaitTermination();
如果
checkpointDirectory
存在,则上下文将从检查点数据中重新创建。如果目录不存在(即第一次运行),则将调用函数
contextFactory
来创建新的上下文并设置 DStreams。请参阅 Java 示例
JavaRecoverableNetworkWordCount
。该示例将网络数据的单词计数追加到一个文件中。
除了使用
getOrCreate
之外,还需要确保驱动程序进程在失败时能够自动重启。这只能通过用于运行应用程序的部署基础设施来实现。这在
部署
部分中有进一步讨论。
请注意,RDD 的检查点会产生保存到可靠存储的成本。这可能会导致那些被检查点的 RDD 所在批次的处理时间增加。因此,检查点的间隔需要谨慎设置。在小批量大小(比如 1 秒)时,每个批次都进行检查点可能会显著降低操作吞吐量。相反,检查点过于频繁导致血缘和任务大小增长,这可能会产生不利影响。对于需要 RDD 检查点的有状态转换,默认间隔是批次间隔的倍数,至少为 10 秒。可以使用
dstream.checkpoint(checkpointInterval)
来设置。通常情况下,5 - 10 个 DStream 的滑动间隔的检查点间隔是一个不错的设置。
累加器、广播变量和检查点
累加器 和 广播变量 不能从 Spark Streaming 的检查点中恢复。如果您启用了检查点并且使用了 累加器 或 广播变量 ,您将需要为 累加器 和 广播变量 创建惰性实例化的单例实例,以便在驱动程序因故障重启后能够重新实例化。这在以下示例中显示。
def getWordExcludeList(sparkContext):
if ("wordExcludeList" not in globals()):
globals()["wordExcludeList"] = sparkContext.broadcast(["a", "b", "c"])
return globals()["wordExcludeList"]
def getDroppedWordsCounter(sparkContext):
if ("droppedWordsCounter" not in globals()):
globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
return globals()["droppedWordsCounter"]
def echo(time, rdd):
# 获取或注册 excludeList 广播
excludeList = getWordExcludeList(rdd.context)
# 获取或注册 droppedWordsCounter 累加器
droppedWordsCounter = getDroppedWordsCounter(rdd.context)
# 使用 excludeList 过滤单词并使用 droppedWordsCounter 计数
def filterFunc(wordCount):
if wordCount[0] in excludeList.value:
droppedWordsCounter.add(wordCount[1])
False
else:
True
counts = "在时间 %s %s 的计数" % (time, rdd.filter(filterFunc).collect())
wordCounts.foreachRDD(echo)
查看完整的 源代码 。
object WordExcludeList {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordExcludeList = Seq("a", "b", "c")
instance = sc.broadcast(wordExcludeList)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("DroppedWordsCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// 获取或注册 excludeList 广播
val excludeList = WordExcludeList.getInstance(rdd.sparkContext)
// 获取或注册 droppedWordsCounter 计数器
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// 使用 excludeList 过滤单词,并使用 droppedWordsCounter 计数
val counts = rdd.filter { case (word, count) =>
if (excludeList.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "在时间 " + time + " " + counts
})
查看完整的 源代码 。
class JavaWordExcludeList {
private static volatile Broadcast<ListString>> instance = null;
public static Broadcast<ListStringgetInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordExcludeList.class) {
if (instance == null) {
List<StringwordExcludeList = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordExcludeList);
}
}
}
return instance;
}
}
class JavaDroppedWordsCounter {
private static volatile LongAccumulator instance = null;
public static LongAccumulator getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.sc().longAccumulator("DroppedWordsCounter");
}
}
}
return instance;
}
}
wordCounts.foreachRDD((rdd, time) -> {
// 获取或注册 excludeList 广播
Broadcast<ListStringexcludeList = JavaWordExcludeList.getInstance(new JavaSparkContext(rdd.context()));
// 获取或注册 droppedWordsCounter 统计器
LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// 使用 excludeList 去掉单词,并使用 droppedWordsCounter 计数
String counts = rdd.filter(wordCount -> {
if (excludeList.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
} else {
return true;
}
}).collect().toString();
String output = "Counts at time " + time + " " + counts;
}
查看完整的 源代码 。
部署应用程序
本节讨论部署 Spark Streaming 应用程序的步骤。
要求
要运行Spark Streaming应用程序,您需要具备以下条件。
-
集群管理器中的集群 - 这是任何 Spark 应用程序的一般要求,详细讨论在 部署指南 中。
-
打包应用程序 JAR - 你必须将流应用程序编译成一个 JAR。如果你使用
spark-submit
来启动应用程序,那么你不需要在 JAR 中提供 Spark 和 Spark Streaming。然而,如果你的应用程序使用了 高级源 (例如 Kafka),那么你必须将它们链接到的额外工件和它们的依赖关系打包在用于部署应用程序的 JAR 中。例如,使用KafkaUtils
的应用程序必须在应用程序 JAR 中包含spark-streaming-kafka-0-10_2.12
及其所有传递依赖项。 -
为执行程序配置足够的内存 - 由于接收的数据必须存储在内存中,执行程序必须配置足够的内存来容纳接收到的数据。请注意,如果你正在进行 10 分钟窗口操作,系统必须至少在内存中保留最后 10 分钟的数据。因此,应用程序的内存需求取决于其中使用的操作。
-
配置检查点 - 如果流应用程序需要,它必须在 Hadoop API 兼容的容错存储(例如 HDFS、S3 等)中配置一个目录作为检查点目录,并将流应用程序编写成可以使用检查点信息进行故障恢复的方式。有关更多详细信息,请参阅 检查点 部分。
-
配置应用程序驱动程序的自动重启
- 为了自动恢复驱动程序故障,运行流应用程序的部署基础设施必须监控驱动程序进程,如果其失败则重新启动驱动程序。不同的
集群管理器
有不同的工具来实现这一点。
- Spark 独立模式 - Spark 应用程序驱动程序可以被提交到 Spark 独立集群中运行(请参见 集群部署模式 ),即应用程序驱动程序本身在一个工作节点上运行。此外,独立集群管理器可以被指示 监视 驱动程序,并在驱动程序因非零退出代码或运行驱动程序的节点故障而失败时重新启动它。有关更多详细信息,请参见 Spark 独立指南 中的 集群模式 和 监视 部分。
- YARN - YARN 支持类似的机制来自动重启应用程序。有关更多详细信息,请参阅 YARN 文档。
- Mesos - Marathon 已用于实现这一点与 Mesos。
-
配置写前日志 - 自 Spark 1.2 起,我们引入了 写前日志 以实现强大的容错保证。如果启用,所有从接收器接收到的数据都会写入配置检查点目录中的写前日志。这防止在驱动程序恢复时丢失数据,从而确保零数据丢失(在 容错语义 部分中详细讨论)。通过将 配置参数
spark.streaming.receiver.writeAheadLog.enable
设置为true
可以启用此功能。然而,这些更强的语义可能会影响单个接收器的接收吞吐量。通过运行 更多并行接收器 来增加总吞吐量可以纠正这一点。此外,建议在启用写前日志时禁用 Spark 内部接收数据的复制,因为日志已经存储在一个复制存储系统中。这可以通过将输入流的存储级别设置为StorageLevel.MEMORY_AND_DISK_SER
来完成。在使用 S3(或任何不支持刷新操作的文件系统)作为 写前日志 时,请记得启用spark.streaming.driver.writeAheadLog.closeFileAfterWrite
和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
。有关更多详细信息,请参见 Spark Streaming 配置 。请注意,当启用 I/O 加密时,Spark 将不加密写入写前日志的数据。如果希望加密写前日志数据,则应将其存储在本地支持加密的文件系统中。 -
设置最大接收速率
- 如果集群资源不足以使流应用程序以接收到的速度处理数据,接收器可以通过设置每秒记录的最大速率进行限速。请参见
配置参数
spark.streaming.receiver.maxRate
用于接收器,spark.streaming.kafka.maxRatePerPartition
用于直接 Kafka 方法。在 Spark 1.5 中,我们引入了一种名为 反压 的特性,它消除了设置此速率限制的需要,因为 Spark Streaming 会自动确定速率限制,并在处理条件变化时动态调整它们。通过将 配置参数spark.streaming.backpressure.enabled
设置为true
可以启用这种反压。
升级应用程序代码
如果一个正在运行的 Spark Streaming 应用程序需要升级新的应用代码,那么有两种可能的机制。
-
升级后的 Spark Streaming 应用程序已启动,并与现有应用并行运行。一旦新的应用程序(接收与旧应用程序相同的数据)已经预热并准备好投入使用,旧的应用程序就可以关闭。请注意,这适用于支持将数据发送到两个目标(即早期和升级后的应用程序)的数据源。
-
现有应用程序被优雅地关闭(参见
StreamingContext.stop(...)
或JavaStreamingContext.stop(...)
以确保在关闭之前已接收到的数据完全被处理。然后可以启动升级后的应用程序,它将从早期应用程序的上一个停留点开始处理。请注意,这仅适用于支持源端缓冲的数据源(如 Kafka),因为在前一个应用程序关闭期间,需要缓冲数据,而升级后的应用程序尚未启动。而且,无法从升级前代码的早期检查点信息重新启动。检查点信息本质上包含序列化的 Scala/Java/Python 对象,尝试使用新修改的类反序列化对象可能导致错误。在这种情况下,要么使用不同的检查点目录启动升级后的应用程序,要么删除之前的检查点目录。
监控应用程序
除了Spark的
监控能力
,还有一些特定于Spark Streaming的附加功能。当使用StreamingContext时,
Spark网页UI
显示一个额外的
Streaming
标签,显示有关运行接收器的统计信息(接收器是否处于活动状态、接收到的记录数量、接收器错误等)以及已完成的批处理(批处理时间、排队延迟等)。这可以用来监控流应用程序的进展。
在网页界面中,以下两个指标特别重要:
- 处理时间 - 每批数据的处理时间。
- 调度延迟 - 一批数据在队列中等待前面批次处理完毕的时间。
如果批处理时间持续超过批间隔和/或排队延迟不断增加,则表明系统无法以生成的速度处理批次,正在落后。在这种情况下,请考虑 减少 批处理时间。
Spark Streaming 程序的进度也可以通过 StreamingListener 接口进行监控, 该接口允许您获取接收器状态和处理时间。请注意,这是一个开发者 API, 未来可能会得到改进(即,报告更多信息)。
性能调优
在集群上获得Spark Streaming应用程序的最佳性能需要进行一些调优。本节解释了一些可以调整的参数和配置,以提高您的应用程序的性能。从高层次来看,您需要考虑两件事:
-
通过有效利用集群资源来减少每批数据的处理时间。
-
设置合适的批处理大小,使得数据批能够尽快处理(即数据处理能够跟上数据摄取的速度)。
减少批处理时间
可以在Spark中进行多项优化,以最小化每个批次的处理时间。这些内容在 调优指南 中已详细讨论。本节强调了一些最重要的优化。
数据接收中的并行级别
通过网络接收数据(如 Kafka、socket 等)需要将数据反序列化并存储在 Spark 中。如果数据接收成为系统中的瓶颈,则考虑对数据接收进行并行化。请注意,每个输入 DStream 创建一个单独的接收器(在工作机器上运行),该接收器接收单个数据流。因此,可以通过创建多个输入 DStreams 并配置它们接收来自源的不同数据流分区来实现接收多个数据流。例如,一个接收两个主题数据的单个 Kafka 输入 DStream 可以分为两个 Kafka 输入流,每个流仅接收一个主题。这将运行两个接收器,从而允许数据并行接收,从而增加整体吞吐量。这些多个 DStreams 可以联合在一起创建一个单一的 DStream。然后,可以将之前在单个输入 DStream 上应用的转换应用到统一流上。操作如下。
numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
另一个需要考虑的参数是接收器的块间隔,这由
配置参数
spark.streaming.blockInterval
决定。对于大多数接收器,接收到的数据会被聚合成数据块,然后存储在Spark的内存中。每个批次中的块数决定了将用于在类似map的转换中处理接收到的数据的任务数。每个接收器每批次的任务数量大约为(批次间隔 / 块间隔)。例如,200毫秒的块间隔将每2秒的批次创建10个任务。如果任务数量太低(即,少于每台机器的核心数量),则效率将不高,因为所有可用的核心不会被用于处理数据。要增加给定批次间隔的任务数量,请减少块间隔。然而,推荐的块间隔最小值约为50毫秒,低于此值时,任务启动开销可能会成为一个问题。
接收多个输入流/接收器数据的另一种替代方法是显式重新分区输入数据流(使用
inputStream.repartition(
)。这会在进一步处理之前,将接收到的数据批次分配到集群中指定数量的机器上。
对于直接流,请参考 Spark Streaming + Kafka 集成指南
数据处理中的并行级别
如果在计算的任何阶段中使用的并行任务数量不足,集群资源可能会被低效利用。例如,对于像
reduceByKey
和
reduceByKeyAndWindow
这样的分布式归约操作,默认的并行任务数量由
spark.default.parallelism
配置属性
控制。您可以将并行度作为参数传递(请参见
PairDStreamFunctions
文档),或者设置
spark.default.parallelism
配置属性
来更改默认值。
数据序列化
通过调整序列化格式,可以减少数据序列化的开销。在流式传输的情况下,有两种类型的数据正在被序列化。
-
输入数据 : 默认情况下,通过接收器接收到的输入数据存储在执行器的内存中,使用 StorageLevel.MEMORY_AND_DISK_SER_2 。也就是说,数据被序列化为字节以减少 GC 开销,并且进行了复制以容忍执行器故障。此外,数据首先保留在内存中,只有在内存不足以容纳所有输入数据以进行流计算时,才会溢出到磁盘。这种序列化显然有开销——接收器必须对接收到的数据进行反序列化并使用 Spark 的序列化格式进行重新序列化。
-
由流操作生成的持久化 RDDs : 通过流计算生成的 RDDs 可以保存在内存中。例如,窗口操作在内存中持久化数据,因为它们将被处理多次。然而,与 Spark Core 的默认设置 StorageLevel.MEMORY_ONLY 不同,通过流计算生成的持久化 RDDs 默认为使用 StorageLevel.MEMORY_ONLY_SER (即序列化)进行持久化,以最小化 GC 开销。
在这两种情况下,使用Kryo序列化可以减少CPU和内存开销。有关更多细节,请参阅 Spark 调优指南 。对于Kryo,考虑注册自定义类,并禁用对象引用跟踪(请参阅 配置指南 中的Kryo相关配置)。
在某些情况下,当需要为流应用保留的数据量不大时,可能可以将数据(两种类型)作为反序列化对象持久化,而不会产生过高的垃圾回收开销。例如,如果您使用的批处理间隔为几秒且没有窗口操作,则可以通过明确设置存储级别来尝试禁用持久化数据的序列化。这将减少由于序列化造成的CPU开销,可能在没有过多垃圾回收开销的情况下提高性能。
任务启动开销
如果每秒启动的任务数量很高(比如每秒 50 个或更多),那么向执行器发送任务的开销可能会很大,从而使得实现亚秒级的延迟变得困难。可以通过以下更改来减少开销:
- 执行模式 : 以独立模式或粗粒度 Mesos 模式运行 Spark 的任务启动时间比细粒度 Mesos 模式更好。请参考 在 Mesos 上运行指南 以获取更多详细信息。
这些变化可能将批处理时间减少到数百毫秒,从而使得亚秒级的批处理规模成为可行。
设置正确的批处理间隔
为了使在集群上运行的Spark Streaming应用程序保持稳定,系统应该能够以接收数据的速度处理数据。换句话说,数据批次应该以生成的速度被处理。可以通过 监控 流式Web UI中的处理时间来判断这一点,其中批处理时间应该小于批间隔。
根据流计算的性质,所使用的批处理间隔可能对应用在固定集群资源上可以维持的数据速率产生显著影响。例如,让我们考虑早期的 WordCountNetwork 示例。对于特定的数据速率,系统可能能够每 2 秒报告一次单词计数(即,批处理间隔为 2 秒),但不能每 500 毫秒报告一次。因此,批处理间隔需要设置为能够在生产中维持预期的数据速率。
确定您的应用程序的合适批处理大小的一个好方法是用保守的批处理间隔(例如,5-10秒)和低数据速率进行测试。要验证系统是否能够跟上数据速率,您可以检查每个处理批次所经历的端到端延迟的值(可以在Spark驱动程序的log4j日志中查找“Total delay”,或使用 StreamingListener 接口)。如果延迟保持在与批处理大小相当的水平,那么系统就是稳定的。否则,如果延迟持续增加,这意味着系统无法跟上,从而不稳定。一旦您对稳定配置有了了解,您可以尝试提高数据速率和/或减少批处理大小。请注意,由于临时数据速率增加而导致的短暂延迟增加可能是可以接受的,只要延迟回落到低值(即,低于批处理大小)。
内存调优
调整Spark应用的内存使用和GC行为在 调优指南 中已被详细讨论。强烈建议您阅读那部分内容。在本节中,我们将讨论一些特别针对Spark Streaming应用的调优参数。
Spark Streaming 应用程序所需的集群内存量在很大程度上取决于所使用的转换类型。例如,如果您想对过去 10 分钟的数据使用窗口操作,则您的集群应该有足够的内存来存储 10 分钟的数据。或者,如果您想使用
updateStateByKey
处理大量键,则所需的内存会很高。相反,如果您想进行简单的映射-过滤-存储操作,则所需的内存会很低。
一般来说,由于通过接收器接收到的数据是以 StorageLevel.MEMORY_AND_DISK_SER_2 存储的,无法存入内存的数据将会溢出到磁盘。这可能会降低流处理应用程序的性能,因此建议根据您的流处理应用程序提供足够的内存。最好是先在小规模上尝试并相应地估计内存使用情况。
内存调优的另一个方面是垃圾回收。对于需要低延迟的流应用程序,因JVM垃圾回收而导致的大暂停是不可取的。
有一些参数可以帮助您调整内存使用和垃圾回收开销:
-
流DStreams的持久性级别 : 如前面在 数据序列化 部分提到的,输入数据和RDD默认以序列化字节的形式持久化。这相比于反序列化持久化,降低了内存使用和GC开销。启用Kryo序列化进一步减少了序列化后的大小和内存使用。通过压缩(参见Spark配置
spark.rdd.compress
)还可以进一步减少内存使用,但代价是CPU时间。 -
清除旧数据 : 默认情况下,通过DStream转换生成的所有输入数据和持久化的RDD会自动清除。Spark Streaming
重要的注意事项:
-
一个DStream与一个单一的接收器相关联。为了实现读取并行性,需要创建多个接收器即多个DStreams。接收器在执行器中运行。它占用一个核心。确保在预订接收器槽后还有足够的核心用于处理,即
spark.cores.max
应考虑接收器槽位。接收器以轮询的方式分配给执行器。 -
当从流源接收到数据时,接收器创建数据块。每个 blockInterval 毫秒会生成一个新的数据块。在 batchInterval 期间会创建 N 个数据块,其中 N = batchInterval/blockInterval。这些块由当前执行器的 BlockManager 分发到其他执行器的块管理器。之后,驱动程序上运行的网络输入跟踪器会被告知块的位置以进行进一步处理。
-
在驱动器上为在 batchInterval 期间创建的块创建了一个 RDD。batchInterval 期间生成的块是 RDD 的分区。每个分区是spark中的一个任务。blockInterval== batchinterval 意味着只创建一个分区,并可能在本地处理。
-
块上的映射任务在执行器中处理(一个接收了块,另一个复制了块)无论块间隔如何,除非启动了非本地调度。更大的 blockInterval 意味着更大的块。较高的
spark.locality.wait
值增加了在本地节点上处理块的机会。需要在这两个参数之间找到一个平衡,以确保更大的块在本地处理。 -
您可以通过调用
inputDstream.repartition(n)
定义分区的数量,而不是依赖于 batchInterval 和 blockInterval。这会随机重新分配 RDD 中的数据,以创建 n 个分区。是的,目的是为了更高的并行性。虽然代价是需要一次洗牌。RDD 的处理由驱动程序的作业调度程序作为作业进行调度。在任何给定时刻,只有一个作业处于活动状态。因此,如果一个作业正在执行,其他作业将被排队。 -
如果您有两个 dstreams,将形成两个 RDD,并且将创建两个作业,这两个作业将一个接一个地调度。为了避免这种情况,您可以对两个 dstreams 进行 union。这将确保为两个 dstreams 的 RDD 形成一个单一的 unionRDD。然后,这个 unionRDD 被视为一个单一的作业。然而,RDD 的分区不会受到影响。
-
如果批处理时间超过 batchinterval,那么显然接收器的内存将开始填满,并最终会抛出异常(很可能是 BlockNotFoundException)。目前,没有办法暂停接收器。通过使用 SparkConf 配置
spark.streaming.receiver.maxRate
,可以限制接收器的速率。
容错语义
在本节中,我们将讨论在发生故障时,Spark Streaming 应用程序的行为。
背景
要理解Spark Streaming提供的语义,我们需要记住Spark的RDD的基本容错语义。
- RDD是一个不可变的、确定性可重计算的分布式数据集。每个RDD记住用于在容错输入数据集上创建它的确定性操作的血统。
- 如果由于工作节点故障而丢失RDD的任何分区,则可以使用操作的血统从原始容错数据集中重新计算该分区。
- 假设所有RDD转换都是确定性的,最终转换的RDD中的数据将始终相同,无论Spark集群中发生何种故障。
Spark 在像 HDFS 或 S3 这样的容错文件系统上操作数据。因此,所有从容错数据生成的 RDD 也都是容错的。然而,对于 Spark Streaming 来说,并非如此,因为在大多数情况下,数据是通过网络接收的(除非使用
fileStream
)。为了实现所有生成的 RDD 的相同容错属性,接收到的数据在集群中的工作节点中的多个 Spark 执行器之间进行复制(默认复制因子为 2)。这导致系统中存在两种在故障发生时需要恢复的数据:
- 接收到并复制的数据 - 这些数据可以承受单个工作节点的故障,因为它的副本存在于其他节点之一上。
- 接收到但缓冲以便复制的数据 - 由于这些数据尚未被复制,恢复这些数据的唯一方法是从源头重新获取。
此外,我们应该关注两种类型的故障:
- 工作节点的故障 - 任何运行执行器的工作节点都可能发生故障,所有在这些节点上的内存数据将丢失。如果任何接收器在故障节点上运行,那么它们的缓冲数据将会丢失。
- 驱动节点的故障 - 如果运行Spark Streaming应用程序的驱动节点发生故障,那么显然SparkContext会丢失,所有执行器及其内存数据也会丢失。
有了这些基本知识,让我们了解Spark Streaming的容错语义。
定义
流媒体系统的语义通常通过系统可以处理每条记录的次数来描述。在所有可能的操作条件下(尽管存在故障等),系统可以提供三种类型的保证。
- 至多一次 : 每条记录要么处理一次,要么根本不处理。
- 至少一次 : 每条记录将被处理一次或多次。这比 至多一次 更强,因为它确保不会丢失任何数据。但可能会有重复。
- 恰好一次 : 每条记录将被处理恰好一次——不会丢失数据,也不会多次处理数据。这显然是三者中最强的保证。
基本语义
在任何流处理系统中,广义上讲,处理数据的步骤有三个。
-
接收数据 : 数据通过接收器或其他方式从源中接收。
-
转换数据 : 接收到的数据通过DStream和RDD转换进行处理。
-
推送数据 : 最终转换的数据被推送到外部系统,如文件系统、数据库、仪表盘等。
如果一个流处理应用程序需要实现端到端的精确一次保证,那么每个步骤都必须提供精确一次的保证。也就是说,每条记录必须被接收一次且仅一次,转换一次且仅一次,并且推送到下游系统一次且仅一次。让我们在Spark Streaming的上下文中理解这些步骤的语义。
-
接收数据 : 不同的输入源提供不同的保证。这在下一个小节中将详细讨论。
-
转换数据 : 所有已接收的数据将被处理 准确一次 ,感谢 RDD 提供的保证。即使发生故障,只要接收到的输入数据可访问,最终转换后的 RDD 内容将始终相同。
-
推送数据 : 默认情况下,输出操作确保 至少一次 的语义,因为其依赖于输出操作的类型(幂等的或非幂等的)以及下游系统的语义(是否支持事务)。但是用户可以实现自己的事务机制以实现 准确一次 的语义。关于这一点将在本节后面详细讨论。
接收数据的语义
不同的输入源提供不同的保证,范围从 至少一次 到 准确一次 。阅读更多详细信息。
包含文件
如果所有的输入数据已经存在于像 HDFS 这样的容错文件系统中,Spark Streaming 总是可以从任何故障中恢复并处理所有数据。这提供了 exactly-once 语义,意味着所有数据无论发生什么故障都会被处理一次且仅一次。
使用基于接收器的源
对于基于接收器的输入源,容错语义依赖于故障场景和接收器的类型。正如我们 之前 讨论的,接收器有两种类型:
- 可靠接收器 - 这些接收器仅在确保接收到的数据已经复制后,才会对可靠源进行确认。如果这样的接收器发生故障,源将不会收到缓冲(未复制)数据的确认。因此,如果接收器被重新启动,源将重新发送数据,并且由于故障不会丢失任何数据。
- 不可靠接收器 - 这样的接收器 不会 发送确认,因此在由于工作或驱动程序故障而发生故障时, 可能 会丢失数据。
根据所使用的接收器类型,我们可以实现以下语义。如果工作节点失败,则使用可靠接收器不会导致数据丢失。使用不可靠接收器时,已接收但未复制的数据可能会丢失。如果驱动节点失败,则除了这些损失之外,所有过去接收并在内存中复制的数据都将丢失。这将影响状态转换的结果。
为了避免过去接收的数据丢失,Spark 1.2 引入了 写前日志 ,将接收到的数据保存到容错存储中。启用 写前日志 和可靠的接收器时,数据不会丢失。从语义上讲,它提供了一种至少一次的保证。
下表总结了故障下的语义:
部署场景 | 工作者故障 | 驱动程序故障 |
---|---|---|
Spark 1.1 或更早版本,
或者
Spark 1.2 或更高版本且未使用写前日志 |
使用不可靠接收器时,缓冲数据丢失
使用可靠接收器时,无数据丢失 至少一次语义 |
使用不可靠接收器时,缓冲数据丢失
所有接收器过去的数据丢失 未定义的语义 |
Spark 1.2 或更高版本且使用写前日志 |
使用可靠接收器时,无数据丢失
至少一次语义 |
使用可靠接收器和文件时,无数据丢失
至少一次语义 |
使用 Kafka 直接 API
在Spark 1.3中,我们引入了一种新的Kafka Direct API,这可以确保所有的Kafka数据被Spark Streaming精确接收一次。与此同时,如果您实现了精确一次的输出操作,您可以实现端到端的精确一次保证。有关此方法的进一步讨论,请参见 Kafka集成指南 。
输出操作的语义
输出操作(如
foreachRDD
)具有
至少一次
语义,即在工作节点失败的情况下,转化后的数据可能会多次写入外部实体。虽然对于使用
saveAs***Files
操作保存到文件系统来说这是可以接受的(因为文件会被相同的数据简单覆盖),但是为了实现准确一次的语义,可能需要额外的努力。有两种方法。
-
幂等更新 :多次尝试始终写入相同的数据。例如,
saveAs***Files
始终将相同的数据写入生成的文件中。 -
事务更新 :所有更新都以事务方式进行,以确保更新正好一次,以原子方式执行。一种实现方法如下。
-
使用批处理时间(在
foreachRDD
中可用)和 RDD 的分区索引来创建标识符。该标识符唯一标识流应用程序中的一个 blob 数据。 -
使用该标识符以事务方式更新外部系统(即,准确一次,以原子方式)。也就是说,如果该标识符尚未提交,则以原子方式提交分区数据和标识符。否则,如果该标识符已经提交,则跳过更新。
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // 使用此 uniqueId 以事务方式提交 partitionIterator 中的数据 } }
-
使用批处理时间(在