基于Redis的任务调度设计方案

一个网关服务器就跟快餐店一样,总是希望客人来得快、去得也快,这样在相同时间内才可以服务更多的客人。如果快餐店的服务员在一个顾客点餐、等餐和结账时都全程跟陪的话,那么这个服务员大部分时间都是在空闲的等待。应该有专门的服务员负责点餐,专门的服务员负责送餐,专门的服务员负责结账,这样才能提高效率。同样道理,网关服务器中也需要分工明确。举个例子:

假设有一个申请发送重置密码邮件的网关接口,须知道发送一封邮件可能会花费上好几秒钟,如果网关服务器直接在线上给用户发送重置密码邮件,高并发的情况下就很容易造成网络拥挤。但实际上,网关服务器并非一定要等待邮件发送成功后才能响应用户,完全可以先告知用户邮件会发送的,而后再在线下把邮件发送出去(就像快餐店里点餐的服务员跟顾客说先去找位置坐,饭菜做好后有有人给他送过去)。

那么是谁来把邮件发送出去呢?

为了网关接口能够尽快响应用户请求,无需即时知道结果的耗时操作可以交由任务队列机制来处理。
任务队列机制中包含两种角色,一个是任务生产者,一个是任务消费者,而任务队列是两者之间的纽带:

  • 生产者往队列里放入任务;
  • 消费者从队列里取出任务。

任务队列的整体运行流程是:任务生产者把当前操作的关键信息(后续可以根据这些信息还原出当前操作)抽象出来,比如发送重置密码的邮件,我们只需要当前用户邮箱和用户名就可以了;任务生产者把任务放进队列,实际就是把任务的关键信息存储起来,这里会用到MySQL、Redis之类数据存储工具,常用的是Redis;而任务消费者就不断地从数据库中取出任务信息,逐一执行。

任务生产者的工作是任务分发,一般由线上的网关服务程序执行;任务消费者的工作是任务调度,一般由线下的程序执行,这样即使任务耗时再多,也不阻塞网关服务。

这里主要讨论的是任务调度(任务消费者)的程序设计。

简单直接

假设我们用Redis列表List存储任务信息,列表键名是queues:default,任务发布就是往列表queues:default后追加数据:

<?php
// PHP伪代码
    Redis::rpush('queues:default', serialize($task));

那么任务调度可以这样简单直接的实现:

<?php
// PHP伪代码
Class Worker {

    public function schedule() {
        while(1) {
            $seri = Redis::lpop('queues:default');
            if($seri) {
                $task = unserialize($seri);
                $this->handle($task);
                continue;
            }
            sleep(1);
        }
    }

    public function handle($task) {
        // do something time-consuming
    }
}

$worker = new Worker;
$worker->schedule();

意外保险

上面代码是直接从queues:default列表中移出第一个任务(lpop),因为handle($task)函数是一个耗时的操作,过程中若是遇到什么意外导致了整个程序退出,这个任务可能还没执行完成,可是任务信息已经完全丢失了。保险起见,对schedule()函数进行以下修改:

<?php
...
    public function schedule() {
        while(1) {
            $seri = Redis::lindex('queues:default', 0);
            if($seri) {
                $task = unserialize($seri);
                $this->handle($task);
                Redis::lpop('queues:default');
                continue;
            }
            sleep(1);
        }
    }
...

即在任务完成后才将任务信息从列表中移除。

延时执行

queues:default列表中的任务都是需要即时执行的,但是有些任务是需要间隔一段时间后或者在某个时间点上执行,那么可以引入一个有序集合,命名为queues:default:delayed,来存放这些任务。任务发布时需要指明执行的时间点$time

<?php
// PHP伪代码
    Redis::zadd('queues:default:delayed', $time, serialize($task));

任务调度时,如果queues:default列表已经空了,就从queues:default:delayed集合中取出到达执行时间的任务放入queues:default列表中:

<?php
...
    public function schedule() {
        while(1) {
            $seri = Redis::lindex('queues:default', 0);
            if($seri) {
                $task = unserialize($seri);
                $this->handle($task);
                Redis::lpop('queues:default');
                continue;
            }
            $seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time());
            if($seri_arr) {
                Redis::rpush('queues:default', $seri_arr);
                continue;
            }
            sleep(1);
        }
    }
...

任务超时

