[原创]Swoole和Swoft的那些事(Task投递/定时任务篇)

Swoft的任务功能基于Swoole的Task机制,或者说Swoft的Task机制本质就是对SwooleTask机制的封装和加强。

任务投递

//Swoft\Task\Task.php
class Task
{
    /**
     * Deliver coroutine or async task
     *
     * @param string $taskName
     * @param string $methodName
     * @param array  $params
     * @param string $type
     * @param int    $timeout
     *
     * @return bool|array
     * @throws TaskException
     */
    public static function deliver(string $taskName, string $methodName, array $params = [], string $type = self::TYPE_CO, $timeout = 3)
    {
        $data   = TaskHelper::pack($taskName, $methodName, $params, $type);

        if(!App::isWorkerStatus() && !App::isCoContext()){
            return self::deliverByQueue($data);//见下文Command章节
        }

        if(!App::isWorkerStatus() && App::isCoContext()){
            throw new TaskException('Please deliver task by http!');
        }


        $server = App::$server->getServer();
        // Delier coroutine task
        if ($type == self::TYPE_CO) {
            $tasks[0]  = $data;
            $prifleKey = 'task' . '.' . $taskName . '.' . $methodName;

            App::profileStart($prifleKey);
            $result = $server->taskCo($tasks, $timeout);
            App::profileEnd($prifleKey);

            return $result;
        }

        // Deliver async task
        return $server->task($data);
    }
}

任务投递Task::deliver()将调用参数打包后根据$type参数通过swoole的$server->taskCo()$server->task()接口投递到Task进程。
Task本身始终是同步执行的,$type仅仅影响投递这一操作的行为,Task::TYPE_ASYNC对应的$server->task()是异步投递,Task::deliver()调用后马上返回;Task::TYPE_CO对应的$server->taskCo()是协程投递,投递后让出协程控制,任务完成或执行超时后Task::deliver()才从协程返回。

任务执行

//Swoft\Task\Bootstrap\Listeners\TaskEventListener 
/**
 * The listener of swoole task
 * @SwooleListener({
 *     SwooleEvent::ON_TASK,
 *     SwooleEvent::ON_FINISH,
 * })
 */
class TaskEventListener implements TaskInterface, FinishInterface
{
    /**
     * @param \Swoole\Server $server
     * @param int            $taskId
     * @param int            $workerId
     * @param mixed          $data
     * @return mixed
     * @throws \InvalidArgumentException
     */
    public function onTask(Server $server, int $taskId, int $workerId, $data)
    {
        try {
            /* @var TaskExecutor $taskExecutor*/
            $taskExecutor = App::getBean(TaskExecutor::class);
            $result = $taskExecutor->run($data);
        } catch (\Throwable $throwable) {
            App::error(sprintf('TaskExecutor->run %s file=%s line=%d ', $throwable->getMessage(), $throwable->getFile(), $throwable->getLine()));
            $result = false;

            // Release system resources
            App::trigger(AppEvent::RESOURCE_RELEASE);

            App::trigger(TaskEvent::AFTER_TASK);
        }
        return $result;
    }
}

此处是swoole.onTask的事件回调,其职责仅仅是将将worker进程投递来的打包后的数据转发给TaskExecutor

Swoole的Task机制的本质是worker进程将耗时任务投递给同步的Task(又名TaskWorker)进程处理,所以swoole.onTask的事件回调是在Task进程中执行的。上文说过,worker进程是你大部分http服务代码执行的环境,但是从TaskEventListener.onTask()方法开始,代码的执行环境都是Task进程,也就是说,TaskExecutor和具体的TaskBean都是执行在Task进程中的。

//Swoft\Task\TaskExecutor
/**
 * The task executor
 *
 * @Bean()
 */
