0%

Redis 消息队列

Reids 实现消息队列存在几种方式:

  • Redis的列表(lists)数据结构
  • Redis自带的PUB/SUB机制,即发布-订阅模式
  • Stream流结构

本文主要介绍通过 Stream 流的方式

1. 生产&消费

使用stream进行消费有两种情况:

  • 只有一个消费者的独立消费
  • 多消费者的消费者组

1.1. 独立消费

1

类似于List,生产者往list中写数据,消费者从list中读数据,只能有一个消费者。

生产消息

  • redis

    1
    XADD "test" * num 1
  • python

    1
    r.xadd("test", {"num": i})

消费消息

  • 阻塞模式

    1
    2
    3
    xread count 1 streams "test" 0-0 # 从头开始读一条
    xread count 1 streams "test" 1562980142175-0 # 从某个ID开始
    xread count 1 streams "test" $ # 从最后一条消息开始,此时(非阻塞模式)没有意义,因为最后一条信息的后面永远为空
  • 非阻塞模式

    1
    2
    XREAD block 1000 streams memberMessage $ # 阻塞模式读取最后一条消息 (如果为空则等待1秒)
    XREAD block 0 streams memberMessage $ # 如果为空则一直等待

案例 python代码实现

生产者

1
2
3
4
5
6
7
import redis

pool = redis.ConnectionPool(host='***', port=6379, password='', decode_responses=True)
r = redis.Redis(connection_pool=pool)

for i in range(0, 100):
r.xadd("consumer", {"num": i}, id="*")

消费者

消费后并不会删除队列中的消息。如果想连续消费,需要不断传入 ID 值

1
2
3
4
5
6
7
8
9
10
11
12
13
import redis

pool = redis.ConnectionPool(host='***', port=6379, password='', decode_responses=True)
r = redis.Redis(connection_pool=pool)

STREAM_NAME = "consumer"

next_id = "0-0"
while True:
data = r.xread({STREAM_NAME: next_id}, count=1, block=0)
print(data)
# if data[0][1]:
next_id = data[0][1][0][0]

1.2. 多消费者

此时存在两个概念,需要区分开。

  • 消费组

    一个消费队列可以有多个消费组,消费组之间互不影响. 每个消费组(Consumer Group)的状态都是独立的,也就是说同一份Stream内部的消息会被每个消费组都消费到。

  • 消费者

    一个消费组中可以存在多个消费者,这些消费者将共同消费此时的队列。这些消费者之间是竞争关系

2

上图是消费者组的简单流程:

  1. 创建消费组
  2. 创建消费者 并使用 XREADGROUP GROUP ... > 进行消费。此时消费的消息进入 pending 队列中。
  3. 当向服务器发送 每条消息的ACK 时,此消息被认为成功消费,由 pending 删除。

创建消费组

1
xgroup create "test" test_g1 0-0 # 0-0 从头开始消费
1
2
g1 = r.xgroup_create("test", "test_g1", 0)
print(g1)

创建消费者 & 消费

1
XREADGROUP GROUP test_g1 consumer_1 count 1 streams "test" >
1
a = r.xreadgroup("test_g1", "consumer_1", {"test": ">"}, 1, block=0)

test_g1 是刚才创建的消费组,consumer_1 是新建的消费者 > 表示从 last_delivered_id 开始消费,每当消费者读取一条消息,last_delivered_id变量就会前进.

如果使用ID是任何其他有效的数字ID,则该命令将允许我们访问没有被ACK的消息(即处于pending中的消息)。

pendding队列

为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,STREAM 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。

消费者首先根据 last_delivered_id 读取消息,然后将消息放到 pendding 队列中。当用户提交 ACK 时,表示该消息完全处理完,将pendding 中的消息删除。

XREADGROUP GROUP test_g1 consumer_1 count 1 streams "test" ID 使用ID时,将会从pendding中读取内容。

XACK

XACK将消息标记为已处理,其他的消费者将不会在处理该消息。同时将pending中的内容删除。但是此消息仍然保留在redis中。

案例

生产者

1
2
3
4
5
6
7
import redis

pool = redis.ConnectionPool(host='192.168.137.3', port=6379, password='', decode_responses=True)
r = redis.Redis(connection_pool=pool)

for i in range(0, 100):
r.xadd("consumer", {"num": i}, id="*")

消费者

先遍历pending中内容,然后遍历流中的内容

consumer1 https://gist.github.com/f8fea383d367df1eb25bb359df2d17e5
consumer2 https://gist.github.com/f8fea383d367df1eb25bb359df2d17e5

