Redis高级特性——消息队列

消息队列

消息队列的设计方案在开发过程中应用比较广泛,一般情况都是为了实现程序解耦,生产者发送一个消息,消费者或多个客户端消费者去消费消息,完成任务的异步处理。如果有多个消费者,一方面可以提高处理性能,另一方面可以使得消费者程序得容灾。基于这两方面得有点,消息队列当前应用的非常广泛,一般在启用消息队列时都会采用MQ服务中间件,比如RabbitMq、ZeroMQ等等,这些服务是专业的消息服务,在互联网公司应用的非常多,因此服务质量和分布式方案都是比较成熟的。

依据Redis的特点,他也有很多类型可以实现消息队列方案,在之前的项目中也有应用,并且有些场合是MQ无法替代的。同时Redis因为不是一个标准的MQ服务,因此很多方面无法满足需求,比如ACK,丢包的问题无法保证,有这方面的需求还是使用MQ中间件;即使有这方面的问题使用Redis作为消息中间件的团队仍大量存在,基于这些原因,Redis的作者开发了一个新的MQ服务,同Redis类似,使用方式也基本类似Disque文档,之前作者表示未来会将该组件加入到Redis标准组件中,这样的话,未来Redis也可以作为MQ的一个首选项。

在之前的项目中分别使用过列表、有序集合作为简单的消息队列,这里对这几种方式进行简单描述。

列表作为消息队列

列表作为消息队列,是最简单的选择,其特性天然适合队列。只要在一端分别使用LPUSH将信息添加到列表,在另一端使用RPOP消费一个消息即可。采用两个cli模拟列表的消息队列行为

# client 1  producer,对入库的资源ID进行上架操作
LPUSH mymq 1
LPUSH mymq 2
# client 2  consumer
# 获取消息,并消费
RPOP mymq
"1"
# 获取到资源1后,进行上架操作,操作完成后丢弃
# 处理完成后,需要再次获取消息
RPOP mymq
"2"
RPOP mymq
(nil)
# mymq中没有新的消息,消费者客户端,需要不断重复RPOP去检查有没有消息

采用列表作为消息队列,也有一个问题,如上述示例中演示的,消费者每次主动去获取消息,有没有消息没有办法判断,只能不断的读取。对于服务端采用JAVA的话,一般这种任务型的都会采用定期执行的方案,那么这种代码很可能如下所示:

public void runTask() {
    while(true) {
        Object id = redis.rpop(myMqKey);
        if (null == id) {
            // 没有消息了,需要停止,停止多久再次执行,这个时间不太好处理,并且没有什么经验值,因为资源的入库没有规律。
        }
        // 处理上架
        do(id);
    }
}

不断的去检查Redis是否有新消息,虽然不麻烦,但是也在耗费资源,并且是无效的投入。Redis的列表中有一个BRPOP方法,前面章节提到过,这里可以采用这个方法,解决不断检查Redis是否有数据,消耗资源的问题。仍然使用上述示例

# 客户端1,consumer
BRPOP mymq 0        # 0 表示永不过期
# mymq队列没有数据,此时一直阻塞,客户端2生产消息后,客户端1理解消费消息
# 1) "mymq"
# 2) "1"
# (30.82s)
# 等待了30.82秒,mymq出现数据并进行了消费,一次读取消息结束
# 客户端2,producer
LPUSH mymq 1
# (integer) 1

这种方式很好的解决了不断检查耗费资源的问题,此时JAVA代码的更改并不大

public void runTask() {
    while(true) {
        List<Object> ids = redis.brpop(myMqKey, 0);
        // 因为永久等待,只要弹出数据,就不为空
        // 处理上架, 数据在位置1中
        do(ids.get(1));
    }
}

使用有序集合作为消息队列

有序集合作为消息队列,除了能够解决列表中不断检查耗费资源的问题,同时将得分采用时间戳表示时,还能实现消息到期时间消亡或到期必须处理的需求。

在之前的产品开发中,有一个销售课程的子模块,用户下订单后,有一个支付时间的限制。在这个产品中要求用户下订单后15分钟内完成支付,如果15分钟内不完成支付,则取消订单。

这里就采用了有序集合作为队列的支撑,其具体实现流程为

  • 当用户下订单时,将订单号作为值,以订单过期时间15分钟作为得分,加入到有序队列中
  • 完成订单后,引导用户进入支付流程,略
  • 起一个持续的任务,用于处理定单消亡处理流程
  • 获取第一个订单信息,检查其得分是否超时,如果超时则处理订单消亡流程
  • 接收到一个过期订单,处理订单的消亡,由于订单支付过程没有更改队列,因此需要检查订单的状态再进行消亡
  • 如果得到的订单没有过期,则暂停,其时间可以比较准确,就是得分剩余的时间
  • 上述流程中,订单过期称为命中,一旦命中,则继续读取下一个订单,直到队列为空或一个未到期订单为止

