直接上代码说明:
/**
* 链接
*/
private function connectBeanstalk():Beanstalk{
$b = new Beanstalk();
$b->connect("127.0.0.1",11300);
return $b;
}
/**
* 守护进程运行
* @param Pool $pool
* @param int $workerId
*/
public function workerStart(Pool $pool,int $workerId){
swoole_set_process_name("ansyc:image:{$workerId}");
$b = $this->connectBeanstalk();
$run =true;
// 判断是否连接成功
if(isset($b->connection) ? true : false){
$b->watch($this->tubeName); // 连接成功 监听 队列
}else{
echo "connect falied.\n";
$run = false;
sleep(12);// 连接失败,退出当前子进程,重新启动新的进程,重新连接
return false;
}
pcntl_signal(SIGTERM, function ($signo) use(&$run,$pool){
echo "pcntl 收到 SIGTERM 信号\n";
$run=false;
});
ini_set("default_socket_timeout",-1);
echo "进程 {$workerId} 启动\n";
while($run){
pcntl_signal_dispatch();
// 检测连接是否有效
if(!isset($bean->connection)){
echo "重新生成进程 \n";
$run = false; // 连接失败,重新fork 进程
}
echo "等待任务进入 \n";
$job = $b->reserve(12);//没有任务的时候 阻塞 12s
if($job == "TIMED_OUT\r\n"){
// "心跳检测"; 超时啦
continue;
}
if(($jobId = $job["id"] ?? 0) > 0){
echo "Pid:{$workerId} 存在下载任务 {$jobId} \n";
$data = $job["data"];
$std = @json_decode($data);
if($std){
echo "存在合法任务. \n";
// todo 处理任务 处理成功删除任务($b->delete($jobId)); 处理失败保存任务($b->bury($jobId))
}else{
echo "非法任务直接删除. \n";
$b->delete($jobId);
}
}
}
}