探讨ack与last_delievered_id

  1. 创建一个组 xgroup create consumer g1 0-0

  2. 查看以下pending

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    xinfo groups consumer

    1) 1) "name"
    2) "g1"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0 # 此时为0
    7) "last-delivered-id"
    8) "0-0"
  3. 消费2条消息 XREADGROUP GROUP g1 c1 count 1 STREAMS consumer >

  4. 再次查看pending

    1
    2
    3
    4
    5
    6
    7
    8
    1) 1) "name"
    2) "g1"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 2 # 为2了
    7) "last-delivered-id"
    8) "1592227256475-0"
  5. 重置 last_delivered_id

    1
    xgroup setid consumer g1 0
  6. 查看pendign信息

    1
    2
    3
    4
    5
    6
    7
    8
    1) 1) "name"
    2) "g1"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 2 # pengding 仍然为2
    7) "last-delivered-id"
    8) "0-0" # last-delivered-id 已经变成0
  7. 重新消费

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    XREADGROUP GROUP g1 c1 count 1 STREAMS consumer >
    1) 1) "consumer"
    2) 1) 1) "1592227256474-0"
    2) 1) "num"
    2) "0"

    XREADGROUP GROUP g1 c1 count 1 STREAMS consumer >
    1) 1) "consumer"
    2) 1) 1) "1592227256475-0"
    2) 1) "num"
    2) "1"
  8. 查看队列

    1
    2
    3
    4
    5
    6
    7
    8
    1) 1) "name"
    2) "g1"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 2 # 仍然为2
    7) "last-delivered-id"
    8) "1592227256475-0"
  9. 查看pending中的内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    XREADGROUP group g1 c1 count 1 STREAMS consumer 0-0
    1) 1) "consumer"
    2) 1) 1) "1592227256474-0"
    2) 1) "num"
    2) "0"
    XREADGROUP group g1 c1 count 1 STREAMS consumer 1592227256474-0
    1) 1) "consumer"
    2) 1) 1) "1592227256475-0"
    2) 1) "num"
    2) "1"
    XREADGROUP group g1 c1 count 1 STREAMS consumer 1592227256475-0
    1) 1) "consumer"
    2) (empty list or set)
  10. ack num 0

    1
    xack consumer g1 1592227256474-0
  11. 查看pending

    1
    2
    3
    4
    5
    6
    7
    8
    1) 1) "name"
    2) "g1"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 1 # 只剩1个了
    7) "last-delivered-id"
    8) "1592227256475-0"
  12. 重置 last_delivered_id

    1
    xgroup setid consumer g1 0
  13. 消费消息

    1
    2
    3
    4
    5
    6
    XREADGROUP GROUP g1 c1 count 1 STREAMS consumer >

    1) 1) "consumer"
    2) 1) 1) "1592227256474-0"
    2) 1) "num"
    2) "0" # 被重新消费
  14. 查看队列

    1
    2
    3
    4
    5
    6
    7
    8
    1) 1) "name"
    2) "g1"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 2 # 又变成了2,所以通过重置 last_delivered_id 。可以重新消费已经ack的内容
    7) "last-delivered-id"
    8) "1592227256474-0"

2. 命令速查

2.1. XADD

生产消息

1
XADD <stream_name> * <key> <value> <key> <value>
1
r.xadd("test", {"num": i})

2.2. XREAD

非阻塞模式

1
2
3
xread count 1 streams "test" 0-0 # 从头开始读一条
xread count 1 streams "test" 1562980142175-0 # 从某个ID开始
xread count 1 streams "test" $ # 从最后一条消息开始,此时(非阻塞模式)没有意义,因为最后一条信息的后面永远为空

阻塞模式

1
2
XREAD block 1000 streams memberMessage $ # 阻塞模式读取最后一条消息 (如果为空则等待1秒)
XREAD block 0 streams memberMessage $ # 如果为空则一直等待

python

1
print(r.xread({"test": "0-0"}, count=1, block=0))

2.3. XINFO

查看当前流的信息

1
XINFO STREAM <stream_name>
1
r.xinfo_stream("test")

查看当流的组的信息

1
XINFO GROUPS <stream_name>
1
r.xinfo_groups("test")

查看流中组的消费者信息

1
XINFO CONSUMERS <流> <组>
1
r.xinfo_consumers("test","test")

2.4. XRANGE

遍历消息

1
2
3
4
xrange "test" # 查询所有消息
xrange "test" - + # -表示最小值, +表示最大值
xrange "test" 1562980142175-0 + # 指定最小消息ID的列表
xrange "test" - 1562980142175-0 # 指定最大消息ID的列表
1
print(r.xrange("test", min="-", max="+", count=2))

2.5. XGROUP

创建消费者组

1
xgroup create "test" test_g1 0-0 # 0-0 从头开始消费
1
2
g1 = r.xgroup_create("test", "test_g1", 0)
print(g1)

删除组

1
XGROUP DESTROY <stream> <some-consumer-group>
1
r.xgroup_destroy(STREAM_NAME,GROUP_NAME)

删除消费者

1
XGROUP DELCONSUMER <stream> <consumer-group-name> <consumer-name>
1
r.xgroup_delconsumer(STREAM_NAME, GROUP_NAME, CONSUMER_NAME)

设置 last_delivered_id

1
XGROUP SETID <stream> <group> <id>
1
r.xgroup_setid("test","g1","0-0")

2.6. XREADGROUP

创建消费者并消费

1
XREADGROUP GROUP test_g1 consumer_1 count 1 STREAMS "test" >
1
a = r.xreadgroup("test_g1", "consumer_1", {"test": ">"}, 1, block=0)

2.7. XACK

确认消息

1
xack <stream-name> <group-name> <id>

2.8. XDEL

删除消息

1
XDEL <stream_name> <id>
1
print(r.xdel("test", "1592201926623-0"))

2.9. XLEN

查看流的长度

1
xlen <stream_name>
1
print(r.xlen("test"))

2.10. XTRIM

设置流的最大长度

依据先进先出的原则,自动删除超出最长长度的消息

1
XTRIM mystream MAXLEN 1000