思路:首先swoole作为后台守护进程处理耗时的任务(Job),通过swoole_client将任务丢进后台Job去处理,等候处理完毕将结果通知到回调Url上,一个Job处理完毕。
话不多说,先看代码实现过程
1、队列处理(核心):server.php
<?php
$serv = new swoole_server("192.168.206.128", 9502);
// 设置异步任务的工作进程数量
$serv->set(['task_worker_num' => 4]);
$serv->on('receive', function($serv, $fd, $from_id, $data) {
$serv->send($fd, '{"status":true}');
//投递异步任务
$serv->task($data);
});
// 处理异步任务
$serv->on('task', function ($serv, $task_id, $from_id, $data) {
// 休眠模拟任务处理时间
sleep(5);
$serv->finish($data);
});
// 处理异步任务的结果
$serv->on('finish', function ($serv, $task_id, $data) {
$data = json_decode($data, true);
$callbackInfo = parse_url($data['callback']);
// 利用协程发起处理完成的通知请求
go(function () use ($callbackInfo) {
$cli = new Swoole\Coroutine\Http\Client($callbackInfo['host'], ($callbackInfo['scheme'] === 'http' ? 80 : 443));
// 发起Http POST请求
$cli->post($callbackInfo['path'], ['status' => true]);
echo $cli->body . PHP_EOL;
$cli->close();
});
});
$serv->start();
面向对象版
<?php
use Swoole\Server as SwooleServer;
use Swoole\Coroutine\Http\Client as HttpClient;
class Server
{
private $server = null;
public function __construct()
{
$this->server = new SwooleServer('192.168.206.128', 9502);
// 设置异步任务的工作进程数量
$this->server->set([
'worker_num' => 2,
'task_worker_num' => 2
]);
// 添加事件监听
$this->server->on('start', [$this, 'onStart']);
$this->server->on('connect', [$this, 'onConnect']);
$this->server->on('receive', [$this, 'onReceive']);
$this->server->on('task', [$this, 'onTask']);
$this->server->on('finish', [$this, 'onFinish']);
$this->server->on('close', [$this, 'onClose']);
$this->server->start();
}
public function onStart($server)
{
echo 'Server start...'.PHP_EOL;
}
public function onConnect($server, $fd, $reactorId)
{
echo '新的连接:'.$fd.PHP_EOL;
}
public function onReceive($server, $fd, $reactorId, $data)
{
echo '收到数据:'.$data.PHP_EOL;
$data = json_decode($data, true);
if (!$data) {
$server->send($fd, json_encode([
'status' => false,
'message' => '数据有误'
], JSON_UNESCAPED_UNICODE));
$server->close($fd, true);
}
$server->send($fd, json_encode([
'status' => true,
'message' => '等待处理'
], JSON_UNESCAPED_UNICODE));
// 开始执行异步任务
$server->task($data);
}
public function onTask($server, $taskId, $srcWorkerId, $data)
{
if (!isset($data['task_url'])) {
$server->finish($data);
}
// 利用协程发起处理请求
go(function () use ($server, $data) {
$callbackInfo = parse_url($data['task_url']);
$cli = new HttpClient($callbackInfo['host'], ($callbackInfo['scheme'] === 'http' ? 80 : 443));
$cli->setHeaders([
'Content-Type' => "application/json"
]);
// 发起Http POST请求
$cli->post($callbackInfo['path'], ['status' => true]);
// echo $cli->body.':'.date('H:i:s').PHP_EOL;
$cli->close();
$server->finish($data);
});
}
public function onFinish($server, $taskId, $data)
{
echo '处理完成:'.$taskId.PHP_EOL;
}
public function onClose($server, $fd, $reactorId)
{
echo '断开连接:'.$fd.PHP_EOL;
}
}
new Server();
run:php server.php
2、接口:client.php
(业务中可以自行封装至框架中)
<?php
$client = new swoole_client(SWOOLE_SOCK_TCP);
if (!$client->connect('192.168.206.128', 9502, -1)) {
exit("connect failed. Error: {$client->errCode}\n");
}
$data = [
'callback' => 'http://192.168.206.128/callback.php'// 处理完成通知地址
];
$client->send(json_encode($data));
// 输出任务响应信息 {"status":true}
$res = json_decode($client->recv(), true);
$client->close();
if ($res['status']) {
echo 'success';
} else {
echo 'error';
}
run:postman直接请求client.php
3、接收结果:callback.php
<?php
$data = file_get_contents('php://input');
file_put_contents('./result.log', $data . PHP_EOL, FILE_APPEND);
echo 'success';
最后callback.php
去完善任务处理成功做的一下事情
相信看完你已经了解swoole异步队列的一些应用了