Spark Streaming + Kinesis 集成
Amazon Kinesis 是一种完全托管的服务,用于大规模实时处理流数据。 Kinesis 接收器使用 Amazon 根据 Amazon 软件许可证 (ASL) 提供的 Kinesis 客户端库 (KCL) 创建输入 DStream。 KCL 基于 Apache 2.0 许可证的 AWS Java SDK 之上,并通过 Workers、Checkpoints 和 Shard Leases 的概念提供负载均衡、容错和检查点功能。 在这里,我们解释如何配置 Spark Streaming 从 Kinesis 接收数据。
配置 Kinesis
可以在一个有效的 Kinesis 端点设置 Kinesis 数据流,每个数据流可以有 1 个或多个分片,具体请参见以下 指南 。
配置 Spark 流处理应用程序
-
链接: 对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,将您的流应用程序链接到以下工件(有关更多信息,请参阅主编程指南中的 链接部分 )。
groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.12 version = 3.5.3对于 Python 应用程序,您在部署应用程序时需要添加此库及其依赖项。请参阅下面的 部署 子章节。 请注意,通过链接到此库,您将包括 ASL 许可的代码在您的应用程序中。
-
编程: 在流应用程序代码中,导入
KinesisInputDStream并创建如下的字节数组输入 DStream:from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis 应用名称], [Kinesis 流名称], [端点 URL], [区域名称], [初始位置], [检查点间隔], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)请参阅 API 文档 和 示例 。有关运行示例的说明,请参阅 运行示例 子章节。
- CloudWatch 指标级别和维度。有关详细信息,请参见 有关监控 KCL 的 AWS 文档 。默认值为 MetricsLevel.DETAILED
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([端点 URL]) .regionName([区域名称]) .streamName([流名称]) .initialPosition([初始位置]) .checkpointAppName([Kinesis 应用名称]) .checkpointInterval([检查点间隔]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; KinesisInputDStreamkinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([端点 URL]) .regionName([区域名称]) .streamName([流名称]) .initialPosition([初始位置]) .checkpointAppName([Kinesis 应用名称]) .checkpointInterval([检查点间隔]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build(); 您还可以提供以下设置。目前仅在 Scala 和 Java 中支持。
-
一个“消息处理函数”,它接受一个 Kinesis
Record并返回一个通用对象T,以便您可以使用Record中包含的其他数据(如分区键)。
import collection.JavaConverters._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([端点 URL]) .regionName([区域名称]) .streamName([流名称]) .initialPosition([初始位置]) .checkpointAppName([Kinesis 应用名称]) .checkpointInterval([检查点间隔]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .buildWithMessageHandler([消息处理程序])import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import scala.collection.JavaConverters; KinesisInputDStreamkinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([端点 URL]) .regionName([区域名称]) .streamName([流名称]) .initialPosition([初始位置]) .checkpointAppName([Kinesis 应用名称]) .checkpointInterval([检查点间隔]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet()) .buildWithMessageHandler([消息处理程序]); -
streamingContext:包含一个应用名称的 StreamingContext,Kinesis 使用该名称将此 Kinesis 应用程序与 Kinesis 流关联起来 -
[Kinesis 应用名称]:将用于在 DynamoDB 表中检查 Kinesis 序列号的应用名称。- 对于给定帐户和区域,应用名称必须是唯一的。
- 如果表存在但具有不正确的检查点信息(对于不同的流,或过期的序列号),则可能会出现临时错误。
-
[Kinesis 流名称]:此流应用程序将从中提取数据的 Kinesis 流。 -
[端点 URL]:有效的 Kinesis 端点 URL 可在 此处 找到。 -
[区域名称]:有效的 Kinesis 区域名称可在 此处 找到。 -
[检查点间隔]:Kinesis 客户端库在流中保存其位置的间隔(例如,Duration(2000) = 2 秒)。起初,将其设置为与流应用程序的批处理间隔相同。 -
[初始位置]:可以是KinesisInitialPositions.TrimHorizon、KinesisInitialPositions.Latest或KinesisInitialPositions.AtTimestamp(有关更多详细信息,请参见Kinesis 检查点部分和Amazon Kinesis API 文档)。 -
[消息处理程序]:一个接受 KinesisRecord并输出通用T的函数。
在 API 的其他版本中,您还可以直接指定 AWS 访问密钥和密钥。
-
部署: 与任何 Spark 应用程序一样,
spark-submit用于启动您的应用程序。然而,Scala/Java 应用程序和 Python 应用程序的细节略有不同。对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,则将
spark-streaming-kinesis-asl_2.12及其依赖项打包到应用程序 JAR 中。确保将spark-core_2.12和spark-streaming_2.12标记为provided依赖项,因为它们已经包含在 Spark 安装中。然后使用spark-submit启动您的应用程序(请参见主编程指南中的 部署部分 )。对于缺少 SBT/Maven 项目管理的 Python 应用程序,可以使用
--packages直接将spark-streaming-kinesis-asl_2.12及其依赖项添加到spark-submit(请参见 应用程序提交指南 )。也就是说,./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.3 ...或者,您还可以从 Maven 仓库 下载 Maven 工件
spark-streaming-kinesis-asl-assembly的 JAR,并将其添加到spark-submit中,使用--jars。
运行时要记住的要点:
-
Kinesis 数据处理是按分区排序的,并且至少对每条消息处理一次。
-
多个应用程序可以从同一个 Kinesis 流中读取。Kinesis 将在 DynamoDB 中维护特定于应用程序的 shard 和检查点信息。
-
每个 Kinesis 流 shard 一次只能被一个输入 DStream 处理。
-
单个 Kinesis 输入 DStream 可以通过创建多个 KinesisRecordProcessor 线程从 Kinesis 流的多个 shards 读取。
-
在不同的进程/实例中运行的多个输入 DStreams 可以从一个 Kinesis 流中读取。
-
您永远无需比 Kinesis 流 shard 的数量更多的 Kinesis 输入 DStreams ,因为每个输入 DStream 将创建至少一个 KinesisRecordProcessor 线程来处理单个 shard。
-
通过添加/删除 Kinesis 输入 DStreams(在单个进程内或跨多个进程/实例)来实现水平扩展 - 直到达到上面提到的 Kinesis 流 shard 的总数。
-
Kinesis 输入 DStream 将在所有 DStreams 之间平衡负载 - 即使跨进程/实例。
-
Kinesis 输入 DStream 将在重新分片事件(合并和拆分)期间根据负载变化平衡负载。
-
作为最佳实践,建议尽可能过量提供,以避免重新分片抖动。
-
每个 Kinesis 输入 DStream 保持其自己的检查点信息。有关更多详细信息,请参见 Kinesis 检查点部分。
-
Kinesis 流 shard 的数量与在输入 DStream 处理期间在 Spark 集群中创建的 RDD 分区/shard 的数量之间没有相关性。这是两种独立的分区方案。
-
运行示例
要运行这个示例,
-
从 下载网站 下载一个Spark二进制文件。
-
在AWS中设置Kinesis流(请参阅前面部分)。注意Kinesis流的名称和与所创建流的区域相对应的端点URL。
-
使用你的AWS凭证设置环境变量
AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY。 -
在Spark根目录中,运行示例,如下所示
./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \ connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ [Kinesis应用名称] [Kinesis流名称] [端点URL] [区域名称]./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.3 streaming.KinesisWordCountASL [Kinesis应用名称] [Kinesis流名称] [端点URL]./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.3 streaming.JavaKinesisWordCountASL [Kinesis应用名称] [Kinesis流名称] [端点URL]这将等待从Kinesis流接收数据。
-
要生成随机字符串数据以放入Kinesis流,在另一个终端中,运行相关的Kinesis数据生产者。
./bin/run-example streaming.KinesisWordProducerASL [Kinesis流名称] [端点URL] 1000 10这将每秒向Kinesis流推送1000行,每行10个随机数字。然后,这些数据应被运行的示例接收并处理。
记录去聚合
当使用 Kinesis Producer Library (KPL) 生成数据时,消息可能会被聚合以节省成本。Spark Streaming将在消费过程中自动去聚合记录。
Kinesis 检查点
-
每个 Kinesis 输入 DStream 定期将流的当前位置信息存储在后端 DynamoDB 表中。这允许系统在故障后恢复,并继续从 DStream 停止的地方处理。
-
过于频繁的检查点将对 AWS 检查点存储层造成过大的负载,并可能导致 AWS 限流。提供的示例使用随机退避重试策略来处理此限流。
-
如果在输入 DStream 开始时不存在 Kinesis 检查点信息,它将从可用的最旧记录开始(
KinesisInitialPositions.TrimHorizon),或从最新记录开始(KinesisInitialPositions.Latest),或(除了 Python)从提供的 UTC 时间戳所表示的位置开始(KinesisInitialPositions.AtTimestamp(Date timestamp))。这是可配置的。-
KinesisInitialPositions.Latest可能导致在没有输入 DStream 运行时(且未存储检查点信息)添加数据时错过记录。 -
KinesisInitialPositions.TrimHorizon可能导致记录的重复处理,其影响取决于检查点频率和处理的幂等性。
-
Kinesis 重试配置
-
spark.streaming.kinesis.retry.waitTime: Kinesis 重试之间的等待时间,格式为持续时间字符串。当从 Amazon Kinesis 读取时,用户可能会遇到ProvisionedThroughputExceededException,当消费速度超过每秒 5 笔交易或超过每秒 2 MiB 的最大读取速率时。这项配置可以调整,以增加在获取失败时的休眠时间,从而减少这些异常。默认值是“100ms”。 -
spark.streaming.kinesis.retry.maxAttempts: Kinesis 获取的最大重试次数。此配置还可以用于处理上述情况中的 KinesisProvisionedThroughputExceededException。可以增加此值以在 Kinesis 读取时进行更多次重试。默认值是 3。