Redis 流

Redis流介绍

Redis流是一种数据结构,其行为类似于仅追加日志,但也实现了多种操作以克服典型仅追加日志的一些限制。这些操作包括O(1)时间内的随机访问和复杂的消费策略,例如消费者组。 您可以使用流来实时记录并同时分发事件。 Redis流的用例示例包括:

  • 事件溯源(例如,跟踪用户操作、点击等)
  • 传感器监控(例如,来自现场设备的读数)
  • 通知(例如,将每个用户的通知记录存储在单独的流中)

Redis为每个流条目生成一个唯一的ID。 您可以使用这些ID稍后检索它们相关的条目,或者读取和处理流中的所有后续条目。请注意,由于这些ID与时间相关,此处显示的ID可能会有所不同,并且与您在自己的Redis实例中看到的ID不同。

Redis 流支持多种修剪策略(以防止流无限增长)和多种消费策略(参见 XREAD, XREADGROUP, 和 XRANGE)。

基本命令

  • XADD 向流中添加一个新条目。
  • XREAD 读取一个或多个条目,从给定位置开始并随时间向前移动。
  • XRANGE 返回两个提供的条目ID之间的条目范围。
  • XLEN 返回流的长度。

查看完整的流命令列表

示例

  • When our racers pass a checkpoint, we add a stream entry for each racer that includes the racer's name, speed, position, and location ID:

  • Read two stream entries starting at ID 1692632086370-0:

  • Read up to 100 new stream entries, starting at the end of the stream, and block for up to 300 ms if no entries are being written:

性能

向流中添加一个条目的时间复杂度是O(1)。 访问任何单个条目的时间复杂度是O(n),其中n是ID的长度。 由于流ID通常较短且长度固定,这实际上减少为常数时间查找。 有关原因的详细信息,请注意流是作为基数树实现的。

简单来说,Redis流提供了高效的插入和读取功能。 详情请参见每个命令的时间复杂度。

流基础

流是一种仅追加的数据结构。基本的写入命令,称为XADD,将新条目追加到指定的流中。

每个流条目由一个或多个字段-值对组成,有点像字典或Redis哈希:

上述对XADD命令的调用将条目rider: Castilla, speed: 29.9, position: 1, location_id: 2添加到键race:france的流中,使用自动生成的条目ID,即命令返回的ID,具体为1692632147973-0。它的第一个参数是键名race:france,第二个参数是标识流中每个条目的条目ID。然而,在这种情况下,我们传递了*,因为我们希望服务器为我们生成一个新的ID。每个新ID都会单调递增,因此更简单地说,每个新添加的条目都将具有比所有过去条目更高的ID。服务器自动生成ID几乎总是您想要的,而明确指定ID的原因非常罕见。我们稍后会详细讨论这一点。每个流条目都有一个ID的事实是另一个与日志文件的相似之处,其中行号或文件内的字节偏移量可用于标识给定条目。回到我们的XADD示例,在键名和ID之后,接下来的参数是组成我们流条目的字段-值对。

可以使用XLEN命令来获取Stream中的项目数量:

条目ID

XADD命令返回的条目ID,用于唯一标识给定流中的每个条目,由两部分组成:

<millisecondsTime>-<sequenceNumber>

毫秒时间部分实际上是生成流ID的本地Redis节点中的本地时间,但是如果当前毫秒时间恰好小于前一个条目的时间,则使用前一个条目的时间,因此如果时钟向后跳转,单调递增的ID属性仍然保持。序列号用于在同一毫秒内创建的条目。由于序列号是64位宽的,实际上在同一毫秒内可以生成的条目数量没有限制。

这种ID的格式一开始可能看起来很奇怪,细心的读者可能会想知道为什么时间是ID的一部分。原因是Redis流支持按ID进行范围查询。由于ID与条目生成的时间相关,这基本上可以免费查询时间范围。我们将在介绍XRANGE命令时很快看到这一点。

如果出于某种原因,用户需要与时间无关但实际与另一个外部系统ID相关联的增量ID,如前所述,XADD命令可以接受一个显式ID,而不是触发自动生成的*通配符ID,如下例所示:

请注意,在这种情况下,最小ID为0-1,并且该命令不会接受等于或小于前一个ID的ID:

如果您正在运行 Redis 7 或更高版本,您还可以提供一个仅包含毫秒部分的显式 ID。在这种情况下,ID 的序列部分将自动生成。为此,请使用以下语法:

从流中获取数据

现在我们终于能够通过XADD在流中追加条目。然而,虽然向流中追加数据非常直观,但查询流以提取数据的方式并不那么直观。如果我们继续用日志文件的类比,一个显而易见的方法是模仿我们通常使用Unix命令tail -f的方式,也就是说,我们可以开始监听以获取追加到流中的新消息。请注意,与Redis的阻塞列表操作不同,在阻塞列表操作中,给定的元素将到达一个正在执行弹出式操作(如BLPOP)的单个客户端,而在流中,我们希望多个消费者能够看到追加到流中的新消息(就像许多tail -f进程可以看到添加到日志中的内容一样)。使用传统术语,我们希望流能够扇出消息到多个客户端。

