消息队列
消息队列的设计方案在开发过程中应用比较广泛,一般情况都是为了实现程序解耦,生产者发送一个消息,消费者或多个客户端消费者去消费消息,完成任务的异步处理。如果有多个消费者,一方面可以提高处理性能,另一方面可以使得消费者程序得容灾。基于这两方面得有点,消息队列当前应用的非常广泛,一般在启用消息队列时都会采用MQ服务中间件,比如RabbitMq、ZeroMQ等等,这些服务是专业的消息服务,在互联网公司应用的非常多,因此服务质量和分布式方案都是比较成熟的。
依据Redis的特点,他也有很多类型可以实现消息队列方案,在之前的项目中也有应用,并且有些场合是MQ无法替代的。同时Redis因为不是一个标准的MQ服务,因此很多方面无法满足需求,比如ACK,丢包的问题无法保证,有这方面的需求还是使用MQ中间件;即使有这方面的问题使用Redis作为消息中间件的团队仍大量存在,基于这些原因,Redis的作者开发了一个新的MQ服务,同Redis类似,使用方式也基本类似Disque,文档,之前作者表示未来会将该组件加入到Redis标准组件中,这样的话,未来Redis也可以作为MQ的一个首选项。
在之前的项目中分别使用过列表、有序集合作为简单的消息队列,这里对这几种方式进行简单描述。
列表作为消息队列
列表作为消息队列,是最简单的选择,其特性天然适合队列。只要在一端分别使用LPUSH
将信息添加到列表,在另一端使用RPOP
消费一个消息即可。采用两个cli模拟列表的消息队列行为
# client 1 producer,对入库的资源ID进行上架操作
LPUSH mymq 1
LPUSH mymq 2
# client 2 consumer
# 获取消息,并消费
RPOP mymq
"1"
# 获取到资源1后,进行上架操作,操作完成后丢弃
# 处理完成后,需要再次获取消息
RPOP mymq
"2"
RPOP mymq
(nil)
# mymq中没有新的消息,消费者客户端,需要不断重复RPOP去检查有没有消息
采用列表作为消息队列,也有一个问题,如上述示例中演示的,消费者每次主动去获取消息,有没有消息没有办法判断,只能不断的读取。对于服务端采用JAVA的话,一般这种任务型的都会采用定期执行的方案,那么这种代码很可能如下所示:
public void runTask() {
while(true) {
Object id = redis.rpop(myMqKey);
if (null == id) {
// 没有消息了,需要停止,停止多久再次执行,这个时间不太好处理,并且没有什么经验值,因为资源的入库没有规律。
}
// 处理上架
do(id);
}
}
不断的去检查Redis是否有新消息,虽然不麻烦,但是也在耗费资源,并且是无效的投入。Redis的列表中有一个BRPOP
方法,前面章节提到过,这里可以采用这个方法,解决不断检查Redis是否有数据,消耗资源的问题。仍然使用上述示例
# 客户端1,consumer
BRPOP mymq 0 # 0 表示永不过期
# mymq队列没有数据,此时一直阻塞,客户端2生产消息后,客户端1理解消费消息
# 1) "mymq"
# 2) "1"
# (30.82s)
# 等待了30.82秒,mymq出现数据并进行了消费,一次读取消息结束
# 客户端2,producer
LPUSH mymq 1
# (integer) 1
这种方式很好的解决了不断检查耗费资源的问题,此时JAVA代码的更改并不大
public void runTask() {
while(true) {
List<Object> ids = redis.brpop(myMqKey, 0);
// 因为永久等待,只要弹出数据,就不为空
// 处理上架, 数据在位置1中
do(ids.get(1));
}
}
使用有序集合作为消息队列
有序集合作为消息队列,除了能够解决列表中不断检查耗费资源的问题,同时将得分采用时间戳表示时,还能实现消息到期时间消亡或到期必须处理的需求。
在之前的产品开发中,有一个销售课程的子模块,用户下订单后,有一个支付时间的限制。在这个产品中要求用户下订单后15分钟内完成支付,如果15分钟内不完成支付,则取消订单。
这里就采用了有序集合作为队列的支撑,其具体实现流程为
- 当用户下订单时,将订单号作为值,以订单过期时间15分钟作为得分,加入到有序队列中
- 完成订单后,引导用户进入支付流程,略
- 起一个持续的任务,用于处理定单消亡处理流程
- 获取第一个订单信息,检查其得分是否超时,如果超时则处理订单消亡流程
- 接收到一个过期订单,处理订单的消亡,由于订单支付过程没有更改队列,因此需要检查订单的状态再进行消亡
- 如果得到的订单没有过期,则暂停,其时间可以比较准确,就是得分剩余的时间
- 上述流程中,订单过期称为命中,一旦命中,则继续读取下一个订单,直到队列为空或一个未到期订单为止
发布订阅
Redis提供了发布订阅模式,可以实现异步消息通信。发布订阅模式中,订阅者可以定于一个或多个频道,发布者可以向一个或多个频道发送消息,只要订阅了这些频道的客户端,都能够收到发布的消息。
Redis得发布订阅模式没有持久化消息的操作,如果某一个频道没有客户端订阅,则发布出去的消息就会被丢弃,即使该频道后续有客户端订阅,之前发布出去的消息也无法被再次收到。
发布订阅模式主要由以下指令完成
# 发布消息,将消息message发布到指定的频道
PUBLISH channel message
# 订阅频道,客户端订阅一个或多个频道
SUBSCRIBE channel [channel...]
# 按模式订阅,客户端按照channel得模式进行订阅,类似MQ的queue模式
PSUBSCRIBE pattern [pattern...]
# 退订一个或多个频道,与SUBSCRIBE指令配合使用
UNSUBSCRIBE channel [channel...]
# 按给定的模式退订一个或多个频道,与PSUBSCRIBE指令配合使用
PUNSUBSCRIBE pattern [pattern...]
PUBLISH指令比较简单,就是用于给一个频道发布消息。其返回值表示当前消息被几个订阅者接收。
# 向订单处理channel发送一个订单id=1的消息
PUBLISH mymq.order 1
# (integer) 0
# 由于此时没有客户端订阅该频道,因此返回值为0,订单1得消息再不会被接收处理
对于其他几个指令就比较复杂一些,不过SUBSCRIBE等几个关于订阅的指令,其收到的消息格式是类似的
UNSUBSCRIBE my
# 1) "unsubscribe"
# 2) "my"
# 3) (integer) 0
PUNSUBSCRIBE my*
# 1) "punsubscribe"
# 2) "my*"
# 3) (integer) 0
SUBSCRIBE mymq.order
# 1) "subscribe"
# 2) "mymq.order"
# 3) (integer) 1
PSUBSCRIBE my*
# 1) "psubscribe"
# 2) "my*"
# 3) (integer) 1
# 已订阅成功,收到的消息
# 1) "message"
# 2) "mymq.order"
# 3) "2"
收到的消息是一个列表,长度为三,其具体得信息分别表示:
- 当调用的是指令时,第一个位置表示了当前的指令信息,如SUBSCRIBE/UNSUBSCIRBE等;如果是消息的话,那么第一个位置为
message
,消息类型对业务来说是最重要的 - 第二个位置,表示频道或订阅的频道模式,如my/my*等
- 第三个位置,如果是消息,则为消息内容;如果是指令,订阅指令,表示该频道、频道模式的订阅数量;退订指令表示,还有多少客户端订阅该频道
分别启用三个客户端,一个客户端用于订阅频道,一个用于按模式订阅,一个用来发布消息。
# 客户端1,SUBSCRIBE
SUBSCRIBE mymq.order
# Reading messages... (press Ctrl-C to quit)
# 1) "subscribe"
# 2) "mymq.order"
# 3) (integer) 1
# 成功订阅,客户端等待消息
### 客户端3发送PUBLISH mymq.order world
# 1) "message"
# 2) "mymq.order"
# 3) "world"
# 客户端2,PSUBSCRIBE mymq.order.? mymq.order*
PSUBSCRIBE mymq.order.? mymq.order*
# Reading messages... (press Ctrl-C to quit)
# 1) "psubscribe"
# 2) "mymq.order.?"
# 3) (integer) 1
# 1) "psubscribe"
# 2) "mymq.order*"
# 3) (integer) 2
# 对于mymq.order*已经有两个客户端订阅,因此该模式返回的订阅者为2,等待发送消息
# 客户端3发送 PUBLISH mymq.order.1 hello
# 1) "pmessage"
# 2) "mymq.order.?"
# 3) "mymq.order.1"
# 4) "hello"
# 1) "pmessage"
# 2) "mymq.order*"
# 3) "mymq.order.1"
# 4) "hello"
#### PUBLISH mymq.order world
# 1) "pmessage"
# 2) "mymq.order*"
# 3) "mymq.order"
# 4) "world"
# 客户端3,发送消息
PUBLISH mymq.order.1 hello
# (integer) 2
# 由于客户端2有两个符合mymq.order.1的模式,因此接收到的客户端为2
PUBLISH mymq.order world
# (integer) 2
# 客户端1满足订阅频道,客户端2满足mymq.order*的模式,因此接收到的客户端仍然是2