从踩坑到稳定:基于Redis+Supervisor的PHP队列实践与优化之路

曾以为理解了队列先进先出的原理,封装一个Redis队列不过是手到擒来之事。无非是利用Redis列表结构配合 Supervisor守护进程,在列表尾部压入数据,从头部取出数据,一个完美的消息队列就诞生了。然而上线后却遭遇了各种意料之外的问题,本文将分享完整的实现流程与关键的优化步骤。


一、初始架构与核心实现

1.1 Redis基础封装 (RedisBase.php)

namespace app\common\lib\redis;

use think\facade\Cache;

class RedisBase extends Cache
{
    protected $handle = null;
    protected $options = [];
    protected $select_db_config = [/* 分库配置 */];

    public function __construct($cache_config_type = '')
    {
        $config = config('cache.stores.redis');
        // 动态选择数据库
        if ($cache_config_type && isset($this->select_db_config[$cache_config_type])) {
            $config['select'] = $this->select_db_config[$cache_config_type];
        }
        
        $this->options = $config;
        $this->options['prefix'] = $this->options['prefix'] ?? '';
        $this->handle = self::store('redis')->handler();
    }

    public function getCacheKey($name): string {
        return $this->options['prefix'] . $name;
    }

    public function expire(string $name, int $time) {
        return $this->handle->expire($this->getCacheKey($name), $time);
    }
}

1.2 队列核心类 (RedisQueue.php)

namespace app\common\lib\redis;

class RedisQueue extends RedisBase
{
    // 序列化处理(支持数组/对象)
    protected function serialize($data): string {
        return is_scalar($data) ? $data : json_encode($data, JSON_UNESCAPED_UNICODE);
    }
    
    protected function unserialize($data) {
        return ($data && $decoded = json_decode($data, true)) ? $decoded : $data;
    }

    // 关键队列操作
    public function rPush(string $name, $value): int {
        return $this->handle->rPush(
            $this->getCacheKey($name),
            $this->serialize($value)
        );
    }

    public function lPop(string $name) {
        return $this->unserialize(
            $this->handle->lPop($this->getCacheKey($name))
        );
    }
}

1.3 队列枚举管理 (Queue.php)

namespace app\common\enum;

class Queue
{
    const WECHAT_QUEUE = 'wechat_queue';
    const CAINIAO_PUT_IN_STORAGE_QUEUE = 'cainiao_put_in_storage_queue';

    public static function getAllQueue(): array {
        return [
            self::WECHAT_QUEUE => '微信消息推送',
            self::CAINIAO_PUT_IN_STORAGE_QUEUE => '菜鸟订单入库'
        ];
    }

    public static function isExist($queue): bool {
        return array_key_exists($queue, self::getAllQueue());
    }
}

1.4 消费者脚本 (QueueJob.php)

namespace app\command\queue;

use app\common\enum\Queue;
use app\common\lib\redis\RedisQueue;
use think\console\Command;

class QueueJob extends Command
{
    protected function configure() {
        $this->setName('queueJob')
             ->addArgument('business', Argument::REQUIRED, '队列名称');
    }

    protected function execute(Input $input, Output $output) {
        $business = $input->getArgument('business');
        $this->doJob($business);
    }

    // 核心消费逻辑
    protected function doJob(string $queueName) {
        $redis = new RedisQueue();
        while (true) {
            while ($data = $redis->lPop($queueName)) {
                try {
                    // 执行任务逻辑
                    $instance = app()->make($data['class']);
                    $instance->{$data['method']}(...$data['data']);
                } catch (\Exception $e) {
                    // 简单重试逻辑
                    $redis->lPush($queueName, $data);
                }
            }
            sleep(1); // 空队列休眠
        }
    }
}

1.5 生产者调用

// 投递任务示例
$redisQueue = new RedisQueue();
$redisQueue->rPush(
    Queue::CAINIAO_PUT_IN_STORAGE_QUEUE,
    [
        'class' => 'yejoin\YJTDeliveryOrder',
        'method' => 'processOrder',
        'data' => ['order_id' => 1001]
    ]
);

二、生产环境暴露的核心问题

  1. 连接不稳定:夜间Redis连接断开导致队列停止
  2. 阻塞消费缺失:轮询模式造成CPU空转
  3. 异常处理薄弱:失败任务缺乏有效管理
  4. 资源泄漏风险:长运行导致内存持续增长
  5. 日志可观测性差:关键错误未能有效捕获

