前一篇文章《Redis消息队列》介绍了一种简单的FIFO队列的实现。
FIFO队列中的消息一经发送出去,便从队列里删除。如果由于网络原因消费者没有收到消息,或者消费者在处理这条消息的过程中崩溃了,就再也无法还原出这条消息。也就是说,FIFO队列不能保证消息会传递成功。
究其原因,在于FIFO队列缺乏消息确认机制,即消费者向队列报告消息已收到或已处理的机制。可靠队列便是加入了这一机制的消息队列。
Redis在RPOPLPUSH
命令的文档中提供了一种利用这一命令实现可靠队列的方式。这个命令可以在从一个list中获取消息的同时把这条消息复制到另一个list里,并且这个过程是原子的。
利用RPOPLPUSH
实现的可靠队列由两个列表组成,一个存储待处理(pending)的消息,另一个存储处理中(processing)的消息。
生产者通过LPUSH
将消息发送到待处理列表:
127.0.0.1:6379> LPUSH queue:pending "message"
消费者使用RPOPLPUSH
从待处理列表获取消息,同时将它加入处理中列表:
127.0.0.1:6379> RPOPLPUSH queue:pending queue:processing
"message"
此时这条消息已经从待处理列表中删除,并且复制到了处理中列表:
127.0.0.1:6379> LRANGE queue:pending 0 -1
(empty list or set)
127.0.0.1:6379> LRANGE queue:processing 0 -1
1) "message"
消费者在收到消息或者处理完消息后,使用LREM
命令从处理中列表删除这条消息,即完成了消息确认:
127.0.0.1:6379> LREM queue:processing 1 "message"
使用LREM
而不是RPOP
的原因在于,在并发时,不能保证处理中的消息能按加入列表的先后顺序被确认;而RPOP
会按顺序删除消息。
没有被确认的消息会一直存储在处理中列表。如果一个消息在处理中列表存在的时间过长,那么可以认为这个消息的传递或处理失败了。我们可以设定一个超时时间,定时扫描处理中列表,将超时的消息重新放回待处理列表等待重新传递。