然而,这仅仅是一种潜在的访问模式。我们也可以以一种截然不同的方式看待流:不是作为消息系统,而是作为时间序列存储。在这种情况下,获取新追加的消息可能也有用,但另一种自然的查询模式是按时间范围获取消息,或者使用游标迭代消息以逐步检查所有历史记录。这无疑是另一种有用的访问模式。

最后,如果从消费者的角度来看待流,我们可能希望以另一种方式访问流,即作为一个可以被分区到多个处理这些消息的消费者的消息流,这样消费者组只能看到单个流中到达的消息的一个子集。通过这种方式,可以在不同的消费者之间扩展消息处理,而不需要单个消费者处理所有消息:每个消费者只会得到不同的消息来处理。这基本上就是Kafka (TM) 通过消费者组所做的。通过消费者组读取消息是从Redis流中读取的另一种有趣的方式。

Redis Streams 通过不同的命令支持上述所有三种查询模式。接下来的部分将展示所有这些模式,从最简单和直接使用的范围查询开始。

按范围查询:XRANGE 和 XREVRANGE

要按范围查询流,我们只需要指定两个ID,startend。返回的范围将包括具有start或end作为ID的元素,因此范围是包含的。两个特殊ID -+ 分别表示可能的最小和最大ID。

每个返回的条目都是一个包含两个项目的数组:ID和字段-值对列表。我们已经说过,条目ID与时间有关,因为-字符左侧的部分是创建流条目时本地节点的Unix时间(以毫秒为单位)(但请注意,流是通过完全指定的XADD命令复制的,因此副本将与主节点具有相同的ID)。这意味着我可以使用XRANGE查询一个时间范围。然而,为了做到这一点,我可能希望省略ID的序列部分:如果省略,在范围的开始部分将假定为0,而在结束部分将假定为可用的最大序列号。这样,仅使用两个毫秒的Unix时间进行查询,我们就可以以包含的方式获取在该时间范围内生成的所有条目。例如,如果我想查询一个两毫秒的时间段,我可以使用:

在这个范围内,我只有一个条目。然而,在实际的数据集中,我可以查询几个小时的范围,或者在两毫秒内可能有许多项目,返回的结果可能非常庞大。因此,XRANGE 支持在末尾使用可选的 COUNT 选项。通过指定一个计数,我可以只获取前 N 个项目。如果我想要更多,我可以获取返回的最后一个 ID,将序列部分加一,然后再次查询。让我们在下面的例子中看看这一点。假设流 race:france 中填充了 4 个项目。为了开始我的迭代,每次命令获取 2 个项目,我从整个范围开始,但计数为 2。

