Pulsar 二进制协议规范
Pulsar 使用自定义的二进制协议进行生产者和消费者与代理之间的通信。该协议旨在支持所需的功能,如确认和流量控制,同时确保最大的传输和实现效率。
客户端和代理之间交换命令。命令格式化为二进制协议缓冲区(也称为protobuf)消息。protobuf命令的格式在PulsarApi.proto
文件中指定,并在下面的Protobuf接口部分中进行了文档化。
连接共享
不同生产者和消费者的命令可以交错并通过同一连接发送,没有限制。
所有与Pulsar协议相关的命令都包含在一个BaseCommand
protobuf消息中,该消息包括一个Type
枚举,其中包含所有可能的子命令作为可选字段。BaseCommand
消息只能指定一个子命令。
框架
由于protobuf不提供任何类型的消息帧,Pulsar协议中的所有消息都附加了一个4字节的字段,用于指定帧的大小。单个帧的最大允许大小为5 MB。
Pulsar协议允许两种类型的命令:
为了提高效率,消息负载以原始格式而不是protobuf格式传递。
简单命令
简单(无负载)命令具有以下基本结构:
组件 | 描述 | 大小(字节) |
---|---|---|
totalSize | 帧的大小,计算它之后的所有内容(以字节为单位) | 4 |
commandSize | protobuf序列化命令的大小 | 4 |
command | protobuf 序列化的命令 |
消息命令
有效载荷命令具有以下基本结构:
组件 | 必需或可选 | 描述 | 大小(字节) |
---|---|---|---|
totalSize | 必填 | 帧的大小,计算它之后的所有内容(以字节为单位) | 4 |
commandSize | 必填 | protobuf序列化命令的大小 | 4 |
command | 必填 | protobuf 序列化的命令 | |
magicNumberOfBrokerEntryMetadata | 可选 | 一个2字节的字节数组(0x0e02 ),用于标识代理条目元数据 注意: magicNumberOfBrokerEntryMetadata 、brokerEntryMetadataSize 和brokerEntryMetadata 应一起使用。 | 2 |
brokerEntryMetadataSize | 可选 | broker条目元数据的大小 | 4 |
brokerEntryMetadata | 可选 | 作为二进制protobuf消息存储的broker条目元数据 | |
magicNumber | 必填 | 一个2字节的字节数组(0x0e01 ),用于标识当前格式 | 2 |
checksum | 必填 | 其后所有内容的CRC32-C校验和 | 4 |
metadataSize | 必填 | 消息metadata的大小 | 4 |
metadata | 必需 | 消息metadata存储为二进制protobuf消息 | |
payload | 必填 | 帧中剩余的任何内容都被视为有效载荷,可以包括任何字节序列 |
Broker 入口元数据
Broker入口元数据与消息元数据一起存储为序列化的protobuf消息。 它是由broker在消息到达broker时创建的,如果配置了,则原封不动地传递给消费者。
字段 | 必填或可选 | 描述 |
---|---|---|
broker_timestamp | 可选 | 消息到达代理时的时间戳(id est 自1970年1月1日以来的毫秒数,UTC时间) |
index | 可选 | 消息的索引。它由代理分配。 |
如果你想为brokers使用broker入口元数据,请在broker.conf
文件中配置brokerEntryMetadataInterceptors
参数。
如果你想为消费者使用代理条目元数据:
-
使用客户端协议版本 18 或更高版本。
-
配置
brokerEntryMetadataInterceptors
参数,并在broker.conf
文件中将exposingBrokerEntryMetadataToClientEnabled
参数设置为true
。
消息元数据
消息元数据与应用程序指定的有效载荷一起存储为序列化的protobuf消息。元数据由生产者创建,并在不更改的情况下传递给消费者。
字段 | 必填或可选 | 描述 |
---|---|---|
producer_name | 必填 | 发布消息的生产者名称 |
sequence_id | 必填 | 消息的序列ID,由生产者分配 |
publish_time | 必填 | 发布的时间戳,以Unix时间表示(即自1970年1月1日UTC以来的毫秒数) |
properties | 必填 | 一系列键/值对(使用KeyValue 消息)。这些是应用程序定义的键和值,对Pulsar没有特殊意义。 |
replicated_from | 可选 | 表示消息已被复制,并指定消息最初发布的集群名称 |
partition_key | 可选 | 在发布到分区主题时,如果存在键,则使用键的哈希值来确定选择哪个分区。分区键用作消息键。 |
compression | 可选 | 表示有效载荷已被压缩,并指明使用的压缩库 |
uncompressed_size | 可选 | 如果使用了压缩,生产者必须用原始有效载荷大小填充未压缩大小字段 |
num_messages_in_batch | 可选 | 如果此消息实际上是多个条目的批次,则必须将此字段设置为批次中的消息数量 |
批量消息
使用批量消息时,有效载荷将包含一个条目列表,每个条目都有其单独的元数据,由SingleMessageMetadata
对象定义。
对于单个批次,有效载荷格式将如下所示:
字段 | 必填或可选 | 描述 |
---|---|---|
metadataSizeN | Required | 单个消息元数据序列化后的Protobuf大小 |
metadataN | 必填 | 单条消息元数据 |
payloadN | 必填 | 应用程序传递的消息负载 |
每个元数据字段看起来像这样;
字段 | 必填或可选 | 描述 |
---|---|---|
properties | 必填 | 应用程序定义的属性 |
partition key | 可选 | 用于指示哈希到特定分区的键 |
payload_size | 必填 | 批次中单个消息的有效载荷大小 |
当启用压缩时,整个批次将一次性压缩。
交互
连接建立
在打开到代理的TCP连接后,通常在端口6650上,客户端负责启动会话。
在从代理接收到Connected
响应后,客户端可以认为连接已准备好使用。或者,如果代理未验证客户端身份验证,它将回复一个Error
命令并关闭TCP连接。
示例:
message CommandConnect {
"client_version" : "Pulsar-Client-Java-v1.15.2",
"auth_method_name" : "my-authentication-plugin",
"auth_data" : "my-auth-data",
"protocol_version" : 6
}
字段:
client_version
: 基于字符串的标识符。格式不受强制要求。auth_method_name
: (可选) 如果启用了认证插件,则为认证插件的名称。auth_data
: (可选) 插件特定的认证数据。protocol_version
: 表示客户端支持的协议版本。Broker 不会发送协议新版本中引入的命令。Broker 可能会强制执行最低版本。original_principal
: 由代理添加。普通客户端不应提供此值。当设置此值且启用授权时,auth_data
必须映射到 broker.conf 中的某个proxyRoles
。original_auth_method
: 由代理添加。普通客户端不应提供此值。original_auth_data
: 由代理在配置为这样做时添加。普通客户端不应提供此值。proxy_version
: 由代理添加。代理会拒绝带有此字段的Connect
命令。普通客户端不应提供此值。当在代理中启用身份验证和授权时,只有配置的proxyRoles
之一有权提供此字段。为了向后兼容,代理不要求proxyRole
提供此字段。
message CommandConnected {
"server_version" : "Pulsar-Broker-v1.15.2",
"protocol_version" : 6
}
字段:
server_version
: 代理版本的字符串标识符。protocol_version
: 代理支持的协议版本。客户端不得尝试发送在协议新版本中引入的命令。
保持连接
为了识别客户端和代理之间的长时间网络分区或机器崩溃但未中断远程端TCP连接的情况(例如:停电、内核恐慌、硬重启...),我们引入了一种机制来探测远程对等方的可用性状态。
客户端和代理都会定期发送Ping
命令,如果在超时时间内(代理默认使用60秒)没有收到Pong
响应,它们将关闭套接字。
一个有效的Pulsar客户端实现不需要发送Ping
探测,尽管它需要在从代理接收到探测后迅速回复,以防止远程端强制关闭TCP连接。
生产者
为了发送消息,客户端需要建立一个生产者。在创建生产者时,代理将首先验证该特定客户端是否有权在该主题上发布。
一旦客户端收到生产者创建的确认,它就可以开始向代理发布消息,引用之前协商的生产者ID。
如果客户端没有收到指示生产者创建成功或失败的响应, 客户端应首先发送命令关闭原始生产者,然后再发送 命令重新尝试创建生产者。
在创建或连接生产者之前,您需要先执行主题查找。
命令生产者
message CommandProducer {
"topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
"producer_id" : 1,
"request_id" : 1
}
字段:
topic
: 你想要创建生产者的完整主题名称。producer_id
: 客户端生成的生产者标识符。在同一连接内需要唯一。request_id
: 此请求的标识符。用于将响应与原始请求匹配。在同一连接内需要是唯一的。producer_name
: (可选) 如果指定了生产者名称,将使用该名称,否则代理将生成一个唯一的名称。生成的生产者名称保证是全局唯一的。实现期望在生产者最初创建时让代理生成一个新的生产者名称,然后在重新连接后重新创建生产者时重复使用它。
代理将回复ProducerSuccess
或Error
命令。
命令生产者成功
message CommandProducerSuccess {
"request_id" : 1,
"producer_name" : "generated-unique-producer-name"
}
字段:
request_id
:CreateProducer
请求的原始ID。producer_name
: 生成全局唯一的生产者名称或客户端指定的名称(如果有)。
命令发送
命令 Send
用于在已经存在的生产者上下文中发布新消息。如果尚未为连接创建生产者,代理将终止连接。此命令用于包含命令以及消息有效载荷的帧中,其完整格式在消息命令部分中指定。
message CommandSend {
"producer_id" : 1,
"sequence_id" : 0,
"num_messages" : 1
}
字段:
producer_id
: 现有生产者的ID。sequence_id
: 每条消息都有一个相关的序列ID,预期通过从0开始的计数器实现。确认消息有效发布的SendReceipt
将通过其序列ID引用它。num_messages
: (可选) 用于一次性发布一批消息时。
命令 SendReceipt
在消息被持久化到配置的副本数量后,代理将向生产者发送确认回执。
message CommandSendReceipt {
"producer_id" : 1,
"sequence_id" : 0,
"message_id" : {
"ledgerId" : 123,
"entryId" : 456
}
}
字段:
producer_id
: 发起发送请求的生产者的ID。sequence_id
: 发布消息的序列ID。message_id
: 系统分配给发布消息的消息ID 在单个集群内是唯一的。消息ID由2个长整型组成,ledgerId
和entryId
,这反映了这个唯一ID是在追加到BookKeeper账本时分配的。
命令 关闭生产者
此命令可以由生产者或代理发送。
当接收到CloseProducer
命令时,代理将停止接受该生产者的任何更多消息,等待所有待处理的消息被持久化,然后向客户端回复Success
。
如果客户端在超时时间内未收到Producer
命令的响应,
客户端必须首先发送CloseProducer
命令,然后再发送另一个
Producer
命令。客户端在发送下一个Producer
命令之前,
不需要等待CloseProducer
命令的响应。
当代理执行优雅的故障转移时(例如:代理正在重新启动,或者负载均衡器正在卸载主题以转移到不同的代理),代理可以向客户端发送CloseProducer
命令。
当接收到CloseProducer
时,客户端应再次进行服务发现查找并重新创建生产者。TCP连接不受影响。
消费者
消费者用于附加到订阅并从其消费消息。 每次重新连接后,客户端需要订阅主题。如果 订阅尚不存在,将创建一个新的订阅。
在创建或连接消费者之前,您需要先执行主题查找。
如果客户端没有收到表示消费者创建成功或失败的响应, 客户端应首先发送命令关闭原始消费者,然后再发送命令重新尝试创建消费者。
流程控制
消费者准备就绪后,客户端需要给予权限给代理以推送消息。这是通过Flow
命令完成的。
一个Flow
命令提供了额外的许可来向消费者发送消息。
典型的消费者实现将使用一个队列来在应用程序准备好消费之前积累这些消息。
在应用程序从队列中取出一半的消息后,消费者会向代理发送许可以请求更多消息(等于队列中消息的一半)。
例如,如果队列大小为1000,消费者消费了队列中的500条消息。 然后消费者向代理发送许可以请求500条消息。
命令订阅
message CommandSubscribe {
"topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
"subscription" : "my-subscription-name",
"subType" : "Exclusive",
"consumer_id" : 1,
"request_id" : 1
}
字段:
topic
: 您想要在其上创建消费者的完整主题名称。subscription
: 订阅名称。subType
: 订阅类型: 独占, 共享, 故障转移, 键共享.consumer_id
: 客户端生成的消费者标识符。在同一连接内需要是唯一的。request_id
: 此请求的标识符。用于将响应与原始请求匹配。在同一连接内需要是唯一的。consumer_name
: (可选) 客户端可以指定一个消费者名称。这个名称可以用于在统计中跟踪特定的消费者。此外,在故障转移订阅类型中,该名称用于决定哪个消费者被选为主(接收消息的消费者):消费者按其名称排序,第一个被选为主。
命令流程
message CommandFlow {
"consumer_id" : 1,
"messagePermits" : 1000
}
字段:
consumer_id
: 已建立的消费者的ID。messagePermits
: 授予代理的额外许可数量,用于推送更多消息。
命令消息
命令 Message
用于代理在给定的许可范围内将消息推送到现有的消费者。
此命令用于包含消息有效载荷的框架中,其完整格式在消息命令部分中指定。
message CommandMessage {
"consumer_id" : 1,
"message_id" : {
"ledgerId" : 123,
"entryId" : 456
}
}
命令确认
一个Ack
用于向代理发出信号,表明应用程序已成功处理了给定的消息,并且代理可以丢弃该消息。
此外,代理还将根据已确认的消息维护消费者的位置。
message CommandAck {
"consumer_id" : 1,
"ack_type" : "Individual",
"message_id" : {
"ledgerId" : 123,
"entryId" : 456
}
}
字段:
consumer_id
: 已建立的消费者的ID。ack_type
: 确认类型:Individual
或Cumulative
.message_id
: 要确认的消息的ID。validation_error
: (可选) 表示消费者由于以下原因丢弃了消息:UncompressedSizeCorruption
、DecompressionError
、ChecksumMismatch
、BatchDeSerializeError
。properties
: (可选) 保留的配置项。txnid_most_bits
: (可选) 与事务协调器ID相同,txnid_most_bits
和txnid_least_bits
唯一标识一个事务。txnid_least_bits
: (可选) 在事务协调器中打开的事务的ID,txnid_most_bits
和txnid_least_bits
唯一标识一个事务。request_id
: (可选) 用于处理响应和超时的ID。
命令确认响应
一个AckResponse
是代理对客户端发送的请求的确认响应。它包含请求中发送的consumer_id
。
如果使用了事务,它包含请求中发送的事务ID和请求ID。
客户端根据请求ID完成特定请求。
如果设置了error
字段,则表示请求失败。
一个带有重定向的AckResponse
示例:
message CommandAckResponse {
"consumer_id" : 1,
"txnid_least_bits" = 0,
"txnid_most_bits" = 1,
"request_id" = 5
}
命令 CloseConsumer
此命令可以由生产者或代理发送。
此命令的行为与CloseProducer
相同
如果客户端在超时时间内未收到Subscribe
命令的响应,
客户端必须先发送一个CloseConsumer
命令,然后再发送另一个
Subscribe
命令。客户端在发送下一个Subscribe
命令之前,
不需要等待CloseConsumer
命令的响应。
命令 重新发送未确认的消息
消费者可以请求代理重新传递一些或所有推送到该特定消费者但尚未确认的待处理消息。
protobuf 对象接受消费者希望重新传递的消息 ID 列表。如果列表为空,代理将重新传递所有未决消息。
在重新投递时,消息可以发送给同一个消费者,或者在共享订阅的情况下,分散到所有可用的消费者。
命令到达主题末尾
这是由代理发送给特定消费者的,每当主题被“终止”并且订阅上的所有消息都被确认时。
客户端应使用此命令通知应用程序不再有来自消费者的消息。
命令消费者统计
此命令由客户端发送,用于从代理检索订阅者和消费者级别的统计信息。
字段:
request_id
: 请求的ID,用于关联请求和响应。consumer_id
: 已建立的消费者的ID。
命令 ConsumerStatsResponse
这是代理对客户端ConsumerStats请求的响应。
它包含请求中发送的consumer_id
的订阅者和消费者级别的统计信息。
如果设置了error_code
或error_message
字段,则表示请求失败。
命令取消订阅
此命令由客户端发送,以从相关主题取消订阅consumer_id
。
字段:
request_id
: 请求的ID。consumer_id
: 需要取消订阅的已建立消费者的ID。
服务发现
主题查找
每次客户端需要创建或重新连接生产者或消费者时,都需要执行主题查找。查找用于发现哪个特定的代理正在为我们即将使用的主题提供服务。
查找可以通过REST调用来完成,如admin API文档中所述。
自 Pulsar-1.16 起,也可以在二进制协议中执行查找。
为了举例说明,我们假设有一个服务发现组件运行在 pulsar://broker.example.com:6650
各个代理将在 pulsar://broker-1.example.com:6650
,
pulsar://broker-2.example.com:6650
, ... 运行
客户端可以使用与发现服务主机的连接来发出LookupTopic
命令。响应可以是连接的代理主机名,或者是重试查找的代理主机名。
LookupTopic
命令必须在已经通过 Connect
/ Connected
初始握手的连接中使用。
message CommandLookupTopic {
"topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
"request_id" : 1,
"authoritative" : false
}
字段:
topic
: 要查找的主题名称。request_id
: 将与响应一起传递的请求ID。authoritative
: 初始查找请求应使用 false。当跟随重定向响应时,客户端应传递响应中包含的相同值。
查找主题响应
成功查找的响应示例:
message CommandLookupTopicResponse {
"request_id" : 1,
"response" : "Connect",
"brokerServiceUrl" : "pulsar://broker-1.example.com:6650",
"brokerServiceUrlTls" : "pulsar+ssl://broker-1.example.com:6651",
"authoritative" : true
}
这是一个带有重定向的查找响应示例:
message CommandLookupTopicResponse {
"request_id" : 1,
"response" : "Redirect",
"brokerServiceUrl" : "pulsar://broker-2.example.com:6650",
"brokerServiceUrlTls" : "pulsar+ssl://broker-2.example.com:6651",
"authoritative" : true
}
在第二种情况下,我们需要重新发出LookupTopic
命令请求到broker-2.example.com
,这个代理将能够对查找请求给出明确的答案。
分区主题发现
分区主题元数据发现用于确定一个主题是否是“分区主题”以及设置了多少个分区。
如果主题被标记为“分区”,客户端应创建多个生产者或消费者,每个分区一个,使用partition-X
后缀。
此信息仅在首次创建生产者或消费者时需要检索。重新连接后无需执行此操作。
分区主题元数据的发现与主题查找非常相似。客户端向服务发现地址发送请求,响应将包含实际的元数据。
命令 PartitionedTopicMetadata
message CommandPartitionedTopicMetadata {
"topic" : "persistent://my-property/my-cluster/my-namespace/my-topic",
"request_id" : 1
}
字段:
topic
: 要检查分区元数据的主题。request_id
: 将与响应一起传递的请求ID。
命令 PartitionedTopicMetadataResponse
带有元数据的响应示例:
message CommandPartitionedTopicMetadataResponse {
"request_id" : 1,
"response" : "Success",
"partitions" : 32
}
Protobuf 接口
所有Pulsar的Protobuf定义可以在这里找到。