python利用redis做延时队列

场景:平时工作中需要用到一些延时操作,同时避免多个人同时操作一个数据
1、创建一个redis连接池

import redis
import uuid
import time
import threading
pool = redis.ConnectionPool(host="127.0.0.1", port=6379, decode_responses=True, db=3)
r = redis.Redis(connection_pool=pool)

2、我们这边用到的是redis里面的Zset(有序数组),把任务根据延迟的时间进行排队{job:delayTime},然后依次进行执行

#创建一个延时器
def delayTask(name, delayTime):
    task_id = str(uuid.uuid4())
    processTime = delayTime + time.time()
    #往Zset里面的delay-queue键里面加入值
    r.zadd("delay-queue", {name + task_id: processTime}

#创建一个任务函数
def handleTask(task_id, ):
    """ do somethings"""
    print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),"{0}延迟执行完毕".format(task_id, ))

#创建一个loop函数
def loop():
    while True:
        #将数组里面的元素根据delayTime排队之后,在0到当前时间范围内循环取值,其实每次都只能取到一个,一定是这个时间和当前时间一样了才会被取到
        task_list = r.zrangebyscore("delay-queue", 0, time.time(), 0, 1)
        #如果为空,证明没有任务需要执行,进入下一个循环
        if not task_list:
            print("cost 1s")
            time.sleep(1)
            continue
        else:
            #如果有值,就取值
            task_id = task_list[0]
            # 能删除成功,证明被该线程抢到了这个任务,其他线程无法删除,也就无法拿到任务
            is_del = r.zrem("delay-queue", task_id)
            if is_del:
                handleTask(task_id)


3、执行函数

if __name__ == '__main__':
#方便演示,我们开2个线程,其实只会有一个线程抢到该任务
    t1 = threading.Thread(target=loop)
    t2 = threading.Thread(target=loop)
    t1.start()
    t2.start()
    delayTask("任务一", 3)
    delayTask("任务二", 1)
    delayTask("任务三", 5)
2024-04-23 11:36:40
2024-04-23 11:36:40
2024-04-23 11:36:41 任务二7d455dc0-ff89-47de-a170-cbcbaea87c94延迟执行完毕
2024-04-23 11:36:41
2024-04-23 11:36:41
2024-04-23 11:36:42
2024-04-23 11:36:42
2024-04-23 11:36:43 任务一a98fc5fd-2bff-4d6f-8ca9-5c42ab2d42cc延迟执行完毕
2024-04-23 11:36:43
2024-04-23 11:36:43
2024-04-23 11:36:44
2024-04-23 11:36:44
2024-04-23 11:36:45 任务三aa7dc283-f5bc-4f71-b2ab-1e200bb32588延迟执行完毕
2024-04-23 11:36:45
2024-04-23 11:36:45
可以看出主程序是40s开始执行,我们在41s,43s,45s分别执行了job
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容