class TaskExecutor
{
    /**
     * @param string $data
     * @return mixed
    */
    public function run(string $data)
    {
        $data = TaskHelper::unpack($data);

        $name   = $data['name'];
        $type   = $data['type'];
        $method = $data['method'];
        $params = $data['params'];
        $logid  = $data['logid'] ?? uniqid('', true);
        $spanid = $data['spanid'] ?? 0;


        $collector = TaskCollector::getCollector();
        if (!isset($collector['task'][$name])) {
            return false;
        }

        list(, $coroutine) = $collector['task'][$name];
        $task = App::getBean($name);
        if ($coroutine) {
            $result = $this->runCoTask($task, $method, $params, $logid, $spanid, $name, $type);
        } else {
            $result = $this->runSyncTask($task, $method, $params, $logid, $spanid, $name, $type);
        }

        return $result;
    }

任务执行思路很简单,将Worker进程发过来的数据解包还原成原来的调用参数,根据$name参数找到对应的TaskBean并调用其对应的Task方法。其中TaskBean使用类级别注解@Task(name="TaskName")或者@Task("TaskName")声明。

值得一提的一点是,@Task注解除了name属性,还有一个coroutine属性,上述代码会根据该参数选择使用协程的runCoTask()或者同步的runSyncTask()执行Task。但是由于而且由于Swoole的Task进程的执行是完全同步的,不支持协程,所以目前版本请该参数不要配置为true。同样的在TaskBean中编写的任务代码必须的同步的或者是要能根据环境自动将异步和协程降级为同步的

从Process中投递任务

前面我们提到:

Swoole的Task机制的本质是worker进程将耗时任务投递给同步的Task进程(又名TaskWorker进程)处理。

换句话说,swoole的$server->taskCo()$server->task()都只能在worker进程中使用。
这个限制大大的限制了使用场景。 如何能够为了能够在Process中投递任务呢?Swoft为了绕过这个限制提供了Task::deliverByProcess()方法。其实现原理也很简单,通过Swoole的$server->sendMessage()方法将调用信息从Process中投递到Worker进程中,然后由Worker进程替其投递到Task进程当中,相关代码如下:

//Swoft\Task\Task.php
    /**
     * Deliver task by process
     *
     * @param string $taskName
     * @param string $methodName
     * @param array  $params
     * @param string $type
     * @param int    $timeout
     * @param int    $workId
     *
     * @return bool
     */
    public static function deliverByProcess(string $taskName, string $methodName, array $params = [], int $timeout = 3, int $workId = 0, string $type = self::TYPE_ASYNC): bool
    {
        /* @var PipeMessageInterface $pipeMessage */
        $server      = App::$server->getServer();
        $pipeMessage = App::getBean(PipeMessage::class);
        $data = [
            'name'    => $taskName,
            'method'  => $methodName,
            'params'  => $params,
            'timeout' => $timeout,
            'type'    => $type,
        ];

        $message = $pipeMessage->pack(PipeMessage::MESSAGE_TYPE_TASK, $data);
        return $server->sendMessage($message, $workId);
    }

数据打包后使用$server->sendMessage()投递给woerker:

//Swoft\Bootstrap\Server\ServerTrait.php
    /**
     * onPipeMessage event callback
     *
     * @param \Swoole\Server $server
     * @param int            $srcWorkerId
     * @param string         $message
     * @return void
     * @throws \InvalidArgumentException
     */
    public function onPipeMessage(Server $server, int $srcWorkerId, string $message)
    {
        /* @var PipeMessageInterface $pipeMessage */
        $pipeMessage = App::getBean(PipeMessage::class);
        list($type, $data) = $pipeMessage->unpack($message);

        App::trigger(AppEvent::PIPE_MESSAGE, null, $type, $data, $srcWorkerId);
    }

$server->sendMessage后,worker进程收到数据时会触发一个swoole.pipeMessage事件的回调,Swoft会将其转换成自己的swoft.pipeMessage事件并触发.

//Swoft\Task\Event\Listeners\PipeMessageListener.php
/**
 * The pipe message listener
 *
 * @Listener(event=AppEvent::PIPE_MESSAGE)
 */
class PipeMessageListener implements EventHandlerInterface
{
    /**
     * @param \Swoft\Event\EventInterface $event
     */
    public function handle(EventInterface $event)
    {
        $params = $event->getParams();
        if (count($params) < 3) {
            return;
        }

        list($type, $data, $srcWorkerId) = $params;

        if ($type != PipeMessage::MESSAGE_TYPE_TASK) {
            return;
        }

        $type       = $data['type'];
        $taskName   = $data['name'];
        $params     = $data['params'];
        $timeout    = $data['timeout'];
        $methodName = $data['method'];

        // delever task
        Task::deliver($taskName, $methodName, $params, $type, $timeout);
    }
}

swoft.pipeMessage事件最终由PipeMessageListener处理。在相关的监听其中,如果发现swoft.pipeMessage事件由Task::deliverByProcess()产生的,worker进程会替其执行一次Task::deliver(),最终将任务数据投递到TaskWorker进程中。

一道简单的回顾练习:从Task::deliverByProcess()到某TaskBean 最终执行任务,经历了哪些进程,而调用链的哪些部分又分别是在哪些进程中执行?

从Command进程或其子进程中投递任务

//Swoft\Task\QueueTask.php
    /**
     * @param string $data
     * @param int    $taskWorkerId
     * @param int    $srcWorkerId
     *
     * @return bool
     */
    public function deliver(string $data, int $taskWorkerId = null, $srcWorkerId = null)
    {
        if ($taskWorkerId === null) {
            $taskWorkerId = mt_rand($this->workerNum + 1, $this->workerNum + $this->taskNum);
        }

        if ($srcWorkerId === null) {
            $srcWorkerId = mt_rand(0, $this->workerNum - 1);
        }

        $this->check();
        $data   = $this->pack($data, $srcWorkerId);
        $result = \msg_send($this->queueId, $taskWorkerId, $data, false);
        if (!$result) {
            return false;
        }

        return true;
    }

对于Command进程的任务投递,情况会更复杂一点。
上文提到的Process,其往往衍生于Http/Rpc服务,作为同一个Manger的子孙进程,他们能够拿到Swoole\Server的句柄变量,从而通过$server->sendMessage(),$server->task()等方法进行任务投递。

但在Swoft的体系中,还有一个十分路人的角色: Command
Command的进程从shell或cronb独立启动,和Http/Rpc服务相关的进程没有亲缘关系。因此Command进程以及从Command中启动的Process进程是没有办法拿到Swoole\server的调用句柄直接通过UnixSocket进行任务投递的。
为了为这种进程提供任务投递支持,Swoft利用了Swoole的Task进程的一个特殊功能----消息队列。

使用消息队列的Task进程.png

同一个项目中Command和Http\RpcServer 通过约定一个message_queue_key获取到系统内核中的同一条消息队列,然后Comand进程就可以通过该消息队列向Task进程投递任务了。
该机制没有提供对外的公开方法,仅仅被包含在Task::deliver()方法中,Swoft会根据当前环境隐式切换投递方式。但该消息队列的实现依赖Semaphore拓展,如果你想使用,需要在编译PHP时加上--enable-sysvmsg参数。

定时任务

除了手动执行的普通任务,Swoft还提供了精度为秒的定时任务功能用来在项目中替代Linux的Crontab功能.

Swoft用两个前置Process---任务计划进程:CronTimerProcess和任务执行进程CronExecProcess
,和两张内存数据表-----RunTimeTable任务(配置)表OriginTable(任务)执行表)用于定时任务的管理调度。
两张表的每行记录的结构如下:

\\Swoft\Task\Crontab\TableCrontab.php
    /**
     * 任务表,记录用户配置的任务信息
     * 表每行记录包含的字段如下,其中`rule`,`taskClass`,`taskMethod`生成key唯一确定一条记录
     * @var array $originStruct 
     */
    private $originStruct = [
        'rule'       => [\Swoole\Table::TYPE_STRING, 100],//定时任务执行规则,对应@Scheduled注解的cron属性
        'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//任务名 对应@Task的name属性(默认为类名)
        'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//Task方法,对应@Scheduled注解所在方法
        'add_time'   => [\Swoole\Table::TYPE_STRING, 11],//初始化该表内容时的10位时间戳
    ];

    /**
     * 执行表,记录短时间内要执行的任务列表及其执行状态
     * 表每行记录包含的字段如下,其中`taskClass`,`taskMethod`,`minute`,`sec`生成key唯一确定一条记录
     * @var array $runTimeStruct 
     */
    private $runTimeStruct = [
        'taskClass'  => [\Swoole\Table::TYPE_STRING, 255],//同上
        'taskMethod' => [\Swoole\Table::TYPE_STRING, 255],//同上
        'minute'      => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 格式date('YmdHi')
        'sec'        => [\Swoole\Table::TYPE_STRING, 20],//需要执行任务的时间,精确到分钟 10位时间戳
        'runStatus'  => [\Swoole\TABLE::TYPE_INT, 4],//任务状态,有 0(未执行)  1(已执行)  2(执行中) 三种。 
        //注意:这里的执行是一个容易误解的地方,此处的执行并不是指任务本身的执行,而是值`任务投递`这一操作的执行,从宏观上看换成 _未投递_,_已投递_,_投递中_描述会更准确。
    ];

此处为何要使用Swoole的内存Table?

Swoft的的定时任务管理是分别由 任务计划进程任务执行进程 进程负责的。两个进程的运行共同管理定时任务,如果使用进程间独立的array()等结构,两个进程必然需要频繁的进程间通信。而使用跨进程的Table(本文的Table,除非特别说明,都指Swoole的Swoole\Table结构)直接进行进程间数据共享,不仅性能高,操作简单 还解耦了两个进程。