要继续迭代接下来的两个项目,我必须选择返回的最后一个ID,即1692632094485-0,并为其添加前缀(。生成的独占范围区间,即本例中的(1692632094485-0,现在可以用作下一个XRANGE调用的新start参数:

既然我们已经从一个只有4个条目的流中检索了4个项目,如果我们尝试检索更多的项目,我们将得到一个空数组:

由于XRANGE的复杂度是O(log(N))来查找,然后O(M)来返回M个元素,当计数较小时,该命令具有对数时间复杂度,这意味着每次迭代的步骤都很快。因此,XRANGE也是事实上的流迭代器,并且不需要XSCAN命令。

命令 XREVRANGE 等同于 XRANGE,但返回的元素顺序相反,因此 XREVRANGE 的一个实际用途是检查流中的最后一个项目:

请注意,XREVRANGE 命令以相反的顺序接受 startstop 参数。

使用XREAD监听新项目

当我们不想通过范围访问流中的项目时,通常我们想要的是订阅到达流的新项目。这个概念可能与Redis Pub/Sub相关,在那里你订阅一个频道,或者与Redis阻塞列表相关,在那里你等待一个键获取新元素来获取,但在消费流的方式上有根本的区别:

  1. 一个流可以有多个客户端(消费者)等待数据。默认情况下,每个新项目将传递给每个消费者,这些消费者正在给定流中等待数据。这种行为与阻塞列表不同,在阻塞列表中,每个消费者将获得不同的元素。然而,向多个消费者分发的能力与发布/订阅类似。
  2. 虽然Pub/Sub消息是即发即忘的,并且无论如何都不会被存储,而使用阻塞列表时,当客户端接收到消息时,它会从列表中弹出(实际上被移除),流的工作方式则完全不同。所有消息都会无限期地附加到流中(除非用户明确要求删除条目):不同的消费者将通过记住接收到的最后一条消息的ID来知道从他们的角度来看什么是新消息。
  3. Streams Consumer Groups 提供了一种控制级别,这是 Pub/Sub 或阻塞列表无法实现的,对于同一个流有不同的组,明确确认已处理的项目,能够检查待处理的项目,认领未处理的消息,以及每个客户端一致的历史可见性,即只能看到其私有的过去消息历史。

提供监听新消息到达流的能力的命令被称为XREAD。它比XRANGE稍微复杂一些,因此我们将从展示简单的形式开始,稍后将提供完整的命令布局。

以上是XREAD的非阻塞形式。请注意,COUNT选项不是强制性的,实际上该命令的唯一强制性选项是STREAMS选项,它指定了一个键列表以及调用消费者已经看到的每个流的相应最大ID,以便该命令仅向客户端提供ID大于我们指定的消息。

在上述命令中,我们写了STREAMS race:france 0,因此我们希望获取流race:france中所有ID大于0-0的消息。正如你在上面的示例中所看到的,该命令返回了键名,因为实际上可以调用此命令同时从多个流中读取数据。例如,我可以写:STREAMS race:france race:italy 0 0。请注意,在STREAMS选项之后,我们需要提供键名,然后是ID。因此,STREAMS选项必须始终是最后一个选项。任何其他选项必须在STREAMS选项之前。

除了XREAD可以同时访问多个流,并且我们可以指定我们拥有的最后一个ID以仅获取新消息之外,在这种简单形式下,该命令与XRANGE相比并没有太大的不同。然而,有趣的部分是,我们可以通过指定BLOCK参数,轻松地将XREAD变成一个阻塞命令

> XREAD BLOCK 0 STREAMS race:france $

请注意,在上面的例子中,除了移除COUNT,我还指定了新的BLOCK选项,超时时间为0毫秒(这意味着永不超时)。此外,我没有为流mystream传递一个普通的ID,而是传递了特殊的ID$。这个特殊的ID意味着XREAD应该使用流mystream中已存储的最大ID作为最后一个ID,这样我们将只接收到从我们开始监听时起的消息。这在某种程度上类似于Unix命令tail -f

请注意,当使用BLOCK选项时,我们不必使用特殊的ID $。我们可以使用任何有效的ID。如果命令能够立即满足我们的请求而不阻塞,它就会这样做,否则它将阻塞。通常,如果我们想从新条目开始消费流,我们从ID $开始,然后继续使用接收到的最后一条消息的ID来进行下一次调用,依此类推。

XREAD的阻塞形式也能够监听多个流,只需指定多个键名即可。如果请求可以同步处理,因为至少有一个流的元素大于我们指定的相应ID,它将返回结果。否则,命令将阻塞,并返回第一个接收到新数据的流的项目(根据指定的ID)。

与阻塞列表操作类似,从等待数据的客户端角度来看,阻塞流读取是公平的,因为语义是FIFO(先进先出)风格的。当有新项目可用时,第一个阻塞在给定流上的客户端将首先被解除阻塞。

XREAD 除了 COUNTBLOCK 之外没有其他选项,因此它是一个非常基础的命令,具有将消费者附加到一个或多个流的特定目的。使用消费者组 API 可以获得更强大的流消费功能,然而通过消费者组进行读取是通过一个名为 XREADGROUP 的不同命令实现的,这将在本指南的下一节中介绍。

消费者组

当手头的任务是从不同的客户端消费相同的流时,XREAD 已经提供了一种向 N 个客户端进行扇出的方式,可能还使用副本来提供更多的读取可扩展性。然而,在某些问题中,我们想要做的不是向多个客户端提供相同的消息流,而是从同一个流中向多个客户端提供不同的子集的消息。一个明显的用例是处理速度较慢的消息:能够有 N 个不同的工作器接收流的不同部分,使我们能够通过将不同的消息路由到准备做更多工作的不同工作器来扩展消息处理。

实际上,如果我们想象有三个消费者C1、C2、C3,以及一个包含消息1、2、3、4、5、6、7的流,那么我们想要的是根据以下图表来分发消息:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

为了实现这一点,Redis使用了一个称为消费者组的概念。从实现的角度来看,Redis消费者组与Kafka(TM)消费者组没有任何关系,这一点非常重要。然而,它们在功能上是相似的,所以我决定保留Kafka(TM)的术语,因为它最初普及了这个概念。

消费者组就像一个伪消费者,它从流中获取数据,实际上服务于多个消费者,提供某些保证:

  1. 每条消息都会被发送给不同的消费者,因此不可能将同一条消息发送给多个消费者。
  2. 消费者在消费者组内通过名称进行标识,这是一个区分大小写的字符串,实现消费者的客户端必须选择。这意味着即使断开连接后,流消费者组仍保留所有状态,因为客户端将再次声明为同一消费者。然而,这也意味着客户端需要提供一个唯一的标识符。
  3. 每个消费者组都有从未消费的第一个ID的概念,因此,当消费者请求新消息时,它可以只提供之前未传递的消息。
  4. 然而,消费消息需要使用特定命令进行显式确认。Redis 将确认解释为:此消息已正确处理,因此可以从消费者组中移除。
  5. 消费者组跟踪所有当前待处理的消息,即已传递给消费者组中某个消费者但尚未确认处理的消息。由于此功能,当访问流的消息历史时,每个消费者只会看到传递给它的消息

在某种程度上,消费者组可以被想象为关于流的某种状态量

+----------------------------------------+
| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+

如果你从这个角度来看,理解消费者组能做什么、它如何能够仅向消费者提供他们待处理消息的历史记录,以及请求新消息的消费者如何只会收到大于last_delivered_id的消息ID,就非常简单了。同时,如果你将消费者组视为Redis流的辅助数据结构,很明显,单个流可以有多个消费者组,这些消费者组拥有不同的消费者集合。实际上,同一个流甚至可以有客户端通过XREAD在没有消费者组的情况下读取,以及客户端通过XREADGROUP在不同的消费者组中读取。

现在是时候放大查看基本的消费者组命令了。它们如下:

  • XGROUP 用于创建、销毁和管理消费者组。
  • XREADGROUP 用于通过消费者组从流中读取数据。
  • XACK 是允许消费者将待处理消息标记为已正确处理的命令。

创建消费者组

假设我已经有一个类型为流的键 race:france,为了创建一个消费者组,我只需要执行以下操作:

正如你在上面的命令中所看到的,当创建消费者组时,我们必须指定一个ID,在示例中只是$。这是必要的,因为消费者组在其他状态中,必须知道在第一个消费者连接时接下来要提供什么消息,也就是说,当组刚创建时,最后的消息ID是什么。如果我们像我们做的那样提供$,那么从现在开始只有到达流中的新消息才会提供给组中的消费者。如果我们指定0,那么消费者组将从头开始消费流历史中的所有消息。当然,你可以指定任何其他有效的ID。你所知道的是,消费者组将开始传递大于你指定的ID的消息。因为$意味着流中当前最大的ID,指定$将产生只消费新消息的效果。

XGROUP CREATE 还支持在流不存在时自动创建流,使用可选的 MKSTREAM 子命令作为最后一个参数:

现在消费者组已经创建,我们可以立即尝试使用XREADGROUP命令通过消费者组读取消息。我们将从消费者那里读取,我们称之为Alice和Bob,看看系统如何向Alice或Bob返回不同的消息。

XREADGROUPXREAD 非常相似,并提供了相同的 BLOCK 选项,否则它是一个同步命令。然而,有一个必须的选项必须始终指定,即 GROUP,它有两个参数:消费者组的名称和尝试读取的消费者的名称。还支持 COUNT 选项,并且与 XREAD 中的选项相同。

我们将向比赛:italy流中添加骑手,并尝试使用消费者组读取一些内容: 注意:这里的rider是字段名称,name是关联的值。请记住,流项目是小字典。

XREADGROUP 的回复与 XREAD 的回复类似。但请注意上面提供的 GROUP 。它表示我想使用消费者组 mygroup 从流中读取数据,并且我是消费者 Alice。每次消费者使用消费者组执行操作时,都必须指定其名称,以唯一标识该消费者在组中的身份。

在上面的命令行中,还有一个非常重要的细节,在强制性的STREAMS选项之后,为键mystream请求的ID是特殊的ID>。这个特殊的ID仅在消费者组的上下文中有效,它的意思是:到目前为止从未传递给其他消费者的消息

这几乎总是你想要的,然而也可以指定一个真实的ID,例如0或任何其他有效的ID,在这种情况下,我们请求XREADGROUP只提供给我们待处理消息的历史记录,并且在这种情况下,永远不会看到组中的新消息。所以基本上XREADGROUP根据我们指定的ID有以下行为:

  • 如果ID是特殊ID >,则该命令将仅返回迄今为止从未传递给其他消费者的新消息,并且作为副作用,将更新消费者组的最后ID
  • 如果ID是任何其他有效的数字ID,那么该命令将允许我们访问我们的待处理消息的历史记录。也就是说,这些消息是传递给这个指定消费者(由提供的名称标识)的,并且到目前为止从未通过XACK进行确认。

我们可以立即通过指定ID为0来测试这种行为,而不使用任何COUNT选项:我们只会看到唯一一条待处理的消息,即关于Castilla的消息:

然而,如果我们确认消息已处理,它将不再属于待处理消息历史记录的一部分,因此系统将不再报告任何内容:

如果你还不知道XACK是如何工作的,不用担心,其核心思想是处理过的消息不再是我们可以访问的历史记录的一部分。

现在是鲍勃的阅读时间:

Bob 要求最多两条消息,并通过同一个组 mygroup 进行读取。因此,Redis 只报告消息。正如你所看到的,"Castilla" 消息没有被传递,因为它已经传递给了 Alice,所以 Bob 收到了 Royce 和 Sam-Bodden 等等。

这样,Alice、Bob以及组中的任何其他消费者都能够从同一个流中读取不同的消息,读取他们尚未处理的消息的历史记录,或者将消息标记为已处理。这允许为从流中消费消息创建不同的拓扑和语义。

有几件事情需要注意:

  • 消费者在第一次被提及时会自动创建,无需显式创建。
  • 即使使用XREADGROUP,你也可以同时从多个键读取,然而,要实现这一点,你需要在每个流中创建一个同名的消费者组。这不是一个常见的需求,但值得一提的是,这个功能在技术上是可用的。
  • XREADGROUP 是一个写命令,因为即使它从流中读取数据,消费者组也会因为读取的副作用而被修改,所以它只能在主实例上调用。

一个使用消费者组实现的消费者示例,用Ruby语言编写,可能如下所示。这段Ruby代码旨在让几乎任何有经验的程序员都能读懂,即使他们不懂Ruby:

require 'redis'

if ARGV.length == 0
    puts "Please specify a consumer name"
    exit 1
end

ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new

def process_message(id,msg)
    puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end

$lastid = '0-0'

puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
    # Pick the ID based on the iteration: the first time we want to
    # read our pending messages, in case we crashed and are recovering.
    # Once we consumed our history, we can start getting new messages.
    if check_backlog
        myid = $lastid
    else
        myid = '>'
    end

    items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)

    if items == nil
        puts "Timeout!"
        next
    end

    # If we receive an empty reply, it means we were consuming our history
    # and that the history is now empty. Let's start to consume new messages.
    check_backlog = false if items[0][1].length == 0

    items[0][1].each{|i|
        id,fields = i

        # Process the message
        process_message(id,fields)

        # Acknowledge the message as processed
        r.xack(:my_stream_key,GroupName,id)

        $lastid = id
    }
end

正如你所看到的,这里的想法是从消费历史记录开始,也就是我们的待处理消息列表。这很有用,因为消费者可能之前崩溃过,所以在重启的情况下,我们希望重新读取那些已经发送给我们但未被确认的消息。请注意,我们可能会多次处理一条消息,或者只处理一次(至少在消费者失败的情况下,但还涉及到Redis持久化和复制的限制,请参阅关于此主题的特定部分)。

一旦历史记录被消费完毕,并且我们得到一个空的消息列表,我们就可以切换到使用>特殊ID来消费新消息。

从永久性故障中恢复

上面的例子允许我们编写参与同一消费者组的消费者,每个消费者处理一部分消息,并在从故障中恢复时重新读取仅发送给它们的待处理消息。然而,在现实世界中,消费者可能会永久失败并且永远不会恢复。那么,当消费者因任何原因停止后永远不会恢复时,其待处理的消息会发生什么情况呢?

Redis 消费者组提供了一种功能,用于在这些情况下认领给定消费者的待处理消息,以便这些消息将改变所有权并重新分配给不同的消费者。该功能非常明确。消费者必须检查待处理消息列表,并且必须使用特殊命令认领特定消息,否则服务器将永远保留这些消息并将其分配给旧的消费者。通过这种方式,不同的应用程序可以选择是否使用此功能,以及具体如何使用它。

这个过程的第一步只是一个命令,它提供了消费者组中待处理条目的可观察性,被称为XPENDING。 这是一个只读命令,调用它总是安全的,不会改变任何消息的所有权。 在最简单的形式中,该命令使用两个参数调用,这两个参数是流的名称和消费者组的名称。

当以这种方式调用时,命令输出消费者组中待处理消息的总数(在这种情况下为两个),待处理消息中的最低和最高消息ID,最后是消费者列表及其待处理消息的数量。 我们只有Bob有两个待处理的消息,因为Alice请求的单个消息已使用XACK确认。

我们可以通过给XPENDING提供更多的参数来请求更多信息,因为完整的命令签名如下:

XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]

