Pulsar 函数概念
完全限定函数名称
每个函数都有一个完全限定函数名称(FQFN),其中包含指定的租户、命名空间和函数名称。使用FQFN,您可以在不同的命名空间中创建具有相同函数名称的多个函数。
一个FQFN看起来像这样:
tenant/namespace/name
函数实例
函数实例是函数执行框架的核心元素,由以下元素组成:
- 一组消费者从不同的输入主题消费消息。
- 一个调用函数的执行器。
- 一个生产者,将函数的结果发送到一个输出主题。
下图展示了一个函数实例的内部工作流程。
一个函数可以有多个实例,每个实例执行函数的一个副本。您可以在配置文件中指定实例的数量。
函数实例内的消费者使用FQFN作为订阅者名称,以基于订阅类型在多个实例之间实现负载均衡。订阅类型可以在函数级别指定。
每个函数都有一个带有FQFN的独立状态存储。您可以指定一个状态接口以在BookKeeper中持久化中间结果。其他用户可以查询函数的状态并提取这些结果。
函数 worker
Function worker 是一个逻辑组件,用于在 Pulsar Functions 的 cluster-mode 部署中监控、编排和执行单个函数。
在函数工作者中,每个函数实例可以作为线程或进程执行,具体取决于所选的配置。另外,如果有可用的Kubernetes集群,函数可以作为Kubernetes中的StatefulSets启动。有关更多详细信息,请参阅设置函数工作者。
下图展示了函数工作者的内部架构和工作流程。
函数工作者形成一个工作者节点集群,工作流程描述如下。
- 用户向REST服务器发送请求以执行函数实例。
- REST服务器响应请求并将请求传递给函数元数据管理器。
- 函数元数据管理器将请求更新写入函数元数据主题。它还跟踪所有与元数据相关的消息,并使用函数元数据主题来持久化函数的状态更新。
- 函数元数据管理器从函数元数据主题读取更新,并触发调度管理器计算分配。
- 日程管理器将作业更新写入作业主题。
- 函数运行时管理器监听分配主题,读取分配更新,并更新其内部状态,该状态包含所有工作者的所有分配的全局视图。如果更新更改了工作者上的分配,则函数运行时管理器通过启动或停止函数实例的执行来实现新的分配。
- 会员管理器请求协调主题以选举一位首席工作者。所有工作者以故障转移订阅的方式订阅协调主题,但活跃的工作者会成为领导者并执行分配,确保该主题只有一个活跃的消费者。
- 会员管理器从协调主题读取更新。
函数运行时间
一个函数实例在运行时内被调用,并且可以并行运行多个实例。Pulsar支持三种类型的函数运行时,具有不同的成本和隔离保证,以最大化部署灵活性。您可以根据需要使用其中一种来运行函数。有关更多详细信息,请参阅配置函数运行时。
下表概述了三种类型的函数运行时。
类型 | 描述 |
---|---|
线程运行时 | 每个实例都作为线程运行。 由于线程模式的代码是用Java编写的,因此它仅适用于Java实例。当函数在线程模式下运行时,它与函数工作器在同一个Java虚拟机(JVM)上运行。 |
进程运行时 | 每个实例作为一个进程运行。 当函数在进程模式下运行时,它运行在与函数工作器相同的机器上。 |
Kubernetes 运行时 | 函数由工作节点作为 Kubernetes StatefulSet 提交,每个函数实例作为一个 pod 运行。Pulsar 支持在启动函数时为 Kubernetes StatefulSets 和服务添加标签,这有助于选择目标 Kubernetes 对象。 |
处理保证和订阅类型
Pulsar 提供了三种不同的消息传递语义,您可以将其应用于函数。不同的传递语义实现是根据确认时间节点来确定的。
交付语义 | 描述 | 采用的订阅类型 |
---|---|---|
最多一次 交付 | 发送到函数的每条消息都会尽力处理。不能保证消息是否会被处理。 当您选择此语义时, autoAck 配置必须设置为 true ,否则启动将失败(autoAck 配置将在未来的版本中弃用)。确认时间节点:在函数处理之前。 | 共享 |
至少一次 交付(默认) | 发送到函数的每条消息可能会被处理多次(在出现处理失败或重新交付的情况下)。 如果您在创建函数时未指定 --processing-guarantees 标志,则该函数提供 至少一次 交付保证。确认时间节点:在发送消息到输出后。 | 共享 |
Effectively-once 交付 | 发送到函数的每条消息可能会被处理多次,但只有一个输出。重复的消息会被忽略。Effectively once 是在 at-least-once 处理的基础上实现的,并保证了服务器端的去重。这意味着状态更新可能会发生两次,但相同的状态更新只会应用一次,其他重复的状态更新会在服务器端被丢弃。Ack 时间节点: 在发送消息到输出之后。 | 故障转移 |
手动 交付 | 当您选择此语义时,框架不会执行任何确认操作,您需要在函数内部调用方法 context.getCurrentRecord().ack() 来手动执行确认操作。确认时间节点: 在函数方法内用户自定义。 | 共享 |
- 默认情况下,Pulsar Functions 提供
at-least-once
交付保证。如果您在创建函数时没有为--processingGuarantees
标志提供值,则该函数将提供at-least-once
保证。 Exclusive
订阅类型在 Pulsar Functions 中不可用,原因是:- 如果只有一个实例,
exclusive
等同于failover
。 - 如果有多个实例,
exclusive
在函数重启时可能会崩溃并重新启动。在这种情况下,exclusive
不等于failover
。因为当主消费者断开连接时,所有未确认和后续的消息都会传递给下一个消费者。
- 如果只有一个实例,
- 要将订阅类型从
shared
更改为key_shared
,您可以在pulsar-admin
中使用—retain-key-ordering
选项。
您可以在创建函数时设置处理保证。以下命令创建一个应用了恰好一次保证的函数。
bin/pulsar-admin functions create \
--name my-effectively-once-function \
--processing-guarantees EFFECTIVELY_ONCE \
# Other function configs
您可以使用update
命令更改应用于函数的处理保证。
bin/pulsar-admin functions update \
--processing-guarantees ATMOST_ONCE \
# Other function configs
上下文
Java、Python 和 Go SDK 提供了对函数可以使用的上下文对象的访问。这个上下文对象为函数提供了广泛的信息和功能,包括:
- 函数的名称和ID。
- 消息的ID。每条消息都会自动分配一个ID。
- 消息的键、事件时间、属性和分区键。
- 消息发送到的主题名称。
- 所有输入主题的名称以及与函数相关的输出主题。
- 用于SerDe的类的名称。
- 与函数关联的租户和命名空间。
- 运行函数的函数实例的ID。
- 函数的版本。
- 函数使用的logger对象,用于创建日志消息。
- 访问通过CLI提供的任意用户配置值。
- 用于记录metrics的接口。
- 用于在状态存储中存储和检索状态的接口。
- 一个用于将新消息发布到任意主题的函数。
- 一个用于确认正在处理的消息的函数(如果自动确认被禁用)。
- (Java) 一个用于获取Pulsar管理客户端的函数。
- (Java) 一个函数,用于创建一条记录,返回从上下文和输入记录中获取的默认值。
函数消息类型
Pulsar 函数以字节数组作为输入,并输出字节数组。您可以通过以下任一方式编写类型化函数并将消息绑定到类型:
窗口函数
目前,窗口函数仅在Java中可用,并且不支持MANUAL
和Effectively-once
的交付语义。
窗口函数是一种在数据窗口上执行计算的函数,即事件流的有限子集。如下图所示,流被分割成“桶”,可以在这些桶上应用函数。
函数的定义涉及两个策略:
- 驱逐策略:控制窗口中收集的数据量。
- 触发策略:控制函数何时触发并执行,以根据驱逐策略处理窗口中收集的所有数据。
触发策略和驱逐策略都是由时间或计数驱动的。
支持处理时间和事件时间。
- 处理时间是根据函数实例构建和处理窗口时的挂钟时间定义的。窗口完整性的判断是直接的,您不必担心数据到达的混乱。
- 事件时间是根据事件记录附带的时间戳定义的。它保证了事件时间的正确性,同时也提供了更多的数据缓冲和有限的数据完整性保证。
窗口类型
根据两个相邻窗口是否可以共享共同事件,窗口可以分为以下两种类型:
滚动窗口
滚动窗口将元素分配到指定时间长度的窗口中。滚动窗口的驱逐策略总是基于窗口已满。因此,您只需要指定触发策略,无论是基于计数还是基于时间。
在具有基于计数的触发策略的滚动窗口中,如下例所示,触发策略设置为2。每当窗口中有两个项目时,每个函数都会被触发并执行,无论时间如何。
相比之下,如下例所示,滚动窗口的窗口长度为10秒,这意味着无论窗口中有多少事件,当10秒的时间间隔过去时,函数都会被触发。
滑动窗口
滑动窗口方法通过设置驱逐策略来定义固定的窗口长度,以限制保留用于处理的数据量,并通过设置滑动间隔来设置触发策略。如果滑动间隔小于窗口长度,则存在数据重叠,这意味着同时落入相邻窗口的数据会被多次用于计算。
如下例所示,窗口长度为2秒,这意味着任何超过2秒的数据将被移除,不会用于计算。滑动间隔配置为1秒,这意味着每秒执行一次函数以处理整个窗口长度内的数据。