使用Python客户端
创建一个生产者
以下示例为my-topic主题创建了一个Python生产者,并在该主题上发送了10条消息:
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
producer.send(('Hello-%d' % i).encode('utf-8'))
client.close()
创建一个消费者
以下示例在my-topic主题上创建了一个名为my-subscription的消费者,接收传入的消息,打印到达消息的内容和ID,并向Pulsar代理确认每条消息。
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic', 'my-subscription')
while True:
msg = consumer.receive()
try:
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)
client.close()
此示例展示了如何配置否定确认。
from pulsar import Client, schema
client = Client('pulsar://localhost:6650')
consumer = client.subscribe('negative_acks','test',schema=schema.StringSchema())
producer = client.create_producer('negative_acks',schema=schema.StringSchema())
for i in range(10):
print('send msg "hello-%d"' % i)
producer.send_async('hello-%d' % i, callback=None)
producer.flush()
for i in range(10):
msg = consumer.receive()
consumer.negative_acknowledge(msg)
print('receive and nack msg "%s"' % msg.data())
for i in range(10):
msg = consumer.receive()
consumer.acknowledge(msg)
print('receive and ack msg "%s"' % msg.data())
try:
# No more messages expected
msg = consumer.receive(100)
except:
print("no more msg")
pass
创建一个阅读器
你可以使用Pulsar Python API来使用Pulsar reader interface。这里有一个示例:
# MessageId taken from a previously fetched message
msg_id = msg.message_id()
reader = client.create_reader('my-topic', msg_id)
while True:
msg = reader.read_next()
print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
# No acknowledgment
使用模式
支持的架构类型
你可以在Pulsar中使用不同的内置模式类型。所有的定义都在pulsar.schema包中。
| 模式 | 备注 |
|---|---|
BytesSchema | 获取原始负载作为bytes对象。不执行序列化/反序列化。这是默认的模式 |
StringSchema | 将有效载荷编码/解码为UTF-8字符串。使用str对象 |
JsonSchema | 需要记录定义。将记录序列化为标准JSON有效载荷 |
AvroSchema | 需要记录定义。以AVRO格式序列化 |
模式定义参考
模式定义是通过继承自pulsar.schema.Record的类完成的。
这个类有许多字段,这些字段可以是pulsar.schema.Field类型或另一个嵌套的Record。所有字段都在pulsar.schema包中指定。这些字段与AVRO字段类型匹配。
| 字段类型 | Python 类型 | 备注 |
|---|---|---|
Boolean | bool | |
Integer | int | |
Long | int | |
Float | float | |
Double | float | |
Bytes | bytes | |
String | str | |
Array | list | 需要为项目指定记录类型。 |
Map | dict | 键始终为String。需要指定值类型。 |
此外,任何Python Enum类型都可以用作有效的字段类型。
字段参数
添加字段时,您可以在构造函数中使用这些参数。
| 参数 | 默认值 | 备注 |
|---|---|---|
default | None | 为字段设置默认值,例如 a = Integer(default=5)。 |
required | False | 将字段标记为“必填”。它会在模式中相应地设置。 |
模式定义示例
简单定义
class Example(Record):
a = String()
b = Integer()
c = Array(String())
i = Map(String())
使用枚举
from enum import Enum
class Color(Enum):
red = 1
green = 2
blue = 3
class Example(Record):
name = String()
color = Color
复杂类型
class MySubRecord(Record):
x = Integer()
y = Long()
z = String()
class Example(Record):
a = String()
sub = MySubRecord()
为Avro模式设置命名空间
使用特殊字段_avro_namespace设置Avro记录模式的命名空间。
class NamespaceDemo(Record):
_avro_namespace = 'xxx.xxx.xxx'
x = String()
y = Integer()
模式定义如下。
{
"name": "NamespaceDemo", "namespace": "xxx.xxx.xxx", "type": "record", "fields": [
{"name": "x", "type": ["null", "string"]},
{"name": "y", "type": ["null", "int"]}
]
}
声明并验证模式
在创建生产者之前,Pulsar broker 会验证现有主题模式是否为正确类型,并且格式是否与类的模式定义兼容。如果主题模式的格式与模式定义不兼容,则在创建生产者时会发生异常。
一旦生产者以某种模式定义创建,它只接受声明模式类实例的对象。
同样,对于消费者或读者来说,消费者返回的是一个对象(这是模式记录类的一个实例),而不是原始字节。
示例
consumer = client.subscribe(
topic='my-topic',
subscription_name='my-subscription',
schema=AvroSchema(Example) )
while True:
msg = consumer.receive()
ex = msg.value()
try:
print("Received message a={} b={} c={}".format(ex.a, ex.b, ex.c))
# Acknowledge successful processing of the message
consumer.acknowledge(msg)
except Exception:
# Message failed to be processed
consumer.negative_acknowledge(msg)
更多代码示例,请参见Schema - 入门。