Skip to main content
Version: 4.0.x

Pulsar WebSocket API 提供了一种简单的方式,使用没有官方客户端库的语言与 Pulsar 进行交互。您可以使用任何 WebSocket 客户端库与 Pulsar WebSocket API 进行交互。有关更多详细信息,请参阅Python 和 Node.js 示例

通过WebSocket,您可以发布和消费消息,并使用Client Feature Matrix页面上提供的功能。

运行WebSocket服务

我们推荐用于本地开发的Pulsar独立版本已经启用了WebSocket服务。

在非独立模式下,有两种部署WebSocket服务的方式:

嵌入Pulsar代理

在这种模式下,WebSocket服务将在已经在代理中运行的同一HTTP服务内运行。要启用此模式,请在安装中的conf/broker.conf配置文件中设置webSocketServiceEnabled参数。

webSocketServiceEnabled=true

作为一个独立的组件

在这种模式下,WebSocket服务将作为独立服务从Pulsar broker运行。此模式的配置在conf/websocket.conf配置文件中处理。您需要至少设置以下参数:

这是一个示例:

configurationMetadataStoreUrl=zk1:2181,zk2:2181,zk3:2181
webServicePort=8080
clusterName=my-cluster

安全设置

要在WebSocket服务上启用TLS加密,请在conf/broker.conf文件中配置以下参数。

tlsEnabled=true
tlsAllowInsecureConnection=false
tlsCertificateFilePath=/path/to/client-websocket.cert.pem
tlsKeyFilePath=/path/to/client-websocket.key-pk8.pem
tlsTrustCertsFilePath=/path/to/ca.cert.pem

要在WebSocket服务上启用静态加密,请在类路径中添加CryptoKeyReaderFactory工厂类,该类将为WebSocket创建CryptoKeyReader,并帮助加载生产者/消费者的加密密钥。

cryptoKeyReaderFactoryClassName=org.apache.pulsar.MyCryptoKeyReaderFactoryClassImpl

启动代理

当配置设置完成后,您可以使用pulsar-daemon工具启动服务:

bin/pulsar-daemon start websocket

发布说明

有关Pulsar WebSocket API的变更日志,请参阅发布说明

API参考

Pulsar的WebSocket API提供了三个端点用于生产消费读取消息。

所有通过WebSocket API的交换都使用JSON。

认证

浏览器 JavaScript WebSocket 客户端

使用查询参数 token 来传输认证令牌。

ws://broker-service-url:8080/path?token=token

生产者端点

生产者端点要求您在URL中指定租户、命名空间和主题:

ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic
查询参数
类型是否必需?解释
sendTimeoutMillislongno发送超时(默认:30秒)
batchingEnabledbooleanno启用消息批处理(默认值:false)
batchingMaxMessagesintno批处理中允许的最大消息数(默认值:1000)
maxPendingMessagesintno设置内部队列的最大大小,用于保存消息(默认值:1000)
batchingMaxPublishDelaylongno消息将被批处理的时间段(默认:10ms)
messageRoutingModestringno分片生产者的消息路由模式SinglePartition, RoundRobinPartition
compressionTypestringno压缩 类型: LZ4, ZLIB
producerNamestringno指定生产者的名称。Pulsar 将强制要求同一名称的生产者只能在一个主题上发布消息
initialSequenceIdlongno设置生产者发布消息的序列ID的基线。
hashingSchemestringno在分区主题上发布时使用的哈希函数JavaStringHash, Murmur3_32Hash
tokenstringno认证令牌,用于浏览器JavaScript客户端
encryptionKeysstringno加密密钥,用于加密发布的消息,仅当使用websocket-configuration中的cryptoKeyReaderFactoryClassName配置配置了加密读取器时。

发布消息

{
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"context": "1"
}
类型是否必需?解释
payloadstringyesBase-64 编码的有效载荷
properties键值对应用程序定义的属性
contextstringno应用程序定义的请求标识符
keystringno对于分区主题,决定使用哪个分区
replicationClustersarrayno将复制限制到指定的集群列表,按名称指定
示例成功响应
{
"result": "ok",
"messageId": "CAAQAw==",
"context": "1"
}
示例失败响应
 {
"result": "send-error:3",
"errorMsg": "Failed to de-serialize from JSON",
"context": "1"
}
类型是否必需?解释
resultstringyesok 如果成功,或者错误信息如果失败
messageIdstringyes分配给发布消息的消息ID
contextstringno应用程序定义的请求标识符

消费者端点

消费者端点要求您在URL中指定租户、命名空间和主题,以及订阅:

ws://broker-service-url:8080/ws/v2/consumer/persistent/:tenant/:namespace/:topic/:subscription
查询参数
类型是否必需?解释
ackTimeoutMillislongno设置未确认消息的超时时间(默认值:0)
subscriptionTypestringno订阅类型: Exclusive, Failover, Shared, Key_Shared
receiverQueueSizeintno消费者接收队列的大小(默认值:1000)
consumerNamestringno消费者名称
priorityLevelintno为消费者定义一个优先级
maxRedeliverCountintno为消费者定义一个maxRedeliverCount(默认值:0)。激活Dead Letter Topic功能。
deadLetterTopicstringno为消费者定义一个deadLetterTopic(默认:{topic}-{subscription}-DLQ)。激活Dead Letter Topic功能。
pullModebooleanno启用拉取模式(默认值:false)。请参阅下面的“流量控制”。
negativeAckRedeliveryDelayintno当消息被否定确认时,消息重新投递前的延迟时间(以毫秒为单位)。默认值为60000。
tokenstringno认证令牌,用于浏览器JavaScript客户端
note

