管理命名空间
本页面仅显示一些常用操作。
-
有关
Pulsar admin的最新和完整信息,包括命令、标志、描述和更多信息,请参阅Pulsar admin docs。 -
有关
REST API的最新和完整信息,包括参数、响应、示例等,请参阅REST API文档。 -
有关
Java admin API的最新和完整信息,包括类、方法、描述等,请参阅Java admin API doc。
Pulsar namespaces 是 topics 的逻辑分组。
命名空间可以通过以下方式管理:
pulsar-admin工具的namespaces命令- 管理REST API的
/admin/v2/namespaces端点 PulsarAdmin对象中的namespaces方法在 Java API 中
命名空间资源
创建命名空间
您可以在给定的tenant下创建新的命名空间。
- 脉冲星管理员
- REST API
- Java
使用create子命令并按名称指定命名空间:
pulsar-admin namespaces create test-tenant/test-namespace
admin.namespaces().createNamespace(namespace);
获取策略
您可以随时获取与命名空间关联的当前策略。
- 脉冲管理器
- REST API
- Java
使用policies子命令并指定命名空间:
pulsar-admin namespaces policies test-tenant/test-namespace
示例输出:
{
"auth_policies": {
"namespace_auth": {},
"destination_auth": {}
},
"replication_clusters": [],
"bundles_activated": true,
"bundles": {
"boundaries": [
"0x00000000",
"0xffffffff"
],
"numBundles": 1
},
"backlog_quota_map": {},
"persistence": null,
"latency_stats_sample_rate": {},
"message_ttl_in_seconds": 0,
"retention_policies": null,
"deleted": false
}
admin.namespaces().getPolicies(namespace);
列出命名空间
您可以列出给定Pulsar tenant中的所有命名空间。
- 脉冲星管理
- REST API
- Java
使用list子命令并指定租户:
pulsar-admin namespaces list test-tenant
示例输出:
test-tenant/namespace1
test-tenant/namespace2
admin.namespaces().getNamespaces(tenant);
删除命名空间
您可以从租户中删除现有的命名空间。
- 脉冲管理员
- REST API
- Java
使用delete子命令并指定命名空间:
pulsar-admin namespaces delete test-tenant/namespace1
admin.namespaces().deleteNamespace(namespace);
配置复制集群
设置复制集群
您可以为命名空间设置复制集群,以使Pulsar能够在内部将发布的消息从一个共置设施复制到另一个共置设施。
- 脉冲星管理员
- REST API
- Java
pulsar-admin namespaces set-clusters test-tenant/namespace1 --clusters cl1
admin.namespaces().setNamespaceReplicationClusters(namespace, clusters);
获取复制集群
您可以获取给定命名空间的复制集群列表。
- 脉冲星管理员
- REST API
- Java
pulsar-admin namespaces get-clusters test-tenant/cluster1/namespace1
示例输出:
cluster2
admin.namespaces().getNamespaceReplicationClusters(namespace)
配置积压配额策略
设置积压配额策略
积压配额帮助代理在命名空间达到某个阈值限制时限制其带宽/存储。管理员可以设置限制,并在达到限制后采取相应的行动。
-
producer_request_hold: 生产者持有消息并重试,直到超过客户端配置的
sendTimeoutMs -
producer_exception: 生产者在尝试发送消息时抛出异常
-
consumer_backlog_eviction: 代理开始丢弃积压的消息
可以通过定义 backlog-quota-type: destination_storage 的限制来处理积压配额限制。
- 脉冲星管理
- REST API
- Java
pulsar-admin namespaces set-backlog-quota --limit 10G \
--limitTime 36000 \
--policy producer_request_hold \
test-tenant/namespace1
admin.namespaces().setBacklogQuota(namespace, new BacklogQuota(limit, limitTime, policy))
获取积压配额策略
您可以获取给定命名空间的配置积压配额。
- 脉冲管理器
- REST API
- Java
pulsar-admin namespaces get-backlog-quotas test-tenant/namespace1
示例输出:
destination_storage BacklogQuotaImpl(limit=10737418240, limitSize=10737418240, limitTime=36000, policy=producer_request_hold)
admin.namespaces().getBacklogQuotaMap(namespace);
移除积压配额策略
您可以删除给定命名空间的积压配额策略。
- 脉冲管理器
- REST API
- Java
pulsar-admin namespaces remove-backlog-quota test-tenant/namespace1
admin.namespaces().removeBacklogQuota(namespace, backlogQuotaType)
配置持久化策略
设置持久化策略
持久化策略允许用户为给定命名空间下的所有主题消息配置持久化级别。
-
Bookkeeper-ack-quorum: 每个条目等待的确认数(保证的副本数),默认值:2
-
Bookkeeper-ensemble: 用于主题的bookies数量,默认值:2
-
Bookkeeper-write-quorum: 每个条目需要多少次写入,默认值:2
-
Ml-mark-delete-max-rate: 标记删除操作的限制速率(0表示无限制),默认值:0
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces set-persistence \
--bookkeeper-ack-quorum 2 --bookkeeper-ensemble 3 \
--bookkeeper-write-quorum 2 --ml-mark-delete-max-rate 0 \
test-tenant/namespace1
admin.namespaces().setPersistence(namespace,new PersistencePolicies(bookkeeperEnsemble, bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate))
获取持久化策略
您可以获取给定命名空间的配置持久化策略。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces get-persistence test-tenant/namespace1
示例输出:
{
"bookkeeperEnsemble": 3,
"bookkeeperWriteQuorum": 2,
"bookkeeperAckQuorum": 2,
"managedLedgerMaxMarkDeleteRate": 0
}
admin.namespaces().getPersistence(namespace)
配置命名空间包
卸载命名空间包
命名空间包是属于同一命名空间的虚拟主题组。如果代理因包的数量过多而过载,此命令可以帮助从该代理卸载一个包,以便可以由其他负载较轻的代理提供服务。命名空间包的ID范围从0x00000000到0xffffffff。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff test-tenant/namespace1
pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff test-tenant/namespace1
pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff --destinationBroker broker1.use.org.com:8080 test-tenant/namespace1
admin.namespaces().unloadNamespaceBundle(namespace, bundle)
拆分命名空间包
一个命名空间包可以包含多个主题,但只能由一个broker提供服务。如果单个包在broker上创建了过多的负载,管理员可以使用下面的命令拆分包,允许卸载一个或多个新包,从而在broker之间平衡负载。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces split-bundle --bundle 0x00000000_0xffffffff test-tenant/namespace1
admin.namespaces().splitNamespaceBundle(namespace, bundle, unloadSplitBundles, splitAlgorithmName)
配置消息TTL
设置消息TTL
您可以配置消息的生存时间(以秒为单位)。在下面的示例中,message-ttl 设置为 100 秒。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces set-message-ttl --messageTTL 100 test-tenant/namespace1
admin.namespaces().setNamespaceMessageTTL(namespace, messageTTL)
获取消息-ttl
当为命名空间设置了消息TTL时,您可以使用以下命令获取配置的值。此示例继续了命令set message-ttl的示例,因此返回的值为100秒。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces get-message-ttl test-tenant/namespace1
示例输出:
100
admin.namespaces().getNamespaceMessageTTL(namespace)
100
移除消息-ttl
移除配置命名空间的消息TTL。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces remove-message-ttl test-tenant/namespace1
admin.namespaces().removeNamespaceMessageTTL(namespace)
清除积压
清除命名空间积压
它清除属于特定命名空间的所有主题的所有消息积压。你也可以清除特定订阅的积压。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces clear-backlog --sub my-subscription test-tenant/namespace1
admin.namespaces().clearNamespaceBacklogForSubscription(namespace, subscription)
清除捆绑积压
它清除属于特定NamespaceBundle的所有主题的所有消息积压。你也可以清除特定订阅的积压。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces clear-backlog \
--bundle 0x00000000_0xffffffff \
--sub my-subscription \
test-tenant/namespace1
admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace, bundle, subscription)
配置保留
设置保留
每个命名空间包含多个主题,每个主题的保留大小(存储大小)不应超过特定阈值,或者应存储一定的时间。此命令帮助配置给定命名空间中主题的保留大小和时间。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces set-retention --size 100M --time 10m test-tenant/namespace1
admin.namespaces().setRetention(namespace, new RetentionPolicies(retentionTimeInMin, retentionSizeInMB))
获取保留
它显示给定命名空间的保留信息。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces get-retention test-tenant/namespace1
{
"retentionTimeInMinutes": 10,
"retentionSizeInMB": 100
}
admin.namespaces().getRetention(namespace)
为主题配置调度节流
为主题设置调度节流
它为给定命名空间下的所有主题设置消息分发速率。分发速率可以通过每X秒的消息数量(msg-dispatch-rate)或每X秒的消息字节数(byte-dispatch-rate)来限制。分发速率以秒为单位,可以通过dispatch-rate-period进行配置。msg-dispatch-rate和byte-dispatch-rate的默认值为-1,这表示禁用限流。
- 如果既没有配置
clusterDispatchRate也没有配置topicDispatchRate,则调度限流被禁用。 - 如果未配置
topicDispatchRate,则clusterDispatchRate生效。 - 如果配置了
topicDispatchRate,则topicDispatchRate生效。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces set-dispatch-rate test-tenant/namespace1 \
--msg-dispatch-rate 1000 \
--byte-dispatch-rate 1048576 \
--dispatch-rate-period 1
admin.namespaces().setDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
获取主题的配置消息速率
它显示了为命名空间配置的消息速率(此命名空间下的主题每秒可以调度这么多消息)
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces get-dispatch-rate test-tenant/namespace1
示例输出:
{
"dispatchThrottlingRateInMsg" : 1000,
"dispatchThrottlingRateInByte" : 1048576,
"relativeToPublishRate" : false,
"ratePeriodInSecond" : 1
}
admin.namespaces().getDispatchRate(namespace)
配置订阅的调度限制
为订阅设置调度限制
它为给定命名空间下的所有主题订阅设置消息分发速率。分发速率可以通过每X秒的消息数量(msg-dispatch-rate)或每X秒的消息字节数(byte-dispatch-rate)来限制。分发速率以秒为单位,可以通过dispatch-rate-period进行配置。msg-dispatch-rate和byte-dispatch-rate的默认值为-1,这表示禁用限流。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces set-subscription-dispatch-rate test-tenant/namespace1 \
--msg-dispatch-rate 1000 \
--byte-dispatch-rate 1048576 \
--dispatch-rate-period 1
admin.namespaces().setSubscriptionDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
获取订阅的配置消息速率
它显示了为命名空间配置的消息速率(此命名空间下的主题每秒可以调度这么多消息)。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces get-subscription-dispatch-rate test-tenant/namespace1
示例输出:
{
"dispatchThrottlingRateInMsg" : 1000,
"dispatchThrottlingRateInByte" : 1048576,
"relativeToPublishRate" : false,
"ratePeriodInSecond" : 1
}
admin.namespaces().getSubscriptionDispatchRate(namespace)
配置复制器的调度限制
为复制器设置调度节流
它为给定命名空间下的所有复制集群之间的复制器设置消息分发速率。分发速率可以通过每X秒的消息数量(msg-dispatch-rate)或每X秒的消息字节数(byte-dispatch-rate)来限制。分发速率以秒为单位,可以通过dispatch-rate-period进行配置。msg-dispatch-rate和byte-dispatch-rate的默认值为-1,表示禁用限流。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces set-replicator-dispatch-rate test-tenant/namespace1 \
--msg-dispatch-rate 1000 \
--byte-dispatch-rate 1048576 \
--dispatch-rate-period 1
admin.namespaces().setReplicatorDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
获取配置的复制器消息速率
它显示了为命名空间配置的消息速率(此命名空间下的主题每秒可以调度这么多消息)
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces get-replicator-dispatch-rate test-tenant/namespace1
{
"dispatchThrottlingRatePerTopicInMsg" : 1000,
"dispatchThrottlingRatePerTopicInByte" : 1048576,
"ratePeriodInSecond" : 1
}
admin.namespaces().getReplicatorDispatchRate(namespace)
配置去重快照间隔
获取去重快照间隔
它显示了为命名空间配置的deduplicationSnapshotInterval(命名空间下的每个主题将根据此间隔拍摄去重快照)
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces get-deduplication-snapshot-interval test-tenant/namespace1
admin.namespaces().getDeduplicationSnapshotInterval(namespace)
设置去重快照间隔
为命名空间设置配置的deduplicationSnapshotInterval。命名空间下的每个主题将根据此间隔进行去重快照。必须将brokerDeduplicationEnabled设置为true,此属性才能生效。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces set-deduplication-snapshot-interval test-tenant/namespace1 --interval 1000
admin.namespaces().setDeduplicationSnapshotInterval(namespace, 1000)
移除去重快照间隔
移除配置的deduplicationSnapshotInterval命名空间(命名空间下的每个主题将根据此间隔进行去重快照)。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces remove-deduplication-snapshot-interval test-tenant/namespace1
admin.namespaces().removeDeduplicationSnapshotInterval(namespace)
命名空间隔离
您可以使用Pulsar隔离策略为命名空间分配资源(broker和bookie)。
从代理卸载命名空间
你可以从当前负责的Pulsar broker中卸载一个命名空间,或者一个命名空间包。
脉冲管理员
使用namespaces命令的unload子命令。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces unload my-tenant/my-ns
admin.namespaces().unload(namespace)
配置入口过滤器策略
设置条目过滤策略
入口过滤器有助于在服务器端过滤消息。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces set-entry-filters \
--desc "The description of the entry filter to be used for user help." \
--entry-filters-name "The class name for the entry filter." \
--entry-filters-dir "The directory for all the entry filter implementations." \
test-tenant/namespace1
admin.namespaces().setEntryFilters(namespace, new EntryFilters("desc", "classes name", "class files localtion"))
获取条目过滤策略
你可以为给定的命名空间获取一个配置好的入口过滤器。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces get-entry-filters test-tenant/namespace1
admin.namespaces().getEntryFilters(namespace);
移除条目过滤策略
您可以删除给定命名空间的条目过滤策略。
- 脉冲管理员
- REST API
- Java
pulsar-admin namespaces remove-entry-filters test-tenant/namespace1
admin.namespaces().removeEntryFilters(namespace)