通过提供一个开始和结束的ID(可以是XRANGE中的-+)和一个控制命令返回信息量的计数,我们能够了解更多关于待处理消息的信息。如果我们想将输出限制为仅针对特定消费者的待处理消息,可以使用可选的最后一个参数——消费者名称,但在以下示例中不会使用此功能。

现在我们有了每条消息的详细信息:ID、消费者名称、空闲时间(以毫秒为单位),这是自上次将消息传递给某个消费者以来经过的毫秒数,最后是给定消息被传递的次数。 我们有两条来自Bob的消息,它们已经空闲了60000多毫秒,大约一分钟。

请注意,没有人阻止我们通过使用XRANGE来检查第一条消息的内容。

我们只需要在参数中重复相同的ID两次。现在我们有了一些想法,Alice可能会决定,如果在1分钟内没有处理消息,Bob可能不会很快恢复,是时候认领这些消息并代替Bob恢复处理了。为此,我们使用XCLAIM命令。

这个命令非常复杂,完整形式下有很多选项,因为它用于复制消费者组的更改,但我们通常只使用我们需要的参数。在这种情况下,它非常简单:

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

基本上我们说,对于这个特定的键和组,我希望指定的消息ID将改变所有权,并将分配给指定的消费者名称。然而,我们还提供了一个最小空闲时间,以便只有当提到的消息的空闲时间大于指定的空闲时间时,操作才会生效。这很有用,因为可能有两个客户端同时尝试认领一条消息:

