PHP的微服务其实和正常的crontab脚本区别不大,唯一的区别就是微服务可以自动判断进程是否被杀死 杀死的话就会重启
言语表达能力有限 废话不多说 直接上代码
<?php
/**
* 异步队列拼单
* @author yjq
*/
include_once __DIR__ . ('/../../kis_lib_zhangyusport/load.php');
set_time_limit(0);
ini_set('memory_limit', '256M');
$argv[1]();
function check() {
$max = 1;
$buffer = array();
$buf = array();
$handle = popen ( "ps aux | grep 'asyncQueue/spelllog'| grep 'process' | grep -v grep | awk '{print $2}'", "r" );
while ( ! feof ( $handle ) ) {
$buffer = fgets ( $handle );
$buf[] = $buffer;
}
pclose($handle);
$han_cou = count($buf);
while ($han_cou <= $max) {
$path = dirname(__FILE__) . "/spelllog.php";
$coms = "nohup php {$path} process >> /tmp/spelllog.log 2>&1 &";
exec($coms);
$han_cou++;
}
unset($buffer,$buf);
}
function process(){
while (true){
$pid = pcntl_fork();
if ($pid == - 1) {
// 错误处理:创建子进程失败时返回-1.
die('could not fork');
}else if($pid){
pcntl_waitpid($pid, $status);
}else{
try{
$list = lib_pay_plan_dao_order::getAppointTypeOrder();
hlp_log::log('spelllog','start:'.$list);
if($list){
lib_pay_plan_model_order::getNoCompleteSpell($list);
}
} catch (Exception $ex) {
hlp_log::log('spelllog',$ex->getMessage());
}
exit;
}
usleep(1000000);
}
}
1、check方法是运行整个微服务的,具体用法就是crontab
*/2 * * * * source /etc/profile && /usr/local/php/bin/php -f /opt/wwwroot/crontab.zhangyusport.com/asyncQueue/spelllog.php check > /dev/null 2>&1 &
每两分钟运行check一次就是让微服务跑起来
2、process 就是进程代码了
*注意 子父进程不能共用同一个数据库链接 这样会报错的
3、上面的代码只控制跑了一个进程 进程条数的控制在check方法里面的 $max就代表是最大进程 usleep里面的是微秒 切记是微秒 不是毫秒
下面再贴出2个进程的代码。其实微服务也很简单,就是要对linux的命令灵活运行和掌握才能写出优秀的微服务
<?php
/**
* 异步队列记录投票日志
* @author zs
*/
include_once __DIR__ . ('/../../kis_lib_zhangyusport/load.php');
set_time_limit(0);
ini_set('memory_limit', '256M');
$argv[1]();
function check() {
$max = 2;
$buffer = array();
$buf = array();
$handle = popen ( "ps aux | grep 'queue/votelog'| grep 'process' | grep -v grep | awk '{print $2}'", "r" );
while ( ! feof ( $handle ) ) {
$buffer = fgets ( $handle );
$buf[] = $buffer;
}
pclose($handle);
$han_cou = count($buf);
while ($han_cou <= $max) {
$path = dirname(__FILE__) . "/votelog.php";
$coms = "nohup php {$path} process >> /tmp/votelog.log 2>&1 &";
exec($coms);
$han_cou++;
}
unset($buffer,$buf);
}
function process(){
$queue_key = lib_queue_redis::queuekey_votlog;
while (true){
$message = lib_queue_redis::getItem($queue_key);
if($message){
excePid($message);
}else{
usleep(20000);
}
}
}
/**
* fork子线程执行逻辑
*/
function excePid($message){
$pid = pcntl_fork();
if ($pid == - 1) {
// 错误处理:创建子进程失败时返回-1.
die('could not fork');
}else if($pid){
pcntl_waitpid($pid, $status);
}else{
try{
hlp_log::log('votelog_queue','start:'.$message);
$message = json_decode($message,true);
if(is_array($message)){
lib_news_model_vote::insertVotelog($message);
}
} catch (Exception $ex) {
hlp_log::log('votelog_queue',$ex->getMessage());
}
exit;
}
}