Redis的消息队列并不是很专业的消息队列,他并没有很多的高级特性,没有ack保证(注:ack即确认字符),没有办法保证对信息可靠性的极高要求。
异步消息队列
我们之前说数据结构的时候说过,list可以作为栈和队列使用,队列的效果就是右进左出,需要的命令也就是rpush和lpop两条命令,Redis可以支持多个生产者和消费者并发进出信息,每个消费者拿到的消息都是不同的列表元素。举个例子:
rpush queue banana apple peach pear
lpop queue
llen queue
lpop queue
llen queue
队列空了的问题
客户端通过对于队列pop的操作来获取消息,然后对其进行处理,处理完之后再次获取新的消息,然后继续处理,这样的一个周期便是消费者的生命周期。但是如果队列空了,pop便会陷入死循环,这样不断的死循环查询不仅会提高客户端的cpu占用率,而且会使Redis的QPS拉高。我们平时的操作一般都是用睡眠的方式,让线程歇一会儿,这样cpu使用率和QPS都会有明显的改善,睡得时间不用很久1s就可以了。
time.sleep(1)
但是这种睡眠会导致消息延迟,如果只有一个消费者,那么延迟就是1s,如果有多个消费者那么延迟会下降,因为每个消费者的睡眠时间是岔开的
阻塞读---blpop/brpop
这种阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦出现数据,就立即醒过来,消息的延迟几乎为零。但是阻塞读依然有问题,因为如果消息到来很慢,线程一直都阻塞在那里,redis的连接变成了闲置连接,闲置太久,服务器一般会断开连接,这时候的blpop和brpush会出现异常。
锁冲突处理
之前我们说过分布式锁,分布式锁的有能抢到锁的,当然也有抢不到锁的,那么没有抢到锁的可怜人咋办呢,我们有三种策略来处理加锁失败。
①直接抛出异常,通知用户过一会儿再试。这种情况的确可以起到延时的效果,用户发起了直接的请求,用户看到了错误的对话框以及点击重试的操作,的确可以起到延时的作用,但在考虑用户体验的情况下,我们可以通过前端的代码替代用户进行延时重试控制,它的本质是放弃本次请求,等待下次的时机。
②sleep一会儿,这种方式是让线程沉睡,以阻塞的方式来进行延时,但是会影响队列的后续信息出现延迟,如果出现的是死锁,那么线程会被完全堵死,后续信息永远也无法得到处理。
③将请求转移至延时队列,过一会儿再试,这种方式是将当前冲突的请求扔到另一个队列延后处理以避开冲突。
延时队列的实现
延时队列是使用zset实现的,我们将消息序列化成字符串存储于zset的value,这个消息到期处理时间作为score,然后用多个线程轮询zset到期任务的处理,多个线程是为了保障可用性,万一挂了还有其他线程可以继续处理,因为有多个线程,所以要考虑争抢任务的事情,确保任务不会被多次执行。实现代码如下:
def delay (msg):
msg.id = str(uuid.uuid4())
#uuid是一种通用唯一标识符,它是通过MAC地址, 时间戳, 命名空间, 随机数, 伪随机数来保证生成ID的唯一性, 有着固定的大小(128bit)
value = json.dumps(msg)
#将消息内容序列化
retry_ts = time.time()+5
#设置重试时间,5秒之后重试
redis.zadd("delay-queue",retry_ts,value)
#添加队列,score为retry_ts+5
def loop():
while True:
values = redis.zrangebyscore("delay-queue",0,time.time(),start=0,num=1)
#最多只能取到一条
if no values:
time.sleep(1)
#如果队列里面没有数据,就睡一秒
continue
value = value[0]
#只有一条数据,也只取第一条
success = redis.zrem("delay-queue",value)
#将已经执行的语句移除队列,并且以此来判断是否成功抢到
if success:
msg = json.loads(value)
handle_msg(msg)
优化方式
因为是多线程多进程的抢任务的关键是zrem,zrem是判断是否争抢到的重要条件,但是因为不是原子操作,这就好比恋爱都谈一半了,你突然告诉我你要和别的男人结婚的感觉,这个线程也受不了啊,浪费了许多的时间,这种情况就又要利用我们的LUA来优化了,在服务器端将zrangebyscore和zrem原子化操作,这样多线程争抢任务就不会出现这种浪费了。