翻译-使用redis做任务队列

前段时间,在工作中我们使用azure storage队列作为任务队列引擎,但过段时间后我们发现它并没有我们希望的那么快,随之,我们集中地使用redis并开始考虑将redis作为任务队列。虽然有许多的文档介绍如何使用redis的发布/订阅服务,但使用redis作为任务队列的屈指可数,所以我决定来描述如何去做。

什么是任务队列?

任务队列允许某些服务的客户端异步发送任务给它。通常服务有许多clients,可能许多处理的workers。总之整个工作流程是这样的:

  1. client将任务放到队列中
  2. workers定期循环检查队列中的新任务,如何存在,则执行任务
    但也有一个队列一些额外的要求:
  3. 服务质量:client不应该阻塞其他client的请求。
  4. 批处理:clients & workers应该能够获得多个任务以获得更好的性能。
  5. 可靠性:如果worker在处理task失败时,该任务可以被其他worker再次处理。
  6. 死信:如果某些任务被多次尝试后失败,它可以放在死信存储。
  7. 一个任务仅可以被成功处理一次

每个客户端将使用一个redis list,list key将使用一个任务队列名作为前缀,第二部分将是client id。当client准备把消息放入队列前,会将队列名和它自己的ID的进行关联成列表键。我们将使用Redis的list为每个客户端。列表键将使用一个任务队列名作为前缀和第二部分将是客户端ID。当客户希望将消息放入队列会由队列名和它自己的ID的级联计算列表键。会有很多的list存入单独的redis db中,但在该情况下,我们须共用一个redis db和其他一些代码,同时允许在它们的名字前添加额外的前缀,如:"queues:"。因此,我们定义一个类RedisQueue来隐藏这些细节。

import json
import datetime
import pytz
from random import randint
import logging
import time

main_prefix = "bqueues:"

class ClientRedisQueue():
    def __init__(self, qname, clientid, redis):
        self.client_id = clientid
        self.queue_name = main_prefix + qname + ":" + clientid
        logging.debug("created queue %s" % self.queue_name)
        self.redis = redis

    def send(self, msgs):
        jmsgs = [json.dumps({ "client_id" : self.client_id, "msg" :msg, "attempts" : 0}) for msg in msgs]
        self.redis.lpush(self.queue_name, *jmsgs)

    def exists(self):
        return self.redis.exists(self.queue_name)

    def length(self):
        return self.redis.llen(self.queue_name) 

    def delete(self):
        self.redis.delete(self.queue_name)

r = redis.StrictRedis("localhost")
cq = ClientRedisQueue("worker1", "client", r)

cq2 = ClientRedisQueue("worker1", "client2", r)
cq.send([1,2])
cq2.send([3,4,0]) 

所以发送端容易实现,那接受端呢?首先,我们需要找到所有队列列表list。有三种方式:

  1. 使用KEYS“prefix:*"命令, 该命令能够列出来所有列表。但这个命令可能会导致生产出现严重的问题,当针对大型数据库中执行它可能毁性能。所以永远不要使用此方式。
  2. 使用SCAN命令, 该命令的作用相当于上一条命令,但没有性能问题。
  3. 使用redis set存储所有list名字,即发送消息时将list名字添加到redis set中,当消息被处理时,删除名字。不幸的是,该步需要额外的代码来实现,所以我们将使用第二个选项。

当我们发现的所有的队列,我们​​需要他们随机排序以保证所有的list以相同的概率处理。之后,我们需通过redis pipeline的方式,一次处理一批大量的消息,随后,如果没有找到的消息,我们需要删除它们。此外,我们需要防止消息的双重处理列表,并防止消息因失败等异常情况造成的消息丢失。要做到这一点,我们将使用RPOPLPUSH命令,它以原子从列表中删除的消息,并把它变成一个额外的“processing”列表,并返回至调用者。因此,我们将使用其他列表中为每个队列与关键“processing:queue_name”。消息处理后,我们必须从prccessing列表中删除。但在几次不成功的尝试过程中消息的情况下,我们需要最终将其移动到死信中。并将之设置为:"dead:queue_name"。不时,我们需要检查的处理列表,如果算上尝试的消息低于允许最大计数然后把它返回到客户端列表或在其他情况下,把它设置成一纸空文。

AX_ATTEMPTS_COUNT = 4
class WorkerRedisQueue():
    def __init__(self, name, redis):
        self.queue_name = main_prefix + name
        self.processing_lname = main_prefix + "processing:" + name
        self.dead_sname = main_prefix + "dead:" + name
        self.refresh_period = datetime.timedelta(seconds=2)
        self.queue_pattern = self.queue_name + "*"
        self.redis = redis
        self.last_refresh_time = datetime.datetime.now(pytz.utc) - self.refresh_period - self.refresh_period
        self.list_names = []

    def proccessed(self, msg):
        self.redis.lrem(self.processing_lname, 0, json.dumps(msg))

    # start this from time to time
    def recover(self):
        recovered = 0
        proc_msgs = self.redis.lrange(self.processing_lname, -5,-1)
        for (msg, orig) in [(json.loads(msg),msg) for msg in proc_msgs if msg]:
            if msg["attempts"] > MAX_ATTEMPTS_COUNT:
                print "found dead letter"
                self.redis.sadd(self.dead_sname, orig)
            else:
                print "recovering"
                recovered = recovered + 1
                msg["attempts"] = msg["attempts"] + 1
                self.redis.lpush("%s:%s" % (self.queue_name, msg["client_id"]), json.dumps(msg))
            self.redis.lrem(self.processing_lname, 0, orig)

        return recovered

    def get_list_names(self):
        lists = []
        print "searching pattern", self.queue_pattern
        for l in self.redis.scan_iter(self.queue_pattern):
            print "found list", l
            lists.append(l)
        return lists

    def refresh(self, force = False):
        now = datetime.datetime.now(pytz.utc)
        time_to_refresh = now - self.last_refresh_time > self.refresh_period
        if force or time_to_refresh:
            self.list_names = self.get_list_names()
            self.last_refresh_time = now
        else:
            print "skip refresh"

    def receive(self, msg_count):
        self.refresh()
        count = len(self.list_names)
        if count == 0:
            print "queues not found"
            return []
        p = self.redis.pipeline()
        for i in range(msg_count):
            l = self.list_names[randint(0, count - 1)]
            p.rpoplpush(l,self.processing_lname)
        msgs = p.execute()
        return [json.loads(msg) for msg in msgs if msg]

    def length(self):
        self.refresh(True)
        res = 0
        for l in self.list_names:
            res = res + self.redis.llen(l)
        return res

wq = WorkerRedisQueue("worker1", r)
while(True):
    time.sleep(1)
    msgs = wq.receive(2)
    if len(msgs) == 0:
        if randint(0, 10) == 0 and wq.length() == 0 and wq.recover() == 0:
            print "sleeping"
            time.sleep(1)
            
    for msg in msgs:
        print "received msg", msg
        try:
            a = 10/msg["msg"]
            wq.proccessed(msg)
        except Exception,e: 
            print "exception", str(e)   

原文:http://hodzanassredin.github.io/2016/03/29/redis_task_queue.html
翻译:yyt030

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容