Client 1: XCLAIM race:italy italy_riders Alice 60000 1692632647899-0
Client 2: XCLAIM race:italy italy_riders Lora 60000 1692632647899-0

然而,作为副作用,认领消息将重置其空闲时间并增加其投递计数器,因此第二个客户端将无法认领它。通过这种方式,我们避免了消息的简单重新处理(即使一般情况下无法实现完全一次处理)。

这是命令执行的结果:

消息已成功被Alice认领,她现在可以处理消息并确认它,即使原始消费者没有恢复,也可以推动事情向前发展。

从上面的例子可以清楚地看出,成功认领给定消息的副作用是,XCLAIM 命令也会返回它。然而,这并不是强制性的。可以使用 JUSTID 选项,以便仅返回成功认领的消息的ID。如果您想减少客户端和服务器之间使用的带宽(以及命令的性能),并且您对消息不感兴趣,因为您的消费者是以一种会不时重新扫描待处理消息历史的方式实现的,这将非常有用。

声明也可以通过一个单独的进程来实现:该进程仅检查待处理消息列表,并将空闲消息分配给看起来活跃的消费者。可以使用Redis流的可观察性功能之一来获取活跃的消费者。这是下一节的主题。

自动索赔

XAUTOCLAIM 命令,在 Redis 6.2 中添加,实现了我们上面描述的认领过程。 XPENDINGXCLAIM 提供了不同类型恢复机制的基本构建块。 该命令通过让 Redis 管理它来优化通用过程,并为大多数恢复需求提供了一个简单的解决方案。

