Spark Streaming + Kinesis 集成

Amazon Kinesis 是一种完全托管的服务,用于大规模实时处理流数据。 Kinesis 接收器使用 Amazon 根据 Amazon 软件许可证 (ASL) 提供的 Kinesis 客户端库 (KCL) 创建输入 DStream。 KCL 基于 Apache 2.0 许可证的 AWS Java SDK 之上,并通过 Workers、Checkpoints 和 Shard Leases 的概念提供负载均衡、容错和检查点功能。 在这里,我们解释如何配置 Spark Streaming 从 Kinesis 接收数据。

配置 Kinesis

可以在一个有效的 Kinesis 端点设置 Kinesis 数据流,每个数据流可以有 1 个或多个分片,具体请参见以下 指南

配置 Spark 流处理应用程序

  1. 链接: 对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,将您的流应用程序链接到以下工件(有关更多信息,请参阅主编程指南中的 链接部分 )。

     groupId = org.apache.spark
     artifactId = spark-streaming-kinesis-asl_2.12
     version = 3.5.3
    

    对于 Python 应用程序,您在部署应用程序时需要添加此库及其依赖项。请参阅下面的 部署 子章节。 请注意,通过链接到此库,您将包括 ASL 许可的代码在您的应用程序中。

  2. 编程: 在流应用程序代码中,导入 KinesisInputDStream 并创建如下的字节数组输入 DStream:

         from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    
         kinesisStream = KinesisUtils.createStream(
             streamingContext, [Kinesis 应用名称], [Kinesis 流名称], [端点 URL],
             [区域名称], [初始位置], [检查点间隔], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
    

    请参阅 API 文档 示例 。有关运行示例的说明,请参阅 运行示例 子章节。

         import org.apache.spark.storage.StorageLevel
         import org.apache.spark.streaming.kinesis.KinesisInputDStream
         import org.apache.spark.streaming.{Seconds, StreamingContext}
         import org.apache.spark.streaming.kinesis.KinesisInitialPositions
    
         val kinesisStream = KinesisInputDStream.builder
             .streamingContext(streamingContext)
             .endpointUrl([端点 URL])
             .regionName([区域名称])
             .streamName([流名称])
             .initialPosition([初始位置])
             .checkpointAppName([Kinesis 应用名称])
             .checkpointInterval([检查点间隔])
             .metricsLevel([metricsLevel.DETAILED])
             .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
             .build()
    

    请参阅 API 文档 示例 。有关运行示例的说明,请参阅 运行示例 子章节。

         import org.apache.spark.storage.StorageLevel;
         import org.apache.spark.streaming.kinesis.KinesisInputDStream;
         import org.apache.spark.streaming.Seconds;
         import org.apache.spark.streaming.StreamingContext;
         import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
    
         KinesisInputDStream kinesisStream = KinesisInputDStream.builder()
             .streamingContext(streamingContext)
             .endpointUrl([端点 URL])
             .regionName([区域名称])
             .streamName([流名称])
             .initialPosition([初始位置])
             .checkpointAppName([Kinesis 应用名称])
             .checkpointInterval([检查点间隔])
             .metricsLevel([metricsLevel.DETAILED])
             .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
             .build();
    

    请参阅 API 文档 示例 。有关运行示例的说明,请参阅 运行示例 子章节。

    您还可以提供以下设置。目前仅在 Scala 和 Java 中支持。

    • 一个“消息处理函数”,它接受一个 Kinesis Record 并返回一个通用对象 T ,以便您可以使用 Record 中包含的其他数据(如分区键)。
             import collection.JavaConverters._
             import org.apache.spark.storage.StorageLevel
             import org.apache.spark.streaming.kinesis.KinesisInputDStream
             import org.apache.spark.streaming.{Seconds, StreamingContext}
             import org.apache.spark.streaming.kinesis.KinesisInitialPositions
             import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
             import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
    
             val kinesisStream = KinesisInputDStream.builder
                 .streamingContext(streamingContext)
                 .endpointUrl([端点 URL])
                 .regionName([区域名称])
                 .streamName([流名称])
                 .initialPosition([初始位置])
                 .checkpointAppName([Kinesis 应用名称])
                 .checkpointInterval([检查点间隔])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .metricsLevel(MetricsLevel.DETAILED)
                 .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
                 .buildWithMessageHandler([消息处理程序])
    
             import org.apache.spark.storage.StorageLevel;
             import org.apache.spark.streaming.kinesis.KinesisInputDStream;
             import org.apache.spark.streaming.Seconds;
             import org.apache.spark.streaming.StreamingContext;
             import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
             import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
             import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
             import scala.collection.JavaConverters;
    
             KinesisInputDStream kinesisStream = KinesisInputDStream.builder()
                 .streamingContext(streamingContext)
                 .endpointUrl([端点 URL])
                 .regionName([区域名称])
                 .streamName([流名称])
                 .initialPosition([初始位置])
                 .checkpointAppName([Kinesis 应用名称])
                 .checkpointInterval([检查点间隔])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .metricsLevel(MetricsLevel.DETAILED)
                 .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet())
                 .buildWithMessageHandler([消息处理程序]);
    
    • streamingContext :包含一个应用名称的 StreamingContext,Kinesis 使用该名称将此 Kinesis 应用程序与 Kinesis 流关联起来

    • [Kinesis 应用名称] :将用于在 DynamoDB 表中检查 Kinesis 序列号的应用名称。
      • 对于给定帐户和区域,应用名称必须是唯一的。
      • 如果表存在但具有不正确的检查点信息(对于不同的流,或过期的序列号),则可能会出现临时错误。
    • [Kinesis 流名称] :此流应用程序将从中提取数据的 Kinesis 流。

    • [端点 URL] :有效的 Kinesis 端点 URL 可在 此处 找到。

    • [区域名称] :有效的 Kinesis 区域名称可在 此处 找到。

    • [检查点间隔] :Kinesis 客户端库在流中保存其位置的间隔(例如,Duration(2000) = 2 秒)。起初,将其设置为与流应用程序的批处理间隔相同。

    • [初始位置] :可以是 KinesisInitialPositions.TrimHorizon KinesisInitialPositions.Latest KinesisInitialPositions.AtTimestamp (有关更多详细信息,请参见 Kinesis 检查点 部分和 Amazon Kinesis API 文档 )。

    • [消息处理程序] :一个接受 Kinesis Record 并输出通用 T 的函数。

    在 API 的其他版本中,您还可以直接指定 AWS 访问密钥和密钥。

  3. 部署: 与任何 Spark 应用程序一样, spark-submit 用于启动您的应用程序。然而,Scala/Java 应用程序和 Python 应用程序的细节略有不同。

    对于 Scala 和 Java 应用程序,如果您使用 SBT 或 Maven 进行项目管理,则将 spark-streaming-kinesis-asl_2.12 及其依赖项打包到应用程序 JAR 中。确保将 spark-core_2.12 spark-streaming_2.12 标记为 provided 依赖项,因为它们已经包含在 Spark 安装中。然后使用 spark-submit 启动您的应用程序(请参见主编程指南中的 部署部分 )。

    对于缺少 SBT/Maven 项目管理的 Python 应用程序,可以使用 --packages 直接将 spark-streaming-kinesis-asl_2.12 及其依赖项添加到 spark-submit (请参见 应用程序提交指南 )。也就是说,

     ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.3 ...
    

    或者,您还可以从 Maven 仓库 下载 Maven 工件 spark-streaming-kinesis-asl-assembly 的 JAR,并将其添加到 spark-submit 中,使用 --jars

    Spark Streaming Kinesis Architecture

    运行时要记住的要点:

    • Kinesis 数据处理是按分区排序的,并且至少对每条消息处理一次。

    • 多个应用程序可以从同一个 Kinesis 流中读取。Kinesis 将在 DynamoDB 中维护特定于应用程序的 shard 和检查点信息。

    • 每个 Kinesis 流 shard 一次只能被一个输入 DStream 处理。

    • 单个 Kinesis 输入 DStream 可以通过创建多个 KinesisRecordProcessor 线程从 Kinesis 流的多个 shards 读取。

    • 在不同的进程/实例中运行的多个输入 DStreams 可以从一个 Kinesis 流中读取。

    • 您永远无需比 Kinesis 流 shard 的数量更多的 Kinesis 输入 DStreams ,因为每个输入 DStream 将创建至少一个 KinesisRecordProcessor 线程来处理单个 shard。

    • 通过添加/删除 Kinesis 输入 DStreams(在单个进程内或跨多个进程/实例)来实现水平扩展 - 直到达到上面提到的 Kinesis 流 shard 的总数。

    • Kinesis 输入 DStream 将在所有 DStreams 之间平衡负载 - 即使跨进程/实例。

    • Kinesis 输入 DStream 将在重新分片事件(合并和拆分)期间根据负载变化平衡负载。

    • 作为最佳实践,建议尽可能过量提供,以避免重新分片抖动。

    • 每个 Kinesis 输入 DStream 保持其自己的检查点信息。有关更多详细信息,请参见 Kinesis 检查点部分。

    • Kinesis 流 shard 的数量与在输入 DStream 处理期间在 Spark 集群中创建的 RDD 分区/shard 的数量之间没有相关性。这是两种独立的分区方案。

运行示例

要运行这个示例,

记录去聚合

当使用 Kinesis Producer Library (KPL) 生成数据时,消息可能会被聚合以节省成本。Spark Streaming将在消费过程中自动去聚合记录。

Kinesis 检查点

Kinesis 重试配置