跳至内容

redis#

Redis消息队列。

Redis消息队列配置 #

基础类: BaseSettings

Redis消息队列配置。

参数:

名称 类型 描述 默认值
type Literal[str]
'redis'
url str
'redis://localhost:6379'
host str | None
None
port int | None
None
db int | None
None
username str | None
None
password str | None
None
ssl bool | None
None
Source code in llama_deploy/message_queues/redis.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class RedisMessageQueueConfig(BaseSettings):
    """Redis message queue configuration."""

    model_config = SettingsConfigDict(env_prefix="REDIS_")

    type: Literal["redis"] = Field(default="redis", exclude=True)
    url: str = "redis://localhost:6379"
    host: str | None = None
    port: int | None = None
    db: int | None = None
    username: str | None = None
    password: str | None = None
    ssl: bool | None = None

    def model_post_init(self, __context: Any) -> None:
        if self.host and self.port:
            scheme = "rediss" if self.ssl else "redis"
            auth = (
                f"{self.username}:{self.password}@"
                if self.username and self.password
                else ""
            )
            self.url = f"{scheme}://{auth}{self.host}:{self.port}/{self.db or ''}"

Redis消费者元数据 #

基类: BaseModel

参数:

名称 类型 描述 默认值
message_type str
required
start_consuming_callable Callable[..., Coroutine[Any, Any, None]]
required
pubsub Any
None
topic str
required
Source code in llama_deploy/message_queues/redis.py
46
47
48
49
50
class RedisConsumerMetadata(BaseModel):
    message_type: str
    start_consuming_callable: StartConsumingCallable
    pubsub: Any = None
    topic: str

Redis消息队列 #

基类: AbstractMessageQueue

用于消息队列的Redis集成。

该类利用Redis的发布/订阅功能实现消息分发。

示例:

from llama_deploy.message_queues.redis import RedisMessageQueue

message_queue = RedisMessageQueue()  # uses the default url
Source code in llama_deploy/message_queues/redis.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class RedisMessageQueue(AbstractMessageQueue):
    """Redis integration for message queue.

    This class uses Redis Pub/Sub functionality for message distribution.

    Examples:
        ```python
        from llama_deploy.message_queues.redis import RedisMessageQueue

        message_queue = RedisMessageQueue()  # uses the default url
        ```
    """

    def __init__(self, config: RedisMessageQueueConfig | None = None) -> None:
        self._config = config or RedisMessageQueueConfig()
        self._consumers: dict[str, RedisConsumerMetadata] = {}

        try:
            from redis.asyncio import Redis

            self._redis: Redis = Redis.from_url(self._config.url)
        except ImportError:
            msg = "Missing redis optional dependency. Please install by running `pip install llama-deploy[redis]`."
            raise ValueError(msg)

    async def _publish(self, message: QueueMessage, topic: str) -> Any:
        """Publish message to the Redis channel."""
        message_json = json.dumps(message.model_dump())
        result = await self._redis.publish(topic, message_json)
        logger.info(
            f"Published message {message.id_} to topic {topic} with {result} subscribers"
        )
        return result

    async def register_consumer(
        self, consumer: BaseMessageQueueConsumer, topic: str
    ) -> StartConsumingCallable:
        """Register a new consumer."""
        if consumer.id_ in self._consumers:
            logger.debug(
                f"Consumer {consumer.id_} already registered for topic {topic}",
            )
            return self._consumers[consumer.id_].start_consuming_callable

        pubsub = self._redis.pubsub()
        await pubsub.subscribe(topic)

        async def start_consuming_callable() -> None:
            """StartConsumingCallable.

            Consumer of this queue should call this in order to start consuming.
            """
            try:
                while True:
                    message = await pubsub.get_message(ignore_subscribe_messages=True)
                    if message:
                        decoded_message = json.loads(message["data"])
                        queue_message = QueueMessage.model_validate(decoded_message)
                        await consumer.process_message(queue_message)
                    await asyncio.sleep(0.01)
            finally:
                return

        logger.info(
            f"Registered consumer {consumer.id_} for topic {topic}",
        )

        self._consumers[consumer.id_] = RedisConsumerMetadata(
            message_type=consumer.message_type,
            start_consuming_callable=start_consuming_callable,
            pubsub=pubsub,
            topic=topic,
        )

        return start_consuming_callable

    async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
        """Deregister a consumer."""
        consumer_metadata = self._consumers.pop(consumer.id_, None)
        if consumer_metadata is not None:
            await consumer_metadata.pubsub.unsubscribe(consumer_metadata.topic)
            logger.info(
                f"Deregistered consumer {consumer.id_} for topic {consumer_metadata.topic}",
            )

    async def cleanup(self, *args: Any, **kwargs: dict[str, Any]) -> None:
        """Perform any cleanup before shutting down."""
        for consumer_metadata in self._consumers.values():
            if consumer_metadata.pubsub:
                await consumer_metadata.pubsub.unsubscribe()
                await consumer_metadata.pubsub.aclose()

        # Clear consumers
        self._consumers = {}

        # Close main Redis connection
        await self._redis.aclose()  # type: ignore  # mypy doesn't see the async method for some reason

    def as_config(self) -> BaseModel:
        return self._config

