Skip to main content
Version: 4.0.x

Pulsar 函数入门

本实践教程提供了逐步指导和示例,介绍如何在独立Pulsar中创建和验证函数,包括有状态函数和窗口函数。

先决条件

步骤1:启动独立的Pulsar

  1. conf/standalone.conf中启用pulsar函数(如果不存在则添加此字段):

    functionsWorkerEnabled=true
  2. 本地启动Pulsar。

    bin/pulsar standalone

    Pulsar服务的所有组件(包括ZooKeeper、BookKeeper、broker等)按顺序启动。您可以使用bin/pulsar-admin brokers healthcheck命令来确保Pulsar服务已启动并正在运行。

  3. 检查Pulsar二进制协议端口。

    telnet localhost 6650
  4. 检查Pulsar函数集群。

    bin/pulsar-admin functions-worker get-cluster

    输出

    [{"workerId":"c-standalone-fw-localhost-6750","workerHostname":"localhost","port":6750}]
  5. 确保存在一个公共租户。

    bin/pulsar-admin tenants list

    输出

    public
  6. 确保存在一个默认的命名空间。

    bin/pulsar-admin namespaces list public

    输出

    public/default
  7. 确保表服务已成功启用。

    telnet localhost 4181

    输出

    Trying ::1...
    telnet: connect to address ::1: Connection refused
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.

步骤2:为测试创建一个命名空间

  1. 创建一个租户和一个命名空间。

    bin/pulsar-admin tenants create test
    bin/pulsar-admin namespaces create test/test-namespace
  2. 在与步骤1相同的终端窗口中,验证租户和命名空间。

    bin/pulsar-admin namespaces list test

    输出

    此输出显示租户和命名空间均已成功创建。

    "test/test-namespace"

步骤3:启动函数

note

在开始函数之前,您需要启动Pulsar创建一个测试命名空间

  1. 创建一个名为 examples 的函数。

    tip

    你可以在本地机器的Pulsar目录下的examples文件夹中看到example-function-config.yamlapi-examples.jar文件。

    这个示例函数将在每条消息的末尾添加一个!

    bin/pulsar-admin functions create \
    --function-config-file $PWD/examples/example-function-config.yaml \
    --jar $PWD/examples/api-examples.jar

    输出

    Created Successfully

    你可以在examples/example-function-config.yaml中查看此函数的配置:

    tenant: "test"
    namespace: "test-namespace"
    name: "example" # function name
    className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
    inputs: ["test_src"] # this function will read messages from these topics
    output: "test_result" # the return value of this function will be sent to this topic
    autoAck: true # function will acknowledge input messages if set true
    parallelism: 1

    你可以查看ExclamationFunction源代码。 有关yaml配置的更多信息,请参阅参考

  2. 在与步骤1相同的终端窗口中,验证函数的配置。

    bin/pulsar-admin functions get \
    --tenant test \
    --namespace test-namespace \
    --name example

    输出

    {
    "tenant": "test",
    "namespace": "test-namespace",
    "name": "example",
    "className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
    "inputSpecs": {
    "test_src": {
    "isRegexPattern": false,
    "schemaProperties": {},
    "consumerProperties": {},
    "poolMessages": false
    }
    },
    "output": "test_result",
    "producerConfig": {
    "useThreadLocalProducers": false,
    "batchBuilder": ""
    },
    "processingGuarantees": "ATLEAST_ONCE",
    "retainOrdering": false,
    "retainKeyOrdering": false,
    "forwardSourceMessageProperty": true,
    "userConfig": {},
    "runtime": "JAVA",
    "autoAck": true,
    "parallelism": 1,
    "resources": {
    "cpu": 1.0,
    "ram": 1073741824,
    "disk": 10737418240
    },
    "cleanupSubscription": true,
    "subscriptionPosition": "Latest"
    }
  3. 在与步骤1相同的终端窗口中,验证函数的状态。

    bin/pulsar-admin functions status \
    --tenant test \
    --namespace test-namespace \
    --name example

    输出

    "running": true 表示函数正在运行。

    {
    "numInstances" : 1,
    "numRunning" : 1,
    "instances" : [ {
    "instanceId" : 0,
    "status" : {
    "running" : true,
    "error" : "",
    "numRestarts" : 0,
    "numReceived" : 0,
    "numSuccessfullyProcessed" : 0,
    "numUserExceptions" : 0,
    "latestUserExceptions" : [ ],
    "numSystemExceptions" : 0,
    "latestSystemExceptions" : [ ],
    "averageLatency" : 0.0,
    "lastInvocationTime" : 0,
    "workerId" : "c-standalone-fw-localhost-8080"
    }
    } ]
    }
  4. 在与步骤1相同的终端窗口中,订阅输出主题 test_result

    bin/pulsar-client consume -s test-sub -n 0 test_result
  5. 在一个新的终端窗口中,向输入主题 test_src生成消息。

    bin/pulsar-client produce -m "test-messages-`date`" -n 10 test_src
  6. 在与步骤1相同的终端窗口中,返回了由example函数生成的消息。你可以看到每条消息的末尾都添加了一个!

    输出

    ----- got message -----
    test-messages-Thu Jul 19 11:59:15 PDT 2021!
    ----- got message -----
    test-messages-Thu Jul 19 11:59:15 PDT 2021!
    ----- got message -----
    test-messages-Thu Jul 19 11:59:15 PDT 2021!
    ----- got message -----
    test-messages-Thu Jul 19 11:59:15 PDT 2021!
    ----- got message -----
    test-messages-Thu Jul 19 11:59:15 PDT 2021!
    ----- got message -----
    test-messages-Thu Jul 19 11:59:15 PDT 2021!
    ----- got message -----
    test-messages-Thu Jul 19 11:59:15 PDT 2021!
    ----- got message -----
    test-messages-Thu Jul 19 11:59:15 PDT 2021!
    ----- got message -----
    test-messages-Thu Jul 19 11:59:15 PDT 2021!
    ----- got message -----
    test-messages-Thu Jul 19 11:59:15 PDT 2021!

