Swoole Task

Task任务进程是Swoole中独立于Worker工作进程的一个异步工作进程,用于处理一些耗时较长的逻辑。这些逻辑如果在Task异步任务进程中处理,不会影响到Worker工作进程处理来自客户端的请求,因此大大提高了Swoole扩展并发处理能力。

Task异步任务进程和Worker工作进程之间通过UNIX的Sock管道进行通信,也可以配置通过消息队列进行通信,Task异步任务进程只能传递字符串格式的消息。

应用场景

Task异步任务进程主要用于执行耗时较长的操作上,如给多人发送邮件,广播消息等需要长时间等待的操作。

机制原理

Swoole的Task异步任务机制的本质是Worker工作进程将耗时的任务投递给异步的TaskWorker任务工作进程中进行处理,所以swoole.onTask事件回调是在Task异步任务进程中执行的。

TaskWorker进程
  • onTask回调函数用于执行Task异步任务
  • onFinish回调函数用于处理Task异步任务的返回结果

注意事项

  • Task异步任务进程传递的数据大小

当数据小于8KB时,在Swoole的结构中会直接通过管道进行传递,当数据大于8KB时会超出Swoole的buffer缓冲空间,此时数据会被先写入系统临时文件中进行传递。在onTask监听到后再去系统临时目录\tmp下读取文件。

  • Task异步任务进程传递对象

可以通过序列化传递一个对象的拷贝,注意这里并不是传递了一个对象的引用。由于Task异步任务进程和Worker工作进程是两个各自独立的进程,拥有各自不同的内存空间。因此,Task异步任务进程中对象的改变不会反映到Worker工作进程中。

Task异步任务进程中数据库连接、网络连接对象是不可以传递的。

  • Task异步任务进程的onFinish回调

Task异步任务进程的onFinish回调会返回并回调task方法的Worker工作进程,也就是返回给投递者的进程。

代码实践

实践1:客户端向服务器发送消息,服务器以异步方式处理后返回JSON字符串给客户端。

创建服务器

$ vim server.php
<?php
class Server
{
    public $test;
    public function __construct($host="0.0.0.0", $port=9501, $options=[])
    {
        $svr = new swoole_server($host, $port);
        if(!empty($options)){
            $svr->set($options);
        }
        $svr->on("Start",[$this, "onStart"]);
        $svr->on("Connect", [$this, "onConnect"]);
        $svr->on("Receive", [$this, "onReceive"]);
        $svr->on("Close", [$this, "onClose"]);
        $svr->on("Task", [$this, "onTask"]);
        $svr->on("Finish", [$this, "onFinish"]);
        $svr->on("WorkerStart", [$this, "onWorkerStart"]);
        $svr->start();
    }
    public function log($msg)
    {
        $filepath = __DIR__.DIRECTORY_SEPARATOR;
        $filename = date("Ymd").".log";
        $file = $filepath.$filename;

        $message = "[".date("Y-m-d H:i:s")."] ";
        $message .= $msg;
        $message .= PHP_EOL;
        file_put_contents($file, $message, FILE_APPEND);
    }
    public function onStart($svr)
    {
        echo __METHOD__.PHP_EOL;
    }
    public function onConnect($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    public function onClose($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    /**
     * 接收来自客户端的请求并转发数据
     */
    public function onReceive(swoole_server $svr, $fd, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}:data:{$data}".PHP_EOL;
        
        //收到客户端数据创建任务并投递
        $params = [];
        $params["client_id"] = $fd;//客户端描述符
        $params["unique_id"] = time().mt_rand(100000, 999999);
        $params["message"] = $data;
        //Worker进程使用task()向Task进程投递任务数据
        $svr->task(json_encode($params));//task只能传递字符串
    }
    /**
     * 监听并处理Worker工作线程
     */
    public function onWorkerStart($svr, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}".PHP_EOL;
    }
    /**
     * Task进程的onTask方法
     * 接收Worker进程使用task()函数投递的任务数据
     */
    public function onTask($svr, $task_id, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:task:{$task_id}:data:{$data}".PHP_EOL;

        //获取参数获取任务
        $params = json_decode($data, true);
        //处理数据
        $data = [];
        $data["client_id"] = $params["client_id"];
        $data["unique_id"] = $params["unique_id"];
        $data["message"] = "thank you";

        //发送数据给客户端
        $svr->send($params["client_id"], json_encode($data));
        
        //返回数据给onFinish
        return "finished";       
    }
    /**
     * Worker进程的onFinish方法
     * 用于接收Task进程执行finish后的参数
     **/
    public function onFinish($svr, $task_id, $data)
    {
        echo __METHOD__.":task:{$task_id}:data:{$data}".PHP_EOL;
    }
}
$host = "0.0.0.0";
$port = 9501;
$options = [];
$options["worker_num"] = 2;
$options["daemonize"] = false;
// $options["max_request"] = 1000;
// $options["dispatch_mode"] = 2;
$options["task_worker_num"] = 2;
$server = new Server($host, $port, $options);

创建客户端

