Reids 实现消息队列存在几种方式:
- Redis的列表(lists)数据结构
- Redis自带的PUB/SUB机制,即发布-订阅模式
- Stream流结构
本文主要介绍通过 Stream
流的方式
1. 生产&消费
使用stream进行消费有两种情况:
- 只有一个消费者的独立消费
- 多消费者的消费者组
1.1. 独立消费
类似于List,生产者往list中写数据,消费者从list中读数据,只能有一个消费者。
生产消息
redis
1
XADD "test" * num 1
python
1
r.xadd("test", {"num": i})
消费消息
阻塞模式
1
2
3xread count 1 streams "test" 0-0 # 从头开始读一条
xread count 1 streams "test" 1562980142175-0 # 从某个ID开始
xread count 1 streams "test" $ # 从最后一条消息开始,此时(非阻塞模式)没有意义,因为最后一条信息的后面永远为空非阻塞模式
1
2XREAD block 1000 streams memberMessage $ # 阻塞模式读取最后一条消息 (如果为空则等待1秒)
XREAD block 0 streams memberMessage $ # 如果为空则一直等待
案例 python代码实现
生产者
1 | import redis |
消费者
消费后并不会删除队列中的消息。如果想连续消费,需要不断传入 ID 值
1 | import redis |
1.2. 多消费者
此时存在两个概念,需要区分开。
消费组
一个消费队列可以有多个消费组,消费组之间互不影响. 每个消费组(Consumer Group)的状态都是独立的,也就是说同一份Stream内部的消息会被每个消费组都消费到。
消费者
一个消费组中可以存在多个消费者,这些消费者将共同消费此时的队列。这些消费者之间是竞争关系。
上图是消费者组的简单流程:
- 创建消费组
- 创建消费者 并使用
XREADGROUP GROUP ... >
进行消费。此时消费的消息进入 pending 队列中。 - 当向服务器发送 每条消息的ACK 时,此消息被认为成功消费,由 pending 删除。
创建消费组
1 | xgroup create "test" test_g1 0-0 # 0-0 从头开始消费 |
1 | g1 = r.xgroup_create("test", "test_g1", 0) |
创建消费者 & 消费
1 | XREADGROUP GROUP test_g1 consumer_1 count 1 streams "test" > |
1 | a = r.xreadgroup("test_g1", "consumer_1", {"test": ">"}, 1, block=0) |
⚠
test_g1
是刚才创建的消费组,consumer_1
是新建的消费者>
表示从last_delivered_id
开始消费,每当消费者读取一条消息,last_delivered_id
变量就会前进.如果使用ID是任何其他有效的数字ID,则该命令将允许我们访问没有被ACK的消息(即处于pending中的消息)。
pendding队列
为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM
设计了 Pending
列表,用于记录读取但并未处理完毕的消息。
消费者首先根据 last_delivered_id
读取消息,然后将消息放到 pendding 队列中。当用户提交 ACK 时,表示该消息完全处理完,将pendding 中的消息删除。
当 XREADGROUP GROUP test_g1 consumer_1 count 1 streams "test" ID
使用ID时,将会从pendding中读取内容。
XACK
XACK将消息标记为已处理,其他的消费者将不会在处理该消息。同时将pending中的内容删除。但是此消息仍然保留在redis中。
案例
生产者
1 | import redis |
消费者
先遍历pending中内容,然后遍历流中的内容
consumer1 https://gist.github.com/f8fea383d367df1eb25bb359df2d17e5
consumer2 https://gist.github.com/f8fea383d367df1eb25bb359df2d17e5
探讨ack与last_delievered_id
创建一个组
xgroup create consumer g1 0-0
查看以下pending
1
2
3
4
5
6
7
8
9
10xinfo groups consumer
1) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0 # 此时为0
7) "last-delivered-id"
8) "0-0"消费2条消息
XREADGROUP GROUP g1 c1 count 1 STREAMS consumer >
再次查看pending
1
2
3
4
5
6
7
81) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 2 # 为2了
7) "last-delivered-id"
8) "1592227256475-0"重置
last_delivered_id
1
xgroup setid consumer g1 0
查看pendign信息
1
2
3
4
5
6
7
81) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 2 # pengding 仍然为2
7) "last-delivered-id"
8) "0-0" # last-delivered-id 已经变成0重新消费
1
2
3
4
5
6
7
8
9
10
11XREADGROUP GROUP g1 c1 count 1 STREAMS consumer >
1) 1) "consumer"
2) 1) 1) "1592227256474-0"
2) 1) "num"
2) "0"
XREADGROUP GROUP g1 c1 count 1 STREAMS consumer >
1) 1) "consumer"
2) 1) 1) "1592227256475-0"
2) 1) "num"
2) "1"查看队列
1
2
3
4
5
6
7
81) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 2 # 仍然为2
7) "last-delivered-id"
8) "1592227256475-0"查看pending中的内容
1
2
3
4
5
6
7
8
9
10
11
12
13XREADGROUP group g1 c1 count 1 STREAMS consumer 0-0
1) 1) "consumer"
2) 1) 1) "1592227256474-0"
2) 1) "num"
2) "0"
XREADGROUP group g1 c1 count 1 STREAMS consumer 1592227256474-0
1) 1) "consumer"
2) 1) 1) "1592227256475-0"
2) 1) "num"
2) "1"
XREADGROUP group g1 c1 count 1 STREAMS consumer 1592227256475-0
1) 1) "consumer"
2) (empty list or set)ack num 0
1
xack consumer g1 1592227256474-0
查看pending
1
2
3
4
5
6
7
81) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 1 # 只剩1个了
7) "last-delivered-id"
8) "1592227256475-0"重置
last_delivered_id
1
xgroup setid consumer g1 0
消费消息
1
2
3
4
5
6XREADGROUP GROUP g1 c1 count 1 STREAMS consumer >
1) 1) "consumer"
2) 1) 1) "1592227256474-0"
2) 1) "num"
2) "0" # 被重新消费查看队列
1
2
3
4
5
6
7
81) 1) "name"
2) "g1"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 2 # 又变成了2,所以通过重置 last_delivered_id 。可以重新消费已经ack的内容
7) "last-delivered-id"
8) "1592227256474-0"
2. 命令速查
2.1. XADD
生产消息
1 | XADD <stream_name> * <key> <value> <key> <value> |
1 | r.xadd("test", {"num": i}) |
2.2. XREAD
非阻塞模式
1 | xread count 1 streams "test" 0-0 # 从头开始读一条 |
阻塞模式
1 | XREAD block 1000 streams memberMessage $ # 阻塞模式读取最后一条消息 (如果为空则等待1秒) |
python
1 | print(r.xread({"test": "0-0"}, count=1, block=0)) |
2.3. XINFO
查看当前流的信息
1 | XINFO STREAM <stream_name> |
1 | r.xinfo_stream("test") |
查看当流的组的信息
1 | XINFO GROUPS <stream_name> |
1 | r.xinfo_groups("test") |
查看流中组的消费者信息
1 | XINFO CONSUMERS <流> <组> |
1 | r.xinfo_consumers("test","test") |
2.4. XRANGE
遍历消息
1 | xrange "test" # 查询所有消息 |
1 | print(r.xrange("test", min="-", max="+", count=2)) |
2.5. XGROUP
创建消费者组
1 | xgroup create "test" test_g1 0-0 # 0-0 从头开始消费 |
1 | g1 = r.xgroup_create("test", "test_g1", 0) |
删除组
1 | XGROUP DESTROY <stream> <some-consumer-group> |
1 | r.xgroup_destroy(STREAM_NAME,GROUP_NAME) |
删除消费者
1 | XGROUP DELCONSUMER <stream> <consumer-group-name> <consumer-name> |
1 | r.xgroup_delconsumer(STREAM_NAME, GROUP_NAME, CONSUMER_NAME) |
设置 last_delivered_id
1 | XGROUP SETID <stream> <group> <id> |
1 | r.xgroup_setid("test","g1","0-0") |
2.6. XREADGROUP
创建消费者并消费
1 | XREADGROUP GROUP test_g1 consumer_1 count 1 STREAMS "test" > |
1 | a = r.xreadgroup("test_g1", "consumer_1", {"test": ">"}, 1, block=0) |
2.7. XACK
确认消息
1 | xack <stream-name> <group-name> <id> |
2.8. XDEL
删除消息
1 | XDEL <stream_name> <id> |
1 | print(r.xdel("test", "1592201926623-0")) |
2.9. XLEN
查看流的长度
1 | xlen <stream_name> |
1 | print(r.xlen("test")) |
2.10. XTRIM
设置流的最大长度
依据先进先出的原则,自动删除超出最长长度的消息
1 | XTRIM mystream MAXLEN 1000 |