Redis发布订阅和Stream

一、前言

发布订单系统是日常开发中经常会用到的功能。简单来说,就是发布者发布消息,订阅者就会接受到消息并进行相应的处理,如下图所示。

消息队列.jpg

二、发布/订阅

Redis为我们提供了发布/订阅的功能模块PubSub,可以用于消息传递。

旅游行程制定流程图.jpg

其中发布者publisher、订阅者subscriber都是redis客户端,channel则是redis服务器。

发布者publisher向channel发送消息,订阅该channel的subscriber就会接收到消息。

2.1 常用命令

2.1.1 订阅频道subscribe

127.0.0.1:6379> subscribe test1 test2
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "test1"
3) (integer) 1
1) "subscribe"
2) "test2"
3) (integer) 2</pre>

发布消息publish

127.0.0.1:6379> publish test1 hello
(integer) 1
127.0.0.1:6379> publish test2 world
(integer) 1

订阅test1、test2的客户端会收到消息

1) "message"
2) "test1"
3) "hello"
1) "message"
2) "test2"
3) "world"

2.1.2 订阅模式psubscribe

按照上述这种方式,如果订阅者subscriber想要订阅多个channel则需要同时指定多个channel的名称,redis为了解决这个问题提供psubscribe模式匹配这种订阅方式,可以通过通配符的方式匹配频道。

psubscribe.jpg
127.0.0.1:6379> psubscribe ch*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "ch*"
3) (integer) 1

发布消息

127.0.0.1:6379> publish cha hello
(integer) 1
127.0.0.1:6379> publish china world
(integer) 1

之前订阅ch*的客户端就会收到cha频道和china频道的消息,这样就一次性订阅多个频道

1) "pmessage"
2) "ch*"
3) "cha"
4) "hello"
1) "pmessage"
2) "ch*"
3) "china"
4) "world"

2.2 实现原理

redis服务端存储了订阅频道/模式的客户端列表

struct redisServer { 
     ... 
     dict *pubsub_channels; // redis服务端进程中维护的订阅频道的客户端信息,key就是channel,value就是客户端列表
     list *pubsub_patterns; //redis server进程中维护的pattern; 
     ... 
};

相当于如果客户端订阅一个频道,那么服务端的pubsub_channels就会存储一条数据,pubsub_channels其实是一个链表,key对应channel,value对应客户端列表,根据key订阅的频道,就可以找到订阅该频道的所有客户端。

同时如果客户端订阅一个模式pubsub_patterns也会新增一条数据,记录当前客户端订阅的模式,pubsub_patterns也有自己的数据结构,其中就包含了客户端以及模式。

typedef struct pubsubPattern {
    client *client; // 客户端
    robj *pattern; // 模式
} pubsubPattern;

当发布者向某个频道发布消息时,就会遍历pubsub_channels找到订阅该频道的客户端列表,依次向这些客户端发送消息。

然后遍历pubsub_patterns找到符合当前频道的模式,同时找到模式对应的客户端,然后向客户端发送消息。

三、Stream

虽然Redis提供了发布/订阅的功能,但是并不完善,导致基本没有合适的场景能够使用。

PubSub缺点:

  • 订阅者如果部署多个节点,会出现重复消息的情况。

  • 没有ack机制,消息容易发生丢失。如果在订阅消息的期间有消费者宕机了,那么后续他重连之后也无法接收到宕机这段时间内发布的消息了。

  • 消息不会持久化。一旦Redis服务端宕机了,所有的消息都会丢失。

直到Redis5.0出现之后,出现了Stream这种数据结构,才终于完善了Redis的消息机制

Stream实际上就是一个消息列表,只是他几乎实现了消息队列所需要的所有功能,包括:

  • 消息ID的序列化生成

  • 消息遍历

  • 消息的阻塞和非阻塞读取

  • 消息的分组消费

  • 未完成消息的处理

  • 消息队列监控

同时需要注意的是Stream只是一个数据结构,他不会主动把消息推送给消费者,需要消费者主动来消费数据

每个Stream都有唯一的名称,它就是Redis的key,首次使用 xadd 指令追加消息时自动创建。

常见操作命令如下表:

命令名称 命令格式 描述
xadd xadd key id<*> field1 value1 将指定消息追加到指定队列(key)中,*表示自动生成id(当前时间+序列号)
xread xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] 从消息队列中读取,COUNT:读取条数,BLOCK:阻塞读(默认不阻塞),key:队列名称,id:消息ID(起始ID)
xrange xrange key start end [COUNT] 读取队列中给定ID范围的消息,COUNT:返回消息条数(消息id从小到大)
xrevrange xrevrange key start end [COUNT] 读取队列中给定ID范围的消息,COUNT:返回消息条数(消息id从大到小)
xdel xdel key id 删除队列的消息
xgroup create xgroup create key groupname id 创建一个新的消费组
xgroup destroy xgroup destroy key groupname 删除指定消费组
xgroup delconsumer xgroup delconsumer key groupname cname 删除指定消费组中的指定消费者
xgroup setid xgroup setid key id 修改指定消息的最大id
xreadgroup xreadgroup group groupname consumer COUNT streams key 消费消费组的数据(consumer不存在则创建)

