一、前言
发布订单系统是日常开发中经常会用到的功能。简单来说,就是发布者发布消息,订阅者就会接受到消息并进行相应的处理,如下图所示。
二、发布/订阅
Redis为我们提供了发布/订阅的功能模块PubSub,可以用于消息传递。
其中发布者publisher、订阅者subscriber都是redis客户端,channel则是redis服务器。
发布者publisher向channel发送消息,订阅该channel的subscriber就会接收到消息。
2.1 常用命令
2.1.1 订阅频道subscribe
127.0.0.1:6379> subscribe test1 test2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "test1"
3) (integer) 1
1) "subscribe"
2) "test2"
3) (integer) 2</pre>
发布消息publish
127.0.0.1:6379> publish test1 hello
(integer) 1
127.0.0.1:6379> publish test2 world
(integer) 1
订阅test1、test2的客户端会收到消息
1) "message"
2) "test1"
3) "hello"
1) "message"
2) "test2"
3) "world"
2.1.2 订阅模式psubscribe
按照上述这种方式,如果订阅者subscriber想要订阅多个channel则需要同时指定多个channel的名称,redis为了解决这个问题提供psubscribe模式匹配这种订阅方式,可以通过通配符的方式匹配频道。
127.0.0.1:6379> psubscribe ch*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "ch*"
3) (integer) 1
发布消息
127.0.0.1:6379> publish cha hello
(integer) 1
127.0.0.1:6379> publish china world
(integer) 1
之前订阅ch*的客户端就会收到cha频道和china频道的消息,这样就一次性订阅多个频道
1) "pmessage"
2) "ch*"
3) "cha"
4) "hello"
1) "pmessage"
2) "ch*"
3) "china"
4) "world"
2.2 实现原理
redis服务端存储了订阅频道/模式的客户端列表
struct redisServer {
...
dict *pubsub_channels; // redis服务端进程中维护的订阅频道的客户端信息,key就是channel,value就是客户端列表
list *pubsub_patterns; //redis server进程中维护的pattern;
...
};
相当于如果客户端订阅一个频道,那么服务端的pubsub_channels就会存储一条数据,pubsub_channels其实是一个链表,key对应channel,value对应客户端列表,根据key订阅的频道,就可以找到订阅该频道的所有客户端。
同时如果客户端订阅一个模式,pubsub_patterns也会新增一条数据,记录当前客户端订阅的模式,pubsub_patterns也有自己的数据结构,其中就包含了客户端以及模式。
typedef struct pubsubPattern {
client *client; // 客户端
robj *pattern; // 模式
} pubsubPattern;
当发布者向某个频道发布消息时,就会遍历pubsub_channels找到订阅该频道的客户端列表,依次向这些客户端发送消息。
然后遍历pubsub_patterns找到符合当前频道的模式,同时找到模式对应的客户端,然后向客户端发送消息。
三、Stream
虽然Redis提供了发布/订阅的功能,但是并不完善,导致基本没有合适的场景能够使用。
PubSub缺点:
订阅者如果部署多个节点,会出现重复消息的情况。
没有ack机制,消息容易发生丢失。如果在订阅消息的期间有消费者宕机了,那么后续他重连之后也无法接收到宕机这段时间内发布的消息了。
消息不会持久化。一旦Redis服务端宕机了,所有的消息都会丢失。
直到Redis5.0出现之后,出现了Stream这种数据结构,才终于完善了Redis的消息机制。
Stream实际上就是一个消息列表,只是他几乎实现了消息队列所需要的所有功能,包括:
消息ID的序列化生成
消息遍历
消息的阻塞和非阻塞读取
消息的分组消费
未完成消息的处理
消息队列监控
同时需要注意的是Stream只是一个数据结构,他不会主动把消息推送给消费者,需要消费者主动来消费数据。
每个Stream都有唯一的名称,它就是Redis的key,首次使用 xadd 指令追加消息时自动创建。
常见操作命令如下表:
命令名称 | 命令格式 | 描述 |
---|---|---|
xadd | xadd key id<*> field1 value1 | 将指定消息追加到指定队列(key)中,*表示自动生成id(当前时间+序列号) |
xread | xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] | 从消息队列中读取,COUNT:读取条数,BLOCK:阻塞读(默认不阻塞),key:队列名称,id:消息ID(起始ID) |
xrange | xrange key start end [COUNT] | 读取队列中给定ID范围的消息,COUNT:返回消息条数(消息id从小到大) |
xrevrange | xrevrange key start end [COUNT] | 读取队列中给定ID范围的消息,COUNT:返回消息条数(消息id从大到小) |
xdel | xdel key id | 删除队列的消息 |
xgroup create | xgroup create key groupname id | 创建一个新的消费组 |
xgroup destroy | xgroup destroy key groupname | 删除指定消费组 |
xgroup delconsumer | xgroup delconsumer key groupname cname | 删除指定消费组中的指定消费者 |
xgroup setid | xgroup setid key id | 修改指定消息的最大id |
xreadgroup | xreadgroup group groupname consumer COUNT streams key | 消费消费组的数据(consumer不存在则创建) |
3.1 使用示例
3.1.1 新增、读取消息
// 新增消息 队列名:mq 数据:score=100
127.0.0.1:6379> xadd mq * score 100
"1627225715999-0"
127.0.0.1:6379> xadd mq * score 80
"1627225761166-0"
// 读取一条数据
127.0.0.1:6379> xread COUNT 1 STREAMS mq 0
1) 1) "mq"
2) 1) 1) "1627225715999-0"
2) 1) "score"
2) "100"
// 获取mq从小到大的所有数据
127.0.0.1:6379> xrange mq - +
1) 1) "1627225715999-0"
2) 1) "score"
2) "100"
2) 1) "1627225761166-0"
2) 1) "score"
2) "80"
// 获取指定id范围的消息
127.0.0.1:6379> xrange mq 1627225761166-0 1627225761166-0
1) 1) "1627225761166-0"
2) 1) "score"
2) "80"
如果客户端希望知道自身消费到第几条数据了,那么就需要记录一下当前消费的消息ID,下次再次消费的时候就从上次消费的消息ID开始读取数据即可。
3.2 消费组
消费组中多了一个游标last_delivered_id,表示当前消费到了哪一条数据。同时所有的数据都是待处理消息(PEL),只有消费者处理完毕之后使用ack指令告知redis服务器,数据才会从PEL中移除,确认后的消息就无法再次消费。
// 查看当前消息队列中有多少数据
127.0.0.1:6379> xrange mq - +
1) 1) "1627225715999-0"
2) 1) "score"
2) "100"
2) 1) "1627226969615-0"
2) 1) "score"
2) "70"
// 消息队列mq创建消费组mqGroup
127.0.0.1:6379> xgroup create mq mqGroup 0
OK
// 消费消费组mqGroup中的一条数据,>的意思是指从当前消费组的游标last_delivered_id开始读取
// 读取过后消费id会递增,也就是下次读取的时候就会是下一跳数据
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq >
1) 1) "mq"
2) 1) 1) "1627225715999-0"
2) 1) "score"
2) "100"
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq >
1) 1) "mq"
2) 1) 1) "1627226969615-0"
2) 1) "score"
2) "70"
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq >
(nil)
// 查看当前消息队列对应的消费组的情况
127.0.0.1:6379> xinfo groups mq
1) 1) "name"
2) "mqGroup" // 消费组的名称
3) "consumers"
4) (integer) 1 // 消费者的数量
5) "pending"
6) (integer) 2 // 待处理的数据数量,如果仅仅只是读取了数据,但是没有告知redis,那么数据就依旧处于待处理状态
7) "last-delivered-id"
8) "1627226969615-0" // 当前已经读取到的消息ID
// ack确认指定消息,返回的数值就是确认的数量
127.0.0.1:6379> xack mq mqGroup 1627226969615-0
(integer) 1
// 查看状态
127.0.0.1:6379> xinfo groups mq
1) 1) "name"
2) "mqGroup"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 1 // 待处理的消息数量从2变成了1
7) "last-delivered-id"
8) "1627226969615-0"
// 已经确认过的消息,就不能再次消费了
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq 1627225715999-0
1) 1) "mq"
2) (empty list or set)
// 消息列表中的消息数量变少了
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq 0
1) 1) "mq"
2) 1) 1) "1627225715999-0"
2) 1) "score"
2) "100"
3.2 消息队列过长
如果接收到的消息比较多,为了避免Stream过长,可以选择指定Stream的最大长度,一旦到达了最大长度,就会从最早的消息开始清除,保证Stream中最新的消息。