三、系统性优化方案

3.1 架构级优化

1. 引入单例模式

namespace app\common\traits;

trait Singleton
{
    private static $instance = null;

    public static function getInstance() {
        return self::$instance ?? (self::$instance = new static());
    }
    
    private function __clone() {}
    public function __sleep() { return []; }
}

// RedisQueue中引入
class RedisQueue extends RedisBase {
    use Singleton;
}

2. 阻塞消费改造

- public function lPop(string $name) { ... }
+ public function blPop(string $name, int $timeout = 30) {
+   $result = $this->handle->blPop($this->getCacheKey($name), $timeout);
+   return $result ? $this->unserialize($result[1]) : null;
+ }

3. 生产者工具函数

function rPushQueueJob(string $queue, string $class, string $method, array $params) {
    if (!Queue::isExist($queue)) {
        throw new \InvalidArgumentException("未定义队列: {$queue}");
    }
    
    RedisQueue::getInstance()->rPush($queue, [
        'class' => $class,
        'method' => $method,
        'data' => $params
    ]);
}

// 调用简化为
rPushQueueJob(Queue::ORDER_QUEUE, OrderProcessor::class, 'handle', [$orderId]);

3.2 健壮性增强

1. 连接保活机制

class RedisBase extends Cache {
    private $lastPingTime = 0;
    
    // 心跳检测+断线重连
    public function ensureConnected() {
        try {
            if (time() - $this->lastPingTime > 3600) {
                $this->handle->ping();
                $this->lastPingTime = time();
            }
        } catch (\RedisException $e) {
            $this->reconnect(); // 包含关闭旧连接+新建连接
        }
    }
    
    // 关键操作前调用
    public function blPop(...) {
        $this->ensureConnected();
        // ...原有逻辑
    }
}

2. 消费端内存保护

class QueueJob extends Command {
    const MAX_MEMORY = 134217728; // 128MB
    
    protected function processJobs() {
        while (true) {
            // 内存超标重启
            if (memory_get_usage() > self::MAX_MEMORY) {
                exit(1); // Supervisor自动重启
            }
            
            // ...消费逻辑
        }
    }
}

3. 方法反射缓存

private $methodCache = [];

private function getMethodHandler(string $class, string $method): array {
    $key = "{$class}::{$method}";
    
    if (!isset($this->methodCache[$key])) {
        $reflection = new \ReflectionMethod($class, $method);
        $this->methodCache[$key] = [
            $reflection->isStatic() ? null : app()->make($class),
            $method,
            $reflection->isStatic()
        ];
    }
    
    return $this->methodCache[$key];
}

3.3 可观测性提升

统一错误日志

private function logQueueError(
    string $queue, 
    array $data, 
    string $error
) {
    event('GeneralLog', [
        'content' => [
            'queue' => $queue,
            'job_data' => $data,
            'error' => $error
        ],
        'type' => Log::QUEUE_ERROR
    ]);
}

四、终极陷阱:数据库连接超时

问题现象

  • 队列夜间停止消费
  • 无Redis相关错误日志
  • 重启Supervisor后恢复

根本原因
MySQL服务器主动关闭了闲置超时的连接(默认8小时),而ThinkPHP未启用自动重连

解决方案

// config/database.php
return [
    'connections' => [
        'mysql' => [
            'break_reconnect' => true // 启用断线重连
        ]
    ]
];

五、最终架构拓扑

deepseek_mermaid_20250530_73cbc4.png

六、核心优化总结

  1. 连接管理

    • Redis心跳检测(每小时)
    • 断线自动重连机制
    • MySQL连接池自动恢复
  2. 消费模式升级

    • 阻塞式消费(BLPOP)
    • 内存保护重启机制
    • 异常任务隔离处理
  3. 性能优化

    • 单例模式减少资源开销
    • 反射方法缓存
    • 序列化优化
  4. 可观测性

    • 统一错误日志格式
    • 关键指标监控(内存/队列深度)
    • 结构化日志上下文

经验启示:生产级队列系统需考虑四维稳定性——网络连接资源管理错误恢复可观测性。通过本次实践,我们不仅实现了基于Redis的高效队列,更构建了符合生产要求的可靠消息处理系统。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容