多进程开发3:计划任务调用脚本使用多进程处理任务

使用Laravel10框架,PHP语言使用8.2.21版本。
开发的需求如下:
1.每分钟执行一次计划任务,读取数据表test_task,查询status=3的任务
2.如果存在任务,通过多进程执行,根据test_task表里每条记录里num字段的大小,向test_news表插入 num条的数据。

这个脚本通过Laravel框架的artisan命令行来创建类文件,在这里面实现代码。

1.数据表结构

test_task 任务表

字段 说明
id 任务主键id
title 任务标题
num 当前任务要插入的新闻条数
status 1:任务进行中 3:定时处理 4:任务已完成

test_news 新闻表

字段 说明
id 主键id
content 内容
task_id 任务表主键ID

test_task表中插入记录
+----+-----------+-------+--------+------------+---------------------+
| id | title | num | status | created_at | updated_at |
+----+-----------+-------+--------+------------+---------------------+
| 1 | 任务一 | 500 | 4 | NULL | 2024-09-03 16:21:05 |
| 2 | 任务二 | 1500 | 4 | NULL | 2024-09-03 16:21:10 |
| 3 | 任务三 | 5500 | 4 | NULL | 2024-09-03 16:21:28 |
| 4 | 任务四 | 1500 | 4 | NULL | 2024-09-03 16:21:10 |
| 5 | 任务五 | 45000 | 4 | NULL | 2024-09-03 16:24:05 |
| 6 | 任务六 | 3000 | 4 | NULL | 2024-09-03 16:22:16 |
| 7 | 任务七 | 30000 | 4 | NULL | 2024-09-03 16:24:15 |
| 8 | 任务八 | 6000 | 4 | NULL | 2024-09-03 16:22:31 |
| 9 | 任务九 | 5000 | 4 | NULL | 2024-09-03 16:22:25 |
+----+-----------+-------+--------+------------+---------------------+

注意: 在最开始将ID 1-5 这五条记录设置为status=3,在第一批次被执行。
然后第二轮将 ID 6-9的这四条记录 改为status=3,作为第二批次运行。

2.将当前Artisan类添加到计划任务

进入文件 App\Console\Kernel.php里

引入命名空间

use App\Console\Commands\Cron\Jiemo;

将类文件添加到类属性$commands中

protected $commands = [
  \App\Console\Commands\Cron\Jiemo::class,
]

设置调度频率,每分钟执行一次脚本

$schedule->command('pro:jiemo')->everyMinute()->runInBackground();

3.主要代码展示

主要代码 在handle方法中,在这里完成了全部的业务逻辑,资源回收处理程序写在方法 handleSignal()中。

定义了一个静态成员属性 $pidPool ,存储创建的子进程id

在这个章节,展示和分析代码,在下一个章节,演示打印的日志结果

protected $signature = 'pro:jiemo';

    protected $description = '赢邦象自定义处理数据脚本';
    
    protected static $pidPool = [];

    public function __construct()
    {
        parent::__construct();
    }

    public function handle()
    {
        //启用异步接收信号,收到信号立即调用处理程序
        pcntl_async_signals(true);
        pcntl_signal(SIGCHLD,[self::class,'handleSignal']);
        
        $result = DB::table('test_task')->where('status',3)->get();

        if($result->isNotEmpty()){

            foreach($result as $value){

                //开启子进程
                $pid = pcntl_fork();
                $task_title = $value->title;

                if($pid == -1){

                    Log::channel('wechat')->debug('创建子进程失败',['test_task'=>$task_title]);
                }elseif($pid){
                    self::$pidPool[] = $pid;
                    Log::channel('wechat')->debug('创建子进程成功',['test_task'=>$task_title,'pid'=>$pid]);
                    
                }else{
                    //子进程逻辑
                    
                    Log::channel('wechat')->debug('子进程开始工作 ChildPid:'.posix_getpid());

                    //修改status=1 代表当前任务运行中
                    DB::table('test_task')->where('id',$value->id)->update([
                        'status'=>1,
                        'updated_at'=>date('Y-m-d H:i:s',time())
                    ]);

                    $num = $value->num;

                    //向数据表test_news 写入数据
                    for($i=1;$i<=$num;$i++){

                        $insert = [
                            'content'=>'向数据表写入程序',
                            'task_id'=>$value->id,
                            'created_at'=>date('Y-m-d H:i:s',time())
                        ];

                        DB::table('test_news')->insert($insert);
                    }

                    DB::table('test_task')->where('id',$value->id)->update([
                        'status'=>4,
                        'updated_at'=>date('Y-m-d H:i:s',time())
                    ]);

                    Log::channel('wechat')->debug('子进程完成工作准备退出 ChildPid:'.posix_getpid());
                    //写入完毕,退出子进程
                    exit(0);

                }


                

            }

          //在foreach结束后,添加这个while循环
          while(count(self::$pidPool)>0){
                sleep(1);
            }
    

        }

    }


    private static function handleSignal($signo){

        switch($signo){
            case SIGCHLD:

                while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){

                    $key = array_search($childPid,self::$pidPool);

                    if($key !== false){
                        unset(self::$pidPool[$key]);
                    }

                    Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
                }    

            
            break;
        }

    }