启动有状态函数

Pulsar的独立模式为有状态函数启用了BookKeeper表服务。有关更多信息,请参阅配置状态存储

note

在开始有状态函数之前,你需要启动Pulsar创建一个测试命名空间

以下示例提供了启动一个有状态函数以验证计数器函数的说明。

  1. 使用examples/example-stateful-function-config.yaml创建一个函数。

    bin/pulsar-admin functions create \
    --function-config-file $PWD/examples/example-stateful-function-config.yaml \
    --jar $PWD/examples/api-examples.jar

    输出

    Created Successfully

    你可以在examples/example-stateful-function-config.yaml中查看此函数的配置:

    tenant: "test"
    namespace: "test-namespace"
    name: "word_count"
    className: "org.apache.pulsar.functions.api.examples.WordCountFunction"
    inputs: ["test_wordcount_src"] # this function will read messages from these topics
    autoAck: true
    parallelism: 1

    你可以查看源代码WordCountFunction。这个函数不会返回任何值,但会将单词的出现次数存储在函数上下文中。因此,你不需要指定输出主题。 有关yaml配置的更多信息,请参阅参考

  2. 在与步骤1相同的终端窗口中,获取word_count函数的信息。

    bin/pulsar-admin functions get \
    --tenant test \
    --namespace test-namespace \
    --name word_count

    输出

    {
    "tenant": "test",
    "namespace": "test-namespace",
    "name": "word_count",
    "className": "org.apache.pulsar.functions.api.examples.WordCountFunction",
    "inputSpecs": {
    "test_wordcount_src": {
    "isRegexPattern": false,
    "schemaProperties": {},
    "consumerProperties": {},
    "poolMessages": false
    }
    },
    "producerConfig": {
    "useThreadLocalProducers": false,
    "batchBuilder": ""
    },
    "processingGuarantees": "ATLEAST_ONCE",
    "retainOrdering": false,
    "retainKeyOrdering": false,
    "forwardSourceMessageProperty": true,
    "userConfig": {},
    "runtime": "JAVA",
    "autoAck": true,
    "parallelism": 1,
    "resources": {
    "cpu": 1.0,
    "ram": 1073741824,
    "disk": 10737418240
    },
    "cleanupSubscription": true,
    "subscriptionPosition": "Latest"
    }
  3. 在与步骤1相同的终端窗口中,获取word_count函数的状态。

    bin/pulsar-admin functions status \
    --tenant test \
    --namespace test-namespace \
    --name word_count

    输出

    {
    "numInstances" : 1,
    "numRunning" : 1,
    "instances" : [ {
    "instanceId" : 0,
    "status" : {
    "running" : true,
    "error" : "",
    "numRestarts" : 0,
    "numReceived" : 0,
    "numSuccessfullyProcessed" : 0,
    "numUserExceptions" : 0,
    "latestUserExceptions" : [ ],
    "numSystemExceptions" : 0,
    "latestSystemExceptions" : [ ],
    "averageLatency" : 0.0,
    "lastInvocationTime" : 0,
    "workerId" : "c-standalone-fw-localhost-8080"
    }
    } ]
    }
  4. 在与步骤1相同的终端窗口中,查询状态表中键为hello的函数。此操作监视与hello相关的更改。

    bin/pulsar-admin functions querystate \
    --tenant test \
    --namespace test-namespace \
    --name word_count -k hello -w
    tip

    有关pulsar-admin functions querystate options命令的更多信息,包括标志、描述、默认值和简写,请参阅Pulsar admin API

    输出

    key 'hello' doesn't exist.
    key 'hello' doesn't exist.
    key 'hello' doesn't exist.
    ...
  5. 在一个新的终端窗口中,使用以下方法之一向输入主题test_wordcount_src发送10条带有hello的消息。hello的值已更新为10。

