结构化流 + Kafka 集成指南(Kafka broker 版本 0.10.0 或更高)
与Kafka 0.10的结构化流功能集成,用于从Kafka读取数据和向Kafka写入数据。
链接
对于使用 SBT/Maven 项目定义的 Scala/Java 应用程序,将您的应用程序与以下工件链接:
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.5.3
请注意,使用头功能时,您的Kafka客户端版本应为0.11.0.0或更高版本。
对于Python应用程序,您在部署应用程序时需要添加上述库及其依赖项。请参阅下面的 部署 小节。
在实验
spark-shell
时,您需要在调用
spark-shell
时添加此上面的库及其依赖项。此外,请参见以下
部署
小节。
从Kafka读取数据
为流查询创建Kafka源
# 订阅一个主题
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅一个主题,带有头部信息
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("includeHeaders", "true") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
# 订阅多个主题
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅一个模式
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// 订阅一个主题
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅一个主题,带有头信息
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Array[(String, Array[Byte])])]
// 订阅多个主题
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅一个模式
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅 1 个主题
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 订阅 1 个主题,带头信息
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");
// 订阅多个主题
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 订阅一个模式
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
为批量查询创建一个Kafka源
如果您有更适合批处理的用例,您可以为定义的偏移范围创建一个 Dataset/DataFrame。
# 订阅 1 个主题默认使用最早和最新的偏移量
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅多个主题,指定明确的 Kafka 偏移量
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅一个模式,使用最早和最新的偏移量
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// 订阅 1 个主题,默认到最早和最新偏移量
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅多个主题,指定明确的 Kafka 偏移量
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅一个模式,使用最早和最新偏移量
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// 订阅 1 个主题,默认使用最早和最新的偏移量
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 订阅多个主题,指定显式的 Kafka 偏移量
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
.option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// 根据模式订阅,使用最早和最新的偏移量
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
源中的每一行具有以下结构:
列 | 类型 |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | timestamp |
timestampType | int |
headers (可选) | array |
以下选项必须为Kafka源设置,以支持批处理和流处理查询。
选项 | 值 | 含义 |
---|---|---|
assign | json 字符串 {"topicA":[0,1],"topicB":[2,4]} | 特定的 TopicPartitions 来消费。 对于 Kafka 源,只能指定 "assign"、"subscribe" 或 "subscribePattern" 中的一个选项。 |
subscribe | 以逗号分隔的主题列表 | 要订阅的主题列表。 对于 Kafka 源,只能指定 "assign"、"subscribe" 或 "subscribePattern" 中的一个选项。 |
subscribePattern | Java 正则表达式字符串 | 用于订阅主题的模式。 对于 Kafka 源,只能指定 "assign"、"subscribe" 或 "subscribePattern" 中的一个选项。 |
kafka.bootstrap.servers | 以逗号分隔的 host:port 列表 | Kafka "bootstrap.servers" 配置。 |
以下配置是可选的:
选项 | 值 | 默认 | 查询类型 | 含义 |
---|---|---|---|---|
startingTimestamp | 时间戳字符串,例如 "1000" |
无(下一个优先选项是
startingOffsetsByTimestamp
)
|
流式和批处理 |
查询开始时的时间戳起点,字符串指定了订阅主题中所有分区的起始时间戳。请参阅下面有关时间戳偏移选项的详细信息。如果Kafka未返回匹配的偏移量,行为将遵循选项
startingOffsetsByTimestampStrategy
的值
注意1:
startingTimestamp
优先于
startingOffsetsByTimestamp
和
startingOffsets
.
注意2: 对于流式查询,仅在新查询开始时适用,而恢复将始终从查询结束的地方继续。查询期间新发现的分区将最早开始。 |
startingOffsetsByTimestamp | json字符串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ |
无(下一个优先选项是
startingOffsets
)
|
流式和批处理 |
查询开始时的时间戳起点,json字符串指定每个TopicPartition的起始时间戳。请参阅下面有关时间戳偏移选项的详细信息。如果Kafka未返回匹配的偏移量,行为将遵循选项
startingOffsetsByTimestampStrategy
的值
注意1:
startingOffsetsByTimestamp
优先于
startingOffsets
.
注意2: 对于流式查询,仅在新查询开始时适用,而恢复将始终从查询结束的地方继续。查询期间新发现的分区将最早开始。 |
startingOffsets | "earliest", "latest"(仅流式),或 json字符串 """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ | "latest" 对于流式,"earliest" 对于批处理 | 流式和批处理 | 查询开始时的起点,可以是 "earliest"(从最早的偏移量开始),"latest"(从最新的偏移量开始),或 json 字符串指定每个 TopicPartition 的起始偏移量。在json中,-2作为偏移量可用于指代最早,-1指代最新。注意: 对于批处理查询,最新(无论是隐式使用还是在json中使用-1)是不允许的。对于流式查询,仅在新查询开始时适用,而恢复将始终从查询结束的地方继续。查询期间新发现的分区将最早开始。 |
endingTimestamp | 时间戳字符串,例如 "1000" |
无(下一个优先选项是
endingOffsetsByTimestamp
)
|
批处理查询 |
批处理查询结束时的结束点,json字符串指定所有订阅主题分区的结束时间戳。请参阅下面有关时间戳偏移选项的详细信息。如果Kafka未返回匹配的偏移量,偏移量将设置为最新。
注意:
endingTimestamp
优先于
endingOffsetsByTimestamp
和
endingOffsets
.
|
endingOffsetsByTimestamp | json字符串 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ |
无(下一个优先选项是
endingOffsets
)
|
批处理查询 |
批处理查询结束时的结束点,json字符串指定每个TopicPartition的结束时间戳。请参阅下面有关时间戳偏移选项的详细信息。如果Kafka未返回匹配的偏移量,偏移量将设置为最新。
注意:
endingOffsetsByTimestamp
优先于
endingOffsets
.
|
endingOffsets | 最新或 json字符串 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} | 最新 | 批处理查询 | 批处理查询结束时的结束点,可以是 "latest"(仅指代最新),或 json 字符串指定每个 TopicPartition 的结束偏移量。在json中,-1作为偏移量可指代最新,而-2(最早)作为偏移量是不允许的。 |
failOnDataLoss | true 或 false | true | 流式和批处理 | 当发生可能的数据丢失时(例如,主题被删除或偏移量超出范围),查询是否应该失败。这可能是假警报。您可以在它未按预期工作时禁用它。 |
kafkaConsumer.pollTimeoutMs | 长整型 | 120000 | 流式和批处理 |
从Kafka中轮询数据的超时,以毫秒为单位。当未定义时,它将回退到
spark.network.timeout
。
|
fetchOffset.numRetries | 整型 | 3 | 流式和批处理 | 在放弃获取Kafka偏移量之前重试的次数。 |
fetchOffset.retryIntervalMs | 长整型 | 10 | 流式和批处理 | 重试获取Kafka偏移量之前等待的毫秒数 |
maxOffsetsPerTrigger | 长整型 | 无 | 流式查询 | 每次触发间隔处理的最大偏移量的速率限制。指定的总偏移量将在不同数量的topicPartitions中成比例地分配。 |
minOffsetsPerTrigger | 长整型 | 无 | 流式查询 | 每次触发间隔至少处理的偏移量。指定的总偏移量将在不同数量的topicPartitions中成比例地分配。注意,如果超过maxTriggerDelay,将触发即使可用偏移量未达到minOffsetsPerTrigger。 |
maxTriggerDelay | 带单位的时间 | 15m | 流式查询 | 在两个触发之间可以延迟的最大时间,前提是源中有一些数据可用。此选项仅在设置了minOffsetsPerTrigger时适用。 |
minPartitions | 整型 | 无 | 流式和批处理 |
期望从Kafka读取的最小分区数。默认情况下,Spark具有topicPartitions与从Kafka消费的Spark分区的1-1映射。如果将此选项设置为大于topicPartitions的值,Spark将把大的Kafka分区分配为较小的部分。请注意,此配置像是一个
hint
:Spark任务的数量将
大约
为
minPartitions
。根据四舍五入误差或没有接收到新数据的Kafka分区,数量可能更少或更多。
|
groupIdPrefix | 字符串 | spark-kafka-source | 流式和批处理 |
由结构化流查询生成的消费组标识符(
group.id
)的前缀。如果设置了 "kafka.group.id",则此选项将被忽略。
|
kafka.group.id | 字符串 | 无 | 流式和批处理 | 在从Kafka读取时使用的Kafka组id。使用此选项时要谨慎。默认情况下,每个查询生成一个唯一的组id以读取数据。这确保每个Kafka源都有自己的消费者组,不会受到其他消费者的干扰,因此可以读取其订阅主题的所有分区。在某些场景下(例如,基于Kafka组的授权),可能希望使用特定的授权组id读取数据。您可以选择设置组id。然而,这样做时要极其小心,因为它可能会导致意外的行为。并发运行的查询(批处理和流式)或使用相同组id的源可能会相互干扰,导致每个查询只能读取部分数据。这也可能在快速连续启动/重新启动查询时发生。为了最小化此类问题,将Kafka消费者会话超时(通过设置选项 "kafka.session.timeout.ms")设置得非常小。当设置此选项时,选项 "groupIdPrefix" 将被忽略。 |
includeHeaders | 布尔型 | false | 流式和批处理 | 是否在行中包含Kafka头信息。 |
startingOffsetsByTimestampStrategy | "error" 或 "latest" | "error" | 流式和批处理 |
当根据时间戳指定的起始偏移(无论是全局的还是每个分区的)与Kafka返回的偏移不匹配时,将使用的策略。以下是策略名称和相应的描述:
"error": 使查询失败,用户必须处理需要手动步骤的变通方案。 "latest": 为这些分区分配最新的偏移量,以便Spark可以在进一步的微批中从这些分区读取更新的记录。
|
时间戳偏移选项的详细信息
每个分区返回的偏移量是该分区中时间戳大于或等于给定时间戳的最早偏移量。 如果Kafka未返回匹配的偏移量,则行为因选项而异 - 请查看每个选项的描述。
Spark简单地将时间戳信息传递给
KafkaConsumer.offsetsForTimes
,并没有对该值进行解释或推理。有关
KafkaConsumer.offsetsForTimes
的更多细节,请参阅
javadoc
。此外,这里
timestamp
的含义可能会根据Kafka配置 (
log.message.timestamp.type
) 而有所不同:请参阅
Kafka文档
获取进一步的细节。
时间戳偏移选项需要Kafka 0.10.1.0或更高版本。
偏移获取
在Spark 3.0及之前,Spark使用
KafkaConsumer
进行偏移量获取,这可能导致驱动程序中的无限等待。
在Spark 3.1中,新增了一个配置选项
spark.sql.streaming.kafka.useDeprecatedOffsetFetching
(默认值:
false
),
允许Spark使用新的偏移量获取机制,使用
AdminClient
。(将此设置为
true
以使用旧的偏移量获取方式,使用
KafkaConsumer
。)
当新的机制被使用时,以下适用。
首先,这种新方法支持Kafka代理
0.11.0.0+
。
在Spark 3.0及之前版本中,从驱动程序的角度来看,安全的Kafka处理需要以下ACL:
- 主题资源描述操作
- 主题资源读取操作
- 组资源读取操作
自 Spark 3.1 起,可以通过
AdminClient
获取偏移量,而不再使用
KafkaConsumer
。为此,从驱动程序的角度需要以下 ACL:
- 主题资源描述操作
由于驱动中的
AdminClient
未连接到消费者组,基于
group.id
的授权将不再有效(执行器从未完成基于组的授权)。值得一提的是,执行器端的行为与之前完全相同(组前缀和覆盖功能正常)。
消费者缓存
初始化Kafka消费者是耗时的,尤其是在处理时间是关键因素的流处理场景中。由于这个原因,Spark在执行者上池化Kafka消费者,通过利用Apache Commons Pool。
缓存键由以下信息构成:
- 主题名称
- 主题分区
- 组ID
以下属性可用于配置消费者池:
属性名称 | 默认值 | 含义 | 版本 |
---|---|---|---|
spark.kafka.consumer.cache.capacity | 64 | 缓存的消费者的最大数量。请注意,这只是一个软限制。 | 3.0.0 |
spark.kafka.consumer.cache.timeout | 5m (5分钟) | 消费者在池中闲置的最短时间,超过该时间即有资格被驱逐。 | 3.0.0 |
spark.kafka.consumer.cache.evictorThreadRunInterval | 1m (1分钟) | 消费者池中空闲驱逐线程运行的时间间隔。当为非正值时,不会运行任何空闲驱逐线程。 | 3.0.0 |
spark.kafka.consumer.cache.jmx.enable | false | 为使用此配置实例创建的池启用或禁用JMX。池的统计信息可以通过JMX实例获得。 JMX名称的前缀设置为“kafka010-cached-simple-kafka-consumer-pool”。 | 3.0.0 |
池的大小受限于
spark.kafka.consumer.cache.capacity
, 但它作为“软限制”不会阻塞Spark任务。
闲置驱逐线程定期移除超过给定超时未使用的消费者。如果在借用时达到此阈值,它会尝试移除当前未使用的最少使用条目。
如果无法移除,则池将继续增长。在最坏的情况下,池将增长到执行器中可以运行的最大并发任务数(即任务插槽的数量)。
如果任务因任何原因失败,新任务将使用新创建的Kafka消费者执行,以确保安全。同时,我们会使所有具有相同缓存键的池中的消费者失效,以移除在失败执行中使用的消费者。任何其他任务正在使用的消费者将不会被关闭,但在它们返回到池中时也会被使失效。
与消费者一起,Spark 将从 Kafka 中获取的记录单独进行池化,以使 Kafka 消费者在 Spark 的视角下是无状态的,并最大化池化的效率。它利用与 Kafka 消费者池相同的缓存键。请注意,由于特性上的差异,它不利用 Apache Commons Pool。
以下属性可用于配置获取的数据池:
属性名称 | 默认 | 含义 | 自版本 |
---|---|---|---|
spark.kafka.consumer.fetchedData.cache.timeout | 5m (5分钟) | 获取的数据在池中闲置的最短时间,超过此时间后,数据将有资格被驱逐。 | 3.0.0 |
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval | 1m (1分钟) | 获取的数据池中闲置驱逐线程运行之间的时间间隔。当为非正时,不会运行闲置驱逐线程。 | 3.0.0 |
将数据写入Kafka
在这里,我们描述了对Apache Kafka编写流查询和批处理查询的支持。请注意,Apache Kafka仅支持至少一次的写入语义。因此,在向Kafka写入流查询或批处理查询时,某些记录可能会重复;例如,如果Kafka需要重试一个没有被Broker确认的消息,即使该Broker接收并写入了消息记录,也可能发生这种情况。结构化流处理无法防止由于这些Kafka写入语义而发生的重复。然而,如果查询的写入成功,那么您可以假设查询输出至少写入了一次。读取写入数据时消除重复的一个可能解决方案是引入一个主(唯一)键,可以用于在读取时执行去重。
写入Kafka的数据框应该在模式中具有以下列:
列 | 类型 |
---|---|
键(可选) | 字符串或二进制 |
值(必需) | 字符串或二进制 |
头部(可选) | 数组 |
主题(*可选) | 字符串 |
分区(可选) | 整数 |
* 如果没有指定“主题”配置选项,则主题列是必须的。
值列是唯一必需的选项。如果未指定键列,则会自动添加一个
null
值的键列(请参阅Kafka语义,了解如何处理
null
值的键值)。如果存在主题列,则在将给定行写入Kafka时,其值用作主题,除非设置了“topic”配置选项,即“topic”配置选项会覆盖主题列。如果未指定“partition”列(或其值为
null
),则分区由Kafka生产者计算。在Spark中,可以通过设置
kafka.partitioner.class
选项来指定Kafka分区器。如果未指定,则将使用Kafka默认的分区器。
以下选项必须为Kafka接收器设置,以支持批处理和流处理查询。
选项 | 值 | 含义 |
---|---|---|
kafka.bootstrap.servers | 一个以逗号分隔的 host:port 列表 | Kafka "bootstrap.servers" 配置。 |
以下配置是可选的:
选项 | 值 | 默认 | 查询类型 | 含义 |
---|---|---|---|---|
topic | string | none | streaming and batch | 设置所有行将被写入的Kafka主题。此选项将覆盖数据中可能存在的任何主题列。 |
includeHeaders | boolean | false | streaming and batch | 是否在行中包含Kafka头信息。 |
为流查询创建Kafka接收器
# 将 DataFrame 中的键值数据写入指定的 Kafka 主题
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
# 使用数据中指定的主题将 DataFrame 中的键值数据写入 Kafka
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
// 将 DataFrame 中的键值数据写入指定的 Kafka 主题,该主题通过选项指定
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// 使用数据中指定的主题将 DataFrame 中的键值数据写入 Kafka
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
// 将键值数据从 DataFrame 写入指定的 Kafka 主题,该主题在选项中指定
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start();
// 使用数据中指定的主题将键值数据从 DataFrame 写入 Kafka
StreamingQuery ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start();
将批处理查询的输出写入Kafka
# 将DataFrame中的键值数据写入选项中指定的Kafka主题
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.save()
# 使用数据中指定的主题将DataFrame中的键值数据写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()
// 将键值数据从DataFrame写入指定的Kafka主题,该主题在选项中指定
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// 使用数据中指定的主题将键值数据从DataFrame写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
// 从 DataFrame 中将键值数据写入指定在选项中的特定 Kafka 主题
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save();
// 从 DataFrame 中使用数据中指定的主题将键值数据写入 Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save();
生产者缓存
给定的Kafka生产者实例旨在保持线程安全,Spark初始化一个Kafka生产者实例,并在相同缓存键的任务之间共同使用。
缓存键由以下信息构成:
- Kafka 生产者配置
这包括授权的配置,当使用委托令牌时,Spark会自动包含这些配置。即使我们考虑到授权,您可以期望在相同的Kafka生产者配置中使用相同的Kafka生产者实例。 当委托令牌被更新时,它将使用不同的Kafka生产者;旧委托令牌的Kafka生产者实例将根据缓存策略被驱逐。
以下属性可用于配置生产者池:
属性名称 | 默认值 | 意义 | 自版本起 |
---|---|---|---|
spark.kafka.producer.cache.timeout | 10m (10分钟) | 生产者在池中闲置的最短时间,在此时间后可被驱逐。 | 2.2.1 |
spark.kafka.producer.cache.evictorThreadRunInterval | 1m (1分钟) | 生产者池中闲置驱逐线程的运行间隔时间。当为非正值时,将不会启用闲置驱逐线程。 | 3.0.0 |
闲置驱逐线程定期移除超过给定超时未使用的生产者。请注意,生产者是共享并且并发使用的,因此最后使用的时间戳由生产者实例被返回且引用计数为0的时刻决定。
Kafka 特定配置
Kafka 的配置可以通过
DataStreamReader.option
以
kafka.
前缀进行设置,例如,
stream.option("kafka.bootstrap.servers", "host:port")
。有关可能的 kafka 参数,请参阅
Kafka 消费者配置文档
,查阅与读取数据相关的参数,以及
Kafka 生产者配置文档
,查阅与写入数据相关的参数。
请注意,以下Kafka参数无法设置,Kafka源或接收器将抛出异常:
-
group.id
:Kafka 源将为每个查询自动创建一个唯一的组 ID。用户可以通过可选的源选项
groupIdPrefix
设置自动生成的 group.id 的前缀,默认值为“spark-kafka-source”。您还可以设置“kafka.group.id”以强制 Spark 使用特定的组 ID,但请阅读此选项的警告并谨慎使用。 -
auto.offset.reset
:设置源选项
startingOffsets
以指定从哪里开始。结构化流管理内部消费的偏移量,而不是依赖 kafka Consumer 来处理。这将确保在动态订阅新主题/分区时不遗漏任何数据。请注意,startingOffsets
仅在启动新流查询时适用,而恢复将始终从查询中断的位置继续。请注意,当流应用程序消费的偏移量在 Kafka 中不再存在(例如,主题被删除、偏移量超出范围或偏移量在保留期后被删除)时,偏移量将不会重置,流应用程序将会出现数据丢失。在极端情况下,例如流应用程序的吞吐量无法赶上 Kafka 的保留速度,批次的输入行可能会逐渐减少直到为零,当批次的偏移范围完全不在 Kafka 中时。启用failOnDataLoss
选项可以要求结构化流在这种情况下使查询失败。 - key.deserializer :键始终使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化键。
- value.deserializer :值始终使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化值。
- key.serializer :键始终使用 ByteArraySerializer 或 StringSerializer 序列化。使用 DataFrame 操作将键显式序列化为字符串或字节数组。
- value.serializer :值始终使用 ByteArraySerializer 或 StringSerializer 序列化。使用 DataFrame 操作将值显式序列化为字符串或字节数组。
- enable.auto.commit :Kafka 源不提交任何偏移量。
- interceptor.classes :Kafka 源始终将键和值作为字节数组读取。使用 ConsumerInterceptor 是不安全的,因为它可能会破坏查询。
部署
与任何 Spark 应用程序一样,
spark-submit
用于启动您的应用程序。
spark-sql-kafka-0-10_2.12
及其依赖项可以直接使用
--packages
添加到
spark-submit
中,例如,
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3 ...
在
spark-shell
上进行实验时,您还可以使用
--packages
直接添加
spark-sql-kafka-0-10_2.12
及其依赖项,
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3 ...
有关提交具有外部依赖项的申请的更多详细信息,请参阅 申请提交指南 。
安全
Kafka 0.9.0.0 引入了几项增强集群安全性的功能。有关这些可能性的详细说明,请参阅 Kafka 安全文档 。
值得注意的是,安全性是可选的,并且默认是关闭的。
Spark 支持以下几种方式来验证 Kafka 集群:
- 委托令牌(在Kafka代理1.1.0中引入)
- JAAS登录配置
委托令牌
通过这种方式,可以通过Spark参数配置应用程序,并且可能不需要JAAS登录配置(Spark可以使用Kafka的动态JAAS配置功能)。有关委托令牌的更多信息,请参见 Kafka委托令牌文档 。
该过程由Spark的Kafka委托令牌提供者启动。当
spark.kafka.clusters.${cluster}.auth.bootstrap.servers
被设置时,Spark按照优先顺序考虑以下登录选项:
- JAAS登录配置 ,请参见下面的示例。
-
Keytab文件 ,例如,
./bin/spark-submit \ --keytab
\ --principal \ --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers= \ ... -
Kerberos凭证缓存 ,例如,
./bin/spark-submit \ --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=
\ ...
通过将
spark.security.credentials.kafka.enabled
设置为
false
(默认:
true
) 可以关闭 Kafka 委托令牌提供程序。
Spark可以配置使用以下身份验证协议来获取令牌(必须与Kafka代理配置匹配):
- SASL SSL (默认)
- SSL
- SASL 明文 (用于测试)
成功获取委托令牌后,Spark 将其分发到各个节点并相应地进行更新。
委托令牌使用
SCRAM
登录模块进行身份验证,因此必须配置合适的
spark.kafka.clusters.${cluster}.sasl.token.mechanism
(默认值:
SCRAM-SHA-512
)。此外,此参数
必须与 Kafka 代理配置匹配。
当代理令牌在执行器上可用时,Spark考虑以下登录选项,按优先级排序:
- JAAS登录配置 ,请参见下面的示例。
-
委托令牌
,请查看
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex
参数以获取更多细节。
当上述都不适用时,假定为不安全连接。
配置
委托令牌可以从多个集群中获取,而
${cluster}
是一个任意的唯一标识符,有助于对不同的配置进行分组。
属性名称 | 默认值 | 含义 | 自版本起 |
---|---|---|---|
spark.kafka.clusters.${cluster}.auth.bootstrap.servers
|
无 | 用于建立与Kafka集群的初始连接的逗号分隔的主机/端口对的列表。有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 | 3.0.0 |
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex
|
.* |
用于与应用程序中源和接收器的
bootstrap.servers
配置进行匹配的正则表达式。
如果服务器地址匹配此正则表达式,将在连接时使用从相应的引导服务器获得的委托令牌。
如果多个集群匹配该地址,则会抛出异常,并且查询将不会启动。
Kafka的安全和非安全监听器绑定到不同的端口。当两者都使用时,安全监听器端口必须是正则表达式的一部分。
|
3.0.0 |
spark.kafka.clusters.${cluster}.security.protocol
|
SASL_SSL |
与代理通信所使用的协议。有关更多详细信息,请参见Kafka文档。协议应用于所有源和接收器作为默认值,其中
bootstrap.servers
配置匹配(有关更多详细信息,请参见
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex
),
并且可以通过在源或接收器上设置
kafka.security.protocol
进行覆盖。
|
3.0.0 |
spark.kafka.clusters.${cluster}.sasl.kerberos.service.name
|
kafka | Kafka运行的Kerberos主名称。可以在Kafka的JAAS配置或Kafka的配置中定义。 有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.type
|
无 | 信任库文件的文件格式。有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 | 3.2.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.location
|
无 | 信任库文件的位置。有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.password
|
无 |
信任库文件的存储密码。可选,仅在配置了
spark.kafka.clusters.${cluster}.ssl.truststore.location
时需要。
有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。
|
3.0.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.type
|
无 | 密钥库文件的文件格式。对于客户端来说,这是可选的。 有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 | 3.2.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.location
|
无 | 密钥库文件的位置。这对于客户端是可选的,可以使用双向认证。 有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.password
|
无 |
密钥库文件的存储密码。可选,仅在配置了
spark.kafka.clusters.${cluster}.ssl.keystore.location
时需要。
有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。
|
3.0.0 |
spark.kafka.clusters.${cluster}.ssl.key.password
|
无 | 密钥库文件中私钥的密码。这对于客户端是可选的。 有关更多详细信息,请参见Kafka文档。仅用于获取委托令牌。 | 3.0.0 |
spark.kafka.clusters.${cluster}.sasl.token.mechanism
|
SCRAM-SHA-512 |
用于带有委托令牌的客户端连接的SASL机制。由于用于身份验证的SCRAM登录模块,因此必须在此设置兼容的机制。
有关更多详细信息,请参见Kafka文档(
sasl.mechanism
)。仅用于与Kafka代理使用委托令牌进行身份验证。
|
3.0.0 |
Kafka特定配置
Kafka的自有配置可以用
kafka.
前缀设置,例如
--conf spark.kafka.clusters.${cluster}.kafka.retries=1
。有关可能的Kafka参数,请参见
Kafka adminclient配置文档
。
注意事项
- 获取代理用户的委托令牌尚不支持 ( KAFKA-6945 )。
JAAS 登录配置
JAAS登录配置必须放置在所有Spark尝试访问Kafka集群的节点上。 这提供了应用任何自定义身份验证逻辑的可能性,但维护成本较高。 这可以通过几种方式完成。一种可能性是提供额外的JVM参数,例如,
./bin/spark-submit \
--driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
--conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
...