前言
实测下来,该方案本地 redis + 8 协程可以每秒消费 14 万数据,下文是具体的实现。
代码:https://github.com/yaodongen/delay-queue
import delay "github.com/yaodongen/delay-queue"
delay.AddToQueue(ctx, rdb, "key", "value", 5, 86400)
delay.GetFromQueue(ctx, rdb, "key")
延迟队列的三种思路
思路一:仅使用 redis zset
- 添加一条数据:
zadd key 1645880088 '{"x":1}'
- 获取一条数据:
zrange key 0 0 withscores
- 删除数据:
zrem key value
这里添加数据的时候,直接用未来的时间戳作为 score 存入 redis。取数据的时候,先获取第一个数据,通过比对 score 和当前时间戳,判断是否满足消费的条件。(这里直接获取第一个数据是因为 redis 的 sort set 默认会把最小的 score 排在最前面)
然后,等消费完成后再删除对应的数据。其中获取数据的时候也可以使用 zrangebyscore key (0 1645880088
,把判断时间戳的逻辑交给 redis。
需要注意,在并发场景下,多个线程会获取到同一个数据,为了防止重复消费,需要修改一下取数逻辑。一种方案是通过执行 zrem key value
先删除对应的 key,获取删除的结果,如果该值为 1
则说明删除成功,也就是抢到了数据,可以进行消费。
思路一的优点是实现起来很简洁,缺点是对高并发的支持不友好。因为生产环境下,redis 通常是一个集群的,比如有 5 个机器。通过这种做法 key 只能在一个机器上,不能充分利用 redis 集群的特点。
思路二 redis zset + list
- 插入一条数据:
zadd key 1645880088 '{"x":1}'
- 生产者
-
zrange key 0 0 withscores
// 获取 '{"x":1}' -
rpush mylist01 '{"x":1}'
// 将数据推入 mylist 供消费者消费
-
- 消费数据
lpop mylist01
该思路在一的基础上,引入了一个生产者程序。生产者通过把数据推入多个不同的 list,实现了数据打散,供消费者并发消费。
该方案的优点是提供了对消费者端的高并发支持,但是在生产者端不能利用到 redis 集群的特点,瓶颈卡在了 zset 上。同时该方案引入了额外的生产者,增加了部署的复杂性。
参考实现:https://segmentfault.com/a/1190000022027194
思路三:一和二的合并
- 添加一条数据
zadd key key:1645880088
rpush key:1645880088 '{"x":1}'
- 获取一条数据
-
zrange key 0 0 withscores
// 获取 key:1645880088 -
lpop key:1645880088
// 获取具体值 '{"x":1}'
-
该方案是在 zset 中存储队列的 key(可以理解为一个时间片),然后将真正的数据存储在队列(时间片)中,实现了思路一和思路二的合并。
其中添加数据的时候直接往相应的队列中推数据,取数据的时候直接从队列中取数据的。而因为队列的 key 是不同的,所以天然分散在不同 redis 服务器上,可以支持高扩展性。同时也不需要引入额外的生产者。实测下来本地 redis + 8 协程可以每秒消费 14 万数据。
参考代码: https://github.com/yaodongen/delay-queue
创建日期:2022-02-26 修改日期:2022-02-26