Pulsar WebSocket API 提供了一种简单的方式,使用没有官方客户端库的语言与 Pulsar 进行交互。您可以使用任何 WebSocket 客户端库与 Pulsar WebSocket API 进行交互。有关更多详细信息,请参阅Python 和 Node.js 示例。
通过WebSocket,您可以发布和消费消息,并使用Client Feature Matrix页面上提供的功能。
运行WebSocket服务
我们推荐用于本地开发的Pulsar独立版本已经启用了WebSocket服务。
在非独立模式下,有两种部署WebSocket服务的方式:
嵌入Pulsar代理
在这种模式下,WebSocket服务将在已经在代理中运行的同一HTTP服务内运行。要启用此模式,请在安装中的conf/broker.conf配置文件中设置webSocketServiceEnabled参数。
webSocketServiceEnabled=true
作为一个独立的组件
在这种模式下,WebSocket服务将作为独立服务从Pulsar broker运行。此模式的配置在conf/websocket.conf配置文件中处理。您需要至少设置以下参数:
这是一个示例:
configurationMetadataStoreUrl=zk1:2181,zk2:2181,zk3:2181
webServicePort=8080
clusterName=my-cluster
安全设置
要在WebSocket服务上启用TLS加密,请在conf/broker.conf文件中配置以下参数。
tlsEnabled=true
tlsAllowInsecureConnection=false
tlsCertificateFilePath=/path/to/client-websocket.cert.pem
tlsKeyFilePath=/path/to/client-websocket.key-pk8.pem
tlsTrustCertsFilePath=/path/to/ca.cert.pem
要在WebSocket服务上启用静态加密,请在类路径中添加CryptoKeyReaderFactory工厂类,该类将为WebSocket创建CryptoKeyReader,并帮助加载生产者/消费者的加密密钥。
cryptoKeyReaderFactoryClassName=org.apache.pulsar.MyCryptoKeyReaderFactoryClassImpl
启动代理
当配置设置完成后,您可以使用pulsar-daemon工具启动服务:
bin/pulsar-daemon start websocket
发布说明
有关Pulsar WebSocket API的变更日志,请参阅发布说明。
API参考
Pulsar的WebSocket API提供了三个端点用于生产、消费和读取消息。
所有通过WebSocket API的交换都使用JSON。
认证
浏览器 JavaScript WebSocket 客户端
使用查询参数 token 来传输认证令牌。
ws://broker-service-url:8080/path?token=token
生产者端点
生产者端点要求您在URL中指定租户、命名空间和主题:
ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic
查询参数
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
sendTimeoutMillis | long | no | 发送超时(默认:30秒) |
batchingEnabled | boolean | no | 启用消息批处理(默认值:false) |
batchingMaxMessages | int | no | 批处理中允许的最大消息数(默认值:1000) |
maxPendingMessages | int | no | 设置内部队列的最大大小,用于保存消息(默认值:1000) |
batchingMaxPublishDelay | long | no | 消息将被批处理的时间段(默认:10ms) |
messageRoutingMode | string | no | 分片生产者的消息路由模式:SinglePartition, RoundRobinPartition |
compressionType | string | no | 压缩 类型: LZ4, ZLIB |
producerName | string | no | 指定生产者的名称。Pulsar 将强制要求同一名称的生产者只能在一个主题上发布消息 |
initialSequenceId | long | no | 设置生产者发布消息的序列ID的基线。 |
hashingScheme | string | no | 在分区主题上发布时使用的哈希函数:JavaStringHash, Murmur3_32Hash |
token | string | no | 认证令牌,用于浏览器JavaScript客户端 |
encryptionKeys | string | no | 加密密钥,用于加密发布的消息,仅当使用websocket-configuration中的cryptoKeyReaderFactoryClassName配置配置了加密读取器时。 |
发布消息
{
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"context": "1"
}
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
payload | string | yes | Base-64 编码的有效载荷 |
properties | 键值对 | 否 | 应用程序定义的属性 |
context | string | no | 应用程序定义的请求标识符 |
key | string | no | 对于分区主题,决定使用哪个分区 |
replicationClusters | array | no | 将复制限制到指定的集群列表,按名称指定 |
示例成功响应
{
"result": "ok",
"messageId": "CAAQAw==",
"context": "1"
}
示例失败响应
{
"result": "send-error:3",
"errorMsg": "Failed to de-serialize from JSON",
"context": "1"
}
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
result | string | yes | ok 如果成功,或者错误信息如果失败 |
messageId | string | yes | 分配给发布消息的消息ID |
context | string | no | 应用程序定义的请求标识符 |
消费者端点
消费者端点要求您在URL中指定租户、命名空间和主题,以及订阅:
ws://broker-service-url:8080/ws/v2/consumer/persistent/:tenant/:namespace/:topic/:subscription
查询参数
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
ackTimeoutMillis | long | no | 设置未确认消息的超时时间(默认值:0) |
subscriptionType | string | no | 订阅类型: Exclusive, Failover, Shared, Key_Shared |
receiverQueueSize | int | no | 消费者接收队列的大小(默认值:1000) |
consumerName | string | no | 消费者名称 |
priorityLevel | int | no | 为消费者定义一个优先级 |
maxRedeliverCount | int | no | 为消费者定义一个maxRedeliverCount(默认值:0)。激活Dead Letter Topic功能。 |
deadLetterTopic | string | no | 为消费者定义一个deadLetterTopic(默认:{topic}-{subscription}-DLQ)。激活Dead Letter Topic功能。 |
pullMode | boolean | no | 启用拉取模式(默认值:false)。请参阅下面的“流量控制”。 |
negativeAckRedeliveryDelay | int | no | 当消息被否定确认时,消息重新投递前的延迟时间(以毫秒为单位)。默认值为60000。 |
token | string | no | 认证令牌,用于浏览器JavaScript客户端 |
这些参数(除了pullMode)适用于WebSocket服务的内部消费者。
因此,一旦消息进入接收队列,它们将受到重新传递设置的影响,
即使客户端没有在WebSocket上消费。
接收消息
服务器将在WebSocket会话上推送消息:
{
"messageId": "CAMQADAA",
"payload": "hvXcJvHW7kOSrUn17P2q71RA5SdiXwZBqw==",
"properties": {},
"publishTime": "2021-10-29T16:01:38.967-07:00",
"redeliveryCount": 0,
"encryptionContext": {
"keys": {
"client-rsa.pem": {
"keyValue": "jEuwS+PeUzmCo7IfLNxqoj4h7txbLjCQjkwpaw5AWJfZ2xoIdMkOuWDkOsqgFmWwxiecakS6GOZHs94x3sxzKHQx9Oe1jpwBg2e7L4fd26pp+WmAiLm/ArZJo6JotTeFSvKO3u/yQtGTZojDDQxiqFOQ1ZbMdtMZA8DpSMuq+Zx7PqLo43UdW1+krjQfE5WD+y+qE3LJQfwyVDnXxoRtqWLpVsAROlN2LxaMbaftv5HckoejJoB4xpf/dPOUqhnRstwQHf6klKT5iNhjsY4usACt78uILT0pEPd14h8wEBidBz/vAlC/zVMEqiDVzgNS7dqEYS4iHbf7cnWVCn3Hxw==",
"metadata": {}
}
},
"param": "Tfu1PxVm6S9D3+Hk",
"compressionType": "NONE",
"uncompressedMessageSize": 0,
"batchSize": {
"empty": false,
"present": true
}
}
}
以下是WebSocket消费者响应中的参数。
-
通用参数
键 类型 是否必需? 说明 messageId字符串 是 消息ID payload字符串 是 Base-64编码的有效载荷 publishTime字符串 是 发布时间戳 redeliveryCount数字 是 此消息已传递的次数 properties键值对 否 应用程序定义的属性 key字符串 否 生产者设置的原始路由键 encryptionContextEncryptionContext 否 消费者可用于解密接收消息的加密上下文 param字符串 否 密码的初始化向量(Base64编码) batchSize字符串 否 消息中的条目数(如果是批量消息) uncompressedMessageSize字符串 否 压缩前的消息大小 compressionType字符串 否 用于压缩消息有效载荷的算法 -
encryptionContext相关参数键 类型 是否必需? 解释 keyskey-EncryptionKey 对 是 key-EncryptionKey对中的键是加密密钥名称。key-EncryptionKey对中的值是加密密钥对象。 -
encryptionKey相关参数键 类型 是否必需? 解释 keyValue字符串 是 加密密钥(Base64编码) metadata键值对 否 应用定义的元数据
确认消息
消费者需要确认消息的成功处理,以便Pulsar代理删除它。
{
"messageId": "CAAQAw=="
}
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
messageId | string | yes | 已处理消息的消息ID |
否定确认消息
{
"type": "negativeAcknowledge",
"messageId": "CAAQAw=="
}
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
messageId | string | yes | 已处理消息的消息ID |
流程控制
推送模式
默认情况下(pullMode=false),消费者端点将使用receiverQueueSize参数来调整其内部接收队列的大小,并限制传递给WebSocket客户端的未确认消息的数量。
在这种模式下,如果您不发送确认,Pulsar WebSocket服务将在达到receiverQueueSize未确认消息发送给WebSocket客户端后停止发送消息。
拉取模式
如果您将pullMode设置为true,WebSocket客户端将需要发送permit命令以允许Pulsar WebSocket服务发送更多消息。
{
"type": "permit",
"permitMessages": 100
}
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
type | string | yes | 命令的类型。必须是 permit |
permitMessages | int | yes | 允许的消息数量 |
在这种模式下,可以在不同的连接中确认消息。
检查是否到达主题的末尾
消费者可以通过发送isEndOfTopic请求来检查是否已到达主题的末尾。
请求
{
"type": "isEndOfTopic"
}
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
type | string | yes | 命令的类型。必须是 isEndOfTopic |
响应
{
"endOfTopic": "true/false"
}
读取器端点
读取器端点要求您在URL中指定租户、命名空间和主题:
ws://broker-service-url:8080/ws/v2/reader/persistent/:tenant/:namespace/:topic
查询参数
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
readerName | string | no | 读取器名称 |
receiverQueueSize | int | no | 消费者接收队列的大小(默认值:1000) |
messageId | int 或 enum | 否 | 从哪个消息ID开始,earliest 或 latest(默认:latest) |
token | string | no | 认证令牌,用于浏览器JavaScript客户端 |
接收消息
服务器将在WebSocket会话上推送消息:
{
"messageId": "CAAQAw==",
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"publishTime": "2016-08-30 16:45:57.785",
"redeliveryCount": 4
}
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
messageId | string | yes | 消息ID |
payload | string | yes | Base-64 编码的有效载荷 |
publishTime | string | yes | 发布时间戳 |
redeliveryCount | number | yes | 此消息已被传递的次数 |
properties | 键值对 | 否 | 应用程序定义的属性 |
key | string | no | 生产者设置的原始路由键 |
确认消息
在WebSocket中,Reader需要确认消息的成功处理,以便Pulsar WebSocket服务更新待处理消息的数量。
如果不发送确认,Pulsar WebSocket服务在达到pendingMessages限制后将停止发送消息。
{
"messageId": "CAAQAw=="
}
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
messageId | string | yes | 已处理消息的消息ID |
检查是否到达主题的末尾
消费者可以通过发送isEndOfTopic请求来检查是否已到达主题的末尾。
请求
{
"type": "isEndOfTopic"
}
| 键 | 类型 | 是否必需? | 解释 |
|---|---|---|---|
type | string | yes | 命令的类型。必须是 isEndOfTopic |
响应
{
"endOfTopic": "true/false"
}
错误代码
如果发生错误,服务器将使用以下错误代码关闭WebSocket会话:
| 错误代码 | 错误信息 |
|---|---|
| 1 | 创建生产者失败 |
| 2 | 订阅失败 |
| 3 | 从JSON反序列化失败 |
| 4 | 序列化为JSON失败 |
| 5 | 客户端认证失败 |
| 6 | 客户端未授权 |
| 7 | 无效的有效载荷编码 |
| 8 | 未知错误 |
应用程序负责在退避期后重新建立一个新的WebSocket会话。
客户端示例
下面你将找到Python和Node.js中Pulsar WebSocket API的代码示例。
这个例子使用了websocket-client包。你可以使用pip来安装它:
pip install websocket-client
你也可以从PyPI下载它。
Python 生产者
这是一个发送简单消息到Pulsar topic的Python生产者示例:
import websocket, base64, json
# If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
enable_TLS = False
scheme = 'ws'
if enable_TLS:
scheme = 'wss'
TOPIC = scheme + '://localhost:8080/ws/v2/producer/persistent/public/default/my-topic'
ws = websocket.create_connection(TOPIC)
# encode message
s = "Hello World"
firstEncoded = s.encode("UTF-8")
binaryEncoded = base64.b64encode(firstEncoded)
payloadString = binaryEncoded.decode('UTF-8')
# Send one message as JSON
ws.send(json.dumps({
'payload' : payloadString,
'properties': {
'key1' : 'value1',
'key2' : 'value2'
},
'context' : 5
}))
response = json.loads(ws.recv())
if response['result'] == 'ok':
print( 'Message published successfully')
else:
print('Failed to publish message:', response)
ws.close()
Python 消费者
这是一个Python消费者的示例,它监听Pulsar主题并在消息到达时打印消息ID:
import websocket, base64, json
# If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
enable_TLS = False
scheme = 'ws'
if enable_TLS:
scheme = 'wss'
TOPIC = scheme + '://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub'
ws = websocket.create_connection(TOPIC)
while True:
msg = json.loads(ws.recv())
if not msg: break
print( "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))
ws.close()
Python 阅读器
这是一个Python读取器的示例,它监听Pulsar主题并在消息到达时打印消息ID:
import websocket, base64, json
# If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
enable_TLS = False
scheme = 'ws'
if enable_TLS:
scheme = 'wss'
TOPIC = scheme + '://localhost:8080/ws/v2/reader/persistent/public/default/my-topic'
ws = websocket.create_connection(TOPIC)
while True:
msg = json.loads(ws.recv())
if not msg: break
print ( "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))
# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))
ws.close()
npm install ws
Node.js 生产者
这是一个发送简单消息到Pulsar主题的Node.js生产者示例:
const WebSocket = require('ws');
// If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
const enableTLS = false;
const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/producer/persistent/public/default/my-topic`;
const ws = new WebSocket(topic);
var message = {
"payload" : new Buffer("Hello World").toString('base64'),
"properties": {
"key1" : "value1",
"key2" : "value2"
},
"context" : "1"
};
ws.on('open', function() {
// Send one message
ws.send(JSON.stringify(message));
});
ws.on('message', function(message) {
console.log('received ack: %s', message);
});
Node.js 消费者
这是一个监听与上述生产者相同主题的Node.js消费者示例:
const WebSocket = require('ws');
// If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
const enableTLS = false;
const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub`;
const ws = new WebSocket(topic);
ws.on('message', function(message) {
var receiveMsg = JSON.parse(message);
console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));
});
NodeJS 阅读器
const WebSocket = require('ws');
// If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
const enableTLS = false;
const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/reader/persistent/public/default/my-topic`;
const ws = new WebSocket(topic);
ws.on('message', function(message) {
var receiveMsg = JSON.parse(message);
console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));
});