XAUTOCLAIM 识别空闲的待处理消息并将它们的所有权转移给消费者。 该命令的签名如下所示:

XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]

因此,在上面的例子中,我可以使用自动声明来声明一条消息,如下所示:

XCLAIM类似,该命令会返回一个包含已认领消息的数组,但它还会返回一个流ID,允许迭代待处理的条目。 流ID是一个游标,我可以在下一次调用中使用它来继续认领空闲的待处理消息:

XAUTOCLAIM返回“0-0”流ID作为游标时,这意味着它已经到达了消费者组待处理条目列表的末尾。 这并不意味着没有新的空闲待处理消息,因此通过从流的开头调用XAUTOCLAIM来继续该过程。

索赔和交付计数器

你在XPENDING输出中观察到的计数器是每条消息的传递次数。计数器在两种情况下会增加:当消息通过XCLAIM成功认领时,或者当使用XREADGROUP调用来访问待处理消息的历史时。

当出现故障时,消息通常会多次传递,但最终它们通常会被处理并确认。然而,处理某些特定消息时可能会出现问题,因为消息已损坏或以某种方式制作,触发了处理代码中的错误。在这种情况下,消费者将无法处理这条特定消息。由于我们有传递尝试的计数器,我们可以使用该计数器来检测由于某种原因无法处理的消息。因此,一旦传递计数器达到您选择的某个较大数值,将此类消息放入另一个流并向系统管理员发送通知可能是更明智的做法。这基本上是Redis Streams实现死信概念的方式。

流式可观测性

缺乏可观察性的消息系统非常难以使用。不知道谁在消费消息、哪些消息正在等待处理、在给定流中活跃的消费者组集合,使得一切都变得不透明。因此,Redis Streams 和消费者组有不同的方式来观察正在发生的事情。我们已经介绍了 XPENDING,它允许我们检查在给定时刻正在处理的消息列表,以及它们的空闲时间和传递次数。

然而,我们可能希望做的不仅仅是这些,XINFO 命令是一个可观察性接口,可以与子命令一起使用,以获取有关流或消费者组的信息。

此命令使用子命令来显示有关流及其消费者组状态的不同信息。例如,XINFO STREAM 报告有关流本身的信息。

输出显示了关于流如何在内部编码的信息,并且还显示了流中的第一条和最后一条消息。另一个可用的信息是与该流关联的消费者组的数量。我们可以进一步挖掘,要求更多关于消费者组的信息。

正如你在这个和之前的输出中所看到的,XINFO 命令输出了一系列字段-值对。因为它是一个可观察性命令,这使得人类用户能够立即理解所报告的信息,并且允许命令在未来通过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其他必须更节省带宽的命令,如 XPENDING,则只报告信息而不包含字段名称。

上面的示例输出,其中使用了GROUPS子命令,通过观察字段名称应该很清楚。我们可以通过检查注册在组中的消费者来更详细地查看特定消费者组的状态。

如果你不记得命令的语法,只需向命令本身寻求帮助:

> XINFO HELP
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3)     Show consumers of <groupname>.
4) GROUPS <key>
5)     Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7)     Show information about the stream.
8) HELP
9)     Prints this help.

与Kafka (TM)分区的差异

Redis流中的消费者组在某些方面可能类似于Kafka(TM)基于分区的消费者组,但请注意,实际上Redis流是非常不同的。分区只是逻辑上的,消息只是被放入一个单一的Redis键中,因此不同客户端的服务方式基于谁准备好处理新消息,而不是客户端从哪个分区读取。例如,如果消费者C3在某个时候永久失败,Redis将继续为C1和C2提供所有到达的新消息,就像现在只有两个逻辑上的分区一样。

同样地,如果某个消费者处理消息的速度比其他消费者快得多,那么在相同的时间单位内,这个消费者将按比例接收更多的消息。这是可能的,因为Redis显式地跟踪所有未确认的消息,并记住谁收到了哪条消息以及从未传递给任何消费者的第一条消息的ID。

然而,这也意味着在Redis中,如果你真的想将同一流中的消息分区到多个Redis实例中,你必须使用多个键和一些分片系统,如Redis集群或其他特定于应用程序的分片系统。单个Redis流不会自动分区到多个实例。

