Pulsar 函数入门
本实践教程提供了逐步指导和示例,介绍如何在独立Pulsar中创建和验证函数,包括有状态函数和窗口函数。
先决条件
- JDK 8+。更多详情,请参考Pulsar运行时Java版本推荐。
- 不支持Windows操作系统。
步骤1:启动独立的Pulsar
-
在
conf/standalone.conf
中启用pulsar函数(如果不存在则添加此字段):functionsWorkerEnabled=true
-
本地启动Pulsar。
bin/pulsar standalone
Pulsar服务的所有组件(包括ZooKeeper、BookKeeper、broker等)按顺序启动。您可以使用
bin/pulsar-admin brokers healthcheck
命令来确保Pulsar服务已启动并正在运行。 -
检查Pulsar二进制协议端口。
telnet localhost 6650
-
检查Pulsar函数集群。
bin/pulsar-admin functions-worker get-cluster
输出
[{"workerId":"c-standalone-fw-localhost-6750","workerHostname":"localhost","port":6750}]
-
确保存在一个公共租户。
bin/pulsar-admin tenants list
输出
public
-
确保存在一个默认的命名空间。
bin/pulsar-admin namespaces list public
输出
public/default
-
确保表服务已成功启用。
telnet localhost 4181
输出
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
步骤2:为测试创建一个命名空间
-
创建一个租户和一个命名空间。
bin/pulsar-admin tenants create test
bin/pulsar-admin namespaces create test/test-namespace -
在与步骤1相同的终端窗口中,验证租户和命名空间。
bin/pulsar-admin namespaces list test
输出
此输出显示租户和命名空间均已成功创建。
"test/test-namespace"
步骤3:启动函数
在开始函数之前,您需要启动Pulsar并创建一个测试命名空间。
-
创建一个名为
examples
的函数。tip你可以在本地机器的Pulsar目录下的
examples
文件夹中看到example-function-config.yaml
和api-examples.jar
文件。这个示例函数将在每条消息的末尾添加一个
!
。bin/pulsar-admin functions create \
--function-config-file $PWD/examples/example-function-config.yaml \
--jar $PWD/examples/api-examples.jar输出
Created Successfully
你可以在
examples/example-function-config.yaml
中查看此函数的配置:tenant: "test"
namespace: "test-namespace"
name: "example" # function name
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
inputs: ["test_src"] # this function will read messages from these topics
output: "test_result" # the return value of this function will be sent to this topic
autoAck: true # function will acknowledge input messages if set true
parallelism: 1 -
在与步骤1相同的终端窗口中,验证函数的配置。
bin/pulsar-admin functions get \
--tenant test \
--namespace test-namespace \
--name example输出
{
"tenant": "test",
"namespace": "test-namespace",
"name": "example",
"className": "org.apache.pulsar.functions.api.examples.ExclamationFunction",
"inputSpecs": {
"test_src": {
"isRegexPattern": false,
"schemaProperties": {},
"consumerProperties": {},
"poolMessages": false
}
},
"output": "test_result",
"producerConfig": {
"useThreadLocalProducers": false,
"batchBuilder": ""
},
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"retainKeyOrdering": false,
"forwardSourceMessageProperty": true,
"userConfig": {},
"runtime": "JAVA",
"autoAck": true,
"parallelism": 1,
"resources": {
"cpu": 1.0,
"ram": 1073741824,
"disk": 10737418240
},
"cleanupSubscription": true,
"subscriptionPosition": "Latest"
} -
在与步骤1相同的终端窗口中,验证函数的状态。
bin/pulsar-admin functions status \
--tenant test \
--namespace test-namespace \
--name example输出
"running": true
表示函数正在运行。{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 0,
"numSuccessfullyProcessed" : 0,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.0,
"lastInvocationTime" : 0,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
} -
在与步骤1相同的终端窗口中,订阅输出主题
test_result
。bin/pulsar-client consume -s test-sub -n 0 test_result
-
在一个新的终端窗口中,向输入主题
test_src
生成消息。bin/pulsar-client produce -m "test-messages-`date`" -n 10 test_src
-
在与步骤1相同的终端窗口中,返回了由
example
函数生成的消息。你可以看到每条消息的末尾都添加了一个!
。输出
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
----- got message -----
test-messages-Thu Jul 19 11:59:15 PDT 2021!
启动有状态函数
Pulsar的独立模式为有状态函数启用了BookKeeper表服务。有关更多信息,请参阅配置状态存储。
在开始有状态函数之前,你需要启动Pulsar并创建一个测试命名空间。
以下示例提供了启动一个有状态函数以验证计数器函数的说明。
-
使用
examples/example-stateful-function-config.yaml
创建一个函数。bin/pulsar-admin functions create \
--function-config-file $PWD/examples/example-stateful-function-config.yaml \
--jar $PWD/examples/api-examples.jar输出
Created Successfully
你可以在
examples/example-stateful-function-config.yaml
中查看此函数的配置:tenant: "test"
namespace: "test-namespace"
name: "word_count"
className: "org.apache.pulsar.functions.api.examples.WordCountFunction"
inputs: ["test_wordcount_src"] # this function will read messages from these topics
autoAck: true
parallelism: 1你可以查看源代码的
WordCountFunction
。这个函数不会返回任何值,但会将单词的出现次数存储在函数上下文中。因此,你不需要指定输出主题。 有关yaml配置的更多信息,请参阅参考。 -
在与步骤1相同的终端窗口中,获取
word_count
函数的信息。bin/pulsar-admin functions get \
--tenant test \
--namespace test-namespace \
--name word_count输出
{
"tenant": "test",
"namespace": "test-namespace",
"name": "word_count",
"className": "org.apache.pulsar.functions.api.examples.WordCountFunction",
"inputSpecs": {
"test_wordcount_src": {
"isRegexPattern": false,
"schemaProperties": {},
"consumerProperties": {},
"poolMessages": false
}
},
"producerConfig": {
"useThreadLocalProducers": false,
"batchBuilder": ""
},
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"retainKeyOrdering": false,
"forwardSourceMessageProperty": true,
"userConfig": {},
"runtime": "JAVA",
"autoAck": true,
"parallelism": 1,
"resources": {
"cpu": 1.0,
"ram": 1073741824,
"disk": 10737418240
},
"cleanupSubscription": true,
"subscriptionPosition": "Latest"
} -
在与步骤1相同的终端窗口中,获取
word_count
函数的状态。bin/pulsar-admin functions status \
--tenant test \
--namespace test-namespace \
--name word_count输出
{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 0,
"numSuccessfullyProcessed" : 0,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.0,
"lastInvocationTime" : 0,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
} -
在与步骤1相同的终端窗口中,查询状态表中键为
hello
的函数。此操作监视与hello
相关的更改。bin/pulsar-admin functions querystate \
--tenant test \
--namespace test-namespace \
--name word_count -k hello -wtip有关
pulsar-admin functions querystate options
命令的更多信息,包括标志、描述、默认值和简写,请参阅Pulsar admin API。输出
key 'hello' doesn't exist.
key 'hello' doesn't exist.
key 'hello' doesn't exist.
... -
在一个新的终端窗口中,使用以下方法之一向输入主题
test_wordcount_src
发送10条带有hello
的消息。hello
的值已更新为10。
bin/pulsar-client produce -m "hello" -n 10 test_wordcount_src
-
在与步骤1相同的终端窗口中,检查结果。
结果显示,输出主题
test_wordcount_dest
接收到了消息。输出
{
"key": "hello",
"numberValue": 10,
"version": 9
} -
在终端窗口中,按照步骤5,使用
hello
生成另外10条消息。hello
的值更新为20。bin/pulsar-client produce -m "hello" -n 10 test_wordcount_src
-
在与步骤1相同的终端窗口中,检查结果。
结果显示,输出主题
test_wordcount_dest
接收到的值为20。{
"key": "hello",
"numberValue": 20,
"version": 19
}
开始窗口函数
窗口函数是Pulsar函数的一种特殊形式。更多信息,请参见概念。
在开始窗口函数之前,你需要启动Pulsar 并创建一个测试命名空间。
以下示例提供了启动窗口函数以计算窗口中总和的说明。
-
使用
example-window-function-config.yaml
创建一个函数。bin/pulsar-admin functions create \
--function-config-file $PWD/examples/example-window-function-config.yaml \
--jar $PWD/examples/api-examples.jar输出
Created Successfully
你可以在
examples/example-window-function-config.yaml
中查看此函数的配置:tenant: "test"
namespace: "test-namespace"
name: "window-example"
className: "org.apache.pulsar.functions.api.examples.AddWindowFunction"
inputs: ["test_window_src"]
output: "test_window_result"
autoAck: true
parallelism: 1
# every 5 messages, calculate sum of the latest 10 messages
windowConfig:
windowLengthCount: 10
slidingIntervalCount: 5你可以在这里查看
AddWindowFunction
的源代码。 有关yaml配置的更多信息,请参阅参考。 -
在与步骤1相同的终端窗口中,验证函数的配置。
bin/pulsar-admin functions get \
--tenant test \
--namespace test-namespace \
--name window-example输出
{
"tenant": "test",
"namespace": "test-namespace",
"name": "window-example",
"className": "org.apache.pulsar.functions.api.examples.AddWindowFunction",
"inputSpecs": {
"test_window_src": {
"isRegexPattern": false,
"schemaProperties": {},
"consumerProperties": {},
"poolMessages": false
}
},
"output": "test_window_result",
"producerConfig": {
"useThreadLocalProducers": false,
"batchBuilder": ""
},
"processingGuarantees": "ATLEAST_ONCE",
"retainOrdering": false,
"retainKeyOrdering": false,
"forwardSourceMessageProperty": true,
"userConfig": {},
"runtime": "JAVA",
"autoAck": false,
"parallelism": 1,
"resources": {
"cpu": 1.0,
"ram": 1073741824,
"disk": 10737418240
},
"windowConfig": {
"windowLengthCount": 10,
"slidingIntervalCount": 5,
"actualWindowFunctionClassName": "org.apache.pulsar.functions.api.examples.AddWindowFunction",
"processingGuarantees": "ATLEAST_ONCE"
},
"cleanupSubscription": true,
"subscriptionPosition": "Latest"
} -
在与步骤1相同的终端窗口中,验证函数的状态。
bin/pulsar-admin functions status \
--tenant test \
--namespace test-namespace \
--name window-example输出
"running": true
表示函数正在运行。{
"numInstances" : 1,
"numRunning" : 1,
"instances" : [ {
"instanceId" : 0,
"status" : {
"running" : true,
"error" : "",
"numRestarts" : 0,
"numReceived" : 0,
"numSuccessfullyProcessed" : 0,
"numUserExceptions" : 0,
"latestUserExceptions" : [ ],
"numSystemExceptions" : 0,
"latestSystemExceptions" : [ ],
"averageLatency" : 0.0,
"lastInvocationTime" : 0,
"workerId" : "c-standalone-fw-localhost-8080"
}
} ]
} -
在与步骤1相同的终端窗口中,订阅输出主题
test_window_result
。bin/pulsar-client consume -s test-sub -n 0 test_window_result
-
在一个新的终端窗口中,向输入主题
test_window_src
生成消息。bin/pulsar-client produce -m "3" -n 10 test_window_src
-
在与步骤1相同的终端窗口中,窗口函数
window-example
生成的消息被返回。输出
----- got message -----
key:[null], properties:[], content:15
----- got message -----
key:[null], properties:[], content:30