Pulsar 函数 CLI 和 YAML 配置
Pulsar 管理 CLI 用于 Pulsar 函数
Pulsar 管理界面使您能够通过 CLI 创建和管理 Pulsar 函数。有关最新和完整的信息,包括命令、标志和描述,请参阅 Pulsar admin CLI。
Pulsar 函数的 YAML 配置
您可以使用预定义的YAML文件来配置函数。下表概述了所需的字段和参数。
字段名称 | 类型 | 相关命令参数 | 描述 |
---|---|---|---|
runtimeFlags | String | N/A | 任何你想传递给运行时的标志(仅适用于进程和Kubernetes运行时)。 |
tenant | String | --tenant | 函数的租户。 |
namespace | String | --namespace | 函数的命名空间。 |
name | String | --name | 函数的名称。 |
className | String | --classname | 函数的类名。 |
functionType | String | --function-type | 内置函数类型。 |
inputs | List | -i , --inputs | 函数的输入主题。可以指定多个主题,以逗号分隔的列表形式。 |
customSerdeInputs | Map | --custom-serde-inputs | 从输入主题到SerDe类名称的映射。 |
topicsPattern | String | --topics-pattern | 从命名空间下的主题列表中消费的主题模式。 注意: --input 和 --topic-pattern 是互斥的。对于Java函数,您需要在 --custom-serde-inputs 中添加模式的SerDe类名。 |
customSchemaInputs | Map | --custom-schema-inputs | 从输入主题到模式属性的映射。 |
customSchemaOutputs | Map | --custom-schema-outputs | 从输出主题到模式属性的映射。 |
inputSpecs | Map | --input-specs | 从输入到自定义配置的映射。 |
output | String | -o , --output | 函数的输出主题。如果未指定,则不写入输出。 |
producerConfig | ProducerConfig | --producer-config | 生产者的自定义配置。 |
outputSchemaType | String | -st , --schema-type | 用于消息输出的内置模式类型或自定义模式类名称。 |
outputSerdeClassName | String | --output-serde-classname | 用于消息输出的SerDe类。 |
logTopic | String | --log-topic | 函数日志生成的主题。 |
processingGuarantees | String | --processing-guarantees | 应用于函数的处理保证(交付语义)。可用值:ATLEAST_ONCE , ATMOST_ONCE , EFFECTIVELY_ONCE , MANUAL 。 |
retainOrdering | Boolean | --retain-ordering | 函数是否按顺序消费和处理消息。 |
retainKeyOrdering | Boolean | --retain-key-ordering | 函数是否按键顺序消费和处理消息。 |
batchBuilder | String | --batch-builder | 使用 producerConfig.batchBuilder 代替。注意: batchBuilder 将在代码中很快被弃用。 |
forwardSourceMessageProperty | Boolean | --forward-source-message-property | 在处理过程中,输入消息的属性是否转发到输出主题。当值设置为false 时,转发被禁用。 |
userConfig | Map | --user-config | 用户定义的配置键/值。 |
secrets | Map | --secrets | 从secretName到对象的映射,这些对象封装了底层秘密提供者如何获取秘密。 |
runtime | String | N/A | 函数的运行时。可用值:java ,python , go . |
autoAck | Boolean | --auto-ack | 框架是否自动确认消息。 注意: 此配置将在未来的版本中弃用。如果您指定了交付语义,框架将自动确认消息。如果您不希望框架自动确认消息,请将 processingGuarantees 设置为MANUAL 。 |
maxMessageRetries | Int | --max-message-retries | 在放弃之前处理消息的重试次数。 |
deadLetterTopic | String | --dead-letter-topic | 用于存储未成功处理消息的主题。 |
subName | String | --subs-name | 如果需要,用于输入主题消费者的Pulsar源订阅的名称。 |
parallelism | Int | --parallelism | 函数的并行度因子,即运行的函数实例数量。 |
资源 | Resources | N/A | N/A |
fqfn | String | --fqfn | 函数的完全限定函数名称(FQFN)。 |
窗口配置 | WindowConfig | N/A | N/A |
timeoutMs | Long | --timeout-ms | 消息超时时间(以毫秒为单位)。 |
jar | String | --jar | JAR文件的绝对路径(用Java编写)。它还支持URL路径,工作者可以从这些路径下载包,包括HTTP、HTTPS、文件(假设文件已经存在于工作者主机上的文件协议)和函数(来自包管理服务的包URL)。 |
py | String | --py | 用于函数(用Python编写)的主Python/Python wheel文件的绝对路径。它还支持工作者可以从中下载包的URL路径,包括HTTP、HTTPS、文件(假设文件已存在于工作者主机上的文件协议)和函数(来自包管理服务的包URL)。 |
go | String | --go | 用于函数的主要Go可执行二进制文件的绝对路径(用Go编写)。它还支持工作者可以从中下载包的URL路径,包括HTTP、HTTPS、文件(假设文件已经存在于工作者主机上的文件协议)和函数(来自包管理服务的包URL)。 |
cleanupSubscription | Boolean | --cleanup-subscription | 当函数被删除时,函数创建或使用的订阅是否应该被删除。默认值为 true |
customRuntimeOptions | String | --custom-runtime-options | 一个字符串,用于编码自定义运行时的选项。 |
maxPendingAsyncRequests | Int | --max-message-retries | 每个实例的最大挂起异步请求数,以避免大量并发请求。 |
exposePulsarAdminClientEnabled | Boolean | N/A | 是否将Pulsar管理客户端暴露给函数上下文。默认情况下,它是禁用的。 |
subscriptionPosition | String | --subs-position | Pulsar 源订阅的位置,用于从指定位置消费消息。默认值为 Latest 。 |
skipToLatest | Boolean | --skip-to-latest | 消费者是否应在函数实例重新启动后跳转到最新消息。 |
消费者配置
下表概述了inputSpecs
字段下的嵌套字段和相关参数。
字段名称 | 类型 | 相关命令参数 | 描述 |
---|---|---|---|
模式类型 | 字符串 | 不适用 | 不适用 |
序列化类名称 | 字符串 | 不适用 | 不适用 |
isRegexPattern | 布尔值 | 不适用 | 不适用 |
schemaProperties | Map | 不适用 | 不适用 |
消费者属性 | Map | 不适用 | 不适用 |
接收者队列大小 | 整型 | 不适用 | 不适用 |
cryptoConfig | CryptoConfig | N/A | 参考代码。 |
poolMessages | 布尔值 | 不适用 | 不适用 |
生产者配置
下表概述了producerConfig
字段下的嵌套字段和相关参数。
字段名称 | 类型 | 相关命令参数 | 描述 |
---|---|---|---|
maxPendingMessages | Int | N/A | 队列的最大大小,该队列持有等待从代理接收确认的消息。 |
maxPendingMessagesAcrossPartitions | Int | N/A | 所有分区中maxPendingMessages 的数量。 |
useThreadLocalProducers | 布尔值 | 不适用 | 不适用 |
cryptoConfig | CryptoConfig | N/A | 请参考代码。 |
batchBuilder | String | --batch-builder | 批处理构建方法的类型。可用值:DEFAULT 和 KEY_BASED 。默认值为 DEFAULT 。 |
compressionType | String | N/A | 生产者使用的消息数据压缩类型。默认值为 LZ4 。可用选项: NONE (无压缩)ZLIB ZSTD SNAPPY |
资源
下表概述了resources
字段下的嵌套字段和相关参数。
字段名称 | 类型 | 相关命令参数 | 描述 |
---|---|---|---|
cpu | double | --cpu | 每个函数实例需要分配的CPU核心数(仅适用于Kubernetes运行时)。 |
ram | Long | --ram | 每个函数实例需要分配的内存大小(以字节为单位)(仅适用于进程/Kubernetes运行时)。 |
disk | Long | --disk | 每个函数实例需要分配的磁盘空间(以字节为单位)(仅适用于Kubernetes运行时)。 |
窗口配置
下表概述了windowConfig
字段下的嵌套字段和相关参数。
字段名称 | 类型 | 相关命令参数 | 描述 |
---|---|---|---|
windowLengthCount | Int | --window-length-count | 每个窗口的消息数量。 |
windowLengthDurationMs | Long | --window-length-duration-ms | 每个窗口的时间长度(以毫秒为单位)。 |
slidingIntervalCount | Int | --sliding-interval-count | 窗口滑动前的消息数量。 |
slidingIntervalDurationMs | Long | --sliding-interval-duration-ms | 窗口滑动的时间间隔。 |
lateDataTopic | 字符串 | 不适用 | 不适用 |
maxLagMs | 长整型 | 不适用 | 不适用 |
水印发射间隔毫秒 | 长整型 | 不适用 | 不适用 |
时间戳提取器类名称 | 字符串 | 不适用 | 不适用 |
actualWindowFunctionClassName | 字符串 | 不适用 | 不适用 |
加密配置
下表概述了cryptoConfig
字段下的嵌套字段和相关参数。
字段名称 | 类型 | 相关命令参数 | 描述 |
---|---|---|---|
cryptoKeyReaderClassName | String | N/A | 参考code。 |
cryptoKeyReaderConfig | Map | 不适用 | 不适用 |
加密密钥 | String[] | N/A | N/A |
producerCryptoFailureAction | ProducerCryptoFailureAction | 不适用 | 不适用 |
consumerCryptoFailureAction | ConsumerCryptoFailureAction | 不适用 | 不适用 |
示例
以下示例展示了如何使用YAML或JSON配置函数。
- YAML
- JSON
tenant: "public"
namespace: "default"
name: "config-file-function"
inputs:
- "persistent://public/default/config-file-function-input-1"
- "persistent://public/default/config-file-function-input-2"
output: "persistent://public/default/config-file-function-output"
jar: "function.jar"
parallelism: 1
resources:
cpu: 8
ram: 8589934592
autoAck: true
userConfig:
foo: "bar"
{
"tenant": "public",
"namespace": "default",
"name": "config-file-function",
"inputs": [
"persistent://public/default/config-file-function-input-1",
"persistent://public/default/config-file-function-input-2"
],
"output": "persistent://public/default/config-file-function-output",
"jar": "function.jar",
"parallelism": 1,
"resources": {
"cpu": 8,
"ram": 8589934592
},
"autoAck": true,
"userConfig": {
"foo": "bar"
}
}