Skip to main content
Version: 4.0.x

Pulsar 函数 CLI 和 YAML 配置

Pulsar 管理 CLI 用于 Pulsar 函数

Pulsar 管理界面使您能够通过 CLI 创建和管理 Pulsar 函数。有关最新和完整的信息,包括命令、标志和描述,请参阅 Pulsar admin CLI

Pulsar 函数的 YAML 配置

您可以使用预定义的YAML文件来配置函数。下表概述了所需的字段和参数。

字段名称类型相关命令参数描述
runtimeFlagsStringN/A任何你想传递给运行时的标志(仅适用于进程和Kubernetes运行时)。
tenantString--tenant函数的租户。
namespaceString--namespace函数的命名空间。
nameString--name函数的名称。
classNameString--classname函数的类名。
functionTypeString--function-type内置函数类型。
inputsList-i, --inputs函数的输入主题。可以指定多个主题,以逗号分隔的列表形式。
customSerdeInputsMap--custom-serde-inputs从输入主题到SerDe类名称的映射。
topicsPatternString--topics-pattern从命名空间下的主题列表中消费的主题模式。
注意: --input--topic-pattern 是互斥的。对于Java函数,您需要在 --custom-serde-inputs 中添加模式的SerDe类名。
customSchemaInputsMap--custom-schema-inputs从输入主题到模式属性的映射。
customSchemaOutputsMap--custom-schema-outputs从输出主题到模式属性的映射。
inputSpecsMapConsumerConfig>--input-specs从输入到自定义配置的映射。
outputString-o, --output函数的输出主题。如果未指定,则不写入输出。
producerConfigProducerConfig--producer-config生产者的自定义配置。
outputSchemaTypeString-st, --schema-type用于消息输出的内置模式类型或自定义模式类名称。
outputSerdeClassNameString--output-serde-classname用于消息输出的SerDe类。
logTopicString--log-topic函数日志生成的主题。
processingGuaranteesString--processing-guarantees应用于函数的处理保证(交付语义)。可用值:ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE, MANUAL
retainOrderingBoolean--retain-ordering函数是否按顺序消费和处理消息。
retainKeyOrderingBoolean--retain-key-ordering函数是否按键顺序消费和处理消息。
batchBuilderString--batch-builder使用 producerConfig.batchBuilder 代替。
注意: batchBuilder 将在代码中很快被弃用。
forwardSourceMessagePropertyBoolean--forward-source-message-property在处理过程中,输入消息的属性是否转发到输出主题。当值设置为false时,转发被禁用。
userConfigMap--user-config用户定义的配置键/值。
secretsMap--secrets从secretName到对象的映射,这些对象封装了底层秘密提供者如何获取秘密。
runtimeStringN/A函数的运行时。可用值:java,python, go.
autoAckBoolean--auto-ack框架是否自动确认消息。

注意: 此配置将在未来的版本中弃用。如果您指定了交付语义,框架将自动确认消息。如果您不希望框架自动确认消息,请将processingGuarantees设置为MANUAL
maxMessageRetriesInt--max-message-retries在放弃之前处理消息的重试次数。
deadLetterTopicString--dead-letter-topic用于存储未成功处理消息的主题。
subNameString--subs-name如果需要,用于输入主题消费者的Pulsar源订阅的名称。
parallelismInt--parallelism函数的并行度因子,即运行的函数实例数量。
资源ResourcesN/AN/A
fqfnString--fqfn函数的完全限定函数名称(FQFN)。
窗口配置WindowConfigN/AN/A
timeoutMsLong--timeout-ms消息超时时间(以毫秒为单位)。
jarString--jarJAR文件的绝对路径(用Java编写)。它还支持URL路径,工作者可以从这些路径下载包,包括HTTP、HTTPS、文件(假设文件已经存在于工作者主机上的文件协议)和函数(来自包管理服务的包URL)。
pyString--py用于函数(用Python编写)的主Python/Python wheel文件的绝对路径。它还支持工作者可以从中下载包的URL路径,包括HTTP、HTTPS、文件(假设文件已存在于工作者主机上的文件协议)和函数(来自包管理服务的包URL)。
goString--go用于函数的主要Go可执行二进制文件的绝对路径(用Go编写)。它还支持工作者可以从中下载包的URL路径,包括HTTP、HTTPS、文件(假设文件已经存在于工作者主机上的文件协议)和函数(来自包管理服务的包URL)。
cleanupSubscriptionBoolean--cleanup-subscription当函数被删除时,函数创建或使用的订阅是否应该被删除。默认值为 true
customRuntimeOptionsString--custom-runtime-options一个字符串,用于编码自定义运行时的选项。
maxPendingAsyncRequestsInt--max-message-retries每个实例的最大挂起异步请求数,以避免大量并发请求。
exposePulsarAdminClientEnabledBooleanN/A是否将Pulsar管理客户端暴露给函数上下文。默认情况下,它是禁用的。
subscriptionPositionString--subs-positionPulsar 源订阅的位置,用于从指定位置消费消息。默认值为 Latest
skipToLatestBoolean--skip-to-latest消费者是否应在函数实例重新启动后跳转到最新消息。
消费者配置