3.1 第一部分代码分析

 //启用异步接收信号,收到信号立即调用处理程序
pcntl_async_signals(true);
pcntl_signal(SIGCHLD,[self::class,'handleSignal']);

在handle方法内的第一行,开启异步信号接收,此时只要有子进程退出,它都会收到信号,并调用指定的信号处理器。

使用函数 pcntl_async_signals(true) ,参数用true时,代表异步接收信号,false:代表同步接收信号了。

pcntl_signal 这个函数的作用是,安装信号处理器。这样收到指定的信号时,会用专门的自定义方法去处理。

pcntl_signal函数的第二个参数 可以使用闭包,全局函数,也可以是静态方法,在当前的类里,我这里使用的是静态方法的形式。

3.2 自定义信号处理器

private static function handleSignal($signo){

        switch($signo){
            case SIGCHLD:

                while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){

                    $key = array_search($childPid,self::$pidPool);

                    if($key !== false){
                        unset(self::$pidPool[$key]);
                    }

                    Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
                }
            
            break;
        }

}

这段代码,最核心的部分就是通过pcntl_waitpid函数,进行非阻塞回收

while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){

                    $key = array_search($childPid,self::$pidPool);

                    if($key !== false){
                        unset(self::$pidPool[$key]);
                    }

                    Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
 }

如果有任意一个子进程退出时,在这里$childPid变量都会大于0,然后打印记录日志

下面这些代码的作用将在后面产生详细介绍

$key = array_search($childPid,self::$pidPool);

if($key !== false){
      unset(self::$pidPool[$key]);
}

3.3 创建多进程 处理业务

首先,查询数据表,只查询status=3的表记录,如果存在记录就进行遍历,每次遍历时,
通过pcntl_fork函数创建子进程,记住从pcntl_fork往下开始就进入了子进程环节。

