Task任务进程是Swoole中独立于Worker工作进程的一个异步工作进程,用于处理一些耗时较长的逻辑。这些逻辑如果在Task异步任务进程中处理,不会影响到Worker工作进程处理来自客户端的请求,因此大大提高了Swoole扩展并发处理能力。
Task异步任务进程和Worker工作进程之间通过UNIX的Sock管道进行通信,也可以配置通过消息队列进行通信,Task异步任务进程只能传递字符串格式的消息。
应用场景
Task异步任务进程主要用于执行耗时较长的操作上,如给多人发送邮件,广播消息等需要长时间等待的操作。
机制原理
Swoole的Task异步任务机制的本质是Worker工作进程将耗时的任务投递给异步的TaskWorker任务工作进程中进行处理,所以swoole.onTask
事件回调是在Task异步任务进程中执行的。
-
onTask
回调函数用于执行Task异步任务 -
onFinish
回调函数用于处理Task异步任务的返回结果
注意事项
- Task异步任务进程传递的数据大小
当数据小于8KB时,在Swoole的结构中会直接通过管道进行传递,当数据大于8KB时会超出Swoole的buffer缓冲空间,此时数据会被先写入系统临时文件中进行传递。在onTask
监听到后再去系统临时目录\tmp
下读取文件。
- Task异步任务进程传递对象
可以通过序列化传递一个对象的拷贝,注意这里并不是传递了一个对象的引用。由于Task异步任务进程和Worker工作进程是两个各自独立的进程,拥有各自不同的内存空间。因此,Task异步任务进程中对象的改变不会反映到Worker工作进程中。
Task异步任务进程中数据库连接、网络连接对象是不可以传递的。
- Task异步任务进程的
onFinish
回调
Task异步任务进程的onFinish
回调会返回并回调task
方法的Worker工作进程,也就是返回给投递者的进程。
代码实践
实践1:客户端向服务器发送消息,服务器以异步方式处理后返回JSON字符串给客户端。
创建服务器
$ vim server.php
<?php
class Server
{
public $test;
public function __construct($host="0.0.0.0", $port=9501, $options=[])
{
$svr = new swoole_server($host, $port);
if(!empty($options)){
$svr->set($options);
}
$svr->on("Start",[$this, "onStart"]);
$svr->on("Connect", [$this, "onConnect"]);
$svr->on("Receive", [$this, "onReceive"]);
$svr->on("Close", [$this, "onClose"]);
$svr->on("Task", [$this, "onTask"]);
$svr->on("Finish", [$this, "onFinish"]);
$svr->on("WorkerStart", [$this, "onWorkerStart"]);
$svr->start();
}
public function log($msg)
{
$filepath = __DIR__.DIRECTORY_SEPARATOR;
$filename = date("Ymd").".log";
$file = $filepath.$filename;
$message = "[".date("Y-m-d H:i:s")."] ";
$message .= $msg;
$message .= PHP_EOL;
file_put_contents($file, $message, FILE_APPEND);
}
public function onStart($svr)
{
echo __METHOD__.PHP_EOL;
}
public function onConnect($svr, $fd, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
}
public function onClose($svr, $fd, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
}
/**
* 接收来自客户端的请求并转发数据
*/
public function onReceive(swoole_server $svr, $fd, $worker_id, $data)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}:data:{$data}".PHP_EOL;
//收到客户端数据创建任务并投递
$params = [];
$params["client_id"] = $fd;//客户端描述符
$params["unique_id"] = time().mt_rand(100000, 999999);
$params["message"] = $data;
//Worker进程使用task()向Task进程投递任务数据
$svr->task(json_encode($params));//task只能传递字符串
}
/**
* 监听并处理Worker工作线程
*/
public function onWorkerStart($svr, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}".PHP_EOL;
}
/**
* Task进程的onTask方法
* 接收Worker进程使用task()函数投递的任务数据
*/
public function onTask($svr, $task_id, $worker_id, $data)
{
echo __METHOD__.":worker:{$worker_id}:task:{$task_id}:data:{$data}".PHP_EOL;
//获取参数获取任务
$params = json_decode($data, true);
//处理数据
$data = [];
$data["client_id"] = $params["client_id"];
$data["unique_id"] = $params["unique_id"];
$data["message"] = "thank you";
//发送数据给客户端
$svr->send($params["client_id"], json_encode($data));
//返回数据给onFinish
return "finished";
}
/**
* Worker进程的onFinish方法
* 用于接收Task进程执行finish后的参数
**/
public function onFinish($svr, $task_id, $data)
{
echo __METHOD__.":task:{$task_id}:data:{$data}".PHP_EOL;
}
}
$host = "0.0.0.0";
$port = 9501;
$options = [];
$options["worker_num"] = 2;
$options["daemonize"] = false;
// $options["max_request"] = 1000;
// $options["dispatch_mode"] = 2;
$options["task_worker_num"] = 2;
$server = new Server($host, $port, $options);
创建客户端
$ vim client.php
<?php
class Client
{
private $client;
public function __construct(){
$this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$this->client->on("Connect", [$this, "onConnect"]);
$this->client->on("Receive", [$this, "onReceive"]);
$this->client->on("Close", [$this, "onClose"]);
$this->client->on("Error", [$this, "onError"]);
}
/**
* 连接服务器
*/
public function connect($host, $port)
{
$fp = $this->client->connect($host, $port, 1);
if(!$fp){
echo __METHOD__.":ErrMsg:".$fp->errMsg.":ErrCode:".$fp->errCode.PHP_EOL;
return;
}
}
/**
* 执行connect()方法后会自动调用onConnect()方法
*/
public function onConnect($client)
{
//接收命令行CLI标准输入,并向服务器发送消息。
fwrite(STDOUT, "Enter Message:");
swoole_event_add(STDIN, function(){
fwrite(STDOUT, "Enter Message:");
$msg = trim(fgets(STDIN));
$this->send($msg);
});
}
/**
* 向服务器发送消息
*/
public function send($msg)
{
$this->client->send($msg);
}
/**
* 判断客户端是否仍然连接
*/
public function isConnected()
{
return $this->client->isConnected();
}
/**
* 接收来自服务器的数据
*/
public function onReceive($client, $data)
{
echo __METHOD__.":".$data.PHP_EOL;
}
public function onClose($client)
{
echo __METHOD__.":client close connection".PHP_EOL;
}
public function onError()
{
echo __METHOD__.":error";
}
}
$client = new Client();
$client->connect("127.0.0.1", 9501);
运行服务器
$ php server.php
运行服务端并发送数据
$ php client.php
Enter Message: hello server
查看服务器命令行输出
$ php server.php
Server::onStart
Server::onWorkerStart:worker:3
Server::onWorkerStart:worker:2
Server::onWorkerStart:worker:0
Server::onWorkerStart:worker:1
Server::onConnect:worker:0:client:1
Server::onReceive:worker:0:client:1:data:hello server
Server::onTask:worker:1:task:0:data:{"client_id":1,"unique_id":"1555006210826270","message":"hello server"}
Server::onFinish:task:0:data:finished
查看客户端命令行输出
Enter Message:hello server
Enter Message:Client::onReceive:{"client_id":1,"unique_id":"1555006210826270","message":"thank you"}
案例2:使用Task异步任务进程传递对象副本而非引用
<?php
class Test
{
public $index = 0;
}
class Server
{
public $test;
public function __construct($host="0.0.0.0", $port=9501, $options=[])
{
$svr = new swoole_server($host, $port);
if(!empty($options)){
$svr->set($options);
}
$svr->on("Start",[$this, "onStart"]);
$svr->on("Connect", [$this, "onConnect"]);
$svr->on("Receive", [$this, "onReceive"]);
$svr->on("Close", [$this, "onClose"]);
$svr->on("Task", [$this, "onTask"]);
$svr->on("Finish", [$this, "onFinish"]);
$svr->on("WorkerStart", [$this, "onWorkerStart"]);
$svr->start();
}
public function log($msg)
{
$filepath = __DIR__.DIRECTORY_SEPARATOR;
$filename = date("Ymd").".log";
$file = $filepath.$filename;
$message = "[".date("Y-m-d H:i:s")."] ";
$message .= $msg;
$message .= PHP_EOL;
file_put_contents($file, $message, FILE_APPEND);
}
public function onStart($svr)
{
echo __METHOD__.PHP_EOL;
}
public function onConnect($svr, $fd, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
}
public function onClose($svr, $fd, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}".PHP_EOL;
}
/**
* 接收来自客户端的请求并转发数据
*/
public function onReceive(swoole_server $svr, $fd, $worker_id, $data)
{
echo __METHOD__.":worker:{$worker_id}:client:{$fd}:data:{$data}".PHP_EOL;
//创建对象
$this->test = new Test();
$svr->task(serialize($this->test));
}
/**
* 监听并处理Worker工作线程
*/
public function onWorkerStart($svr, $worker_id)
{
echo __METHOD__.":worker:{$worker_id}".PHP_EOL;
}
/**
* Task进程的onTask方法
* 接收Worker进程使用task()函数投递的任务数据
*/
public function onTask($svr, $task_id, $worker_id, $data)
{
echo __METHOD__.":worker:{$worker_id}:task:{$task_id}:data:{$data}".PHP_EOL;
//获取序列化后的对象
$obj = unserialize($data);
$this->log("onTask:index:".$obj->index);
$obj->index = 1;
//返回数据给onFinish
return "finished";
}
/**
* Worker进程的onFinish方法
* 用于接收Task进程执行finish后的参数
**/
public function onFinish($svr, $task_id, $data)
{
echo __METHOD__.":task:{$task_id}:data:{$data}".PHP_EOL;
$this->log("onFinish:index:".$this->test->index);
}
}
$host = "0.0.0.0";
$port = 9501;
$options = [];
$options["worker_num"] = 2;
$options["daemonize"] = false;
// $options["max_request"] = 1000;
// $options["dispatch_mode"] = 2;
$options["task_worker_num"] = 2;
$server = new Server($host, $port, $options);