下表概述了inputSpecs字段下的嵌套字段和相关参数。

字段名称类型相关命令参数描述
模式类型字符串不适用不适用
序列化类名称字符串不适用不适用
isRegexPattern布尔值不适用不适用
schemaPropertiesMap不适用不适用
消费者属性Map不适用不适用
接收者队列大小整型不适用不适用
cryptoConfigCryptoConfigN/A参考代码
poolMessages布尔值不适用不适用
生产者配置

下表概述了producerConfig字段下的嵌套字段和相关参数。

字段名称类型相关命令参数描述
maxPendingMessagesIntN/A队列的最大大小,该队列持有等待从代理接收确认的消息。
maxPendingMessagesAcrossPartitionsIntN/A所有分区中maxPendingMessages的数量。
useThreadLocalProducers布尔值不适用不适用
cryptoConfigCryptoConfigN/A请参考代码
batchBuilderString--batch-builder批处理构建方法的类型。可用值:DEFAULTKEY_BASED。默认值为 DEFAULT
compressionTypeStringN/A生产者使用的消息数据压缩类型。默认值为 LZ4
可用选项:
  • NONE(无压缩)
  • ZLIB
  • ZSTD
  • SNAPPY
  • 资源

    下表概述了resources字段下的嵌套字段和相关参数。

    字段名称类型相关命令参数描述
    cpudouble--cpu每个函数实例需要分配的CPU核心数(仅适用于Kubernetes运行时)。
    ramLong--ram每个函数实例需要分配的内存大小(以字节为单位)(仅适用于进程/Kubernetes运行时)。
    diskLong--disk每个函数实例需要分配的磁盘空间(以字节为单位)(仅适用于Kubernetes运行时)。
    窗口配置

    下表概述了windowConfig字段下的嵌套字段和相关参数。

    字段名称类型相关命令参数描述
    windowLengthCountInt--window-length-count每个窗口的消息数量。
    windowLengthDurationMsLong--window-length-duration-ms每个窗口的时间长度(以毫秒为单位)。
    slidingIntervalCountInt--sliding-interval-count窗口滑动前的消息数量。
    slidingIntervalDurationMsLong--sliding-interval-duration-ms窗口滑动的时间间隔。
    lateDataTopic字符串不适用不适用
    maxLagMs长整型不适用不适用
    水印发射间隔毫秒长整型不适用不适用
    时间戳提取器类名称字符串不适用不适用
    actualWindowFunctionClassName字符串不适用不适用
    加密配置

    下表概述了cryptoConfig字段下的嵌套字段和相关参数。

    字段名称类型相关命令参数描述
    cryptoKeyReaderClassNameStringN/A参考code
    cryptoKeyReaderConfigMap不适用不适用
    加密密钥String[]N/AN/A
    producerCryptoFailureActionProducerCryptoFailureAction不适用不适用
    consumerCryptoFailureActionConsumerCryptoFailureAction不适用不适用

    示例

    以下示例展示了如何使用YAML或JSON配置函数。

    tenant: "public"
    namespace: "default"
    name: "config-file-function"
    inputs:
    - "persistent://public/default/config-file-function-input-1"
    - "persistent://public/default/config-file-function-input-2"
    output: "persistent://public/default/config-file-function-output"
    jar: "function.jar"
    parallelism: 1
    resources:
    cpu: 8
    ram: 8589934592
    autoAck: true
    userConfig:
    foo: "bar"