register_consumer async #

register_consumer(consumer: BaseMessageQueueConsumer, topic: str) -> StartConsumingCallable

注册一个新消费者。

Source code in llama_deploy/message_queues/redis.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
async def register_consumer(
    self, consumer: BaseMessageQueueConsumer, topic: str
) -> StartConsumingCallable:
    """Register a new consumer."""
    if consumer.id_ in self._consumers:
        logger.debug(
            f"Consumer {consumer.id_} already registered for topic {topic}",
        )
        return self._consumers[consumer.id_].start_consuming_callable

    pubsub = self._redis.pubsub()
    await pubsub.subscribe(topic)

    async def start_consuming_callable() -> None:
        """StartConsumingCallable.

        Consumer of this queue should call this in order to start consuming.
        """
        try:
            while True:
                message = await pubsub.get_message(ignore_subscribe_messages=True)
                if message:
                    decoded_message = json.loads(message["data"])
                    queue_message = QueueMessage.model_validate(decoded_message)
                    await consumer.process_message(queue_message)
                await asyncio.sleep(0.01)
        finally:
            return

    logger.info(
        f"Registered consumer {consumer.id_} for topic {topic}",
    )

    self._consumers[consumer.id_] = RedisConsumerMetadata(
        message_type=consumer.message_type,
        start_consuming_callable=start_consuming_callable,
        pubsub=pubsub,
        topic=topic,
    )

    return start_consuming_callable

deregister_consumer async #

deregister_consumer(consumer: BaseMessageQueueConsumer) -> Any

注销一个消费者。

Source code in llama_deploy/message_queues/redis.py
129
130
131
132
133
134
135
136
async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any:
    """Deregister a consumer."""
    consumer_metadata = self._consumers.pop(consumer.id_, None)
    if consumer_metadata is not None:
        await consumer_metadata.pubsub.unsubscribe(consumer_metadata.topic)
        logger.info(
            f"Deregistered consumer {consumer.id_} for topic {consumer_metadata.topic}",
        )

清理 async #

cleanup(*args: Any, **kwargs: dict[str, Any]) -> None

在关闭前执行任何清理操作。

Source code in llama_deploy/message_queues/redis.py
138
139
140
141
142
143
144
145
146
147
148
149
async def cleanup(self, *args: Any, **kwargs: dict[str, Any]) -> None:
    """Perform any cleanup before shutting down."""
    for consumer_metadata in self._consumers.values():
        if consumer_metadata.pubsub:
            await consumer_metadata.pubsub.unsubscribe()
            await consumer_metadata.pubsub.aclose()

    # Clear consumers
    self._consumers = {}

    # Close main Redis connection
    await self._redis.aclose()  # type: ignore  # mypy doesn't see the async method for some reason
优云智算