基类: AbstractMessageQueue
用于消息队列的Redis集成。
该类利用Redis的发布/订阅功能实现消息分发。
示例:
from llama_deploy.message_queues.redis import RedisMessageQueue
message_queue = RedisMessageQueue() # uses the default url
Source code in llama_deploy/message_queues/redis.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 | class RedisMessageQueue(AbstractMessageQueue):
"""Redis integration for message queue.
This class uses Redis Pub/Sub functionality for message distribution.
Examples:
```python
from llama_deploy.message_queues.redis import RedisMessageQueue
message_queue = RedisMessageQueue() # uses the default url
```
"""
def __init__(self, config: RedisMessageQueueConfig | None = None) -> None:
self._config = config or RedisMessageQueueConfig()
self._consumers: dict[str, RedisConsumerMetadata] = {}
try:
from redis.asyncio import Redis
self._redis: Redis = Redis.from_url(self._config.url)
except ImportError:
msg = "Missing redis optional dependency. Please install by running `pip install llama-deploy[redis]`."
raise ValueError(msg)
async def _publish(self, message: QueueMessage, topic: str) -> Any:
"""Publish message to the Redis channel."""
message_json = json.dumps(message.model_dump())
result = await self._redis.publish(topic, message_json)
logger.info(
f"Published message {message.id_} to topic {topic} with {result} subscribers"
)
return result
async def register_consumer(
self, consumer: BaseMessageQueueConsumer, topic: str
) -> StartConsumingCallable:
"""Register a new consumer."""
if consumer.id_ in self._consumers:
logger.debug(
f"Consumer {consumer.id_} already registered for topic {topic}",
)
return self._consumers[consumer.id_].start_consuming_callable
pubsub = self._redis.pubsub()
await pubsub.subscribe(topic)
async def start_consuming_callable() -> None:
"""StartConsumingCallable.
Consumer of this queue should call this in order to start consuming.
"""
try:
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message:
decoded_message = json.loads(message["data"])
queue_message = QueueMessage.model_validate(decoded_message)
await consumer.process_message(queue_message)
await asyncio.sleep(0.01)
finally:
return
logger.info(
f"Registered consumer {consumer.id_} for topic {topic}",
)
self._consumers[consumer.id_] = RedisConsumerMetadata(
message_type=consumer.message_type,
start_consuming_callable=start_consuming_callable,
pubsub=pubsub,
topic=topic,
)
return start_consuming_callable
async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
"""Deregister a consumer."""
consumer_metadata = self._consumers.pop(consumer.id_, None)
if consumer_metadata is not None:
await consumer_metadata.pubsub.unsubscribe(consumer_metadata.topic)
logger.info(
f"Deregistered consumer {consumer.id_} for topic {consumer_metadata.topic}",
)
async def cleanup(self, *args: Any, **kwargs: dict[str, Any]) -> None:
"""Perform any cleanup before shutting down."""
for consumer_metadata in self._consumers.values():
if consumer_metadata.pubsub:
await consumer_metadata.pubsub.unsubscribe()
await consumer_metadata.pubsub.aclose()
# Clear consumers
self._consumers = {}
# Close main Redis connection
await self._redis.aclose() # type: ignore # mypy doesn't see the async method for some reason
def as_config(self) -> BaseModel:
return self._config
|
register_consumer
async
注册一个新消费者。
Source code in llama_deploy/message_queues/redis.py
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 | async def register_consumer(
self, consumer: BaseMessageQueueConsumer, topic: str
) -> StartConsumingCallable:
"""Register a new consumer."""
if consumer.id_ in self._consumers:
logger.debug(
f"Consumer {consumer.id_} already registered for topic {topic}",
)
return self._consumers[consumer.id_].start_consuming_callable
pubsub = self._redis.pubsub()
await pubsub.subscribe(topic)
async def start_consuming_callable() -> None:
"""StartConsumingCallable.
Consumer of this queue should call this in order to start consuming.
"""
try:
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message:
decoded_message = json.loads(message["data"])
queue_message = QueueMessage.model_validate(decoded_message)
await consumer.process_message(queue_message)
await asyncio.sleep(0.01)
finally:
return
logger.info(
f"Registered consumer {consumer.id_} for topic {topic}",
)
self._consumers[consumer.id_] = RedisConsumerMetadata(
message_type=consumer.message_type,
start_consuming_callable=start_consuming_callable,
pubsub=pubsub,
topic=topic,
)
return start_consuming_callable
|
deregister_consumer
async
注销一个消费者。
Source code in llama_deploy/message_queues/redis.py
129
130
131
132
133
134
135
136 | async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
"""Deregister a consumer."""
consumer_metadata = self._consumers.pop(consumer.id_, None)
if consumer_metadata is not None:
await consumer_metadata.pubsub.unsubscribe(consumer_metadata.topic)
logger.info(
f"Deregistered consumer {consumer.id_} for topic {consumer_metadata.topic}",
)
|
清理
async
cleanup(*args: Any, **kwargs: dict[str, Any]) -> None
在关闭前执行任何清理操作。
Source code in llama_deploy/message_queues/redis.py
138
139
140
141
142
143
144
145
146
147
148
149 | async def cleanup(self, *args: Any, **kwargs: dict[str, Any]) -> None:
"""Perform any cleanup before shutting down."""
for consumer_metadata in self._consumers.values():
if consumer_metadata.pubsub:
await consumer_metadata.pubsub.unsubscribe()
await consumer_metadata.pubsub.aclose()
# Clear consumers
self._consumers = {}
# Close main Redis connection
await self._redis.aclose() # type: ignore # mypy doesn't see the async method for some reason
|