在Redis Insight中管理流和消费者组

学习如何在Redis Insight中管理流和消费者组

一个是一个仅追加的日志文件。 当你向其中添加数据时,你不能更改它。 这看起来可能是一个缺点;然而,流作为日志或单一事实来源。 它也可以用作在不同速度下工作且不需要相互了解的进程之间的缓冲区。 有关流的更多概念信息,请参见Redis Streams

在本主题中,您将学习如何在Redis Insight中添加和使用流以及消费者组。

这里有一个模拟温度和湿度传感器的流。与流交互的进程执行两种角色之一:消费者生产者。 流的重点在于它不会结束,因此你无法捕获整个数据集并对它们进行一些处理。

在这个流中,传感器被视为生产者,它们广播数据。 一个消费者从流中读取数据并对其进行一些处理。 例如,如果温度超过某个阈值,它会发出消息以打开该单元的空调或通知维护人员。

可以有多个消费者执行不同的任务,一个测量湿度,另一个在一段时间内测量温度。 Redis 将整个数据集的副本存储在内存中,这是一种有限的资源。 为了避免数据失控,当您向流中添加内容时,可以对流进行修剪。 当使用 XADD 向流中添加内容时,您可以选择指定流应修剪为特定或近似数量的最新条目,或仅包括 ID 高于指定 ID 的条目。 您还可以使用键过期来管理流数据所需的存储。例如,通过将每天的数据写入 Redis 中的自己的流,并在一段时间后(例如一周)使每个流的键过期。 ID 可以是任何数字,但流中的每个新条目必须具有比流中最后添加的 ID 值更高的 ID。

添加新条目

使用XADD并指定*作为ID,让Redis自动为您生成一个新的ID,该ID由毫秒精度的时间戳、破折号和序列号组成。例如1656416957625-0。然后提供要存储在新流条目中的字段名称和值。

有几种检索数据的方法。你可以按时间范围检索条目,或者你可以请求自你指定的时间戳或ID以来发生的所有事情。使用一个命令,你可以请求从某一天的10:30到11:15之间的任何内容。

消费者组

一个更现实的用例是一个具有许多温度传感器的系统,Redis将其数据放入流中,记录它们到达的时间,并对它们进行排序。

在右侧,我们有两个读取流的消费者。其中一个在温度超过某个数值时发出警报,并通知维护人员需要采取行动,另一个是数据仓库,它接收数据并将其存入数据库。

它们彼此独立运行。 在右上角,我们有另一种任务。 假设警报和数据仓库非常快。 你会收到一条消息,无论温度是否大于特定值,这可能只需要一毫秒。 警报可以跟上数据流。 扩展消费者的一种方式是消费者组,它允许多个相同消费者或相同代码的实例作为一个团队来处理流。

在Redis Insight中管理流

您可以通过两种方式在Redis Insight中添加流:创建新流或添加到现有流。

要创建一个流,首先选择键类型(流)。 你不能设置生存时间(TTL),因为它不能应用于流中的消息;它只能在Redis键上设置。将流命名为mystream。 然后,将Entry ID设置为*以默认为时间戳。 如果你有自己的ID生成策略,请输入序列中的下一个ID。请记住,ID必须高于流中任何其他条目的ID。

然后,使用 + 输入字段和值以添加多个(例如,名称和位置)。 现在,您有一个出现在视图中的流,您可以继续向其添加字段和值。

Redis Insight 为您运行读取命令,以便您可以在视图中查看流条目。 而消费者组视图显示给定消费者组中的每个消费者以及Redis最后一次分配消息的时间、消息的ID以及该过程发生的次数,以及您是否已使用XACK命令告诉Redis您已完成该任务。

在Redis Insight中监控传感器的温度和湿度

此示例展示了如何将现有流引入Redis Insight并与之交互。

设置

  1. 安装 Redis Insight.
  2. 下载并安装 Node.js(LTS 版本)。
  3. 安装Redis。在Docker中,检查Redis是否在默认端口6379上本地运行(未设置密码)。
  4. 克隆此示例的代码仓库。 查看README以获取有关此示例的更多信息和安装提示。
  5. 在命令行中,导航到包含代码库的文件夹并安装Node.js包管理器(npm)。
npm install

运行生产者

要启动生产者,它将每隔几秒向流中添加一个新条目,请输入:

npm run producer

> streams@1.0.0 producer
> node producer.js

Starting producer...
Adding reading for location: 62, temperature: 40.3, humidity: 36.5
Added as 1632771056648-0
Adding reading for location: 96, temperature: 15.4, humidity: 70
Added as 1632771059039-0
...

生产者无限期运行。 选择 Ctrl+C 来停止它。 如果你想更快地向流中添加条目,可以启动多个生产者实例。

运行消费者

要启动消费者,它会每隔几秒从流中读取数据,请输入:

npm run consumer

> streams@1.0.0 consumer
> node consumer.js

Starting consumer...
Resuming from ID 1632744741693-0
Reading stream...
Received entry 1632771056648-0:
[ 'location', '62', 'temp', '40.3', 'humidity', '36.5' ]
Finished working with entry 1632771056648-0
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]

消费者将最后读取的条目ID存储在Redis字符串中,键为consumer:lastid。它使用这个字符串在重启后从上次停止的地方继续读取。尝试通过使用Ctrl+C停止并重新启动它来测试这一点。

一旦消费者处理完流中的每个条目,它将无限期地等待生产者实例添加更多内容:

Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...

使用 Ctrl+C 停止它。

运行一个消费者组

消费者组由多个消费者实例共同工作组成。Redis管理从流中读取的条目分配给消费者组的成员。组中的消费者将接收条目的一个子集,而整个组将接收所有条目。在消费者组中工作时,消费者进程必须确认每个条目的接收/处理。

使用多个终端窗口,启动消费者组消费者的三个实例,每个实例赋予一个唯一的名称:

npm run consumergroup consumer1

> streams@1.0.0 consumergroup
> node consumer_group.js -- "consumer1"

Starting consumer consumer1...
Consumer group temphumidity_consumers exists, not created.
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]
Acknowledged processing of entry 1632771059039-0.
Reading stream...

在第二个终端中:

npm run consumergroup consumer2

在第三个中:

npm run consumergroup consumer3

消费者将无限期运行,等待生产者实例在它们共同消费完整个流后向流中添加新消息。 请注意,在此模型中,每个消费者实例不会接收到流中的所有条目,而是组中的三个成员各自接收到一个子集。

在Redis Insight中查看流

  1. 启动 Redis Insight。
  2. 选择 localhost:6379
  3. 选择STREAM。可选地,从右上角选择全屏以扩展视图。

您现在可以在消费者组视图之间切换以查看您的数据。 正如本主题前面提到的,流是一个仅追加的日志,因此您无法修改条目的内容,但您可以删除整个条目。 这种情况在所谓的毒丸消息事件中非常有用,这种消息可能会导致消费者崩溃。您可以在视图中物理删除此类消息,或在命令行界面(CLI)中使用XDEL命令。

您可以继续在CLI中与您的流进行交互。例如,要获取流的当前长度,请使用XLEN命令:

XLEN ingest:temphumidity

使用流进行审计和处理银行、游戏、供应链、物联网、社交媒体等领域的事件。

RATE THIS PAGE
Back to top ↑