Redis做消息队列

要使用Redis实现消息队列,需要了解消息队列都要满足哪些特性,消息队列在存取消息时,必须要满足消息的消费顺序不能乱,不能重复处理同一个消息,不能漏发消息,对应的我们可以分别称其为消息保序、消息幂等处理和消息可靠性保证。

基于 List 的消息队列解决方案

List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了,具体来说就是一个push,pop的操作,如下图所示:


image.png

这里生产者有消息写入时,不会通知消费者,为了解决这个问题,Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。
第一个保序的问题解决了,还有重复消息处理的问题,这里消息队列要能给每一个消息提供全局唯一的 ID 号;另一方面,消费者程序要把已经处理过的消息的 ID 号记录下来。
这里消息的全局唯一 ID 号就需要生产者程序在发送消息前自行生成。

LPUSH mq "101030001:stock:5"      //将ID为101030001,库存量为5的消息插入消息队列
(integer) 1

最后,List 类型是如何保证消息可靠性的呢,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份 List)留存。
这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。如下图所示:


image.png

基于 Streams 的消息队列解决方案

使用List做消息队列,我们还可能遇到过一个问题,生产者消息发送很快,而消费者处理消息的速度比较慢,这就导致 List 中的消息越积越多,给 Redis 的内存带来很大压力。
Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。

XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
XREAD:用于读取消息,可以按 ID 读取数据;
XREADGROUP:按消费组形式读取消息;
XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成。

XADD 命令可以往消息队列中插入新消息,消息的格式是键 - 值对形式。

XADD mqstream * repo 5   //往名称为mqstream的消息队列中插入一条消息,消息键为repo,值为5,带*的标识表示为这个消息队列自动生成一个全局唯一ID
"1599203861727-0"     // 全局唯一ID,前面的1599203861727标识生成时间,后面的0表示1599203861727毫秒这个时间的第一条消息

XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取。

XREAD BLOCK 100 STREAMS  mqstream 1599203861727-0    //这里设置了block参数,100表示阻塞时长,这里表示读取1599203861727-0后续消息,一共读取了三条
1) 1) "mqstream"
   2) 1) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      2) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      3) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

Streams 本身可以使用 XGROUP 创建消费组,创建消费组之后,Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。

XGROUP create mqstream group1 0    //创建一个名为group1的消费组,消费组中有消息队列mqstream
OK

//让group1中的消费者consumer1从mastream中读取所有消息,参数“>”,表示从第一条尚未被消费的消息开始读取

XREADGROUP group group1 consumer1 streams mqstream >
1) 1) "mqstream"
   2) 1) 1) "1599203861727-0"
         2) 1) "repo"
            2) "5"
      2) 1) "1599274912765-0"
         2) 1) "repo"
            2) "3"
      3) 1) "1599274925823-0"
         2) 1) "repo"
            2) "2"
      4) 1) "1599274927910-0"
         2) 1) "repo"
            2) "1"

使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。

XPENDING mqstream group2    //查看group2中已经读取但是还没确认的消息个数,这里是3个
1) (integer) 3
2) "1599203861727-0"    //读取消息的最小ID
3) "1599274925823-0"    //读取消息的最大ID
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"

一旦消息 1599274912765-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。

 XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)

最后我们总结一下基于List和基于Streams实现消息队列的特点,如下图所示:


image.png

这里要注意,Streams是Redis5.0的版本加入的,也就是要在5.0以上的版本才能使用它。

Redis做消息队列的使用场景

Redis 是一个非常轻量级的键值数据库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。而 Kafka、RabbitMQ 部署时,涉及额外的组件,例如 Kafka 的运行就需要再部署 ZooKeeper。相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,997评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,603评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,359评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,309评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,346评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,258评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,122评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,970评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,403评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,596评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,769评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,464评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,075评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,705评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,848评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,831评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,678评论 2 354

推荐阅读更多精彩内容

  • 在工作中,我们经常会使用队列,在Python中也有原生队列,但是原生的队列是存储在内存中,当重启系统后队列中的数据...
    0Ver1阅读 528评论 0 1
  • 消息队列的需求 1.消息保序 虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避...
    扮鬼之梦阅读 2,089评论 0 2
  • 夜莺2517阅读 127,719评论 1 9
  • 版本:ios 1.2.1 亮点: 1.app角标可以实时更新天气温度或选择空气质量,建议处女座就不要选了,不然老想...
    我就是沉沉阅读 6,889评论 1 6
  • 我是一名过去式的高三狗,很可悲,在这三年里我没有恋爱,看着同龄的小伙伴们一对儿一对儿的,我的心不好受。怎么说呢,高...
    小娘纸阅读 3,387评论 4 7