3.1 使用示例

3.1.1 新增、读取消息

// 新增消息 队列名:mq 数据:score=100
127.0.0.1:6379> xadd mq * score 100 
"1627225715999-0"
127.0.0.1:6379> xadd mq * score 80
"1627225761166-0"

// 读取一条数据
127.0.0.1:6379> xread COUNT 1 STREAMS mq 0 
1) 1) "mq"
 2) 1) 1) "1627225715999-0"
 2) 1) "score"
 2) "100"

// 获取mq从小到大的所有数据
127.0.0.1:6379> xrange mq - + 
1) 1) "1627225715999-0"
 2) 1) "score"
 2) "100"
2) 1) "1627225761166-0"
 2) 1) "score"
 2) "80"

// 获取指定id范围的消息
127.0.0.1:6379> xrange mq 1627225761166-0 1627225761166-0 
1) 1) "1627225761166-0"
 2) 1) "score"
 2) "80"

如果客户端希望知道自身消费到第几条数据了,那么就需要记录一下当前消费的消息ID,下次再次消费的时候就从上次消费的消息ID开始读取数据即可。

3.2 消费组

消费组中多了一个游标last_delivered_id,表示当前消费到了哪一条数据。同时所有的数据都是待处理消息(PEL),只有消费者处理完毕之后使用ack指令告知redis服务器,数据才会从PEL中移除,确认后的消息就无法再次消费。

// 查看当前消息队列中有多少数据
127.0.0.1:6379> xrange mq - +
1) 1) "1627225715999-0"
 2) 1) "score"
 2) "100"
2) 1) "1627226969615-0"
 2) 1) "score"
 2) "70"

// 消息队列mq创建消费组mqGroup
127.0.0.1:6379> xgroup create mq mqGroup 0
OK

// 消费消费组mqGroup中的一条数据,>的意思是指从当前消费组的游标last_delivered_id开始读取
// 读取过后消费id会递增,也就是下次读取的时候就会是下一跳数据
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq > 
1) 1) "mq"
 2) 1) 1) "1627225715999-0"
 2) 1) "score"
 2) "100"
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq > 
1) 1) "mq"
 2) 1) 1) "1627226969615-0"
 2) 1) "score"
 2) "70"
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq > 
(nil)

// 查看当前消息队列对应的消费组的情况
127.0.0.1:6379> xinfo groups mq
1) 1) "name"
 2) "mqGroup"  // 消费组的名称
 3) "consumers"
 4) (integer) 1 // 消费者的数量
 5) "pending"
 6) (integer) 2  // 待处理的数据数量,如果仅仅只是读取了数据,但是没有告知redis,那么数据就依旧处于待处理状态
 7) "last-delivered-id"
 8) "1627226969615-0" // 当前已经读取到的消息ID

// ack确认指定消息,返回的数值就是确认的数量
127.0.0.1:6379> xack mq mqGroup 1627226969615-0
(integer) 1

// 查看状态
127.0.0.1:6379> xinfo groups mq
1) 1) "name"
 2) "mqGroup"
 3) "consumers"
 4) (integer) 1
 5) "pending"
 6) (integer) 1  // 待处理的消息数量从2变成了1
 7) "last-delivered-id"
 8) "1627226969615-0"

// 已经确认过的消息,就不能再次消费了
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq 1627225715999-0
1) 1) "mq"
 2) (empty list or set)

// 消息列表中的消息数量变少了
127.0.0.1:6379> xreadgroup GROUP mqGroup consumer1 COUNT 1 STREAMS mq 0
1) 1) "mq"
 2) 1) 1) "1627225715999-0"
 2) 1) "score"
 2) "100"

3.2 消息队列过长

如果接收到的消息比较多,为了避免Stream过长,可以选择指定Stream的最大长度,一旦到达了最大长度,就会从最早的消息开始清除,保证Stream中最新的消息。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351

推荐阅读更多精彩内容

  • 发布订阅 发布/ 订阅系统 是 Web 系统中比较常用的一个功能。简单点说就是 发布者发布消息,订阅者接受消息。虽...
    kyo1992阅读 616评论 0 0
  • 综述 redis的消息是靠发布订阅实现的。 但这个消息和消息中间件不太一样,比消息中间件简易得多。没有队列,生产者...
    范柏柏阅读 196评论 0 1
  • [TOC] 1. 介绍 在软件架构中,发布-订阅是一种消息范式,消息的发送者(称为发布者)不会将消息直接发送给特定...
    为爱疯狂_3850阅读 315评论 0 0
  • 一、Redis的发布和订阅 Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订...
    AaronSimon阅读 463评论 0 1
  • 表情是什么,我认为表情就是表现出来的情绪。表情可以传达很多信息。高兴了当然就笑了,难过就哭了。两者是相互影响密不可...
    Persistenc_6aea阅读 124,639评论 2 7