发布订阅

Redis提供了发布订阅模式,可以实现异步消息通信。发布订阅模式中,订阅者可以定于一个或多个频道,发布者可以向一个或多个频道发送消息,只要订阅了这些频道的客户端,都能够收到发布的消息。

Redis得发布订阅模式没有持久化消息的操作,如果某一个频道没有客户端订阅,则发布出去的消息就会被丢弃,即使该频道后续有客户端订阅,之前发布出去的消息也无法被再次收到。

发布订阅模式主要由以下指令完成

# 发布消息,将消息message发布到指定的频道
PUBLISH channel message

# 订阅频道,客户端订阅一个或多个频道
SUBSCRIBE channel [channel...]

# 按模式订阅,客户端按照channel得模式进行订阅,类似MQ的queue模式
PSUBSCRIBE pattern [pattern...]

# 退订一个或多个频道,与SUBSCRIBE指令配合使用
UNSUBSCRIBE channel [channel...]

# 按给定的模式退订一个或多个频道,与PSUBSCRIBE指令配合使用
PUNSUBSCRIBE pattern [pattern...]

PUBLISH指令比较简单,就是用于给一个频道发布消息。其返回值表示当前消息被几个订阅者接收

# 向订单处理channel发送一个订单id=1的消息
PUBLISH mymq.order 1
# (integer) 0
# 由于此时没有客户端订阅该频道,因此返回值为0,订单1得消息再不会被接收处理

对于其他几个指令就比较复杂一些,不过SUBSCRIBE等几个关于订阅的指令,其收到的消息格式是类似的

UNSUBSCRIBE my
# 1) "unsubscribe"
# 2) "my"
# 3) (integer) 0

PUNSUBSCRIBE my*
# 1) "punsubscribe"
# 2) "my*"
# 3) (integer) 0

SUBSCRIBE mymq.order
# 1) "subscribe"
# 2) "mymq.order"
# 3) (integer) 1

PSUBSCRIBE my*
# 1) "psubscribe"
# 2) "my*"
# 3) (integer) 1

# 已订阅成功,收到的消息
# 1) "message"
# 2) "mymq.order"
# 3) "2"

收到的消息是一个列表,长度为三,其具体得信息分别表示:

  • 当调用的是指令时,第一个位置表示了当前的指令信息,如SUBSCRIBE/UNSUBSCIRBE等;如果是消息的话,那么第一个位置为message,消息类型对业务来说是最重要的
  • 第二个位置,表示频道或订阅的频道模式,如my/my*等
  • 第三个位置,如果是消息,则为消息内容;如果是指令,订阅指令,表示该频道、频道模式的订阅数量;退订指令表示,还有多少客户端订阅该频道

分别启用三个客户端,一个客户端用于订阅频道,一个用于按模式订阅,一个用来发布消息。

# 客户端1,SUBSCRIBE
SUBSCRIBE mymq.order
# Reading messages... (press Ctrl-C to quit)
# 1) "subscribe"
# 2) "mymq.order"
# 3) (integer) 1
# 成功订阅,客户端等待消息

### 客户端3发送PUBLISH mymq.order world
# 1) "message"
# 2) "mymq.order"
# 3) "world"
# 客户端2,PSUBSCRIBE mymq.order.? mymq.order*
PSUBSCRIBE mymq.order.? mymq.order*
# Reading messages... (press Ctrl-C to quit)
# 1) "psubscribe"
# 2) "mymq.order.?"
# 3) (integer) 1
# 1) "psubscribe"
# 2) "mymq.order*"
# 3) (integer) 2
# 对于mymq.order*已经有两个客户端订阅,因此该模式返回的订阅者为2,等待发送消息
# 客户端3发送 PUBLISH mymq.order.1 hello
# 1) "pmessage"
# 2) "mymq.order.?"
# 3) "mymq.order.1"
# 4) "hello"
# 1) "pmessage"
# 2) "mymq.order*"
# 3) "mymq.order.1"
# 4) "hello"

#### PUBLISH mymq.order world
# 1) "pmessage"
# 2) "mymq.order*"
# 3) "mymq.order"
# 4) "world"

# 客户端3,发送消息
PUBLISH mymq.order.1 hello
# (integer) 2
# 由于客户端2有两个符合mymq.order.1的模式,因此接收到的客户端为2
PUBLISH mymq.order world
# (integer) 2
# 客户端1满足订阅频道,客户端2满足mymq.order*的模式,因此接收到的客户端仍然是2
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容