$ vim client.php
<?php
class Client
{
    private $client;
    public function __construct(){
        $this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
        $this->client->on("Connect", [$this, "onConnect"]);
        $this->client->on("Receive", [$this, "onReceive"]);
        $this->client->on("Close", [$this, "onClose"]);
        $this->client->on("Error", [$this, "onError"]);
    }
    /**
     * 连接服务器
     */
    public function connect($host, $port)
    {
        $fp = $this->client->connect($host, $port, 1);
        if(!$fp){
            echo __METHOD__.":ErrMsg:".$fp->errMsg.":ErrCode:".$fp->errCode.PHP_EOL;
            return;
        }
    }
    /**
     * 执行connect()方法后会自动调用onConnect()方法
     */
    public function onConnect($client)
    {
        //接收命令行CLI标准输入,并向服务器发送消息。
        fwrite(STDOUT, "Enter Message:");
        swoole_event_add(STDIN, function(){
            fwrite(STDOUT, "Enter Message:");
            $msg = trim(fgets(STDIN));
            $this->send($msg);
        });
    }
    /**
     * 向服务器发送消息
     */
    public function send($msg)
    {
        $this->client->send($msg);
    }
    /**
     * 判断客户端是否仍然连接
     */
    public function isConnected()
    {
        return $this->client->isConnected();
    }
    /**
     * 接收来自服务器的数据
     */
    public function onReceive($client, $data)
    {
        echo __METHOD__.":".$data.PHP_EOL;
    }
    public function onClose($client)
    {
        echo __METHOD__.":client close connection".PHP_EOL;
    }
    public function onError()
    {
        echo __METHOD__.":error";
    }
}
$client = new Client();
$client->connect("127.0.0.1", 9501);

运行服务器

$ php server.php

运行服务端并发送数据

$ php client.php
Enter Message: hello server

查看服务器命令行输出

$ php server.php
Server::onStart
Server::onWorkerStart:worker:3
Server::onWorkerStart:worker:2
Server::onWorkerStart:worker:0
Server::onWorkerStart:worker:1
Server::onConnect:worker:0:client:1
Server::onReceive:worker:0:client:1:data:hello server
Server::onTask:worker:1:task:0:data:{"client_id":1,"unique_id":"1555006210826270","message":"hello server"}
Server::onFinish:task:0:data:finished

查看客户端命令行输出

Enter Message:hello server
Enter Message:Client::onReceive:{"client_id":1,"unique_id":"1555006210826270","message":"thank you"}

案例2:使用Task异步任务进程传递对象副本而非引用

<?php
class Test
{
    public $index = 0;
}
class Server
{
    public $test;
    public function __construct($host="0.0.0.0", $port=9501, $options=[])
    {
        $svr = new swoole_server($host, $port);
        if(!empty($options)){
            $svr->set($options);
        }
        $svr->on("Start",[$this, "onStart"]);
        $svr->on("Connect", [$this, "onConnect"]);
        $svr->on("Receive", [$this, "onReceive"]);
        $svr->on("Close", [$this, "onClose"]);
        $svr->on("Task", [$this, "onTask"]);
        $svr->on("Finish", [$this, "onFinish"]);
        $svr->on("WorkerStart", [$this, "onWorkerStart"]);
        $svr->start();
    }
    public function log($msg)
    {
        $filepath = __DIR__.DIRECTORY_SEPARATOR;
        $filename = date("Ymd").".log";
        $file = $filepath.$filename;

        $message = "[".date("Y-m-d H:i:s")."] ";
        $message .= $msg;
        $message .= PHP_EOL;
        file_put_contents($file, $message, FILE_APPEND);
    }
    public function onStart($svr)
    {
        echo __METHOD__.PHP_EOL;
    }
    public function onConnect($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    public function onClose($svr, $fd, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
    }
    /**
     * 接收来自客户端的请求并转发数据
     */
    public function onReceive(swoole_server $svr, $fd, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:client:{$fd}:data:{$data}".PHP_EOL;
        
        //创建对象
        $this->test = new Test();
        $svr->task(serialize($this->test));
    }
    /**
     * 监听并处理Worker工作线程
     */
    public function onWorkerStart($svr, $worker_id)
    {
        echo __METHOD__.":worker:{$worker_id}".PHP_EOL;
    }
    /**
     * Task进程的onTask方法
     * 接收Worker进程使用task()函数投递的任务数据
     */
    public function onTask($svr, $task_id, $worker_id, $data)
    {
        echo __METHOD__.":worker:{$worker_id}:task:{$task_id}:data:{$data}".PHP_EOL;
        //获取序列化后的对象
        $obj = unserialize($data);
        $this->log("onTask:index:".$obj->index);
        $obj->index = 1;
        //返回数据给onFinish
        return "finished";       
    }
    /**
     * Worker进程的onFinish方法
     * 用于接收Task进程执行finish后的参数
     **/
    public function onFinish($svr, $task_id, $data)
    {
        echo __METHOD__.":task:{$task_id}:data:{$data}".PHP_EOL;

        $this->log("onFinish:index:".$this->test->index);
    }
}
$host = "0.0.0.0";
$port = 9501;
$options = [];
$options["worker_num"] = 2;
$options["daemonize"] = false;
// $options["max_request"] = 1000;
// $options["dispatch_mode"] = 2;
$options["task_worker_num"] = 2;
$server = new Server($host, $port, $options);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 进程 什么是进程 进程Process是计算机中的程序关于某数据集合上的一次运行活动,是系统分配资源和调度的基本单位...
    JunChow520阅读 6,246评论 2 9
  • 前文再续,就书接上一回,随着与Server、TCP、Protocol的邂逅,Swoole终于迎来了自己的故事,今天...
    蜗牛淋雨阅读 5,796评论 1 14
  • 参考资料 官方网站 https://www.swoole.com/page/download PHP没有像Pyth...
    JunChow520阅读 8,131评论 0 6
  • swoole 中的swerver,一个异步服务器程序,支持TCP、UDP、UnixSocket 3种协议,仅需要设...
    小小小胡阅读 4,110评论 0 0
  • Swoft的任务功能基于Swoole的Task机制,或者说Swoft的Task机制本质就是对SwooleTask机...
    bromine阅读 12,079评论 3 8

友情链接更多精彩内容