bin/pulsar-client produce -m "hello" -n 10 test_wordcount_src
  1. 在与步骤1相同的终端窗口中,检查结果。

    结果显示,输出主题 test_wordcount_dest 接收到了消息。

    输出

    {
    "key": "hello",
    "numberValue": 10,
    "version": 9
    }
  2. 在终端窗口中,按照步骤5,使用hello生成另外10条消息。hello的值更新为20。

    bin/pulsar-client produce -m "hello" -n 10 test_wordcount_src
  3. 在与步骤1相同的终端窗口中,检查结果。

    结果显示,输出主题 test_wordcount_dest 接收到的值为20。

    {
    "key": "hello",
    "numberValue": 20,
    "version": 19
    }

开始窗口函数

窗口函数是Pulsar函数的一种特殊形式。更多信息,请参见概念

note

在开始窗口函数之前,你需要启动Pulsar创建一个测试命名空间

以下示例提供了启动窗口函数以计算窗口中总和的说明。

  1. 使用example-window-function-config.yaml创建一个函数。

    bin/pulsar-admin functions create \
    --function-config-file $PWD/examples/example-window-function-config.yaml \
    --jar $PWD/examples/api-examples.jar

    输出

    Created Successfully

    你可以在examples/example-window-function-config.yaml中查看此函数的配置:

    tenant: "test"
    namespace: "test-namespace"
    name: "window-example"
    className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
    inputs: ["test_window_src"]
    output: "test_window_result"
    autoAck: true
    parallelism: 1

    # every 5 messages, calculate sum of the latest 10 messages
    windowConfig:
    windowLengthCount: 10
    slidingIntervalCount: 5

    你可以在这里查看AddWindowFunction的源代码。 有关yaml配置的更多信息,请参阅参考

  2. 在与步骤1相同的终端窗口中,验证函数的配置。

    bin/pulsar-admin functions get \
    --tenant test \
    --namespace test-namespace \
    --name window-example

    输出

    {
    "tenant": "test",
    "namespace": "test-namespace",
    "name": "window-example",
    "className": "org.apache.pulsar.functions.api.examples.AddWindowFunction",
    "inputSpecs": {
    "test_window_src": {
    "isRegexPattern": false,
    "schemaProperties": {},
    "consumerProperties": {},
    "poolMessages": false
    }
    },
    "output": "test_window_result",
    "producerConfig": {
    "useThreadLocalProducers": false,
    "batchBuilder": ""
    },
    "processingGuarantees": "ATLEAST_ONCE",
    "retainOrdering": false,
    "retainKeyOrdering": false,
    "forwardSourceMessageProperty": true,
    "userConfig": {},
    "runtime": "JAVA",
    "autoAck": false,
    "parallelism": 1,
    "resources": {
    "cpu": 1.0,
    "ram": 1073741824,
    "disk": 10737418240
    },
    "windowConfig": {
    "windowLengthCount": 10,
    "slidingIntervalCount": 5,
    "actualWindowFunctionClassName": "org.apache.pulsar.functions.api.examples.AddWindowFunction",
    "processingGuarantees": "ATLEAST_ONCE"
    },
    "cleanupSubscription": true,
    "subscriptionPosition": "Latest"
    }
  3. 在与步骤1相同的终端窗口中,验证函数的状态。

    bin/pulsar-admin functions status \
    --tenant test \
    --namespace test-namespace \
    --name window-example

    输出

    "running": true 表示函数正在运行。

    {
    "numInstances" : 1,
    "numRunning" : 1,
    "instances" : [ {
    "instanceId" : 0,
    "status" : {
    "running" : true,
    "error" : "",
    "numRestarts" : 0,
    "numReceived" : 0,
    "numSuccessfullyProcessed" : 0,
    "numUserExceptions" : 0,
    "latestUserExceptions" : [ ],
    "numSystemExceptions" : 0,
    "latestSystemExceptions" : [ ],
    "averageLatency" : 0.0,
    "lastInvocationTime" : 0,
    "workerId" : "c-standalone-fw-localhost-8080"
    }
    } ]
    }
  4. 在与步骤1相同的终端窗口中,订阅输出主题 test_window_result

    bin/pulsar-client consume -s test-sub -n 0 test_window_result
  5. 在一个新的终端窗口中,向输入主题 test_window_src生成消息。

    bin/pulsar-client produce -m "3" -n 10 test_window_src
  6. 在与步骤1相同的终端窗口中,窗口函数window-example生成的消息被返回。

    输出

    ----- got message -----
    key:[null], properties:[], content:15
    ----- got message -----
    key:[null], properties:[], content:30