这些参数(除了pullMode)适用于WebSocket服务的内部消费者。 因此,一旦消息进入接收队列,它们将受到重新传递设置的影响, 即使客户端没有在WebSocket上消费。

接收消息

服务器将在WebSocket会话上推送消息:

{
"messageId": "CAMQADAA",
"payload": "hvXcJvHW7kOSrUn17P2q71RA5SdiXwZBqw==",
"properties": {},
"publishTime": "2021-10-29T16:01:38.967-07:00",
"redeliveryCount": 0,
"encryptionContext": {
"keys": {
"client-rsa.pem": {
"keyValue": "jEuwS+PeUzmCo7IfLNxqoj4h7txbLjCQjkwpaw5AWJfZ2xoIdMkOuWDkOsqgFmWwxiecakS6GOZHs94x3sxzKHQx9Oe1jpwBg2e7L4fd26pp+WmAiLm/ArZJo6JotTeFSvKO3u/yQtGTZojDDQxiqFOQ1ZbMdtMZA8DpSMuq+Zx7PqLo43UdW1+krjQfE5WD+y+qE3LJQfwyVDnXxoRtqWLpVsAROlN2LxaMbaftv5HckoejJoB4xpf/dPOUqhnRstwQHf6klKT5iNhjsY4usACt78uILT0pEPd14h8wEBidBz/vAlC/zVMEqiDVzgNS7dqEYS4iHbf7cnWVCn3Hxw==",
"metadata": {}
}
},
"param": "Tfu1PxVm6S9D3+Hk",
"compressionType": "NONE",
"uncompressedMessageSize": 0,
"batchSize": {
"empty": false,
"present": true
}
}
}

以下是WebSocket消费者响应中的参数。

  • 通用参数

    类型是否必需?说明
    messageId字符串消息ID
    payload字符串Base-64编码的有效载荷
    publishTime字符串发布时间戳
    redeliveryCount数字此消息已传递的次数
    properties键值对应用程序定义的属性
    key字符串生产者设置的原始路由键
    encryptionContextEncryptionContext消费者可用于解密接收消息的加密上下文
    param字符串密码的初始化向量(Base64编码)
    batchSize字符串消息中的条目数(如果是批量消息)
    uncompressedMessageSize字符串压缩前的消息大小
    compressionType字符串用于压缩消息有效载荷的算法
  • encryptionContext 相关参数

    类型是否必需?解释
    keyskey-EncryptionKey 对key-EncryptionKey 对中的键是加密密钥名称。key-EncryptionKey 对中的值是加密密钥对象。
  • encryptionKey 相关参数

    类型是否必需?解释
    keyValue字符串加密密钥(Base64编码)
    metadata键值对应用定义的元数据

确认消息

消费者需要确认消息的成功处理,以便Pulsar代理删除它。

{
"messageId": "CAAQAw=="
}
类型是否必需?解释
messageIdstringyes已处理消息的消息ID

否定确认消息

{
"type": "negativeAcknowledge",
"messageId": "CAAQAw=="
}
类型是否必需?解释
messageIdstringyes已处理消息的消息ID

流程控制

推送模式

默认情况下(pullMode=false),消费者端点将使用receiverQueueSize参数来调整其内部接收队列的大小,并限制传递给WebSocket客户端的未确认消息的数量。 在这种模式下,如果您不发送确认,Pulsar WebSocket服务将在达到receiverQueueSize未确认消息发送给WebSocket客户端后停止发送消息。

拉取模式

如果您将pullMode设置为true,WebSocket客户端将需要发送permit命令以允许Pulsar WebSocket服务发送更多消息。

{
"type": "permit",
"permitMessages": 100
}
类型是否必需?解释
typestringyes命令的类型。必须是 permit
permitMessagesintyes允许的消息数量

在这种模式下,可以在不同的连接中确认消息。

检查是否到达主题的末尾

消费者可以通过发送isEndOfTopic请求来检查是否已到达主题的末尾。

请求

{
"type": "isEndOfTopic"
}
类型是否必需?解释
typestringyes命令的类型。必须是 isEndOfTopic

响应

{
"endOfTopic": "true/false"
}

读取器端点

读取器端点要求您在URL中指定租户、命名空间和主题:

