thinkphp5 的 redis 队列实现原理与bug修复

使用方式

composer require topthink/think-queue:1.1.6 // 官方版
composer require sorry510/think-queue // 修复bug版,断线重连

注意事项

redis 服务端

设置超时时间为 0 或一个尽量大的数,否则队列的某个任务执行时间过程,会导致服务器端主动断开连接

timeout 300

redis 客户端

application\extra\queue.php, timeout 为 0,永不超时

<?php
use think\Env;
return [
    //Redis驱动
    'connector' => 'redis',
    "expire" => 60, //任务过期时间默认为秒,禁用为null
    "default" => "default", //默认队列名称
    "host" => ENV::get('redis.host', '127.0.0.1'), //Redis主机IP地址
    "port" => ENV::get('redis.port', '6379'), //Redis端口
    "password" => ENV::get('redis.password', ''), //Redis密码
    "select" => ENV::get('redis.index', 0), //Redis数据库索引
    "timeout" => 0, //Redis连接超时时间
    "persistent" => false, //是否长连接
];

使用方式

  • listen 模式,每次调用都会开启新的进程,实时调用最新的代码,部署不推荐,特别消耗 cpu 资源
php think queue:listen --queue=xxx
  • work 模式,常驻内存模式,无法调用到修改后的代码,需要重启
php think queue:work --daemon --queue=xxx

work 实现原理

redis 数据

2 种数据类型

list

存储将要执行的任务,key 为 queues:队列名称, 例如,投递一个任务queue($job, '', 0, 'test_queue'),就会生成一个 key 为 queues:test_queue 的 list 数据, 数据元素结构如下:

{
    job:"app\job\JobTest"
    data:""
    id:"CW03tlfol59ClkE7BsC6SuHTIiMkzcVM"
    attempts:1
}
sorted set
  • 执行中的任务

key 为 queues:队列名称:reserved, score为当前时间 + 过期时间(--expire),数据内容与 list 一致

  • 投递的延时的任务

key 为 queues:队列名称:delayed, score为当前时间 + 延时时间(--delay),数据内容与 list 一致

  • 执行失败时的重新发布的任务

key 为 queues:队列名称:delayed, score为当前时间 + 延时时间(--delay),数据内容与 list 一致

运行示意图

tp5.0 redis 队列.jpg

运行方式

  1. 无延时投递任务时时以 left 方式 push 到 list,延时时写入到 sorted set 中
  2. 运行时使用轮训(查看代码 vendor\topthink\think-queue\src\queue\command\Work.php 的 107 行 while (true) {)方式运行,不断循环执行,每次执行后都会睡眠 3 秒钟(可以配置 --sleep 1 修改为 1 秒)
  3. 执行时首先会查询延迟任务(key 为 queues:队列名称:delayed),如果延时到期了,就从set中删除此任务移动到list中(right push)
  4. 然后查询执行失败的任务,从 set 移动到 list(right push),并记录执行次数+1
  5. 之后开始正式的执行环节,首先会从 list 中 right pop 最新的记录,移动到 sorted set(执行中的任务) 中然后开始执行 job
  6. 执行时,首先会判断执行次数是否大于尝试次数(可以配置 --tries 3 设置为3,默认为 0 永久重试),如果大于尝试次数,就从 sorted set 中删除对应数据,并执行 job 的 failed 方法,反之,就正常执行 job 的 fire 方法,成功之后删除 set 中的对应数据,等待休眠,进行下一次循环中,如果 job 的 fire 方法抛出异常且没有被删除(手动调用 $job->delete() 方法),就重新发布 job
  7. 发布逻辑为从 set 中删除之前的对应数据,重新添加一条数据, queues:队列名称:delayed, score 为 当前时间 + 延时时间(--delay),并记录执行次数+1
  8. 下面为主体执行代码逻辑,详情请查看源码 vendor\topthink\think-queue\src\queue\Worker.php 文件
public function process(Job $job, $maxTries = 0, $delay = 0)
{
    if ($maxTries > 0 && $job->attempts() > $maxTries) {
        return $this->logFailedJob($job);
    }

    try {
        $job->fire();

        return ['job' => $job, 'failed' => false];
    } catch (Exception $e) {
        if (!$job->isDeleted()) {
            $job->release($delay);
        }

        throw $e;
    }
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容