mq因为延迟队列这种有优先级的队列实现很影响性能,所以很多mq都没有很灵活的支持。比如rocketmq,只支持固定的1min,2min什么的延迟时间。
基于redis的zset做延迟队列。
参考:有赞延迟队列设计
大概的思想:
一、把延迟队列独立成一个服务。
1.1 通过http请求,发送put延迟任务。
1)指定topic(比如,订单topic)。
2) id(任务标识)。
3)delay(延迟多久执行)。
4)ttl(job超时时间,超过了多少秒还没有执行,则删除,或者,移入其他地方,之后进行排查,为什么这个任务执行不成功)。
5) body(延迟队列的具体的数据,json格式。比如,订单id)。
1.2 设计数据结构
1.3 流程
1、客户端,put一个job任务。服务端接收job,放入job pool(job pool是任务存放的真正地方),再根据delay时间,转换成绝对时间,然后以(value:topic 连接 jobid,score:delayTime)按delayTime排序放入redis的zset中。这个zset就是delay bucket。topic和jobid连接时因为,所有的topic统一轮询,一个轮询就够了。
2、有一个timer,轮询delay bucket,如果delayTime大于当前时间,则说明时间到了。只需要遍历zset的前面的元素,前面没有到期,则后面肯定没有到期。比如:每一秒轮询一次,一次轮询,则取出所有到期的job。
3、如果到时间了,则把这个delay bucket中的元素取出来,然后放到,然后解析出topic和jobid, 然后放到对一个的topic的ready queue(这个ready Queue也可以有序,不过无序也没关系)。
4、客户端,每次发送订阅topic请求, 然后这个topic如果没有到期的任务,则把这个http夯住,直到topic有数据,或者,http超时了。所以客户端需要while true不断发送http请求。
5、重试机制怎么做?
每次客户端从服务端拉取任务的时候,要有重试机制,把任务延长5秒,重新放入添加延迟队列,直到超时时间。
6、超时了怎么办?
如果超时了,则可以持久化到数据库,事后运维检查,这些任务为什么会超时。
7、应答机制怎么做?
客户端处理了这个topic后,发送finish给服务端,服务端,把这个任务删掉。
8、客户端处理任务的幂等性要做:比如订单id,则查一下这个订单id是否已处理。或者update where 这种本身就是幂等的操作。
其实有很多细节:
思考:
1、这种机制,其实延迟队列服务端压力不会很大,因为总共也就一秒轮询一次。
2、服务端的压力很大应该来自于,客户端的轮询http,如果每个topic对应一个消费者,然后可能有很多台客户端服务器,也就是一个topic对应很多个http连接。
那么,服务端的服务端连接池可能就几百个上千个。所以,如果topic太多,可能需要扩容。
3、如果扩容的话。上面提到的各个数据结构的操作,可能就要用分布式锁锁住了。
4、客户端一个一个取任务,任务太多的话,网络传输压力太大,可以考虑,一次指定取多少个,如果没有这么多个,则有多少个取多少个。
缺点:
1、需要引入一个定时服务器。所以,如果数据库轮询能处理,就数据库轮询吧。
1、http轮询,比较麻烦。最好时做到mq那样,直接推。
2、总感觉,这种方式很麻烦。
这里有一个别人的实现:这里有一些地方值得商榷,比如,分布式锁方面(他没有做),消息重试方面(无限重试,客户端收到消息后,把过期时间设置到超时时间,然后重新放入队列),消息超时方面(消息超时无限重试)。http轮询方面。
但是,如果有需要,可以直接修改这个代码,或者直接推翻重写。只是作为一个参考。
https://github.com/yangwenjie88/delay-queue.git