曾以为理解了队列先进先出的原理,封装一个Redis队列不过是手到擒来之事。无非是利用Redis列表结构配合 Supervisor守护进程,在列表尾部压入数据,从头部取出数据,一个完美的消息队列就诞生了。然而上线后却遭遇了各种意料之外的问题,本文将分享完整的实现流程与关键的优化步骤。
一、初始架构与核心实现
1.1 Redis基础封装 (RedisBase.php)
namespace app\common\lib\redis;
use think\facade\Cache;
class RedisBase extends Cache
{
protected $handle = null;
protected $options = [];
protected $select_db_config = [/* 分库配置 */];
public function __construct($cache_config_type = '')
{
$config = config('cache.stores.redis');
// 动态选择数据库
if ($cache_config_type && isset($this->select_db_config[$cache_config_type])) {
$config['select'] = $this->select_db_config[$cache_config_type];
}
$this->options = $config;
$this->options['prefix'] = $this->options['prefix'] ?? '';
$this->handle = self::store('redis')->handler();
}
public function getCacheKey($name): string {
return $this->options['prefix'] . $name;
}
public function expire(string $name, int $time) {
return $this->handle->expire($this->getCacheKey($name), $time);
}
}
1.2 队列核心类 (RedisQueue.php)
namespace app\common\lib\redis;
class RedisQueue extends RedisBase
{
// 序列化处理(支持数组/对象)
protected function serialize($data): string {
return is_scalar($data) ? $data : json_encode($data, JSON_UNESCAPED_UNICODE);
}
protected function unserialize($data) {
return ($data && $decoded = json_decode($data, true)) ? $decoded : $data;
}
// 关键队列操作
public function rPush(string $name, $value): int {
return $this->handle->rPush(
$this->getCacheKey($name),
$this->serialize($value)
);
}
public function lPop(string $name) {
return $this->unserialize(
$this->handle->lPop($this->getCacheKey($name))
);
}
}
1.3 队列枚举管理 (Queue.php)
namespace app\common\enum;
class Queue
{
const WECHAT_QUEUE = 'wechat_queue';
const CAINIAO_PUT_IN_STORAGE_QUEUE = 'cainiao_put_in_storage_queue';
public static function getAllQueue(): array {
return [
self::WECHAT_QUEUE => '微信消息推送',
self::CAINIAO_PUT_IN_STORAGE_QUEUE => '菜鸟订单入库'
];
}
public static function isExist($queue): bool {
return array_key_exists($queue, self::getAllQueue());
}
}
1.4 消费者脚本 (QueueJob.php)
namespace app\command\queue;
use app\common\enum\Queue;
use app\common\lib\redis\RedisQueue;
use think\console\Command;
class QueueJob extends Command
{
protected function configure() {
$this->setName('queueJob')
->addArgument('business', Argument::REQUIRED, '队列名称');
}
protected function execute(Input $input, Output $output) {
$business = $input->getArgument('business');
$this->doJob($business);
}
// 核心消费逻辑
protected function doJob(string $queueName) {
$redis = new RedisQueue();
while (true) {
while ($data = $redis->lPop($queueName)) {
try {
// 执行任务逻辑
$instance = app()->make($data['class']);
$instance->{$data['method']}(...$data['data']);
} catch (\Exception $e) {
// 简单重试逻辑
$redis->lPush($queueName, $data);
}
}
sleep(1); // 空队列休眠
}
}
}
1.5 生产者调用
// 投递任务示例
$redisQueue = new RedisQueue();
$redisQueue->rPush(
Queue::CAINIAO_PUT_IN_STORAGE_QUEUE,
[
'class' => 'yejoin\YJTDeliveryOrder',
'method' => 'processOrder',
'data' => ['order_id' => 1001]
]
);
二、生产环境暴露的核心问题
- 连接不稳定:夜间Redis连接断开导致队列停止
- 阻塞消费缺失:轮询模式造成CPU空转
- 异常处理薄弱:失败任务缺乏有效管理
- 资源泄漏风险:长运行导致内存持续增长
- 日志可观测性差:关键错误未能有效捕获
三、系统性优化方案
3.1 架构级优化
1. 引入单例模式
namespace app\common\traits;
trait Singleton
{
private static $instance = null;
public static function getInstance() {
return self::$instance ?? (self::$instance = new static());
}
private function __clone() {}
public function __sleep() { return []; }
}
// RedisQueue中引入
class RedisQueue extends RedisBase {
use Singleton;
}
2. 阻塞消费改造
- public function lPop(string $name) { ... }
+ public function blPop(string $name, int $timeout = 30) {
+ $result = $this->handle->blPop($this->getCacheKey($name), $timeout);
+ return $result ? $this->unserialize($result[1]) : null;
+ }
3. 生产者工具函数
function rPushQueueJob(string $queue, string $class, string $method, array $params) {
if (!Queue::isExist($queue)) {
throw new \InvalidArgumentException("未定义队列: {$queue}");
}
RedisQueue::getInstance()->rPush($queue, [
'class' => $class,
'method' => $method,
'data' => $params
]);
}
// 调用简化为
rPushQueueJob(Queue::ORDER_QUEUE, OrderProcessor::class, 'handle', [$orderId]);
3.2 健壮性增强
1. 连接保活机制
class RedisBase extends Cache {
private $lastPingTime = 0;
// 心跳检测+断线重连
public function ensureConnected() {
try {
if (time() - $this->lastPingTime > 3600) {
$this->handle->ping();
$this->lastPingTime = time();
}
} catch (\RedisException $e) {
$this->reconnect(); // 包含关闭旧连接+新建连接
}
}
// 关键操作前调用
public function blPop(...) {
$this->ensureConnected();
// ...原有逻辑
}
}
2. 消费端内存保护
class QueueJob extends Command {
const MAX_MEMORY = 134217728; // 128MB
protected function processJobs() {
while (true) {
// 内存超标重启
if (memory_get_usage() > self::MAX_MEMORY) {
exit(1); // Supervisor自动重启
}
// ...消费逻辑
}
}
}
3. 方法反射缓存
private $methodCache = [];
private function getMethodHandler(string $class, string $method): array {
$key = "{$class}::{$method}";
if (!isset($this->methodCache[$key])) {
$reflection = new \ReflectionMethod($class, $method);
$this->methodCache[$key] = [
$reflection->isStatic() ? null : app()->make($class),
$method,
$reflection->isStatic()
];
}
return $this->methodCache[$key];
}
3.3 可观测性提升
统一错误日志
private function logQueueError(
string $queue,
array $data,
string $error
) {
event('GeneralLog', [
'content' => [
'queue' => $queue,
'job_data' => $data,
'error' => $error
],
'type' => Log::QUEUE_ERROR
]);
}
四、终极陷阱:数据库连接超时
问题现象:
- 队列夜间停止消费
- 无Redis相关错误日志
- 重启Supervisor后恢复
根本原因:
MySQL服务器主动关闭了闲置超时的连接(默认8小时),而ThinkPHP未启用自动重连
解决方案:
// config/database.php
return [
'connections' => [
'mysql' => [
'break_reconnect' => true // 启用断线重连
]
]
];
五、最终架构拓扑

deepseek_mermaid_20250530_73cbc4.png
六、核心优化总结
-
连接管理
- Redis心跳检测(每小时)
- 断线自动重连机制
- MySQL连接池自动恢复
-
消费模式升级
- 阻塞式消费(BLPOP)
- 内存保护重启机制
- 异常任务隔离处理
-
性能优化
- 单例模式减少资源开销
- 反射方法缓存
- 序列化优化
-
可观测性
- 统一错误日志格式
- 关键指标监控(内存/队列深度)
- 结构化日志上下文
经验启示:生产级队列系统需考虑四维稳定性——网络连接、资源管理、错误恢复和可观测性。通过本次实践,我们不仅实现了基于Redis的高效队列,更构建了符合生产要求的可靠消息处理系统。