结构化流 + Kafka 集成指南(Kafka broker 版本 0.10.0 或更高)

与Kafka 0.10的结构化流功能集成,用于从Kafka读取数据和向Kafka写入数据。

链接

对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,将您的应用程序与以下工件链接:

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.5.3

请注意,使用头功能时,您的Kafka客户端版本应为0.11.0.0或更高版本。

对于Python应用程序,您在部署应用程序时需要添加上述库及其依赖项。请参阅下面的 部署 小节。

在实验 spark-shell 时,您需要在调用 spark-shell 时添加此上面的库及其依赖项。此外,请参见以下 部署 小节。

从Kafka读取数据

为流查询创建Kafka源

# 订阅一个主题
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅一个主题,带有头部信息
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
# 订阅多个主题
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅一个模式
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// 订阅一个主题
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅一个主题,带有头信息
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Array[(String, Array[Byte])])]
// 订阅多个主题
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅一个模式
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅 1 个主题
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 订阅 1 个主题,带头信息
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");
// 订阅多个主题
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 订阅一个模式
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

为批量查询创建一个Kafka源

如果您有更适合批处理的用例,您可以为定义的偏移范围创建一个 Dataset/DataFrame。

# 订阅 1 个主题默认使用最早和最新的偏移量
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅多个主题,指定明确的 Kafka 偏移量
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅一个模式,使用最早和最新的偏移量
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// 订阅 1 个主题,默认到最早和最新偏移量
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅多个主题,指定明确的 Kafka 偏移量
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅一个模式,使用最早和最新偏移量
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅 1 个主题,默认使用最早和最新的偏移量
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 订阅多个主题,指定显式的 Kafka 偏移量
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
.option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 根据模式订阅,使用最早和最新的偏移量
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

源中的每一行具有以下结构:

类型
key binary
value binary
topic string
partition int
offset long
timestamp timestamp
timestampType int
headers (可选) array

以下选项必须为Kafka源设置,以支持批处理和流处理查询。

选项 含义
assign json 字符串 {"topicA":[0,1],"topicB":[2,4]} 特定的 TopicPartitions 来消费。 对于 Kafka 源,只能指定 "assign"、"subscribe" 或 "subscribePattern" 中的一个选项。
subscribe 以逗号分隔的主题列表 要订阅的主题列表。 对于 Kafka 源,只能指定 "assign"、"subscribe" 或 "subscribePattern" 中的一个选项。
subscribePattern Java 正则表达式字符串 用于订阅主题的模式。 对于 Kafka 源,只能指定 "assign"、"subscribe" 或 "subscribePattern" 中的一个选项。
kafka.bootstrap.servers 以逗号分隔的 host:port 列表 Kafka "bootstrap.servers" 配置。

以下配置是可选的:

