XREADGROUP
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
- Available since:
- 5.0.0
- Time complexity:
- For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.
- ACL categories:
-
@write
,@stream
,@slow
,@blocking
,
XREADGROUP
命令是 XREAD
命令的一个特殊版本,支持消费者组。在阅读本页之前,您可能需要先理解 XREAD
命令。
此外,如果您是流的新手,我们建议您阅读我们的 Redis流介绍。 确保在介绍中理解消费者组的概念, 这样理解这个命令的工作原理会更简单。
30秒了解消费者组
此命令与普通的XREAD
之间的区别在于,此命令支持消费者组。
如果没有消费者组,仅使用XREAD
,所有客户端都会接收到流中的所有条目。而使用XREADGROUP
的消费者组,可以创建消费不同部分消息的客户端组。例如,如果流中接收到新条目A、B和C,并且有两个消费者通过消费者组读取,一个客户端可能会接收到消息A和C,而另一个客户端则接收到消息B,以此类推。
在消费者组中,给定的消费者(即仅从流中消费消息的客户端)必须使用唯一的消费者名称来标识。这只是一个字符串。
消费者组的一个保证是,给定的消费者只能看到传递给它的消息历史,因此一条消息只有一个所有者。然而,有一个称为消息认领的特殊功能,允许其他消费者在某些消费者发生不可恢复的故障时认领消息。为了实现这种语义,消费者组需要通过XACK
命令明确确认消费者成功处理的消息。这是必要的,因为流将为每个消费者组跟踪谁在处理什么消息。
这是如何理解你是否想要使用消费者组:
- 如果你有一个流和多个客户端,并且你希望所有客户端都能接收到所有消息,那么你不需要消费者组。
- 如果你有一个流和多个客户端,并且你希望流在客户端之间进行分区或分片,以便每个客户端都能获取到流中到达消息的一个子集,那么你需要一个消费者组。
XREAD 和 XREADGROUP 之间的区别
从语法的角度来看,命令几乎相同,然而XREADGROUP
需要一个特殊且强制的选项:
GROUP <group-name> <consumer-name>
组名只是与流相关联的消费者组的名称。
该组是使用XGROUP
命令创建的。消费者名称是
客户端用于在组内标识自己的字符串。
消费者在第一次被看到时会在消费者组内自动创建。
不同的客户端应选择不同的消费者名称。
当你使用XREADGROUP
读取时,服务器会记住某个消息已经传递给你:该消息将被存储在消费者组内的待处理条目列表(PEL)中,这是一个已传递但尚未确认的消息ID列表。
客户端必须使用XACK
来确认消息处理,以便将待处理条目从PEL中移除。可以使用XPENDING
命令来检查PEL。
NOACK
子命令可以在不需要可靠性且偶尔的消息丢失是可接受的情况下使用,以避免将消息添加到PEL中。这相当于在读取消息时确认消息。
在使用XREADGROUP
时,STREAMS选项中指定的ID可以是以下两种之一:
- 特殊的
>
ID,表示消费者只想接收从未被任何其他消费者接收过的消息。这仅仅意味着,给我新的消息。 - 任何其他ID,即0或任何其他有效ID或不完整的ID(仅毫秒时间部分),将具有返回待处理条目的效果,这些条目是待消费者发送的命令,其ID大于提供的ID。因此,基本上如果ID不是
>
,那么该命令将仅让客户端访问其待处理条目:已传递给它但尚未确认的消息。请注意,在这种情况下,BLOCK
和NOACK
都被忽略。
与XREAD
类似,XREADGROUP
命令也可以以阻塞方式使用。在这方面没有区别。
消息传递给消费者时会发生什么?
两件事:
- 如果消息从未被传递给任何人,也就是说,如果我们谈论的是一个新消息,那么会创建一个PEL(待处理条目列表)。
- 如果消息已经传递给这个消费者,并且只是再次重新获取相同的消息,那么最后传递计数器将更新为当前时间,并且传递次数将增加一次。您可以使用
XPENDING
命令访问这些消息属性。
使用示例
通常你使用这样的命令来获取新消息并处理它们。用伪代码表示:
WHILE true
entries = XREADGROUP GROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >
if entries == nil
puts "Timeout... try again"
CONTINUE
end
FOREACH entries AS stream_entries
FOREACH stream_entries as message
process_message(message.id,message.fields)
# ACK the message as processed
XACK mystream $GroupName message.id
END
END
END
通过这种方式,示例消费者代码将仅获取新消息,处理它们,并通过XACK
进行确认。然而,上面的示例代码并不完整,因为它没有处理崩溃后的恢复。如果我们在处理消息的过程中崩溃,会发生什么情况呢?我们的消息将保留在待处理条目列表中,因此我们可以通过最初给XREADGROUP
一个ID为0来访问我们的历史记录,并执行相同的循环。一旦提供一个ID为0的回复是一个空的消息集,我们就知道我们已经处理并确认了所有待处理的消息:我们可以开始使用>
作为ID,以获取新消息并重新加入正在处理新事物的消费者。
要查看命令的实际回复,请查看XREAD
命令页面。
当待处理的消息被删除时会发生什么?
由于修剪或随时显式调用XDEL
,可能会从流中删除条目。
根据设计,Redis不会阻止删除流中PELs中的条目。
当这种情况发生时,PELs会保留已删除条目的ID,但实际的条目内容不再可用。
因此,当读取此类PEL条目时,Redis将返回空值代替它们各自的数据。
示例:
> XADD mystream 1 myfield mydata
"1-0"
> XGROUP CREATE mystream mygroup 0
OK
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1-0"
2) 1) "myfield"
2) "mydata"
> XDEL mystream 1-0
(integer) 1
> XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0
1) 1) "mystream"
2) 1) 1) "1-0"
2) (nil)
强烈建议阅读Redis Streams介绍,以便更好地理解流的整体行为和语义。
RESP2 回复
以下之一:
- Array reply: 一个数组,其中每个元素都是一个由两个元素组成的数组,包含键名和为该键报告的条目。报告的条目是完整的流条目,具有ID以及所有字段和值的列表。字段和值保证按照
XADD
添加的顺序报告。 - Nil reply: 如果给出了BLOCK选项并且发生超时,或者如果没有可以服务的流。
RESP3 回复
以下之一:
- Map reply: 一个键值元素的映射,其中每个元素由键名和为该键报告的条目组成。报告的条目是完整的流条目,包含ID以及所有字段和值的列表。字段和值保证按照
XADD
添加的顺序报告。 - Null reply: 如果给出了BLOCK选项并且发生超时,或者如果没有可以服务的流。