Skip to main content
Version: 4.0.x

使用Python客户端

创建一个生产者

以下示例为my-topic主题创建了一个Python生产者,并在该主题上发送了10条消息:

import pulsar

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer('my-topic')

for i in range(10):
producer.send(('Hello-%d' % i).encode('utf-8'))

client.close()

创建一个消费者

以下示例在my-topic主题上创建了一个名为my-subscription的消费者,接收传入的消息,打印到达消息的内容和ID,并向Pulsar代理确认每条消息。

import pulsar

client = pulsar.Client('pulsar://localhost:6650')

consumer = client.subscribe('my-topic', 'my-subscription')

while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)

client.close()

此示例展示了如何配置否定确认。

from pulsar import Client, schema
client = Client('pulsar://localhost:6650')
consumer = client.subscribe('negative_acks','test',schema=schema.StringSchema())
producer = client.create_producer('negative_acks',schema=schema.StringSchema())
for i in range(10):
print('send msg "hello-%d"' % i)
producer.send_async('hello-%d' % i, callback=None)
producer.flush()
for i in range(10):
msg = consumer.receive()
consumer.negative_acknowledge(msg)
print('receive and nack msg "%s"' % msg.data())
for i in range(10):
msg = consumer.receive()
consumer.acknowledge(msg)
print('receive and ack msg "%s"' % msg.data())
try:
# No more messages expected
msg = consumer.receive(100)
except:
print("no more msg")
pass

创建一个阅读器

你可以使用Pulsar Python API来使用Pulsar reader interface。这里有一个示例:

# MessageId taken from a previously fetched message
msg_id = msg.message_id()

reader = client.create_reader('my-topic', msg_id)

while True:
msg = reader.read_next()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# No acknowledgment

使用模式

支持的架构类型

你可以在Pulsar中使用不同的内置模式类型。所有的定义都在pulsar.schema包中。

模式备注
BytesSchema获取原始负载作为bytes对象。不执行序列化/反序列化。这是默认的模式
StringSchema将有效载荷编码/解码为UTF-8字符串。使用str对象
JsonSchema需要记录定义。将记录序列化为标准JSON有效载荷
AvroSchema需要记录定义。以AVRO格式序列化

模式定义参考

模式定义是通过继承自pulsar.schema.Record的类完成的。

这个类有许多字段,这些字段可以是pulsar.schema.Field类型或另一个嵌套的Record。所有字段都在pulsar.schema包中指定。这些字段与AVRO字段类型匹配。

字段类型Python 类型备注
Booleanbool
Integerint
Longint
Floatfloat
Doublefloat
Bytesbytes
Stringstr
Arraylist需要为项目指定记录类型。
Mapdict键始终为String。需要指定值类型。

此外,任何Python Enum类型都可以用作有效的字段类型。

字段参数

添加字段时,您可以在构造函数中使用这些参数。

参数默认值备注
defaultNone为字段设置默认值,例如 a = Integer(default=5)
requiredFalse将字段标记为“必填”。它会在模式中相应地设置。

模式定义示例

简单定义
class Example(Record):
a = String()
b = Integer()
c = Array(String())
i = Map(String())
使用枚举
from enum import Enum

class Color(Enum):
red = 1
green = 2
blue = 3

class Example(Record):
name = String()
color = Color
复杂类型
class MySubRecord(Record):
x = Integer()
y = Long()
z = String()

class Example(Record):
a = String()
sub = MySubRecord()
为Avro模式设置命名空间

使用特殊字段_avro_namespace设置Avro记录模式的命名空间。

class NamespaceDemo(Record):
_avro_namespace = 'xxx.xxx.xxx'
x = String()
y = Integer()

模式定义如下。

{
"name": "NamespaceDemo", "namespace": "xxx.xxx.xxx", "type": "record", "fields": [
{"name": "x", "type": ["null", "string"]},
{"name": "y", "type": ["null", "int"]}
]
}

声明并验证模式

在创建生产者之前,Pulsar broker 会验证现有主题模式是否为正确类型,并且格式是否与类的模式定义兼容。如果主题模式的格式与模式定义不兼容,则在创建生产者时会发生异常。

一旦生产者以某种模式定义创建,它只接受声明模式类实例的对象。

同样,对于消费者或读者来说,消费者返回的是一个对象(这是模式记录类的一个实例),而不是原始字节。

示例

consumer = client.subscribe(
topic='my-topic',
subscription_name='my-subscription',
schema=AvroSchema(Example) )

while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)

更多代码示例,请参见Schema - 入门