php-beanstalk 操作beanstalk
安装:
$ git clone https://gitee.com/qzfzz/php-beanstalk.git
$ cd php-beanstalk
$ /usr/local/php/bin/phpize
$ ./configure --with-php-config=/usr/local/php/bin/php-config && make && make install
# 追加 extension=beanstalk.so 至 php.ini
使用:
消费者
$b = new Beanstalk();
$b->connect( "127.0.0.1",11300 );
if(isset($b->connection) ? true : false){
$run = true;
$b->watch("user"); // 连接成功 监听 队列
}else{
echo "connect falied.\n";
sleep(12);
return false;// 连接失败,退出当前子进程,重新启动新的进程,重新连接
}
ini_set("default_socket_timeout",-1);
while($run){
if(!isset($b->connection)){
// 重新连接并监听
$b = new Beanstalk();
$b->connect( "127.0.0.1",11300 );
$b->watch("user");
}
// 默认监听60s,最好不要设置监听60s,实际生产者 12 s
$job = $b->reserve(12);
if($job == "TIMED_OUT\r\n"){
continue; // 重点:检测到超时情况
}
try{
if(($id = $job["id"] ?? 0) > 0){
$b->delete($id);
$std = @json_decode($job["data"] ?? "");
var_dump($job);
$bean->delete($id);
}
}catch(Exception $e){
echo "产生异常 : ".$e->getMessage()."\n";
}
}
生产者:
$b = new Beanstalk(); //
$b->connect("127.0.0.1",11300);
$jobId=$b->putInTube("user",json_encode([
"content"=>"我是消息内容",
"userId"=>12,
],JSON_UNESCAPED_UNICODE));
$b->close();
var_dump($jobId);
维护者方法
$b = new Beanstalk(); //
$b->connect("127.0.0.1",11300);
// 取出预留的任务
$job = $b->peekBuried($tube); // 一次取出一个 预留的任务,返回 $job 数组
var_dump($job); // ["id"=>int ,"data"=>"sfdsdfsfsfdsdfs"]
//$r = $b->kickJob($job["id"]); // 将消息 重新放回管道给消费者消费
$r = $b->delete($job["id"]); // 删除一个任务
/*
$b 所有的方法
string(7) "connect"
string(5) "close"
string(3) "put"
string(4) "peek"
string(9) "peekReady"
string(6) "delete"
string(5) "stats"
string(4) "bury"
string(6) "ignore"
string(4) "kick"
string(7) "kickJob"
string(9) "listTubes"
string(16) "listTubesWatched"
string(12) "listTubeUsed"
string(9) "pauseTube"
string(10) "resumeTube"
string(11) "peekDelayed"
string(10) "peekBuried"
string(9) "putInTube"
string(7) "release"
string(7) "reserve"
string(8) "statsJob"
string(9) "statsTube"
string(5) "touch"
string(7) "useTube"
string(5) "watch"
*/
$tubestatus = $b->statsTube("user");
$listTube = $b->listTubes();
$b->close();