Swoole Task

Task任务进程是Swoole中独立于Worker工作进程的一个异步工作进程,用于处理一些耗时较长的逻辑。这些逻辑如果在Task异步任务进程中处理,不会影响到Worker工作进程处理来自客户端的请求,因此大大提高了Swoole扩展并发处理能力。

Task异步任务进程和Worker工作进程之间通过UNIX的Sock管道进行通信,也可以配置通过消息队列进行通信,Task异步任务进程只能传递字符串格式的消息。

应用场景

Task异步任务进程主要用于执行耗时较长的操作上,如给多人发送邮件,广播消息等需要长时间等待的操作。

机制原理

Swoole的Task异步任务机制的本质是Worker工作进程将耗时的任务投递给异步的TaskWorker任务工作进程中进行处理,所以swoole.onTask事件回调是在Task异步任务进程中执行的。

TaskWorker进程
  • onTask回调函数用于执行Task异步任务
  • onFinish回调函数用于处理Task异步任务的返回结果

注意事项

  • Task异步任务进程传递的数据大小

当数据小于8KB时,在Swoole的结构中会直接通过管道进行传递,当数据大于8KB时会超出Swoole的buffer缓冲空间,此时数据会被先写入系统临时文件中进行传递。在onTask监听到后再去系统临时目录\tmp下读取文件。

  • Task异步任务进程传递对象

可以通过序列化传递一个对象的拷贝,注意这里并不是传递了一个对象的引用。由于Task异步任务进程和Worker工作进程是两个各自独立的进程,拥有各自不同的内存空间。因此,Task异步任务进程中对象的改变不会反映到Worker工作进程中。

Task异步任务进程中数据库连接、网络连接对象是不可以传递的。

  • Task异步任务进程的onFinish回调

Task异步任务进程的onFinish回调会返回并回调task方法的Worker工作进程,也就是返回给投递者的进程。

代码实践

实践1:客户端向服务器发送消息,服务器以异步方式处理后返回JSON字符串给客户端。

创建服务器

$ vim server.php
<?php
class Server
{
    public $test;
    public function __construct($host="0.0.0.0", $port=9501, $options=[])
    {
        $svr = new swoole_server($host, $port);
        if(!empty($options)){
            $svr->set($options);
        }
        $svr->on("Start",[$this, "onStart"]);
        $svr->on("Connect", [$this, "onConnect"]);
        $svr->on("Receive", [$this, "onReceive"]);
        $svr->on("Close", [$this, "onClose"]);
        $svr->on("Task", [$this, "onTask"]);
        $svr->on("Finish", [$this, "onFinish"]);
        $svr->on("WorkerStart", [$this, "onWorkerStart"]);
        $svr->start();
    }
    public function log($msg)
    {
        $filepath = __DIR__.DIRECTORY_SEPARATOR;
        $filename = date("Ymd").".log";
        $file = $filepath.$filename;

        $message = "[".date("Y-m-d H:i:s")."] ";
        $message .= $msg;
        $message .= PHP_EOL;
        file_put_contents($file, $message, FILE_APPEND);
    }
    public function onStart($svr)
    {
        echo __METHOD__.PHP_EOL;
    }
    public function onConnect($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    public function onClose($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    /**
     * 接收来自客户端的请求并转发数据
     */
    public function onReceive(swoole_server $svr, $fd, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}:data:{$data}".PHP_EOL;
        
        //收到客户端数据创建任务并投递
        $params = [];
        $params["client_id"] = $fd;//客户端描述符
        $params["unique_id"] = time().mt_rand(100000, 999999);
        $params["message"] = $data;
        //Worker进程使用task()向Task进程投递任务数据
        $svr->task(json_encode($params));//task只能传递字符串
    }
    /**
     * 监听并处理Worker工作线程
     */
    public function onWorkerStart($svr, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}".PHP_EOL;
    }
    /**
     * Task进程的onTask方法
     * 接收Worker进程使用task()函数投递的任务数据
     */
    public function onTask($svr, $task_id, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:task:{$task_id}:data:{$data}".PHP_EOL;

        //获取参数获取任务
        $params = json_decode($data, true);
        //处理数据
        $data = [];
        $data["client_id"] = $params["client_id"];
        $data["unique_id"] = $params["unique_id"];
        $data["message"] = "thank you";

        //发送数据给客户端
        $svr->send($params["client_id"], json_encode($data));
        
        //返回数据给onFinish
        return "finished";       
    }
    /**
     * Worker进程的onFinish方法
     * 用于接收Task进程执行finish后的参数
     **/
    public function onFinish($svr, $task_id, $data)
    {
        echo __METHOD__.":task:{$task_id}:data:{$data}".PHP_EOL;
    }
}
$host = "0.0.0.0";
$port = 9501;
$options = [];
$options["worker_num"] = 2;
$options["daemonize"] = false;
// $options["max_request"] = 1000;
// $options["dispatch_mode"] = 2;
$options["task_worker_num"] = 2;
$server = new Server($host, $port, $options);

创建客户端