ws://broker-service-url:8080/ws/v2/reader/persistent/:tenant/:namespace/:topic
查询参数
类型是否必需?解释
readerNamestringno读取器名称
receiverQueueSizeintno消费者接收队列的大小(默认值:1000)
messageIdint 或 enum从哪个消息ID开始,earliestlatest(默认:latest
tokenstringno认证令牌,用于浏览器JavaScript客户端
接收消息

服务器将在WebSocket会话上推送消息:

{
"messageId": "CAAQAw==",
"payload": "SGVsbG8gV29ybGQ=",
"properties": {"key1": "value1", "key2": "value2"},
"publishTime": "2016-08-30 16:45:57.785",
"redeliveryCount": 4
}
类型是否必需?解释
messageIdstringyes消息ID
payloadstringyesBase-64 编码的有效载荷
publishTimestringyes发布时间戳
redeliveryCountnumberyes此消息已被传递的次数
properties键值对应用程序定义的属性
keystringno生产者设置的原始路由键

确认消息

在WebSocket中,Reader需要确认消息的成功处理,以便Pulsar WebSocket服务更新待处理消息的数量。 如果不发送确认,Pulsar WebSocket服务在达到pendingMessages限制后将停止发送消息。

{
"messageId": "CAAQAw=="
}
类型是否必需?解释
messageIdstringyes已处理消息的消息ID

检查是否到达主题的末尾

消费者可以通过发送isEndOfTopic请求来检查是否已到达主题的末尾。

请求

{
"type": "isEndOfTopic"
}
类型是否必需?解释
typestringyes命令的类型。必须是 isEndOfTopic

响应

{
"endOfTopic": "true/false"
}

错误代码

如果发生错误,服务器将使用以下错误代码关闭WebSocket会话:

错误代码错误信息
1创建生产者失败
2订阅失败
3从JSON反序列化失败
4序列化为JSON失败
5客户端认证失败
6客户端未授权
7无效的有效载荷编码
8未知错误

应用程序负责在退避期后重新建立一个新的WebSocket会话。

客户端示例

下面你将找到PythonNode.js中Pulsar WebSocket API的代码示例。

这个例子使用了websocket-client包。你可以使用pip来安装它:

pip install websocket-client

你也可以从PyPI下载它。

Python 生产者

这是一个发送简单消息到Pulsar topic的Python生产者示例:

import websocket, base64, json

# If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
enable_TLS = False
scheme = 'ws'
if enable_TLS:
scheme = 'wss'

TOPIC = scheme + '://localhost:8080/ws/v2/producer/persistent/public/default/my-topic'

ws = websocket.create_connection(TOPIC)

# encode message
s = "Hello World"
firstEncoded = s.encode("UTF-8")
binaryEncoded = base64.b64encode(firstEncoded)
payloadString = binaryEncoded.decode('UTF-8')

# Send one message as JSON
ws.send(json.dumps({
'payload' : payloadString,
'properties': {
'key1' : 'value1',
'key2' : 'value2'
},
'context' : 5
}))

response = json.loads(ws.recv())
if response['result'] == 'ok':
print( 'Message published successfully')
else:
print('Failed to publish message:', response)
ws.close()

Python 消费者

这是一个Python消费者的示例,它监听Pulsar主题并在消息到达时打印消息ID:

import websocket, base64, json

# If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
enable_TLS = False
scheme = 'ws'
if enable_TLS:
scheme = 'wss'

TOPIC = scheme + '://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub'

ws = websocket.create_connection(TOPIC)

while True:
msg = json.loads(ws.recv())
if not msg: break

print( "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))

# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()

Python 阅读器

这是一个Python读取器的示例,它监听Pulsar主题并在消息到达时打印消息ID:

import websocket, base64, json

# If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
enable_TLS = False
scheme = 'ws'
if enable_TLS:
scheme = 'wss'

TOPIC = scheme + '://localhost:8080/ws/v2/reader/persistent/public/default/my-topic'
ws = websocket.create_connection(TOPIC)

while True:
msg = json.loads(ws.recv())
if not msg: break

print ( "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload'])))

# Acknowledge successful processing
ws.send(json.dumps({'messageId' : msg['messageId']}))

ws.close()

这个例子使用了ws包。你可以使用npm来安装它:

npm install ws

Node.js 生产者

这是一个发送简单消息到Pulsar主题的Node.js生产者示例:

const WebSocket = require('ws');

// If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
const enableTLS = false;
const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/producer/persistent/public/default/my-topic`;
const ws = new WebSocket(topic);

var message = {
"payload" : new Buffer("Hello World").toString('base64'),
"properties": {
"key1" : "value1",
"key2" : "value2"
},
"context" : "1"
};

ws.on('open', function() {
// Send one message
ws.send(JSON.stringify(message));
});

ws.on('message', function(message) {
console.log('received ack: %s', message);
});

Node.js 消费者

这是一个监听与上述生产者相同主题的Node.js消费者示例:

const WebSocket = require('ws');

// If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
const enableTLS = false;
const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub`;
const ws = new WebSocket(topic);

ws.on('message', function(message) {
var receiveMsg = JSON.parse(message);
console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));
});

NodeJS 阅读器

const WebSocket = require('ws');

// If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
const enableTLS = false;
const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/reader/persistent/public/default/my-topic`;
const ws = new WebSocket(topic);

ws.on('message', function(message) {
var receiveMsg = JSON.parse(message);
console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
var ackMsg = {"messageId" : receiveMsg.messageId};
ws.send(JSON.stringify(ackMsg));
});