Spark Streaming + Kafka 集成指南(Kafka 代理版本 0.10.0 或更高版本)

Spark Streaming与Kafka 0.10的集成提供了简单的并行性,Kafka分区与Spark分区之间的1:1对应关系,以及对偏移量和元数据的访问。然而,由于较新的集成使用了 新的Kafka消费者API 而不是简单API,因此在使用上存在显著的差异。

链接

对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,将您的流应用程序链接到以下工件(有关进一步信息,请参见主编程指南中的 链接部分 )。

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 3.5.3

不要 手动添加对 org.apache.kafka 工件的依赖(例如 kafka-clients )。 spark-streaming-kafka-0-10 工件已经包含了适当的传递依赖,不同的版本可能会以难以诊断的方式不兼容。

创建直接流

请注意,导入的命名空间包括版本,org.apache.spark.streaming.kafka010

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "为每个流使用单独的组ID",
"auto.offset.reset" -> "最新",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))

流中的每个项目都是一个 ConsumerRecord

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

有关可能的 kafkaParams,参见 Kafka 消费者配置文档 。如果您的 Spark 批处理持续时间大于默认的 Kafka 心跳会话超时(30 秒),请适当增加 heartbeat.interval.ms 和 session.timeout.ms。对于超过 5 分钟的批处理,这将需要在代理上更改 group.max.session.timeout.ms。请注意,示例将 enable.auto.commit 设置为 false,有关讨论,请参见下面的 存储偏移量

位置策略

新的Kafka消费API将预取消息到缓冲区。因此,出于性能考虑,Spark集成保持在执行器上的缓存消费者(而不是为每个批次重建它们)是很重要的,并且更倾向于在具有适当消费者的主机位置上调度分区。

在大多数情况下,您应该使用 LocationStrategies.PreferConsistent ,如上所示。这将均匀地分配分区到可用的执行器上。如果您的执行器与 Kafka 经纪人在同一主机上,请使用 PreferBrokers ,这将优先在该分区的 Kafka 领导者上调度分区。最后,如果您的分区之间的负载存在显著的偏差,请使用 PreferFixed 。这允许您指定分区与主机的明确映射(任何未指定的分区将使用一致的位置)。

消费者的缓存默认最大大小为64。如果您期望处理的Kafka分区超过(64 * 执行器数量),您可以通过 spark.streaming.kafka.consumer.cache.maxCapacity 更改此设置。

如果您想禁用Kafka消费者的缓存,可以将 spark.streaming.kafka.consumer.cache.enabled 设置为 false

缓存是通过 topicpartition 和 group.id 进行键控的,因此每次调用 createDirectStream 时,请使用一个 单独的 group.id

消费者策略

新的 Kafka 消费者 API 有多种不同的方式来指定主题,其中一些需要相当多的对象实例化后的设置。 ConsumerStrategies 提供了一种抽象,使 Spark 能够在从检查点重新启动后获取正确配置的消费者。

ConsumerStrategies.Subscribe ,如上所示,允许您订阅固定数量的主题。 SubscribePattern 允许您使用正则表达式来指定感兴趣的主题。 请注意,与 0.8 集成不同,使用 Subscribe SubscribePattern 应该能够响应在运行流期间添加的分区。 最后, Assign 允许您指定固定数量的分区。 这三种策略都有重载构造函数,允许您为特定分区指定起始偏移量。

如果您有特定的消费者设置需求,而上述选项无法满足,则可以扩展一个公共类 ConsumerStrategy

创建 RDD

如果您有一个更适合批处理的使用案例,您可以为定义的偏移量范围创建一个 RDD。

// 导入依赖并创建kafka参数,如上面的Create Direct Stream
val offsetRanges = Array(
// 主题,分区,包含的起始偏移量,不包含的结束偏移量
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
// 导入依赖并创建 Kafka 参数,如上面创建直接流所示
OffsetRange[] offsetRanges = {
// 主题,分区,包含的起始偏移,排除的结束偏移
OffsetRange.create("test", 0, 0, 100),
OffsetRange.create("test", 1, 0, 100)
};
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);

请注意,您不能使用 PreferBrokers ,因为没有数据流,驱动程序端没有消费者来自动为您查找经纪人元数据。如果需要,请使用 PreferFixed 进行您自己的元数据查找。

获取偏移量

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
});
});

请注意,类型转换为 HasOffsetRanges 仅在对 createDirectStream 的结果进行调用的第一个方法中成功,而不是在方法链的后面。请注意,RDD 分区和 Kafka 分区之间的一对一映射在任何会洗牌或重新分区的方法之后不会保持,例如 reduceByKey() 或 window()。