$ vim client.php
<?php
class Client
{
    private $client;
    public function __construct(){
        $this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
        $this->client->on("Connect", [$this, "onConnect"]);
        $this->client->on("Receive", [$this, "onReceive"]);
        $this->client->on("Close", [$this, "onClose"]);
        $this->client->on("Error", [$this, "onError"]);
    }
    /**
     * 连接服务器
     */
    public function connect($host, $port)
    {
        $fp = $this->client->connect($host, $port, 1);
        if(!$fp){
            echo __METHOD__.":ErrMsg:".$fp->errMsg.":ErrCode:".$fp->errCode.PHP_EOL;
            return;
        }
    }
    /**
     * 执行connect()方法后会自动调用onConnect()方法
     */
    public function onConnect($client)
    {
        //接收命令行CLI标准输入,并向服务器发送消息。
        fwrite(STDOUT, "Enter Message:");
        swoole_event_add(STDIN, function(){
            fwrite(STDOUT, "Enter Message:");
            $msg = trim(fgets(STDIN));
            $this->send($msg);
        });
    }
    /**
     * 向服务器发送消息
     */
    public function send($msg)
    {
        $this->client->send($msg);
    }
    /**
     * 判断客户端是否仍然连接
     */
    public function isConnected()
    {
        return $this->client->isConnected();
    }
    /**
     * 接收来自服务器的数据
     */
    public function onReceive($client, $data)
    {
        echo __METHOD__.":".$data.PHP_EOL;
    }
    public function onClose($client)
    {
        echo __METHOD__.":client close connection".PHP_EOL;
    }
    public function onError()
    {
        echo __METHOD__.":error";
    }
}
$client = new Client();
$client->connect("127.0.0.1", 9501);

运行服务器

$ php server.php

运行服务端并发送数据

$ php client.php
Enter Message: hello server

查看服务器命令行输出

$ php server.php
Server::onStart
Server::onWorkerStart:worker:3
Server::onWorkerStart:worker:2
Server::onWorkerStart:worker:0
Server::onWorkerStart:worker:1
Server::onConnect:worker:0:client:1
Server::onReceive:worker:0:client:1:data:hello server
Server::onTask:worker:1:task:0:data:{"client_id":1,"unique_id":"1555006210826270","message":"hello server"}
Server::onFinish:task:0:data:finished

查看客户端命令行输出

Enter Message:hello server
Enter Message:Client::onReceive:{"client_id":1,"unique_id":"1555006210826270","message":"thank you"}

案例2:使用Task异步任务进程传递对象副本而非引用

<?php
class Test
{
    public $index = 0;
}
class Server
{
    public $test;
    public function __construct($host="0.0.0.0", $port=9501, $options=[])
    {
        $svr = new swoole_server($host, $port);
        if(!empty($options)){
            $svr->set($options);
        }
        $svr->on("Start",[$this, "onStart"]);
        $svr->on("Connect", [$this, "onConnect"]);
        $svr->on("Receive", [$this, "onReceive"]);
        $svr->on("Close", [$this, "onClose"]);
        $svr->on("Task", [$this, "onTask"]);
        $svr->on("Finish", [$this, "onFinish"]);
        $svr->on("WorkerStart", [$this, "onWorkerStart"]);
        $svr->start();
    }
    public function log($msg)
    {
        $filepath = __DIR__.DIRECTORY_SEPARATOR;
        $filename = date("Ymd").".log";
        $file = $filepath.$filename;

        $message = "[".date("Y-m-d H:i:s")."] ";
        $message .= $msg;
        $message .= PHP_EOL;
        file_put_contents($file, $message, FILE_APPEND);
    }
    public function onStart($svr)
    {
        echo __METHOD__.PHP_EOL;
    }
    public function onConnect($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    public function onClose($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    /**
     * 接收来自客户端的请求并转发数据
     */
    public function onReceive(swoole_server $svr, $fd, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}:data:{$data}".PHP_EOL;
        
        //创建对象
        $this->test = new Test();
        $svr->task(serialize($this->test));
    }
    /**
     * 监听并处理Worker工作线程
     */
    public function onWorkerStart($svr, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}".PHP_EOL;
    }
    /**
     * Task进程的onTask方法
     * 接收Worker进程使用task()函数投递的任务数据
     */
    public function onTask($svr, $task_id, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:task:{$task_id}:data:{$data}".PHP_EOL;
        //获取序列化后的对象
        $obj = unserialize($data);
        $this->log("onTask:index:".$obj->index);
        $obj->index = 1;
        //返回数据给onFinish
        return "finished";       
    }
    /**
     * Worker进程的onFinish方法
     * 用于接收Task进程执行finish后的参数
     **/
    public function onFinish($svr, $task_id, $data)
    {
        echo __METHOD__.":task:{$task_id}:data:{$data}".PHP_EOL;

        $this->log("onFinish:index:".$this->test->index);
    }
}
$host = "0.0.0.0";
$port = 9501;
$options = [];
$options["worker_num"] = 2;
$options["daemonize"] = false;
// $options["max_request"] = 1000;
// $options["dispatch_mode"] = 2;
$options["task_worker_num"] = 2;
$server = new Server($host, $port, $options);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335

推荐阅读更多精彩内容

  • 进程 什么是进程 进程Process是计算机中的程序关于某数据集合上的一次运行活动,是系统分配资源和调度的基本单位...
    JunChow520阅读 2,011评论 2 9
  • 前文再续,就书接上一回,随着与Server、TCP、Protocol的邂逅,Swoole终于迎来了自己的故事,今天...
    蜗牛淋雨阅读 1,702评论 1 14
  • 参考资料 官方网站 https://www.swoole.com/page/download PHP没有像Pyth...
    JunChow520阅读 2,928评论 0 6
  • swoole 中的swerver,一个异步服务器程序,支持TCP、UDP、UnixSocket 3种协议,仅需要设...
    小小小胡阅读 697评论 0 0
  • Swoft的任务功能基于Swoole的Task机制,或者说Swoft的Task机制本质就是对SwooleTask机...
    bromine阅读 7,174评论 3 8