为了Table能够在两个进程间共同使用,Table必须在Swoole Server启动前创建并分配内存。具体代码在Swoft\Task\Bootstrap\Listeners->onBeforeStart()中,比较简单,有兴趣的可以自行阅读。

背景介绍完了,我们来看看这两个定时任务进程的行为

//Swoft\Task\Bootstrap\Process\CronTimerProcess.php
/**
 * Crontab timer process
 *
 * @Process(name="cronTimer", boot=true)
 */
class CronTimerProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        //code....
        /* @var \Swoft\Task\Crontab\Crontab $cron*/
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $time = (60 - date('s')) * 1000;
        $server->after($time, function () use ($server, $cron) {
            // Every minute check all tasks, and prepare the tasks that next execution point needs
            $cron->checkTask();
            $server->tick(60 * 1000, function () use ($cron) {
                $cron->checkTask();
            });
        });
    }
}
//Swoft\Task\Crontab\Crontab.php
    /**
     * 初始化runTimeTable数据
     *
     * @param array $task        任务
     * @param array $parseResult 解析crontab命令规则结果,即Task需要在当前分钟内的哪些秒执行
     * @return bool
     */
    private function initRunTimeTableData(array $task, array $parseResult): bool
    {
        $runTimeTableTasks = $this->getRunTimeTable()->table;

        $min = date('YmdHi');
        $sec = strtotime(date('Y-m-d H:i'));
        foreach ($parseResult as $time) {
            $this->checkTaskQueue(false);
            $key = $this->getKey($task['rule'], $task['taskClass'], $task['taskMethod'], $min, $time + $sec);
            $runTimeTableTasks->set($key, [
                'taskClass'  => $task['taskClass'],
                'taskMethod' => $task['taskMethod'],
                'minute'     => $min,
                'sec'        => $time + $sec,
                'runStatus'  => self::NORMAL
            ]);
        }

        return true;
    }

CronTimerProcess是Swoft的定时任务调度进程,其核心方法是Crontab->initRunTimeTableData()
该进程使用了Swoole的定时器功能,通过Swoole\Timer在每分钟首秒时执行的回调,CronTimerProcess每次被唤醒后都会遍历任务表计算出当前这一分钟内的60秒分别需要执行的任务清单,写入执行表并标记为 未执行

//Swoft\Task\Bootstrap\Process
/**
 * Crontab process
 *
 * @Process(name="cronExec", boot=true)
 */
class CronExecProcess implements ProcessInterface
{
    /**
     * @param \Swoft\Process\Process $process
     */
    public function run(SwoftProcess $process)
    {
        $pname = App::$server->getPname();
        $process->name(sprintf('%s cronexec process', $pname));

        /** @var \Swoft\Task\Crontab\Crontab $cron */
        $cron = App::getBean('crontab');

        // Swoole/HttpServer
        $server = App::$server->getServer();

        $server->tick(0.5 * 1000, function () use ($cron) {
            $tasks = $cron->getExecTasks();
            if (!empty($tasks)) {
                foreach ($tasks as $task) {
                    // Diliver task
                    Task::deliverByProcess($task['taskClass'], $task['taskMethod']);
                    $cron->finishTask($task['key']);
                }
            }
        });
    }
}

CronExecProcess作为定时任务的执行者,通过Swoole\Timer每0.5s唤醒自身一次,然后把 执行表 遍历一次,挑选当下需要执行的任务,通过sendMessage()投递出去并更新该 任务执行表中的状态。
该执行进程只负责任务的投递,任务的实际实际执行仍然在Task进程中由TaskExecutor处理。

定时任务的宏观执行情况如下:


Swoft定时任务机制.png

Swoft源码剖析系列目录:https://www.jianshu.com/p/2f679e0b4d58

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容

  • Swoft在PHPer圈中是一个门槛较高的Web框架,不仅仅由于框架本身带来了很多新概念和前沿的设计,还在于Swo...
    bromine阅读 15,285评论 0 18
  • 前文再续,就书接上一回,随着与Server、TCP、Protocol的邂逅,Swoole终于迎来了自己的故事,今天...
    蜗牛淋雨阅读 1,725评论 1 14
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,638评论 18 139
  • swoole 中的swerver,一个异步服务器程序,支持TCP、UDP、UnixSocket 3种协议,仅需要设...
    小小小胡阅读 712评论 0 0
  • 今日体验:无论是新客户还是老客户.进厂保养维修一定要听清楚客户的描述.以及故障点必要时一定要用笔记到接车单上面.往...
    京心达何海港阅读 198评论 0 0