我们可以说,以下内容在示意图上是正确的:

  • 如果你使用1个流 -> 1个消费者,你正在按顺序处理消息。
  • 如果您使用N个流和N个消费者,使得只有特定的消费者访问N个流的一个子集,您可以扩展上述1个流 -> 1个消费者的模型。
  • 如果您使用1个流 -> N个消费者,您正在将负载均衡到N个消费者,但在这种情况下,关于同一逻辑项的消息可能会被无序消费,因为某个消费者可能比另一个消费者更快地处理消息3,而另一个消费者仍在处理消息4。

所以基本上,Kafka分区更像是使用N个不同的Redis键,而Redis消费者组是一个服务器端的消息负载均衡系统,将来自给定流的消息分配给N个不同的消费者。

封顶流

许多应用程序不希望永远将数据收集到流中。有时,在流中最多包含给定数量的项目是有用的,其他时候,一旦达到给定的大小,将数据从Redis移动到不在内存中且速度不快但适合存储历史记录的存储中是有用的,可能用于未来几十年。Redis流对此有一些支持。一个是XADD命令的MAXLEN选项。这个选项使用起来非常简单:

使用MAXLEN时,当达到指定长度时,旧条目会自动被移除,从而使流保持恒定大小。目前没有选项可以告诉流只保留不超过给定时间段的项目,因为这样的命令为了保持一致运行,可能会长时间阻塞以移除项目。例如,想象一下如果有一个插入高峰,然后是一个长时间的暂停,接着是另一个插入,所有这些都有相同的最大时间。流将阻塞以移除在暂停期间变得过旧的数据。因此,用户需要做一些规划并了解所需的最大流长度。此外,虽然流的长度与使用的内存成正比,但按时间修剪更难以控制和预测:它取决于插入速率,而插入速率通常会随时间变化(如果它不变化,那么按大小修剪就很简单了)。

然而,使用MAXLEN进行修剪可能会很昂贵:流通过宏节点在基数树中表示,以实现非常高的内存效率。改变由几十个元素组成的单个宏节点并不是最优的。因此,可以以以下特殊形式使用该命令:

XADD race:italy MAXLEN ~ 1000 * ... entry fields here ...

MAXLEN 选项与实际计数之间的 ~ 参数意味着,我并不真的需要它正好是1000个项目。它可以是1000、1010或1030,只要确保至少保存1000个项目。使用这个参数,修剪只在我们能移除整个节点时进行。这使得它更加高效,通常也是你想要的。你会注意到,客户端库对此有各种实现。例如,Python客户端默认是近似的,必须显式设置为真实长度。

还有一个XTRIM命令,它的功能与上面的MAXLEN选项非常相似,不同之处在于它可以单独运行:

或者,对于XADD选项:

然而,XTRIM 被设计为接受不同的修剪策略。另一种修剪策略是 MINID,它会驱逐ID低于指定值的条目。

由于XTRIM是一个明确的命令,用户应了解不同修剪策略的潜在缺点。

另一个未来可能添加到XTRIM的有用驱逐策略是通过一系列ID进行删除,以便在需要时将数据从Redis移动到其他存储系统时,简化XRANGEXTRIM的使用。

流API中的特殊ID

你可能已经注意到,在Redis API中有几个特殊的ID可以使用。这里是一个简短的回顾,以便它们在将来更有意义。

前两个特殊的ID是-+,它们用于与XRANGE命令的范围查询中。这两个ID分别表示可能的最小ID(基本上是0-1)和可能的最大ID(即18446744073709551615-18446744073709551615)。正如你所看到的,写-+比写这些数字要简洁得多。

然后有一些API,我们想要表示流中具有最大ID的项目的ID。这就是$的含义。例如,如果我只想要新的条目使用XREADGROUP,我使用这个ID来表示我已经拥有所有现有的条目,但不包括将来会插入的新条目。同样地,当我创建或设置消费者组的ID时,我可以将最后传递的项目设置为$,以便只向组中的消费者传递新条目。

如你所见,$ 并不意味着 +,它们是两个不同的东西,因为 + 是每个可能流中最大的ID,而 $ 是包含给定条目的给定流中最大的ID。此外,API通常只理解 +$,但避免加载具有多重含义的给定符号是有用的。

另一个特殊的ID是>,这个特殊含义仅与消费者组相关,并且仅在XREADGROUP命令使用时有效。这个特殊的ID意味着我们只想要那些到目前为止从未被传递给其他消费者的条目。所以基本上,> ID是消费者组的最后传递的ID

最后是特殊ID *,它只能与XADD命令一起使用,意味着为新条目自动选择一个ID。

所以我们有-+$>*,它们都有不同的含义,大多数时候可以在不同的上下文中使用。

持久性、复制和消息安全性

一个流,就像任何其他Redis数据结构一样,会被异步复制到副本并持久化到AOF和RDB文件中。然而,可能不那么明显的是,消费者组的完整状态也会传播到AOF、RDB和副本中,因此如果主服务器中有消息待处理,副本也会有相同的信息。同样,在重启后,AOF将恢复消费者组的状态。

