开始
本指南将引导您快速入门,了解如何使用以下方法来管理主题。
- pulsar-admin
- REST API
- Java
pulsar-admin CLI 是一个命令行工具,可在您的 Pulsar 安装的 bin 文件夹中找到。
REST API 属于HTTP调用,这些调用是针对由brokers提供的管理API进行的。此外,Java管理API和pulsar-admin CLI都使用REST API。
Java admin API 是一个用Java编写的可编程接口。
查看下面的详细步骤。
- pulsar-admin
- REST API
- Java
要使用pulsar-admin CLI管理主题,请完成以下步骤。
-
设置服务URL。
-
创建一个分区主题。
-
更新分区的数量。
-
向主题生成消息。
-
检查主题的统计信息。
-
删除主题。
先决条件
- 安装并启动Pulsar独立模式。本教程以运行Pulsar 2.11为例。
步骤
第一步: 设置服务URL以指向client.conf中的代理服务。
webServiceUrl=http://localhost:8080/
brokerServiceUrl=pulsar://localhost:6650/
步骤 2: 创建一个名为 test-topic-1 的持久主题,包含 6 个分区。
输入
bin/pulsar-admin topics create-partitioned-topic \
persistent://public/default/test-topic-1 \
--partitions 6
输出
没有输出。您可以在步骤5中检查主题的状态。
步骤 3: 将分区数量更新为 8。
输入
bin/pulsar-admin topics update-partitioned-topic \
persistent://public/default/test-topic-1 \
--partitions 8
输出
没有输出。您可以在步骤5中检查分区数量。
步骤 4: 向分区主题 test-topic-1 生成一些消息。
输入
bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-1
输出
2023-03-07T15:33:56,832+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {
"confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf",
"serviceURL" : "pulsar://localhost:6650",
"authPluginClassName" : "",
"authParams" : "",
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"maxConnections" : 1,
"statsIntervalSeconds" : 1000,
"ioThreads" : 1,
"enableBusyWait" : false,
"listenerName" : null,
"listenerThreads" : 1,
"maxLookupRequest" : 50000,
"topics" : [ "persistent://public/default/test-topic-1" ],
"numTestThreads" : 1,
"msgRate" : 1000,
"msgSize" : 1024,
"numTopics" : 1,
"numProducers" : 1,
"separator" : "-",
"sendTimeout" : 0,
"producerName" : null,
"adminURL" : "http://localhost:8080/",
...
2023-03-07T15:35:03,769+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 8.931 ms - med: 3.775 - 95pct: 32.144 - 99pct: 98.432 - 99.9pct: 216.088 - 99.99pct: 304.807 - 99.999pct: 349.391 - Max: 351.235
步骤 5: 检查分区主题 test-topic-1 的内部统计信息。
输入
bin/pulsar-admin topics partitioned-stats-internal \
persistent://public/default/test-topic-1
输出
以下是输出的一部分。有关主题统计的详细解释,请参见Pulsar统计。
{
"metadata" : {
"partitions" : 8
},
"partitions" : {
"persistent://public/default/test-topic-1-partition-1" : {
"entriesAddedCounter" : 4213,
"numberOfEntries" : 4213,
"totalSize" : 8817693,
"currentLedgerEntries" : 4212,
"currentLedgerSize" : 8806289,
"lastLedgerCreatedTimestamp" : "2023-03-07T15:33:59.367+08:00",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 0,
"lastConfirmedEntry" : "65:4211",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 49,
"entries" : 1,
"size" : 11404,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 65,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : {
"test-subscriptio-1" : {
"markDeletePosition" : "49:-1",
"readPosition" : "49:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : -1,
"cursorLedgerLastEntry" : -1,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00",
"state" : "NoLedger",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
},
"test-subscription-1" : {
"markDeletePosition" : "49:-1",
"readPosition" : "49:0",
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : -1,
"cursorLedgerLastEntry" : -1,
"individuallyDeletedMessages" : "[]",
"lastLedgerSwitchTimestamp" : "2023-03-06T16:41:32.801+08:00",
"state" : "NoLedger",
"numberOfEntriesSinceFirstNotAckedMessage" : 1,
"totalNonContiguousDeletedMessagesRange" : 0,
"subscriptionHavePendingRead" : false,
"subscriptionHavePendingReplayRead" : false,
"properties" : { }
}
},
"schemaLedgers" : [ ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
},
...
步骤 6: 删除主题 test-topic-1。
输入
bin/pulsar-admin topics delete-partitioned-topic persistent://public/default/test-topic-1
输出
没有输出。您可以使用以下命令验证test-topic-1是否存在。
输入
列出public/default命名空间中的主题。
bin/pulsar-admin topics list public/default
要使用REST API管理主题,请完成以下步骤。
-
创建一个分区主题
-
更新分区的数量。
-
向主题生成消息。
-
检查主题的统计信息。
-
删除主题。
先决条件
- 安装并启动Pulsar独立模式。本教程以运行Pulsar 2.11为例。
步骤
步骤 1: 创建一个名为 test-topic-2 的持久主题,包含 4 个分区。
输入
curl -X PUT http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "4"
输出
没有输出。你可以在步骤4中查看主题。
步骤 2: 将分区数量更新为 5。
输入
curl -X POST http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions -H 'Content-Type: application/json' -d "5"
输出
没有输出。您可以在步骤4中检查主题的状态。
步骤 3: 向分区主题 test-topic-2 生成一些消息。
输入
bin/pulsar-perf produce -u pulsar://localhost:6650 -r 1000 -i 1000 persistent://public/default/test-topic-2
输出
2023-03-08T15:47:06,268+0800 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Starting Pulsar perf producer with config: {
"confFile" : "/Users/yu/apache-pulsar-2.11.0/conf/client.conf",
"serviceURL" : "pulsar://localhost:6650",
"authPluginClassName" : "",
"authParams" : "",
"tlsTrustCertsFilePath" : "",
"tlsAllowInsecureConnection" : false,
"tlsHostnameVerificationEnable" : false,
"maxConnections" : 1,
"statsIntervalSeconds" : 1000,
"ioThreads" : 1,
"enableBusyWait" : false,
"listenerName" : null,
"listenerThreads" : 1,
"maxLookupRequest" : 50000,
"topics" : [ "persistent://public/default/test-topic-2" ],
"numTestThreads" : 1,
"msgRate" : 1000,
"msgSize" : 1024,
"numTopics" : 1,
"numProducers" : 1,
"separator" : "-",
"sendTimeout" : 0,
"producerName" : null,
"adminURL" : "http://localhost:8080/",
"deprecatedAuthPluginClassName" : null,
"maxOutstanding" : 0,
"maxPendingMessagesAcrossPartitions" : 0,
"partitions" : null,
"numMessages" : 0,
"compression" : "NONE",
"payloadFilename" : null,
"payloadDelimiter" : "\\n",
"batchTimeMillis" : 1.0,
"batchMaxMessages" : 1000,
"batchMaxBytes" : 4194304,
"testTime" : 0,
"warmupTimeSeconds" : 1.0,
"encKeyName" : null,
"encKeyFile" : null,
"delay" : 0,
"exitOnFailure" : false,
"messageKeyGenerationMode" : null,
"producerAccessMode" : "Shared",
"formatPayload" : false,
"formatterClass" : "org.apache.pulsar.testclient.DefaultMessageFormatter",
"transactionTimeout" : 10,
"numMessagesPerTransaction" : 50,
"isEnableTransaction" : false,
"isAbortTransaction" : false,
"histogramFile" : null
}
...
2023-03-08T15:53:28,178+0800 [Thread-0] INFO org.apache.pulsar.testclient.PerformanceProducer - Aggregated latency stats --- Latency: mean: 4.481 ms - med: 2.918 - 95pct: 10.710 - 99pct: 38.928 - 99.9pct: 112.689 - 99.99pct: 154.241 - 99.999pct: 193.249 - Max: 241.717
步骤 4: 检查主题 test-topic-2 的内部统计信息。
输入
curl -X GET http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitioned-internalStats
输出
有关主题统计的详细解释,请参阅Pulsar统计。
{"metadata":{"partitions":5},"partitions":{"persistent://public/default/test-topic-2-partition-3":{"entriesAddedCounter":47087,"numberOfEntries":47087,"totalSize":80406959,"currentLedgerEntries":47087,"currentLedgerSize":80406959,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.273+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"117:47086","state":"LedgerOpened","ledgers":[{"ledgerId":117,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],"cursors":{},"schemaLedgers":[],"compactedLedger":{"ledgerId":-1,"entries":-1,"size":-1,"offloaded":false,"underReplicated":false}},"persistent://public/default/test-topic-2-partition-2":{"entriesAddedCounter":46995,"numberOfEntries":46995,"totalSize":80445417,"currentLedgerEntries":46995,"currentLedgerSize":80445417,"lastLedgerCreatedTimestamp":"2023-03-08T15:47:07.43+08:00","waitingCursorsCount":0,"pendingAddEntriesCount":0,"lastConfirmedEntry":"118:46994","state":"LedgerOpened","ledgers":[{"ledgerId":118,"entries":0,"size":0,"offloaded":false,"underReplicated":false}],...
步骤 5: 删除主题 test-topic-2。
输入
curl -X DELETE http://localhost:8080/admin/v2/persistent/public/default/test-topic-2/partitions
输出
没有输出。您可以使用以下命令验证test-topic-2是否存在。
输入
列出public/default命名空间中的主题。
curl -X GET http://localhost:8080/admin/v2/persistent/public/default
要使用Java管理API管理主题,请完成以下步骤。
-
启动一个Pulsar Java客户端。
-
创建一个分区主题
-
更新分区的数量。
-
向主题生成消息。
-
检查主题的统计信息。
-
删除主题。
先决条件
-
准备一个Java项目,并将以下依赖项添加到您的POM文件中。
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>4.0.1</version>
</dependency>
步骤
第一步: 在你的Java项目中启动一个Pulsar Java客户端。
输入
String url = "http://localhost:8080";
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(url)
.build();
步骤 2: 创建一个分区主题 test-topic-1,包含 4 个分区。
输入
admin.topics().createPartitionedTopic("persistent://public/default/test-topic-1", 4);
步骤 3: 将分区数量更新为 5。
输入
admin.topics().updatePartitionedTopic("test-topic-1", 5);
步骤 4: 向主题 test-topic-1 生成一些消息。
输入
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("test-topic-1")
.blockIfQueueFull(true)
.create();
for (int i = 0; i < 100; ++i) {
producer.newMessage().value("test").send();
}
producer.close();
client.close();
步骤 5: 检查主题 test-topic-1 的统计信息。
输入
PartitionedTopicStats stats = admin.topics().getPartitionedStats("persistent://public/default/test-topic-1",false);
System.out.println(stats.getMsgInCounter());
输出
100
步骤 6: 删除主题 test-topic-1。
输入
admin.topics().deletePartitionedTopic("test-topic-1");
相关主题
-
要了解基础知识,请参阅 Pulsar admin API - Overview
-
要了解使用场景,请参阅 Pulsar admin API - Use cases。
-
要学习常见的行政任务,请参阅 Pulsar admin API - Features。
-
要执行管理操作,请参阅 Pulsar admin API - Tools。
-
要查看详细用法,请参阅以下参考资料。
-
Pulsar 管理 API