该数据结构需要 Redis 5.0.0 + 版本才可用使用
Redis stream 是 Redis 5 引入的一种新的数据结构,它是一个高性能、高可靠性的消息队列,主要用于异步消息处理和流式数据处理。在此之前,想要使用 Redis 实现消息队列,通常可以使用例如:列表,有序集合、发布与订阅 3 种数据结构。但是 stream 相比它们具有以下的优势:
话不多说,接下来具体看看如何使用它。(PS:万字长文,行驶途中请系好安全带)
XADD 命令的语法格式如下:
XADD stream-name id field value [field value]
*
关于使用 XADD 添加元素,还有以下特点:
my-stream
下面是一个使用 XADD 命令添加新消息的示例:
XADD my-stream * name John age 30 email john@example.com
上述命令的说明:
name
age
email
John
30
john@example.com
XADD 命令在成功执行后会返回元素 ID 作为结果:
"1681138020163-0"
每个元素的 ID 是一个递增的唯一标识符,由两部分组成:一个时间戳和一个序列号。
为了证明,我们可以指定消息 ID 向指定流中发送一条消息:
XADD my-stream 1681138020163-1 name Mary age 25 email mary@example.com
返回结果:
"1681138020163-1"
最后,可以提前使用 XRANGE 指令查看推入流中的数据
XRANGE
XRANGE my-stream - +
1) 1) "1681138020163-0" 2) 1) "name" 2) "John" 3) "age" 4) "30" 5) "email" 6) "john@example.com"2) 1) "1681138020163-1" 2) 1) "name" 2) "Mary" 3) "age" 4) "25" 5) "email" 6) "mary@example.com"
1) 1) "1681138020163-0"
2) 1) "name"
2) "John"
3) "age"
4) "30"
5) "email"
6) "john@example.com"
2) 1) "1681138020163-1"
2) "Mary"
4) "25"
6) "mary@example.com"
元素 ID 在 Redis stream 中扮演着非常重要的角色,它不仅保证了元素的唯一性和顺序性,还提供了高效的范围查询和分析功能。在使用 Redis stream 时,需要特别注意元素 ID 的限制,并保证 ID 的唯一性和递增性。
限制如下::
还有一些长度和特殊字符的限制等等,不符合上述限制的添加元素操作,会被 redis 拒绝,并且返回一个错误等。
最大元素 ID 是如何更新的 ?
在成功执行XADD命令之后,流的最大元素ID也会随之更新。
为什么要限制 新元素的 ID 必须比流中所有已有元素的 ID 都要大 ?
限制新元素的 ID 必须比流中所有已有元素的 ID 都要大,是为了保证 stream 中每个元素的唯一性和顺序性。这种特性对于使用流实现消息队列和事件系统的用户来说是非常重要的:用户可以确信,新的消息和事件只会出现在已有消息和事件之后,就像现实世界里新事件总是发生在已有事件之后一样,一切都是有序进行的。
示例开始就演示自动生成消息向流中推送数据,在日常使用非常方便,这里说一下它的生成规则:
流的数据大多只是临时保存的,如果不对流的长度进行限制,会出现以下情况:
为了避免该问题,在使用 Redis stream 时,可以使用 MAXLEN 选项指定 stream 的最大长度,命令格式如下:
XADD stream [MAXLEN len] id field value [field value ...]
示例:
XADD mini-stream MAXLEN 3 * k1 v1XADD mini-stream MAXLEN 3 * k2 v2XADD mini-stream MAXLEN 3 * k3 v3XADD mini-stream MAXLEN 3 * k4 v4# 我们向一个限制长度为 3 的 `mini-stream` 流中添加 4 条数据,然后查看流内的消息:XRANGE mini-stream - +1) 1) "1681140898447-0" 2) 1) "k2" 2) "v2"2) 1) "1681140901790-0" 2) 1) "k3" 2) "v3"3) 1) "1681140906703-0" 2) 1) "k4" 2) "v4"
XADD mini-stream MAXLEN 3 * k1 v1
XADD mini-stream MAXLEN 3 * k2 v2
XADD mini-stream MAXLEN 3 * k3 v3
XADD mini-stream MAXLEN 3 * k4 v4
# 我们向一个限制长度为 3 的 `mini-stream` 流中添加 4 条数据,然后查看流内的消息:
XRANGE mini-stream - +
1) 1) "1681140898447-0"
2) 1) "k2"
2) "v2"
2) 1) "1681140901790-0"
2) 1) "k3"
2) "v3"
3) 1) "1681140906703-0"
2) 1) "k4"
2) "v4"
最后会看到最早创建的 k1 消息已经被移除,redis 删除在流中存在时间最长的元素,从而来保证流的整体长度。
k1
除了在 XADD 命令时限制流,Redis 还提供单独限制流长度的 MAXLEN 命令,基础语法如下:
XTRIM stream MAXLEN len
XTRIM my-stream MAXLEN 2(integer) 1
XTRIM my-stream MAXLEN 2
(integer) 1
这条命令 XTRIM my-stream MAXLEN 2 的作用是将名为 my-stream 的流修剪为最多包含 2 条消息。换句话说,流中超出这个长度的较旧消息将被移除。
XDEL 用于从流中删除特定的消息。这个命令需要提供流的键(key)和一个或多个消息 ID 作为参数。当消息被成功删除时,XDEL 命令会返回被删除消息的数量。
XDEL
XDEL 的基本语法如下:
XDEL key ID [ID ...]
# 这个命令将从名为 `mystream` 的流中删除消息 ID 为 `1681480521617-0` 的消息。XDEL my-stream 1681480521617-0(integer) 1# 你也可以传入多个 `id` 参数进行批量删除XDEL my-stream 1681480524451-0 1681480526810-0 1681480965273-0(integer) 3
# 这个命令将从名为 `mystream` 的流中删除消息 ID 为 `1681480521617-0` 的消息。
XDEL my-stream 1681480521617-0
# 你也可以传入多个 `id` 参数进行批量删除
XDEL my-stream 1681480524451-0 1681480526810-0 1681480965273-0
(integer) 3
注意:,XDEL 不会修改流的长度计数,这意味着删除消息后,流的长度保持不变。
XLEN 用于获取流中消息的数量。这个命令非常简单且高效,因为它只要一个参数。
XLEN 的基本语法如下:
XLEN
XLEN key
XLEN my-stream(integer) 4
XLEN my-stream
(integer) 4
注意:XLEN 命令仅返回流中消息的数量,并不提供消息的具体内容。获取消息内容的命令,看下面的 XRANGE
XRANG 主要用于获取流中的一段连续消息,它还有一个非常相似的 XREVRANGE 命令,区别:
XRANG
XREVRANGE
XRANG 的的基本语法如下:
XRANGE key start end [COUNT count]
获取指定消息,我们可以把 start 和 end 设置同一条消息 ID,可以用来达到查询指定消息 ID 的效果。使用示例:
start
end
# 获取指定消息 IDXRANGE my-stream 1681480968241-0 1681480968241-0
# 获取指定消息 ID
XRANGE my-stream 1681480968241-0 1681480968241-0
获取多条消息,可以利用 COUNT 选项参数,使用示例:
# 获取流中最早的 5 条消息XRANGE my-stream - + COUNT 5
# 获取流中最早的 5 条消息
XRANGE my-stream - + COUNT 5
这条命令获取流中最早的 5 条消息(按消息 ID 顺序排序)。- 和 + 分别表示最小和最大的消息 ID,用于获取流中的所有消息。
-
+
想要读取流中全部消息内容,移除 COUNT 即可:
# 获取全部消息XRANGE my-stream - +
# 获取全部消息
XREVRANGE 按照消息 ID 逆序返回结果,基本语法如下:
XREVRANGE key end start [COUNT count]
用法完全和 XRANGE 一样,这里就不过多介绍了,使用示例:
XREVRANGE my-stream + - COUNT 5
这个命令将返回名为 mystream 的流中的最新的 3 条消息(按消息 ID 逆序排序)。
mystream
在实际业务场景中,可以利用 XRANGE 和 XREVRANGE 命令可以用于实现以下功能:
COUNT
相比 XRANGE,XREVRANGE 类似,XREAD 也是用于从流中读取消息的命令,但它们之间有一些关键区别:
BLOCK
XREAD 的阻塞模式,可以更好的构建实时数据处理应用程序,如事件驱动系统、实时分析系统等。
XREAD 命令的基本语法如下:
XREAD
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
查询的话,除了同时读取多个流的特点外,其他和 XRANGE,XREVRANGE 类似。
使用示例:
XREAD STREAMS my-stream 0
这个命令将从名为 my-stream 的流中读取消息,0 代表读取所有消息,如果指定的消息 ID,表示从该消息 ID 之后开始读取
XREAD STREAMS my-stream mini-stream 0 0
这个命令将从名为 my-stream 和 mini-stream 的流中分别读取所有消息,后面的 2 个参数 0 分别对应 2 个消息 ID 0 开始的位置
mini-stream
0
当使用阻塞模式时,XREAD 命令会在以下几种情况下表现出不同的行为:
如果流中有满足条件的消息(即从指定的消息 ID 之后的新消息),那么 XREAD 命令会立即返回这些消息,不会发生阻塞。
XREAD BLOCK 1000000 COUNT 1 STREAMS my-stream 01) 1) "my-stream" 2) 1) 1) "1681480968241-0" 2) 1) "k5" 2) "v5"
XREAD BLOCK 1000000 COUNT 1 STREAMS my-stream 0
1) 1) "my-stream"
2) 1) 1) "1681480968241-0"
2) 1) "k5"
2) "v5"
XREAD 命令解除阻塞也分 2 情况:超时,新消息到达
示例代码:
# 超时: 阻塞超时,没有新消息到达,解除阻塞XREAD BLOCK 5000 STREAMS my-stream 1681482023346-0(nil)(5.09s)# 新消息到达: 新消息到达,且满足读取条件 (新消息的 ID 大于指定的消息 ID) 解除阻塞XREAD BLOCK 50000 STREAMS my-stream 1681482023346-01) 1) "my-stream" 2) 1) 1) "1681485525804-0" 2) 1) "newMessage" 2) "v1"(18.46s)
# 超时: 阻塞超时,没有新消息到达,解除阻塞
XREAD BLOCK 5000 STREAMS my-stream 1681482023346-0
(nil)
(5.09s)
# 新消息到达: 新消息到达,且满足读取条件 (新消息的 ID 大于指定的消息 ID) 解除阻塞
XREAD BLOCK 50000 STREAMS my-stream 1681482023346-0
2) 1) 1) "1681485525804-0"
2) 1) "newMessage"
2) "v1"
(18.46s)
如果设置的阻塞等待时间为 0,那么 XREAD 命令会一直阻塞:
XREAD BLOCK 0 STREAMS my-stream $
这个命令将一直阻塞等待,直到新消息到达。$ 符号表示只读取新消息。
$
当然如果客户端主动断开连接,阻塞的 XREAD 命令也会被取消
在实际应用中,XREAD 使用阻塞模式,可以在新消息到达时立即处理,实现实时消息处理。
在 Redis 流的消息模型中,是通过消费者组(Consumer Group)来组织和管理多个消费者以协同处理来自同一个流的消息的机制。消费者组的主要目的是在多个消费者之间分发消息,实现负载均衡、高可用性和容错能力。
工作原理:
如图所示:
使用消费者组这种模型的设计,以为在 Redis Stream 中实现以下功能:
接下来我们再详细说明消费组相关的命令使用
通过 XGROUP 命令可以为你的 Redis Stream 创建和管理消费组。
XGROUP
命令格式如下:
XGROUP CREATE stream group id
参数说明:
<stream>
<group>
<id>
[MKSTREAM]
# 创建消费组,如果流不存在则自动创建XGROUP CREATE mystream mygroup $ MKSTREAMOK# 查看流中的消费组XINFO GROUPS mystream1) 1) "name" 2) "mygroup" 3) "consumers" 4) (integer) 0 5) "pending" 6) (integer) 0 7) "last-delivered-id" 8) "0-0" 9) "entries-read" 10) (nil) 11) "lag" 12) (integer) 0
# 创建消费组,如果流不存在则自动创建
XGROUP CREATE mystream mygroup $ MKSTREAM
OK
# 查看流中的消费组
XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
以上命令是使用 XGROUP CREATE 命令创建一个名为 mygroup 的消费组,从最新的消息开始消费,使用 MKSTREAM 选项,如果流不存在则会自动创建流,返回 OK 既代表创建成功。最后使用 XINFO 查看结果。
XGROUP CREATE
mygroup
MKSTREAM
在某些情况下,你可能想要消费组忽略某些消息,或者重新处理某些消息来重现 bug,那么可以使用 XGROUP SETID 命令设置消费组的起始消息 ID。
XGROUP SETID
命令格式非常简单:
XGROUP SETID stream group id
# 设置 mygroup 组的最新消息为指定 IDXGROUP SETID mystream mygroup 1681655893911-0OK# 查看消费组XINFO GROUPS mystream1) 1) "name" 2) "mygroup" 3) "consumers" 4) (integer) 0 5) "pending" 6) (integer) 0 7) "last-delivered-id" 8) "1681655893911-0" # 已被改变 9) "entries-read" 10) (nil) 11) "lag" 12) (integer) 4 # 设置 mygroup 组的最新消息为流的最新消息 IDXGROUP SETID mystream mygroup $# 查看消费组127.0.0.1:6379> XINFO GROUPS mystream1) 1) "name" 2) "mygroup" 3) "consumers" 4) (integer) 0 5) "pending" 6) (integer) 0 7) "last-delivered-id" 8) "1681655916001-0" # 已更新 9) "entries-read" 10) (nil) 11) "lag" 12) (integer) 0
# 设置 mygroup 组的最新消息为指定 ID
XGROUP SETID mystream mygroup 1681655893911-0
# 查看消费组
8) "1681655893911-0" # 已被改变
12) (integer) 4
# 设置 mygroup 组的最新消息为流的最新消息 ID
XGROUP SETID mystream mygroup $
127.0.0.1:6379> XINFO GROUPS mystream
8) "1681655916001-0" # 已更新
以上命令将 mygroup 组的最新消息 ID 更新为指定 ID 和流的最新 ID 的使用示例。
使用 XREADGROUP 命令读取消费组里面的消息,基本语法:
XREADGROUP
XREADGROUP GROUP <group> <consumer> [COUNT <n>] [BLOCK <ms>] STREAMS <stream_key_1> <stream_key_2> ... <id_1> <id_2> ...
<consumer>
<n>
<ms>
<stream_key_1>
<stream_key_2>
<id_1>
<id_2>
>
我们创建一个 myconsumer 的消费组读取上面创建 mygroup 消费组的信息,以下是多种用法示例:
myconsumer
# 以 myconsumer 消费者身份从 mystream 中读取分配给 mygroup 的消息# 读取所有最新的消息(常用)XREADGROUP GROUP mygroup myconsumer STREAMS mystream >(nil)# 其他用法:# 读取最多 10 条消息XREADGROUP GROUP mygroup myconsumer COUNT 10 STREAMS mystream ># 进行阻塞读取最新消息XREADGROUP GROUP mygroup myconsumer BLOCK 5000 STREAMS mystream >
# 以 myconsumer 消费者身份从 mystream 中读取分配给 mygroup 的消息
# 读取所有最新的消息(常用)
XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
# 其他用法:
# 读取最多 10 条消息
XREADGROUP GROUP mygroup myconsumer COUNT 10 STREAMS mystream >
# 进行阻塞读取最新消息
XREADGROUP GROUP mygroup myconsumer BLOCK 5000 STREAMS mystream >
这里拿不到数据是因为我们上面把消费组 mygroup 的消息 ID 设置为最新,我们尝试修改消息 ID 重新消费试试
# 设置消费组的消息 ID,进行重新消费XGROUP SETID mystream mygroup 1681655893911-0# 消费组 myconsumer 读取消费组的消息XREADGROUP GROUP mygroup myconsumer STREAMS mystream >1) 1) "mystream" 2) 1) 1) "1681655897993-0" 2) 1) "k1" 2) "v1" 2) 1) "1681655899297-0" 2) 1) "k1" 2) "v1" 3) 1) "1681655915496-0" 2) 1) "k1" 2) "v1" 4) 1) "1681655916001-0" 2) 1) "k1" 2) "v1" # 查看消费组的信息XINFO GROUPS mystream1) 1) "name" 2) "mygroup" 3) "consumers" 4) (integer) 1 # 消费组有一个消费者 5) "pending" 6) (integer) 4 # 有 4 条正在处理的消息 7) "last-delivered-id" 8) "1681655916001-0" 9) "entries-read" 10) (nil) 11) "lag" 12) (integer) 0
# 设置消费组的消息 ID,进行重新消费
# 消费组 myconsumer 读取消费组的消息
1) 1) "mystream"
2) 1) 1) "1681655897993-0"
2) 1) "k1"
2) 1) "1681655899297-0"
3) 1) "1681655915496-0"
4) 1) "1681655916001-0"
# 查看消费组的信息
4) (integer) 1 # 消费组有一个消费者
6) (integer) 4 # 有 4 条正在处理的消息
8) "1681655916001-0"
通过以上命令可以确认,myconsumer 消费者拿到 mygroup 消费组的消息未确认处理,所以看到有 4 条消息正在等待处理中。
通过 XPENDING 命令,可以获取指定流的指定消费者组目前的待处理消息的相关信息。在很多场景下,你需要通过它来观察和了解消费者的处理情况,从而做出处理,例如以下场景:
XPENDING
基本语法:
XPENDING stream group [start stop count] [consumer]
<start>
<stop>
<count>
使用 XPENDING 命令查看上面的 mygroup 组的消息去哪儿了:
XPENDING mystream mygroup1) (integer) 4 # 待处理消息数量2) "1681655897993-0" # 首条消息 ID3) "1681655916001-0" # 最后一条消息的 ID4) 1) 1) "myconsumer" # 各消费者正在处理的消息数量 2) "4"
XPENDING mystream mygroup
1) (integer) 4 # 待处理消息数量
2) "1681655897993-0" # 首条消息 ID
3) "1681655916001-0" # 最后一条消息的 ID
4) 1) 1) "myconsumer" # 各消费者正在处理的消息数量
2) "4"
以上展示的汇总信息,你还可以通过以下命令,查看待处理消息更详细的信息:
# 查看指定待处理消息XPENDING mystream mygroup 1681655897993-0 1681655897993-0 11) 1) "1681655897993-0" # 消息 ID 2) "myconsumer" # 所属消费者 3) (integer) 2397387 # 最后一次投递时间 4) (integer) 1 # 投递次数
# 查看指定待处理消息
XPENDING mystream mygroup 1681655897993-0 1681655897993-0 1
1) 1) "1681655897993-0" # 消息 ID
2) "myconsumer" # 所属消费者
3) (integer) 2397387 # 最后一次投递时间
4) (integer) 1 # 投递次数
从以上信息你可以看到消息正在被谁处理和处理的时间,你也可以指定消费者查看信息:
XPENDING mystream mygroup - + 10 myconsumer1) 1) "1681655897993-0" 2) "myconsumer" 3) (integer) 2591145 4) (integer) 12) 1) "1681655899297-0" 2) "myconsumer" 3) (integer) 2591145 4) (integer) 13) 1) "1681655915496-0" 2) "myconsumer" 3) (integer) 2591145 4) (integer) 14) 1) "1681655916001-0" 2) "myconsumer" 3) (integer) 2591145 4) (integer) 1
XPENDING mystream mygroup - + 10 myconsumer
1) 1) "1681655897993-0"
2) "myconsumer"
3) (integer) 2591145
4) (integer) 1
以上命令列出 myconsumer 消费者所有待处理的消息的详细信息
XACK 用于确认消费组中的特定消息已被处理。在消费者成功处理消息后,应使用 XACK 命令通知 Redis,以便从消费组的挂起消息列表中移除该消息。
XACK
命令格式:
XACK stream group id [id id ...]
通过 XACK 命令,我们将上面 myconsumer 消费者的消息进行确认处理:
# 确认消息XACK mystream mygroup 1681655897993-0(integer) 1# .....
# 确认消息
XACK mystream mygroup 1681655897993-0
# .....
当消费者对所有消息进行处理后,再查看消费组内容进行验证:
XPENDING mystream mygroup - + 10 myconsumer(empty array)XPENDING mystream mygroup1) (integer) 02) (nil)3) (nil)4) (nil)
(empty array)
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
使用 XACK 可以确保消息不会重复处理防止其他消费者或相同消费者在故障恢复后重复处理该消息等等好处。
XCLAIM 消息转移类似我们生活中的呼叫转移,当一个消费者无法处理某个消息或出现故障时,XCLAIM 可以确保其他消费者接管并处理这些消息。命令格式非常简单:
XCLAIM
XCLAIM stream group new_consumer max_pending_time id [id id id]
# 使用 XPENDING 命令查询消费组中挂起的消息XPENDING mystream mygroup1) (integer) 22) "1681660259887-0"3) "1681660263096-0"4) 1) 1) "myconsumer" 2) "2" # 使用 XCLAIM 命令将消息转移XCLAIM mystream mygroup myconsumer2 10000 1681660259887-01) 1) "1681660259887-0" # 被转移的消息 ID 2) 1) "k1" # 消息内容 2) "v1"
# 使用 XPENDING 命令查询消费组中挂起的消息
1) (integer) 2
2) "1681660259887-0"
3) "1681660263096-0"
4) 1) 1) "myconsumer"
2) "2"
# 使用 XCLAIM 命令将消息转移
XCLAIM mystream mygroup myconsumer2 10000 1681660259887-0
1) 1) "1681660259887-0" # 被转移的消息 ID
2) 1) "k1" # 消息内容
上面的命令意思是:如果消息 ID 1681660259887-0 处理时间超过 10000ms,那么消息转移给 myconsumer2,我们使用 XPENDING 命令来验证:
XPENDING mystream mygroup1) (integer) 22) "1681660259887-0"3) "1681660263096-0"4) 1) 1) "myconsumer" 2) "1" 2) 1) "myconsumer2" 2) "1"
2) "1"
2) 1) "myconsumer2"
XINFO 用于获取流或消费组的详细信息。XINFO 命令有多个子命令,可以提供不同类型的信息。
XINFO
以下是一些常用的 XINFO 子命令及其介绍:
XINFO STREAM:此子命令用于获取流的详细信息,包括长度、消费组数量、第一个和最后一个条目等。例如:
XINFO STREAM mystream
XINFO GROUPS:此子命令用于获取流中消费组的列表及其相关信息。例如:
XINFO CONSUMERS:此子命令用于获取消费组中消费者的列表及其相关信息。例如:
XINFO CONSUMERS mystream mygroup
通过使用这些子命令,您可以了解流、消费组和消费者的状态,从而监控和优化 Redis Stream 应用程序的性能。在处理问题或分析系统性能时,这些信息可能特别有用。
当用户不再需要某个消费者的时候,可以通过执行以下命令将其删除,命令格式:
XGROUP DELCONSUMER stream group consumer
# 删除 myconsumer 消费者XGROUP DELCONSUMER mystream mygroup myconsumer(integer) 1
# 删除 myconsumer 消费者
XGROUP DELCONSUMER mystream mygroup myconsumer
当你不需要消费组时,可以通过以下命令删除它,命令格式:
XGROUP DESTROY stream group
# 删除 mygroup 消费组XGROUP DESTROY mystream mygroup(integer) 1
# 删除 mygroup 消费组
XGROUP DESTROY mystream mygroup
以下是本篇文章涉及的 Redis Stream 命令命令和简要总结:
这些命令提供了对 Redis Stream 的全面操作支持,包括添加、删除、读取、修剪消息以及管理消费组和消费者。通过熟练使用这些命令,您可以实现高效且可扩展的消息传递和日志处理系统。edis Stream 是 Redis 提供的一种强大、持久且可扩展的数据结构,用于实现消息传递和日志处理等场景。Stream 数据结构类似于日志文件,消息以有序的方式存储在流中,同时还支持消费组的概念,允许多个消费者并行处理消息。
原文链接:https://www.cnblogs.com/xiao2shiqi/p/17324547.html
本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728