预估任务正常执行所需的最大时间值,若是任务执行超过了这个时间,可能是过程中遇到一些意外,如果任由它继续卡着,那么后面的任务就会无法被执行了。
首先我们给任务设定一个时限属性timeout,然后在执行任务前先给进程本身设置一个闹钟信号,timeout后收到信号说明任务执行超时,需要退出当前进程(用supervisor守护进程时,进程自身退出,supervisor会自动再拉起)。
注意:pcntl_alarm($timeout)会覆盖之前闹钟信号,而pcntl_alarm(0)会取消闹钟信号;任务超时后,当前任务放入queues:default:delayed集合中延时执行,以免再次阻塞队列。

<?php
...
    public function schedule() {
        while(1) {
            $seri = Redis::lindex('queues:default', 0);
            if($seri) {
                $task = unserialize($seri);
                $this->timeoutHanle($task);
                $this->handle($task);
                Redis::lpop('queues:default');
                continue;
            }
            $seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time());
            if($seri_arr) {
                Redis::rpush('queues:default', $seri_arr);
                continue;
            }
            pcntl_alarm(0);
            sleep(1);
        }
    }

    public function timeoutHanle($task) {
        $timeout = $task->timeout;
        pcntl_signal(SIGALRM, function () {
            $seri = Redis::lpop('queues:default');
            Redis::zadd('queues:default:delayed', time() + ($timeout > 0 ? $timeout * 2 : 10), $seri_arr);
            posix_kill(getmypid(), SIGKILL);
        });
        pcntl_alarm($timeout > 0 ? $timeout : 0);
    }
...

并发执行

上面代码,直观上没什么问题,但是在多进程并发执行的时候,有些任务可能会被重复执行,是因为没能及时将当前执行的任务从queues:default列表中移出,其他进程也可以读取到。为了避免重复执行的问题,我们需要引入一个有序集合SortedSet存放正在执行的任务,命名为queues:default:reserved
首先任务是从queues:default列表中直接移出,然后开始执行任务前先把任务放进queues:default:reserved集合中,任务完成了再从queues:default:reserved集合中移出。
假设一个任务执行时间不可能超过60*60秒(可以按需调整),在queues:default列表为空的时候,queues:default:reserved集合中有任务已经存放超过了60*60秒,那么有可能是某些进程在执行任务是意外退出了,所以把这些任务放到queues:default:delayed集合中稍后执行。

<?php
...
    public function schedule() {
        while(1) {
            $seri = Redis::lpop('queues:default', 0);
            if($seri) {
                Redis::zadd('queues:default:reserved', time()+10, $seri);
                $task = unserialize($seri);                
                $this->timeoutHanle($task);
                $this->handle($task);
                Redis::zrem('queues:default:reserved', $seri);
                continue;
            }
            $seri_arr = Redis::zremrangebyscore('queues:default:delayed', 0, time());
            if($seri_arr) {
                Redis::rpush('queues:default', $seri_arr);
                continue;
            }
            $seri_arr = Redis::zremrangebyscore('queues:default:reserved', 0, time()-60*60);
            if($seri_arr) {
                foreach($seri_arr as $seri) {
                    Redis::zadd('queues:default:delayed', time()+10, $seri);
                }
            }

            sleep(1);
        }
    }

    public function timeoutHanle($task) {
        $timeout = $task->timeout;
        pcntl_signal(SIGALRM, function () {
            posix_kill(getmypid(), SIGKILL);
        });
        pcntl_alarm($timeout > 0 ? $timeout : 0);
    }
...

其他

失败重试

以上代码没有检验任务是否执行成功,应该有任务失败的处理机制:比如给任务设定一个最多重试次数属性retry_times,任务每执行一次retry_times,任务执行失败时,若是retry_times等于0,则将任务放入queues:default:failed列表中不在执行;否则放入放到queues:default:delayed集合中稍后执行。

休眠时间

以上代码是进程忙时连续执行,闲时休眠一秒,可以按需调整优化。

事件监听

若是需要在任务执行成功或失败时进行某些操作,可以给任务设定成功操作方法afterSucceeded()或失败操作方法afterFailed(),在相应的时候回调。

最后

以上讲述了一个任务调度程序的逐步演变,设计方案很大程度上参考了Laravel Queue,用工具,知其然,知其所以然。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,776评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,527评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,361评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,430评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,511评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,544评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,561评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,315评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,763评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,070评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,235评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,911评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,554评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,173评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,424评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,106评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,103评论 2 352

推荐阅读更多精彩内容