XREAD
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- Available since:
- 5.0.0
- Time complexity:
- ACL categories:
-
@read
,@stream
,@slow
,@blocking
,
从一个或多个流中读取数据,仅返回ID大于调用者报告的最后一个接收ID的条目。
此命令有一个选项,如果项目不可用,则可以阻塞,类似于BRPOP
或BZPOPMIN
等。
请注意,在阅读此页面之前,如果您是流的新手,我们建议您阅读我们的Redis流介绍。
非阻塞用法
如果未使用BLOCK选项,该命令是同步的,可以认为与XRANGE
有些相关:它将返回流中的一系列项目,然而即使我们只考虑同步使用,它与XRANGE
相比有两个根本区别:
- 如果我们想同时从多个键读取数据,可以使用此命令调用多个流。这是
XREAD
的一个关键特性,特别是在使用BLOCK进行阻塞时,能够通过单个连接监听多个键是一个至关重要的功能。 - 虽然
XRANGE
返回的是ID范围内的条目,但XREAD
更适合用于从我们迄今为止看到的任何条目都大的第一个条目开始消费流。因此,我们传递给XREAD
的是,对于每个流,我们从中接收到的最后一个元素的ID。
例如,如果我有两个流 mystream
和 writers
,并且我想从它们包含的第一个元素开始读取数据,我可以像下面的示例中那样调用 XREAD
。
注意:我们在示例中使用了COUNT选项,以便对于每个流,调用将返回每个流最多两个元素。
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
2) 1) 1) 1526984818136-0
2) 1) "duration"
2) "1532"
3) "event-id"
4) "5"
5) "user-id"
6) "7782813"
2) 1) 1526999352406-0
2) 1) "duration"
2) "812"
3) "event-id"
4) "9"
5) "user-id"
6) "388234"
2) 1) "writers"
2) 1) 1) 1526985676425-0
2) 1) "name"
2) "Virginia"
3) "surname"
4) "Woolf"
2) 1) 1526985685298-0
2) 1) "name"
2) "Jane"
3) "surname"
4) "Austen"
STREAMS 选项是强制性的,并且必须是最后一个选项,因为该选项以以下格式获取可变长度的参数:
STREAMS key_1 key_2 key_3 ... key_N ID_1 ID_2 ID_3 ... ID_N
所以我们从一个键列表开始,然后继续处理所有相关的ID,这些ID代表我们从该流接收到的最后一个ID,这样调用将只为我们提供来自同一流的更大ID。
例如在上面的例子中,我们收到的流mystream
的最后一项的ID是1526999352406-0
,而流writers
的ID是1526985685298-0
。
要继续迭代这两个流,我将调用:
> XREAD COUNT 2 STREAMS mystream writers 1526999352406-0 1526985685298-0
1) 1) "mystream"
2) 1) 1) 1526999626221-0
2) 1) "duration"
2) "911"
3) "event-id"
4) "7"
5) "user-id"
6) "9488232"
2) 1) "writers"
2) 1) 1) 1526985691746-0
2) 1) "name"
2) "Toni"
3) "surname"
4) "Morrison"
2) 1) 1526985712947-0
2) 1) "name"
2) "Agatha"
3) "surname"
4) "Christie"
依此类推。最终,调用将不会返回任何项目,而只是一个空数组,然后我们知道从我们的流中已经没有更多内容可以获取了(因此我们必须重试操作,因此此命令也支持阻塞模式)。
不完整的ID
使用不完整的ID是有效的,就像对XRANGE
一样有效。然而,在这里,如果ID的序列部分缺失,它总是被解释为零,所以命令:
> XREAD COUNT 2 STREAMS mystream writers 0 0
完全等同于
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
数据阻塞
在其同步形式中,只要还有更多可用的项目,命令就可以获取新数据。然而,在某些时候,我们将不得不等待数据生产者使用XADD
将新条目推入我们正在消费的流中。为了避免以固定或自适应间隔进行轮询,如果根据指定的流和ID无法返回任何数据,命令能够阻塞,并在请求的键之一接受数据时自动解除阻塞。
重要的是要理解,这个命令会分发给所有等待相同ID范围的客户端,因此每个消费者都会获得数据的副本,这与使用阻塞列表弹出操作时发生的情况不同。
为了进行阻塞,使用BLOCK选项,以及我们希望在超时前阻塞的毫秒数。通常Redis的阻塞命令以秒为单位设置超时,然而这个命令以毫秒为单位设置超时,即使通常服务器的超时分辨率接近0.1秒。这次在某些使用场景中,可以阻塞更短的时间,如果服务器的内部机制随着时间的推移有所改进,超时的分辨率也可能会提高。
当传递了BLOCK命令,但至少在一个传递的流中有数据要返回时,该命令将同步执行就像BLOCK选项不存在一样。
这是一个阻塞调用的示例,其中命令稍后返回一个空回复,因为超时已过而没有新数据到达:
> XREAD BLOCK 1000 STREAMS mystream 1526999626221-0
(nil)
特殊的 $
ID。
在阻塞时,有时我们只想接收从我们开始阻塞的那一刻起通过XADD
添加到流中的条目。在这种情况下,我们对已经添加的条目的历史不感兴趣。对于这种用例,我们必须检查流的顶部元素ID,并在XREAD
命令行中使用该ID。这不够简洁并且需要调用其他命令,因此可以使用特殊的$
ID来向流发出信号,表示我们只想要新的内容。
理解这一点非常重要:你应该只在第一次调用XREAD
时使用$
ID。之后,ID应该是流中最后报告的项目的ID,否则你可能会错过中间添加的所有条目。
这是一个典型的XREAD
调用在消费者第一次迭代时只愿意消费新条目的样子:
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream $
一旦我们得到一些回复,下一次调用将会是这样的:
> XREAD BLOCK 5000 COUNT 100 STREAMS mystream 1526999644174-3
等等。
特殊的 +
ID
你可以使用XREVRANGE
命令轻松读取单个流中的最后一条记录,如下所示:
> XREVRANGE stream + - COUNT 1
但是,随着你添加更多的流,这种方法会变得很慢,因为你必须为每个流发出一个单独的命令。
相反,从Redis 7.4开始,你可以使用+
符号作为特殊ID。
这请求流中最后可用的条目。例如:
> XREAD STREAMS streamA streamB streamC streamD + + + +
请注意,当使用此特殊ID作为流时,COUNT选项将被忽略(对于特定流),因为只能返回最后一个条目。
多个客户端在单个流上被阻塞时如何提供服务
对列表或有序集合的阻塞列表操作具有弹出行为。基本上,元素会从列表或有序集合中移除,以便返回给客户端。在这种情况下,您希望根据客户端在给定键上阻塞的时刻来公平地消费这些项目。通常,Redis在这种使用情况下使用FIFO语义。
然而请注意,对于流来说这不是问题:当客户端被服务时,流条目不会从流中移除,因此每个等待的客户端将在XADD
命令向流提供数据时立即得到服务。
强烈建议阅读Redis Streams介绍,以便更好地理解流的整体行为和语义。
RESP2 回复
以下之一:
- Array reply: 一个数组,其中每个元素都是一个由两个元素组成的数组,包含键名和为该键报告的条目。报告的条目是完整的流条目,具有ID以及所有字段和值的列表。字段和值保证按照
XADD
添加的顺序报告。 - Nil reply: 如果给出了BLOCK选项并且发生超时,或者如果没有可以服务的流。
RESP3 回复
以下之一:
- Map reply: 一个键值元素的映射,其中每个元素由键名和为该键报告的条目组成。报告的条目是完整的流条目,包含ID和所有字段及值的列表。字段和值的报告顺序保证与
XADD
添加的顺序相同。 - Null reply: 如果给出了BLOCK选项并且发生超时,或者如果没有可以服务的流。