消息队列(MQ)
相信大家对MQ
这个词都不会陌生,不管用过还是没用过的,大多会对他有一定的了解,
那么消息队列有什么好处呢
- 解耦(接触服务之间的耦合度关系)
- 削峰(例如我某个促销活动在某个时间点有非常大的流量涌入,这个时候用Mq做缓存是最好的方式了)
- 异步化(例如有些服务是我不需要在同步链中进行调用的,那么可以用mq来做一个异步消费)
传统MQ的缺点
MQ基本上和缓存一样是居家必备之良药。然而消息队列虽然重要,但是同时其实是蛮重的一个组件。例如我们在用rabbitMq的话,我们需要为它搭建一个服务端,如果考虑到可用性,那么我们需要为服务端建立一个集群,同时,我们如果线上问题可能还需要在Mq中做查找,那么这些工作就可能加大我们整体的工作量。
利用redis来实现MQ
所以就想能不能先简单的通过Redis来实现消息队列呢?不考虑PubSub、分布式、持久化、事务等复杂的情况。就像JDK的各种Queue一样。答案当然是可以的,因为Redis提供的list数据结构就非常适合做消息队列。大家可能会发现,网上有很多redis的消息队列,但是目前为止,我没有发现一个消息队列是具有ack机制的。
这里我们会讲述怎么利用list的api中的lpush/brpoplpush来实现一个具有ack机制的消息队列
实现思路
初步实现
实现ack的话,(暂时先不考虑集群版,只是单机版本)
- 我可以用lpush做生产者,每次有消息需要生产的时候,就发送一个message到pending队列中。
- brpoplpush做消费者,每次取到消息的时候进行业务消费。在消费的同时吧消息放到另一个doing的队列中
- 每次消费者完成任务,从doing队列中删除任务msg,用来告知这个消息被成功消费掉了
- 然后开一个线程去定时轮询查doing中,如果一定时间(架设我们的message实现了我们的协议,message中带有任务开始的时间戳),这个任务还没被消费成功,那么就把这个doing队列的那个就重新塞到pending的队列里
发现问题
但是这时候可能会出现这样的问题,我轮询doing的队列在取任务的时候可能因为我消费者的任务因为某些原因做的慢了些,那么这时候就会被重新塞会pending队列里,但是过两秒我的doing确实消费完了。
那么怎么解决这个问题呢?
解决方式其实很简单,就是上面的进行步骤3的时候,如果从doing队列进行删除的时候,如果返回值表示删除失败的话,那么说明我们的任务被系统认为过期了,他被赛入pending中了,那么我们只需要在这个时候去pending中重新删除这个message消息即可
延伸问题
ok,那么大家觉得这时候已经完工了吗?其实并没有。。。为什么呢?
因为会出现如下这样一种比较极端的情况:
就是任务完成之后去doing队列中删除message失败,然后去pending中删除也失败,因为有可能在任务扫描的时候,吧任务刚放入pending队列中,没等doing完成呢,pending中重新放入的任务就被消费了。那么这时候依然是消息出现重复
这种情况下的最佳解决方案是什么呢?就是消费端做好幂等性处理(其实像阿里的RocketMq)也会出现消息重复的情况(虽然极低概率),但是在Mq中,似乎设计一个精确只发一次的模型,是一件比较难的事情。
深层延伸的方案
上面的消息重复其实还是有优化的余地,具体的实现思路如下:
- 优化扫描的模型,吧扫描doing过期任务变成一个延迟扫描(如用delayedQueue实现延迟任务扫描)
- 吧每个执行的任务模型用ExecutorService来管理,存储正在执行的Future
- 每次扫描到超时的任务就去内存中查找这个任务的Future是否存在,如果存在则不需要吧doing的message放到pending中
- 如果需要超时机制的话,找到对应的Future并且取消当前任务的执行,并把之前执行的操作进行业务回滚/rollback,把message放到pending中
不过我并不推荐这一套方案,因为这一套方案过于复杂,本身就是不是我们用redis作为消息队列的初衷。
总结
redis作为消息队列是有很大的局限性的,本身作为一个以缓存/内存存储为主的东西,只是因为某些api上的特性,我们得以实现一个简单的队列服务,本身我们要选择好业务的取舍,灵活的使用redis的MQ支持,才能实现一个好的服务。
基于上述思想的代码实践我已经放到了github上,部分代码还在做成中。
github地址 : https://github.com/wgd12389/redisses/