选项 默认 查询类型 含义
startingTimestamp 时间戳字符串,例如 "1000" 无(下一个优先选项是 startingOffsetsByTimestamp 流式和批处理 查询开始时的时间戳起点,字符串指定了订阅主题中所有分区的起始时间戳。请参阅下面有关时间戳偏移选项的详细信息。如果Kafka未返回匹配的偏移量,行为将遵循选项 startingOffsetsByTimestampStrategy 的值

注意1: startingTimestamp 优先于 startingOffsetsByTimestamp startingOffsets .

注意2: 对于流式查询,仅在新查询开始时适用,而恢复将始终从查询结束的地方继续。查询期间新发现的分区将最早开始。
startingOffsetsByTimestamp json字符串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ 无(下一个优先选项是 startingOffsets 流式和批处理 查询开始时的时间戳起点,json字符串指定每个TopicPartition的起始时间戳。请参阅下面有关时间戳偏移选项的详细信息。如果Kafka未返回匹配的偏移量,行为将遵循选项 startingOffsetsByTimestampStrategy 的值

注意1: startingOffsetsByTimestamp 优先于 startingOffsets .

注意2: 对于流式查询,仅在新查询开始时适用,而恢复将始终从查询结束的地方继续。查询期间新发现的分区将最早开始。
startingOffsets "earliest", "latest"(仅流式),或 json字符串 """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ "latest" 对于流式,"earliest" 对于批处理 流式和批处理 查询开始时的起点,可以是 "earliest"(从最早的偏移量开始),"latest"(从最新的偏移量开始),或 json 字符串指定每个 TopicPartition 的起始偏移量。在json中,-2作为偏移量可用于指代最早,-1指代最新。注意: 对于批处理查询,最新(无论是隐式使用还是在json中使用-1)是不允许的。对于流式查询,仅在新查询开始时适用,而恢复将始终从查询结束的地方继续。查询期间新发现的分区将最早开始。
endingTimestamp 时间戳字符串,例如 "1000" 无(下一个优先选项是 endingOffsetsByTimestamp 批处理查询 批处理查询结束时的结束点,json字符串指定所有订阅主题分区的结束时间戳。请参阅下面有关时间戳偏移选项的详细信息。如果Kafka未返回匹配的偏移量,偏移量将设置为最新。

注意: endingTimestamp 优先于 endingOffsetsByTimestamp endingOffsets .

endingOffsetsByTimestamp json字符串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ 无(下一个优先选项是 endingOffsets 批处理查询 批处理查询结束时的结束点,json字符串指定每个TopicPartition的结束时间戳。请参阅下面有关时间戳偏移选项的详细信息。如果Kafka未返回匹配的偏移量,偏移量将设置为最新。

注意: endingOffsetsByTimestamp 优先于 endingOffsets .
endingOffsets 最新或 json字符串 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} 最新 批处理查询 批处理查询结束时的结束点,可以是 "latest"(仅指代最新),或 json 字符串指定每个 TopicPartition 的结束偏移量。在json中,-1作为偏移量可指代最新,而-2(最早)作为偏移量是不允许的。
failOnDataLoss true 或 false true 流式和批处理 当发生可能的数据丢失时(例如,主题被删除或偏移量超出范围),查询是否应该失败。这可能是假警报。您可以在它未按预期工作时禁用它。
kafkaConsumer.pollTimeoutMs 长整型 120000 流式和批处理 从Kafka中轮询数据的超时,以毫秒为单位。当未定义时,它将回退到 spark.network.timeout
fetchOffset.numRetries 整型 3 流式和批处理 在放弃获取Kafka偏移量之前重试的次数。
fetchOffset.retryIntervalMs 长整型 10 流式和批处理 重试获取Kafka偏移量之前等待的毫秒数
maxOffsetsPerTrigger 长整型 流式查询 每次触发间隔处理的最大偏移量的速率限制。指定的总偏移量将在不同数量的topicPartitions中成比例地分配。
minOffsetsPerTrigger 长整型 流式查询 每次触发间隔至少处理的偏移量。指定的总偏移量将在不同数量的topicPartitions中成比例地分配。注意,如果超过maxTriggerDelay,将触发即使可用偏移量未达到minOffsetsPerTrigger。
maxTriggerDelay 带单位的时间 15m 流式查询 在两个触发之间可以延迟的最大时间,前提是源中有一些数据可用。此选项仅在设置了minOffsetsPerTrigger时适用。
minPartitions 整型 流式和批处理 期望从Kafka读取的最小分区数。默认情况下,Spark具有topicPartitions与从Kafka消费的Spark分区的1-1映射。如果将此选项设置为大于topicPartitions的值,Spark将把大的Kafka分区分配为较小的部分。请注意,此配置像是一个 hint :Spark任务的数量将 大约 minPartitions 。根据四舍五入误差或没有接收到新数据的Kafka分区,数量可能更少或更多。
groupIdPrefix 字符串 spark-kafka-source 流式和批处理 由结构化流查询生成的消费组标识符( group.id )的前缀。如果设置了 "kafka.group.id",则此选项将被忽略。
kafka.group.id 字符串 流式和批处理 在从Kafka读取时使用的Kafka组id。使用此选项时要谨慎。默认情况下,每个查询生成一个唯一的组id以读取数据。这确保每个Kafka源都有自己的消费者组,不会受到其他消费者的干扰,因此可以读取其订阅主题的所有分区。在某些场景下(例如,基于Kafka组的授权),可能希望使用特定的授权组id读取数据。您可以选择设置组id。然而,这样做时要极其小心,因为它可能会导致意外的行为。并发运行的查询(批处理和流式)或使用相同组id的源可能会相互干扰,导致每个查询只能读取部分数据。这也可能在快速连续启动/重新启动查询时发生。为了最小化此类问题,将Kafka消费者会话超时(通过设置选项 "kafka.session.timeout.ms")设置得非常小。当设置此选项时,选项 "groupIdPrefix" 将被忽略。
includeHeaders 布尔型 false 流式和批处理 是否在行中包含Kafka头信息。
startingOffsetsByTimestampStrategy "error" 或 "latest" "error" 流式和批处理 当根据时间戳指定的起始偏移(无论是全局的还是每个分区的)与Kafka返回的偏移不匹配时,将使用的策略。以下是策略名称和相应的描述:

"error": 使查询失败,用户必须处理需要手动步骤的变通方案。

"latest": 为这些分区分配最新的偏移量,以便Spark可以在进一步的微批中从这些分区读取更新的记录。

时间戳偏移选项的详细信息

每个分区返回的偏移量是该分区中时间戳大于或等于给定时间戳的最早偏移量。 如果Kafka未返回匹配的偏移量,则行为因选项而异 - 请查看每个选项的描述。

Spark简单地将时间戳信息传递给 KafkaConsumer.offsetsForTimes ,并没有对该值进行解释或推理。有关 KafkaConsumer.offsetsForTimes 的更多细节,请参阅 javadoc 。此外,这里 timestamp 的含义可能会根据Kafka配置 ( log.message.timestamp.type ) 而有所不同:请参阅 Kafka文档 获取进一步的细节。

时间戳偏移选项需要Kafka 0.10.1.0或更高版本。

偏移获取

在Spark 3.0及之前,Spark使用 KafkaConsumer 进行偏移量获取,这可能导致驱动程序中的无限等待。 在Spark 3.1中,新增了一个配置选项 spark.sql.streaming.kafka.useDeprecatedOffsetFetching (默认值: false ), 允许Spark使用新的偏移量获取机制,使用 AdminClient 。(将此设置为 true 以使用旧的偏移量获取方式,使用 KafkaConsumer 。)

当新的机制被使用时,以下适用。

首先,这种新方法支持Kafka代理 0.11.0.0+

在Spark 3.0及之前版本中,从驱动程序的角度来看,安全的Kafka处理需要以下ACL:

自 Spark 3.1 起,可以通过 AdminClient 获取偏移量,而不再使用 KafkaConsumer 。为此,从驱动程序的角度需要以下 ACL:

由于驱动中的 AdminClient 未连接到消费者组,基于 group.id 的授权将不再有效(执行器从未完成基于组的授权)。值得一提的是,执行器端的行为与之前完全相同(组前缀和覆盖功能正常)。

消费者缓存

初始化Kafka消费者是耗时的,尤其是在处理时间是关键因素的流处理场景中。由于这个原因,Spark在执行者上池化Kafka消费者,通过利用Apache Commons Pool。

缓存键由以下信息构成:

以下属性可用于配置消费者池:

属性名称 默认值 含义 版本
spark.kafka.consumer.cache.capacity 64 缓存的消费者的最大数量。请注意,这只是一个软限制。 3.0.0
spark.kafka.consumer.cache.timeout 5m (5分钟) 消费者在池中闲置的最短时间,超过该时间即有资格被驱逐。 3.0.0
spark.kafka.consumer.cache.evictorThreadRunInterval 1m (1分钟) 消费者池中空闲驱逐线程运行的时间间隔。当为非正值时,不会运行任何空闲驱逐线程。 3.0.0
spark.kafka.consumer.cache.jmx.enable false 为使用此配置实例创建的池启用或禁用JMX。池的统计信息可以通过JMX实例获得。 JMX名称的前缀设置为“kafka010-cached-simple-kafka-consumer-pool”。 3.0.0

池的大小受限于 spark.kafka.consumer.cache.capacity , 但它作为“软限制”不会阻塞Spark任务。

闲置驱逐线程定期移除超过给定超时未使用的消费者。如果在借用时达到此阈值,它会尝试移除当前未使用的最少使用条目。

如果无法移除,则池将继续增长。在最坏的情况下,池将增长到执行器中可以运行的最大并发任务数(即任务插槽的数量)。

如果任务因任何原因失败,新任务将使用新创建的Kafka消费者执行,以确保安全。同时,我们会使所有具有相同缓存键的池中的消费者失效,以移除在失败执行中使用的消费者。任何其他任务正在使用的消费者将不会被关闭,但在它们返回到池中时也会被使失效。

与消费者一起,Spark 将从 Kafka 中获取的记录单独进行池化,以使 Kafka 消费者在 Spark 的视角下是无状态的,并最大化池化的效率。它利用与 Kafka 消费者池相同的缓存键。请注意,由于特性上的差异,它不利用 Apache Commons Pool。

以下属性可用于配置获取的数据池:

属性名称 默认 含义 自版本
spark.kafka.consumer.fetchedData.cache.timeout 5m (5分钟) 获取的数据在池中闲置的最短时间,超过此时间后,数据将有资格被驱逐。 3.0.0
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval 1m (1分钟) 获取的数据池中闲置驱逐线程运行之间的时间间隔。当为非正时,不会运行闲置驱逐线程。 3.0.0

将数据写入Kafka

在这里,我们描述了对Apache Kafka编写流查询和批处理查询的支持。请注意,Apache Kafka仅支持至少一次的写入语义。因此,在向Kafka写入流查询或批处理查询时,某些记录可能会重复;例如,如果Kafka需要重试一个没有被Broker确认的消息,即使该Broker接收并写入了消息记录,也可能发生这种情况。结构化流处理无法防止由于这些Kafka写入语义而发生的重复。然而,如果查询的写入成功,那么您可以假设查询输出至少写入了一次。读取写入数据时消除重复的一个可能解决方案是引入一个主(唯一)键,可以用于在读取时执行去重。

写入Kafka的数据框应该在模式中具有以下列:

类型
键(可选) 字符串或二进制
值(必需) 字符串或二进制
头部(可选) 数组
主题(*可选) 字符串
分区(可选) 整数

* 如果没有指定“主题”配置选项,则主题列是必须的。

值列是唯一必需的选项。如果未指定键列,则会自动添加一个 null 值的键列(请参阅Kafka语义,了解如何处理 null 值的键值)。如果存在主题列,则在将给定行写入Kafka时,其值用作主题,除非设置了“topic”配置选项,即“topic”配置选项会覆盖主题列。如果未指定“partition”列(或其值为 null ),则分区由Kafka生产者计算。在Spark中,可以通过设置 kafka.partitioner.class 选项来指定Kafka分区器。如果未指定,则将使用Kafka默认的分区器。

以下选项必须为Kafka接收器设置,以支持批处理和流处理查询。

选项 含义
kafka.bootstrap.servers 一个以逗号分隔的 host:port 列表 Kafka "bootstrap.servers" 配置。

以下配置是可选的:

选项 默认 查询类型 含义
topic string none streaming and batch 设置所有行将被写入的Kafka主题。此选项将覆盖数据中可能存在的任何主题列。
includeHeaders boolean false streaming and batch 是否在行中包含Kafka头信息。

为流查询创建Kafka接收器

# 将 DataFrame 中的键值数据写入指定的 Kafka 主题
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()
# 使用数据中指定的主题将 DataFrame 中的键值数据写入 Kafka
ds = df \
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .start()
// 将 DataFrame 中的键值数据写入指定的 Kafka 主题,该主题通过选项指定
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// 使用数据中指定的主题将 DataFrame 中的键值数据写入 Kafka
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
// 将键值数据从 DataFrame 写入指定的 Kafka 主题,该主题在选项中指定
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start();
// 使用数据中指定的主题将键值数据从 DataFrame 写入 Kafka
StreamingQuery ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start();

将批处理查询的输出写入Kafka

# 将DataFrame中的键值数据写入选项中指定的Kafka主题
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()
# 使用数据中指定的主题将DataFrame中的键值数据写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()
// 将键值数据从DataFrame写入指定的Kafka主题,该主题在选项中指定
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// 使用数据中指定的主题将键值数据从DataFrame写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
// 从 DataFrame 中将键值数据写入指定在选项中的特定 Kafka 主题
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save();
// 从 DataFrame 中使用数据中指定的主题将键值数据写入 Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save();

生产者缓存

给定的Kafka生产者实例旨在保持线程安全,Spark初始化一个Kafka生产者实例,并在相同缓存键的任务之间共同使用。

缓存键由以下信息构成:

这包括授权的配置,当使用委托令牌时,Spark会自动包含这些配置。即使我们考虑到授权,您可以期望在相同的Kafka生产者配置中使用相同的Kafka生产者实例。 当委托令牌被更新时,它将使用不同的Kafka生产者;旧委托令牌的Kafka生产者实例将根据缓存策略被驱逐。

以下属性可用于配置生产者池:

属性名称 默认值 意义 自版本起
spark.kafka.producer.cache.timeout 10m (10分钟) 生产者在池中闲置的最短时间,在此时间后可被驱逐。 2.2.1
spark.kafka.producer.cache.evictorThreadRunInterval 1m (1分钟) 生产者池中闲置驱逐线程的运行间隔时间。当为非正值时,将不会启用闲置驱逐线程。 3.0.0

闲置驱逐线程定期移除超过给定超时未使用的生产者。请注意,生产者是共享并且并发使用的,因此最后使用的时间戳由生产者实例被返回且引用计数为0的时刻决定。

Kafka 特定配置

Kafka 的配置可以通过 DataStreamReader.option kafka. 前缀进行设置,例如, stream.option("kafka.bootstrap.servers", "host:port") 。有关可能的 kafka 参数,请参阅 Kafka 消费者配置文档 ,查阅与读取数据相关的参数,以及 Kafka 生产者配置文档 ,查阅与写入数据相关的参数。

请注意,以下Kafka参数无法设置,Kafka源或接收器将抛出异常:

部署

与任何 Spark 应用程序一样, spark-submit 用于启动您的应用程序。 spark-sql-kafka-0-10_2.12 及其依赖项可以直接使用 --packages 添加到 spark-submit 中,例如,

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3 ...

spark-shell 上进行实验时,您还可以使用 --packages 直接添加 spark-sql-kafka-0-10_2.12 及其依赖项,

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3 ...

有关提交具有外部依赖项的申请的更多详细信息,请参阅 申请提交指南

安全

Kafka 0.9.0.0 引入了几项增强集群安全性的功能。有关这些可能性的详细说明,请参阅 Kafka 安全文档

值得注意的是,安全性是可选的,并且默认是关闭的。

Spark 支持以下几种方式来验证 Kafka 集群:

委托令牌

通过这种方式,可以通过Spark参数配置应用程序,并且可能不需要JAAS登录配置(Spark可以使用Kafka的动态JAAS配置功能)。有关委托令牌的更多信息,请参见 Kafka委托令牌文档

该过程由Spark的Kafka委托令牌提供者启动。当 spark.kafka.clusters.${cluster}.auth.bootstrap.servers 被设置时,Spark按照优先顺序考虑以下登录选项:

通过将 spark.security.credentials.kafka.enabled 设置为 false (默认: true ) 可以关闭 Kafka 委托令牌提供程序。

Spark可以配置使用以下身份验证协议来获取令牌(必须与Kafka代理配置匹配):

成功获取委托令牌后,Spark 将其分发到各个节点并相应地进行更新。 委托令牌使用 SCRAM 登录模块进行身份验证,因此必须配置合适的 spark.kafka.clusters.${cluster}.sasl.token.mechanism (默认值: SCRAM-SHA-512 )。此外,此参数 必须与 Kafka 代理配置匹配。

当代理令牌在执行器上可用时,Spark考虑以下登录选项,按优先级排序:

当上述都不适用时,假定为不安全连接。

配置

委托令牌可以从多个集群中获取,而 ${cluster} 是一个任意的唯一标识符,有助于对不同的配置进行分组。

属性名称 默认值 含义 自版本起
spark.kafka.clusters.${cluster}.auth.bootstrap.servers 用于建立与Kafka集群的初始连接的逗号分隔的主机/端口对的列表。有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex .* 用于与应用程序中源和接收器的 bootstrap.servers 配置进行匹配的正则表达式。 如果服务器地址匹配此正则表达式,将在连接时使用从相应的引导服务器获得的委托令牌。 如果多个集群匹配该地址,则会抛出异常,并且查询将不会启动。 Kafka的安全和非安全监听器绑定到不同的端口。当两者都使用时,安全监听器端口必须是正则表达式的一部分。 3.0.0
spark.kafka.clusters.${cluster}.security.protocol SASL_SSL 与代理通信所使用的协议。有关更多详细信息,请参见Kafka文档。协议应用于所有源和接收器作为默认值,其中 bootstrap.servers 配置匹配(有关更多详细信息,请参见 spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex ), 并且可以通过在源或接收器上设置 kafka.security.protocol 进行覆盖。 3.0.0
spark.kafka.clusters.${cluster}.sasl.kerberos.service.name kafka Kafka运行的Kerberos主名称。可以在Kafka的JAAS配置或Kafka的配置中定义。 有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.type 信任库文件的文件格式。有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 3.2.0
spark.kafka.clusters.${cluster}.ssl.truststore.location 信任库文件的位置。有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.password 信任库文件的存储密码。可选,仅在配置了 spark.kafka.clusters.${cluster}.ssl.truststore.location 时需要。 有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.type 密钥库文件的文件格式。对于客户端来说,这是可选的。 有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 3.2.0
spark.kafka.clusters.${cluster}.ssl.keystore.location 密钥库文件的位置。这对于客户端是可选的,可以使用双向认证。 有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.password 密钥库文件的存储密码。可选,仅在配置了 spark.kafka.clusters.${cluster}.ssl.keystore.location 时需要。 有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.ssl.key.password 密钥库文件中私钥的密码。这对于客户端是可选的。 有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 3.0.0
spark.kafka.clusters.${cluster}.sasl.token.mechanism SCRAM-SHA-512 用于带有委托令牌的客户端连接的SASL机制。由于用于身份验证的SCRAM登录模块,因此必须在此设置兼容的机制。 有关更多详细信息,请参见Kafka文档( sasl.mechanism )。仅用于与Kafka代理使用委托令牌进行身份验证。 3.0.0

Kafka特定配置

Kafka的自有配置可以用 kafka. 前缀设置,例如 --conf spark.kafka.clusters.${cluster}.kafka.retries=1 。有关可能的Kafka参数,请参见 Kafka adminclient配置文档

注意事项

JAAS 登录配置

JAAS登录配置必须放置在所有Spark尝试访问Kafka集群的节点上。 这提供了应用任何自定义身份验证逻辑的可能性,但维护成本较高。 这可以通过几种方式完成。一种可能性是提供额外的JVM参数,例如,

./bin/spark-submit \
    --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
    --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
    ...