存储偏移量

在失败情况下,Kafka 的传递语义取决于偏移量的存储方式和时间。 Spark 输出操作是 至少一次 。 因此,如果您想要等同于精确一次语义,您必须在幂等输出之后存储偏移量,或者在输出的原子事务中存储偏移量。通过这种集成,您可以选择 3 种选项,按可靠性(和代码复杂性)递增的顺序来存储偏移量。

检查点

如果您启用 Spark 检查点 ,则偏移量将存储在检查点中。这很容易启用,但也有缺点。您的输出操作必须是幂等的,因为您将获得重复的输出;事务不是一个选项。此外,如果您的应用程序代码发生了变化,您将无法从检查点恢复。对于计划的升级,您可以通过同时运行新代码和旧代码来减轻这个问题(由于输出需要是幂等的,它们不应该冲突)。但是对于需要代码更改的意外故障,除非您有其他方法来识别已知的良好起始偏移量,否则您将丢失数据。

Kafka 本身

Kafka 有一个偏移提交 API,可以将偏移量存储在一个特殊的 Kafka 主题中。默认情况下,新消费者会定期自动提交偏移量。这几乎肯定不是你想要的,因为消费者成功轮询到的消息可能还没有导致 Spark 输出操作,从而导致未定义的语义。这就是为何上面的流示例将“enable.auto.commit”设置为 false。然而,在你确认输出已被存储后,可以使用 commitAsync API 将偏移量提交到 Kafka。与检查点相比,这样的好处在于,无论你的应用程序代码发生何种变化,Kafka 都是一个持久的存储。但是,Kafka 不是事务性的,因此你的输出仍然必须是幂等的。

stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 一段时间后,在输出完成之后
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

与 HasOffsetRanges 一样,只有在对 createDirectStream 的结果进行调用时,才能成功转换为 CanCommitOffsets,而不是在变换之后。commitAsync 调用是线程安全的,但如果你想要有意义的语义,必须在输出之后进行。

stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// 一段时间后,在输出完成后
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

您自己的数据存储

对于支持事务的数据存储,将偏移量与结果保存在同一个事务中可以使两者保持同步,即使在失败情况下也是如此。如果您小心地检测重复或跳过的偏移范围,回滚事务可以防止重复或丢失的消息影响结果。这提供了完全一次语义的等价物。即使对由聚合结果产生的输出(通常很难实现幂等性),也可以使用这种策略。

// 细节取决于您的数据存储,但大致思路如下
// 从提交到数据库的偏移量开始
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val results = yourCalculation(rdd)
// 开始您的事务
// 更新结果
// 更新偏移量,前提是现有偏移量的结束与该批次偏移量的开始匹配
// 确保偏移量正确更新
// 结束您的事务
}
// 具体细节取决于您的数据存储,但一般思路如下
// 从提交到数据库的偏移量开始
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
Object results = yourCalculation(rdd);
// 开始您的事务
// 更新结果
// 更新偏移量,确保现有偏移量的结束与此批偏移量的开始匹配
// 确保偏移量已正确更新
// 结束您的事务
});

SSL / TLS

新的 Kafka 消费者 支持 SSL 。要启用它,请在传递给 createDirectStream / createRDD 之前适当地设置 kafkaParams。请注意,这仅适用于 Spark 和 Kafka 代理之间的通信;您仍然需要单独 保护 Spark 节点间的通信。

val kafkaParams = Map[String, Object](
// 通常的参数,如果 9092 不是 TLS,请确保更改 bootstrap.servers 中的端口
"security.protocol" -> "SSL",
"ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
"ssl.truststore.password" -> "test1234",
"ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
"ssl.keystore.password" -> "test1234",
"ssl.key.password" -> "test1234"
)
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// 常用参数,如果 9092 不是 TLS,请确保在 bootstrap.servers 中更改端口
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");

部署

与任何Spark应用程序一样, spark-submit 用于启动您的应用程序。

对于Scala和Java应用程序,如果您使用SBT或Maven进行项目管理,则将 spark-streaming-kafka-0-10_2.12 及其依赖项打包到应用程序JAR中。确保 spark-core_2.12 spark-streaming_2.12 被标记为 provided 依赖项,因为这些已经存在于Spark安装中。然后使用 spark-submit 启动您的应用程序(请参见主要编程指南中的 部署部分 )。

安全

请查看 结构化流安全

其他注意事项