$result = DB::table('test_task')->where('status',3)->get();
if($result->isNotEmpty()){

            foreach($result as $value){

                //开启子进程
                $pid = pcntl_fork();
                $task_title = $value->title;

然后通过if选择条件判断,pid= -1 代表子进程创建失败,pid存在并大于0,说明创建成功,返回的子进程的id, 这时将子进程ID存入到静态数组$pidPool里。
当返回的值为0时,说明进入子进程空间,else部分处理的就是子进程的业务逻辑。

if($pid == -1){

   Log::channel('wechat')->debug('创建子进程失败',['test_task'=>$task_title]);
                }elseif($pid){
     self::$pidPool[] = $pid;
     Log::channel('wechat')->debug('创建子进程成功',['test_task'=>$task_title,'pid'=>$pid]);
                    
                }else{

}

3.4 子进程业务逻辑

在子进程里,想打印当前子进程的ID时,只能通过getmypid() 和 posix_getpid() 这两个函数。

然后更改当前的任务状态 设置 status=1,防止计划任务重复执行

                    
Log::channel('wechat')->debug('子进程开始工作 ChildPid:'.posix_getpid());

//修改status=1 代表当前任务运行中
DB::table('test_task')->where('id',$value->id)->update([
       'status'=>1,
       'updated_at'=>date('Y-m-d H:i:s',time())
]);

$num = $value->num;

向数据表test_news 写入数据,写入操作完成后,更新当前任务状态 status=4 代表这条任务已经执行完成。

这个时候可以在记录一下日志。

最后一定要执行 exit(0) 退出,否则会一直在子进程里循环,这点一定要注意

for($i=1;$i<=$num;$i++){

                        $insert = [
                            'content'=>'向数据表写入程序',
                            'task_id'=>$value->id,
                            'created_at'=>date('Y-m-d H:i:s',time())
                        ];

                        DB::table('test_news')->insert($insert);
                    }

                    DB::table('test_task')->where('id',$value->id)->update([
                        'status'=>4,
                        'updated_at'=>date('Y-m-d H:i:s',time())
                    ]);

                    Log::channel('wechat')->debug('子进程完成工作准备退出 ChildPid:'.posix_getpid());
                    //写入完毕,退出子进程
                    exit(0);

3.5 主进程监控静态数组

while(count(self::$pidPool)>0){
                sleep(1);
}

在foreach执行完成后,添加了while循环,检查静态数组 $pidPool,当它为空时,才会结束脚本,在这个循环里,添加了sleep(1) 防止这个死循环消耗太高。

为什么要添加这个循环呢,因为在脚本的主进程里,foreach遍历,创建子进程,这个过程操作完成后,主进程就结束退出了。

后面当子进程完成工作,并退出后,找不到父进程了,无法回收资源,这些子进程就会变成孤儿进程,交由系统的Init进程来回收资源。

所以为了避免这种情况,在创建子进程成功后,将子进程的ID存入到静态数组 pidPool里,当每个子进程退出,并回收成功后,在从pidPool数组里,进行剔除。

while(($childPid = pcntl_waitpid(-1,$status,WNOHANG)) > 0){

                    $key = array_search($childPid,self::$pidPool);

                    if($key !== false){
                        unset(self::$pidPool[$key]);
                    }

                    Log::channel('wechat')->debug('子进程已经被回收',['childPid'=>$childPid,'total'=>count(self::$pidPool)]);
}

4.查看日志

计划任务运行了两次,打印了这些日志,第一个批次,任务一到任务五里,只有任务五没有完成,其余四个完成任务并得到了回收。

然后计划任务第二次运行时,处理任务六到任务九,这四项任务都回收成功。
并且最后当任务五执行完成后,信号处理器,也成功的回收了任务五。

[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务一","pid":5880} 
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5880  
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务二","pid":5882} 
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5882  
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务三","pid":5883} 
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5883  
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务四","pid":5884} 
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5884  
[2024-09-03 16:21:01] production.DEBUG: 创建子进程成功 {"test_task":"任务五","pid":5885} 
[2024-09-03 16:21:01] production.DEBUG: 子进程开始工作 ChildPid:5885  

[2024-09-03 16:21:05] production.DEBUG: 子进程准备退出 ChildPid:5880 {"ppid":5874} 
[2024-09-03 16:21:05] production.DEBUG: 子进程已经被回收 {"childPid":5880,"total":4} 
[2024-09-03 16:21:10] production.DEBUG: 子进程准备退出 ChildPid:5884 {"ppid":5874} 
[2024-09-03 16:21:10] production.DEBUG: 子进程准备退出 ChildPid:5882 {"ppid":5874} 
[2024-09-03 16:21:10] production.DEBUG: 子进程已经被回收 {"childPid":5884,"total":3} 
[2024-09-03 16:21:10] production.DEBUG: 子进程已经被回收 {"childPid":5882,"total":2} 



[2024-09-03 16:21:28] production.DEBUG: 子进程准备退出 ChildPid:5883 {"ppid":5874} 
[2024-09-03 16:21:28] production.DEBUG: 子进程已经被回收 {"childPid":5883,"total":1}



[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务六","pid":6013} 
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6013  
[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务七","pid":6014} 
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6014  
[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务八","pid":6016} 
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6016  
[2024-09-03 16:22:01] production.DEBUG: 创建子进程成功 {"test_task":"任务九","pid":6017} 
[2024-09-03 16:22:01] production.DEBUG: 子进程开始工作 ChildPid:6017  




[2024-09-03 16:22:16] production.DEBUG: 子进程准备退出 ChildPid:6013 {"ppid":6001} 
[2024-09-03 16:22:16] production.DEBUG: 子进程已经被回收 {"childPid":6013,"total":3} 
[2024-09-03 16:22:25] production.DEBUG: 子进程准备退出 ChildPid:6017 {"ppid":6001} 
[2024-09-03 16:22:25] production.DEBUG: 子进程已经被回收 {"childPid":6017,"total":2} 
[2024-09-03 16:22:31] production.DEBUG: 子进程准备退出 ChildPid:6016 {"ppid":6001} 
[2024-09-03 16:22:31] production.DEBUG: 子进程已经被回收 {"childPid":6016,"total":1} 

[2024-09-03 16:24:05] production.DEBUG: 子进程准备退出 ChildPid:5885 {"ppid":5874} 
[2024-09-03 16:24:05] production.DEBUG: 子进程已经被回收 {"childPid":5885,"total":0} 
[2024-09-03 16:24:15] production.DEBUG: 子进程准备退出 ChildPid:6014 {"ppid":6001} 
[2024-09-03 16:24:15] production.DEBUG: 子进程已经被回收 {"childPid":6014,"total":0} 
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容