然而请注意,Redis流和消费者组是使用Redis默认复制进行持久化和复制的,因此:

  • 如果消息持久化在您的应用程序中很重要,则必须将AOF与强fsync策略一起使用。
  • 默认情况下,异步复制不会保证XADD命令或消费者组状态更改被复制:在故障转移后,某些内容可能会丢失,这取决于副本从主节点接收数据的能力。
  • WAIT 命令可用于强制将更改传播到一组副本。然而,请注意,虽然这使得数据丢失的可能性非常小,但由 Sentinel 或 Redis Cluster 操作的 Redis 故障转移过程仅执行尽力而为的检查,以故障转移到最新的副本,并且在某些特定的故障条件下,可能会提升缺少某些数据的副本。

因此,在设计使用Redis流和消费者组的应用程序时,请确保理解应用程序在故障期间应具备的语义属性,并相应地配置事物,评估其对您的用例是否足够安全。

从流中移除单个项目

流还有一个特殊的命令,仅通过ID从流中删除项目。通常对于一个仅追加的数据结构来说,这可能看起来像是一个奇怪的功能,但它实际上对于涉及隐私法规的应用程序非常有用。该命令称为XDEL,并接收流的名称,后跟要删除的ID:

然而在当前实现中,内存直到宏节点完全为空时才会真正回收,因此您不应滥用此功能。

零长度流

流与其他Redis数据结构之间的一个区别是,当其他数据结构不再有任何元素时,作为调用删除元素命令的副作用,键本身将被删除。例如,当调用ZREM删除有序集合中的最后一个元素时,有序集合将被完全删除。另一方面,流被允许保持在零元素状态,无论是由于使用MAXLEN选项且计数为零(XADDXTRIM命令),还是因为调用了XDEL

存在这种不对称性的原因是,Streams 可能有关联的消费者组,我们不希望仅仅因为流中不再有任何项目而丢失消费者组定义的状态。目前,即使流没有关联的消费者组,也不会被删除。

消费消息的总延迟

XRANGEXREADXREADGROUP这样的非阻塞流命令,如果没有BLOCK选项,会像其他Redis命令一样同步执行,因此讨论这些命令的延迟是没有意义的:更有趣的是查看Redis文档中这些命令的时间复杂度。可以说,在提取范围时,流命令至少与有序集合命令一样快,而且XADD非常快,如果使用管道技术,在普通机器上每秒可以轻松插入50万到100万个项目。

然而,如果我们想要理解在消费者组中阻塞消费者的情况下处理消息的延迟,从通过XADD生成消息的那一刻,到消费者通过XREADGROUP返回消息的那一刻,延迟就成为一个有趣的参数。

如何服务被阻止的消费者

在提供执行测试的结果之前,了解Redis使用什么模型来路由流消息(以及通常如何管理任何等待数据的阻塞操作)是很有趣的。

  • 被阻塞的客户端被引用在一个哈希表中,该哈希表将至少有一个阻塞消费者的键映射到等待该键的消费者列表。这样,给定一个接收到数据的键,我们可以解析所有等待该数据的客户端。
  • 当发生写入操作时,在这种情况下,当调用XADD命令时,它会调用signalKeyAsReady()函数。该函数会将键放入需要处理的键列表中,因为这些键可能包含阻塞消费者的新数据。请注意,这些就绪键将在稍后处理,因此在同一个事件循环周期内,键可能会接收到其他写入操作。
  • 最后,在返回到事件循环之前,就绪键最终会被处理。对于每个键,会扫描等待数据的客户端列表,如果适用,这些客户端将接收到到达的新数据。在流的情况下,数据是消费者请求的适用范围内的消息。

正如你所看到的,基本上,在返回到事件循环之前,调用XADD的客户端和阻塞以消费消息的客户端都将在输出缓冲区中有他们的回复,因此XADD的调用者应该在大约相同的时间从Redis接收到回复,消费者也将接收到新消息。

该模型是基于推送的,因为将数据添加到消费者缓冲区将直接通过调用XADD的操作来执行,因此延迟往往相当可预测。

延迟测试结果

为了检查这些延迟特性,我们使用多个Ruby程序实例进行了测试,这些程序推送的消息包含计算机毫秒时间作为附加字段,并且Ruby程序从消费者组读取消息并处理它们。消息处理步骤包括将当前计算机时间与消息时间戳进行比较,以了解总延迟。

获得的结果:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%

因此,99.9%的请求延迟小于等于2毫秒,剩余的异常值仍然非常接近平均值。

向流中添加数百万条未确认的消息并不会改变基准测试的要点,大多数查询仍然以非常短的延迟处理。

几点说明:

  • 在这里,我们每次迭代处理多达10k条消息,这意味着XREADGROUPCOUNT参数被设置为10000。这增加了大量的延迟,但为了使慢速消费者能够跟上消息流,这是必要的。因此,您可以预期实际延迟会小得多。
  • 用于此基准测试的系统与当今标准相比非常慢。

了解更多

RATE THIS PAGE
Back to top ↑