管理函数
本页面仅显示一些常用操作。有关最新和完整的信息,请参阅下面的参考文档。
| 类别 | 方法 | 如果您想管理函数... |
|---|---|---|
| Pulsar CLI | pulsar-admin,它列出了所有命令、标志、描述等。 | 参见functions命令 |
| Pulsar admin APIs | REST API,列出了所有参数、响应、示例等。 | 参见 /admin/v3/functions 端点 |
| Pulsar admin APIs | Java admin API,列出了所有类、方法、描述等。 | 参见PulsarAdmin对象的functions方法 |
您可以对functions执行以下操作。
创建一个函数
您可以使用Admin CLI、REST API或Java Admin API在集群模式下创建Pulsar函数(将其部署在Pulsar集群上)。
- 管理员命令行界面
- REST API
- Java
使用create子命令。
示例
pulsar-admin functions create \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions) \
--inputs test-input-topic \
--output persistent://public/default/test-output-topic \
--classname org.apache.pulsar.functions.api.examples.ExclamationFunction \
--jar $PWD/examples/api-examples.jar
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(functionName);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setParallelism(1);
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setTopicsPattern(sourceTopicPattern);
functionConfig.setSubName(subscriptionName);
functionConfig.setAutoAck(true);
functionConfig.setOutput(sinkTopic);
admin.functions().createFunction(functionConfig, fileName);
更新一个函数
您可以使用Admin CLI、REST API或Java Admin API更新已部署到Pulsar集群的Pulsar函数。
- 管理员命令行界面
- REST API
- Java
使用update子命令。
示例
pulsar-admin functions update \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions) \
--output persistent://public/default/update-output-topic \
# other options
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(functionName);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setParallelism(1);
functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
UpdateOptions updateOptions = new UpdateOptions();
updateOptions.setUpdateAuthData(updateAuthData);
admin.functions().updateFunction(functionConfig, userCodeFile, updateOptions);
启动一个函数
你可以启动一个函数的实例或启动一个函数的所有实例。
启动一个函数的实例
您可以使用Admin CLI、REST API或Java Admin API启动一个已停止的函数实例,使用instance-id。
- 管理员命令行界面
- REST API
- Java
使用start子命令。
pulsar-admin functions start \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions) \
--instance-id 1
admin.functions().startFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));
启动一个函数的所有实例
您可以使用Admin CLI、REST API或Java Admin API启动所有已停止的函数实例。
- 管理员命令行界面
- REST API
- Java
使用start子命令。
示例
pulsar-admin functions start \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions) \
admin.functions().startFunction(tenant, namespace, functionName);
停止一个函数
你可以停止一个函数的实例或停止一个函数的所有实例。
停止一个函数的实例
您可以使用Admin CLI、REST API或Java Admin API停止具有instance-id的函数实例。
- 管理员命令行界面
- REST API
- Java
使用stop子命令。
示例
pulsar-admin functions stop \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions) \
--instance-id 1
admin.functions().stopFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));
停止函数的所有实例
您可以使用Admin CLI、REST API或Java Admin API停止所有函数实例。
- 管理员命令行界面
- REST API
- Java
使用stop子命令。
示例
pulsar-admin functions stop \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions)
admin.functions().stopFunction(tenant, namespace, functionName);
重新启动一个函数
你可以重启一个函数的实例或重启一个函数的所有实例。
重启一个函数的实例
使用Admin CLI、REST API或Java Admin API重新启动具有instance-id的函数实例。
- 管理员命令行界面
- REST API
- Java
使用restart子命令。
示例
pulsar-admin functions restart \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions) \
--instance-id 1
admin.functions().restartFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));
重启一个函数的所有实例
您可以使用Admin CLI、REST API或Java admin API重新启动所有函数实例。
- 管理员命令行界面
- REST API
- Java
使用restart子命令。
示例
pulsar-admin functions restart \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions)
admin.functions().restartFunction(tenant, namespace, functionName);
列出所有函数
您可以使用Admin CLI、REST API或Java Admin API列出在特定租户和命名空间下运行的所有Pulsar函数。
- 管理员命令行界面
- REST API
- Java
admin.functions().getFunctions(tenant, namespace);
删除一个函数
您可以使用Admin CLI、REST API或Java Admin API删除在Pulsar集群上运行的Pulsar函数。
- 管理员命令行界面
- REST API
- Java
使用delete子命令。
示例
pulsar-admin functions delete \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions)
admin.functions().deleteFunction(tenant, namespace, functionName);
获取有关函数的信息
您可以使用Admin CLI、REST API或Java Admin API获取当前在集群模式下运行的Pulsar函数的信息。
- 管理员命令行界面
- REST API
- Java
使用get子命令。
示例
pulsar-admin functions get \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions)
admin.functions().getFunction(tenant, namespace, functionName);
获取函数的状态
您可以获取函数实例的状态或获取函数所有实例的状态。
获取函数实例的状态
您可以使用Admin CLI、REST API或Java Admin API通过instance-id获取Pulsar函数实例的当前状态。
- 管理员命令行界面
- REST API
- Java
使用status子命令。
示例
pulsar-admin functions status \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions) \
--instance-id 1
admin.functions().getFunctionStatus(tenant, namespace, functionName, Integer.parseInt(instanceId));
获取函数所有实例的状态
您可以使用Admin CLI、REST API或Java Admin API获取Pulsar函数实例的当前状态。
- 管理员命令行界面
- REST API
- Java
使用status子命令。
示例
pulsar-admin functions status \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions)
admin.functions().getFunctionStatus(tenant, namespace, functionName);
获取函数的统计信息
获取函数实例的统计信息
您可以使用Admin CLI、REST API或Java admin API通过instance-id获取Pulsar Function实例的当前状态。
- 管理员命令行界面
- REST API
- Java
使用stats子命令。
示例
pulsar-admin functions stats \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions) \
--instance-id 1
admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId));
获取函数所有实例的统计信息
您可以使用Admin CLI、REST API或Java admin API获取Pulsar函数的当前状态。
- 管理员命令行界面
- REST API
- Java
使用stats子命令。
示例
pulsar-admin functions stats \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions)
admin.functions().getFunctionStats(tenant, namespace, functionName);
触发一个函数
您可以使用Admin CLI、REST API或Java admin API通过提供的值触发指定的Pulsar函数。
- 管理员命令行界面
- REST API
- Java
使用trigger子命令。
示例
pulsar-admin functions trigger \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions) \
--topic (the name of input topic) \
--trigger-value \"hello pulsar\"
# or --trigger-file (the path of trigger file)
admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile);
将与函数关联的状态放入
您可以使用Admin CLI、REST API或Java admin API来放置与Pulsar函数相关的状态。
- 管理员命令行界面
- REST API
- Java
使用putstate子命令。
示例
pulsar-admin functions putstate \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions) \
--state "{\"key\":\"pulsar\", \"stringValue\":\"hello pulsar\"}"
TypeReference<FunctionState> typeRef = new TypeReference<FunctionState>() {};
FunctionState stateRepr = ObjectMapperFactory.getThreadLocal().readValue(state, typeRef);
admin.functions().putFunctionState(tenant, namespace, functionName, stateRepr);
获取与函数关联的状态
您可以使用Admin CLI、REST API或Java admin API获取与Pulsar函数关联的当前状态。
- 管理员命令行界面
- REST API
- Java
使用querystate子命令。
示例
pulsar-admin functions querystate \
--tenant public \
--namespace default \
--name (the name of Pulsar Functions) \
--key (the key of state)
admin.functions().getFunctionState(tenant, namespace, functionName, key);