搭建RabbitMQ环境不在此文范围内,后面会单独出搭建的教程资料
与Laravel的结合使用
1.composer引入官方php-amqplib/php-amqplib包
2.封装消息生产者
/**
* 入消息队列
*
* @param $queue string 队列名
* @param $data mixed 数据
*/
public static function pushMessageQueue($queue, $data = null)
{
$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$channel = $connection->channel();
$table = new AMQPTable([
'x-queue-type' => 'classic'
]);
$channel->queue_declare($queue, false, true, false, false, false, $table);
$message = new AMQPMessage(json_encode($data, JSON_UNESCAPED_UNICODE));
$channel->basic_publish($message, '', $queue);
$channel->close();
try {
$connection->close();
} catch (\Exception $e) {
}
}
3.控制器调用封装好的入队方法
IndexController.php
SystemService::pushMessageQueue('other', ['date' => date('Y-m-d H:i:s')]);
4.封装消费者基类
<?php
namespace App\Console\Commands\Queue;
use PhpAmqpLib\Wire\AMQPTable;
use Illuminate\Console\Command;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Base extends Command
{
/**
* rabbitMQ队列名称
*
* @var string
*/
protected $queue = '';
/**
* rabbitMQ连接
*
* @var AMQPStreamConnection|null
*/
protected $connection = null;
/**
* 连接频道
*
* @var \PhpAmqpLib\Channel\AMQPChannel|null
*/
protected $channel = null;
public function __construct()
{
parent::__construct();
if (!empty($this->queue)) {
$this->connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
$this->channel = $this->connection->channel();
$this->channel->queue_declare($this->queue, false, true, false, false, false, new AMQPTable([
'x-queue-type' => 'classic'
]));
$this->channel->basic_consume($this->queue, '', false, false, false, false, $this->handle());
while (count($this->channel->callbacks)) {
try {
$this->channel->wait();
} catch (\ErrorException $exception) {
}
}
}
}
public function handle()
{
return function ($message) {
};
}
/**
* 确认消息
*
* @param $message AMQPMessage 当前消息
*/
protected function ack($message)
{
$this->channel->basic_ack($message->delivery_info['delivery_tag']);
}
/**
* 拒收消息
*
* @param $message AMQPMessage 当前消息
* @param $multiple bool 是否应用于多消息
* @param $requeue bool 是否requeue
*/
protected function nack($message, $multiple = false, $requeue = false)
{
$this->channel->basic_nack($message->delivery_info['delivery_tag'], $multiple, $requeue);
}
/**
* 拒绝消息并选择是否重新入队
*
* @param $message AMQPMessage 当前消息
* @param $requeue bool 是否requeue true则重新入队列(该消费者还是会消费到该条被reject的消息),否则丢弃或者进入死信队列。
*/
protected function reject($message, $requeue = false)
{
$this->channel->basic_reject($message->delivery_info['delivery_tag'], $requeue);
}
/**
* 是否恢复消息到队列
*
* @param $requeue bool true则重新入队列并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费,false则消息会重新被投递给自己
*/
protected function recover($requeue = false)
{
$this->channel->basic_recover($requeue);
}
}
5.other队列消费者
<?php
namespace App\Console\Commands\Queue;
class Other extends Base
{
protected $queue = 'other';
protected $signature = 'command:other';
protected $description = '队列测试';
public function handle()
{
return function ($message) {
echo '收到消息:'.$message->body.PHP_EOL;
// 业务....
sleep(2);
$this->ack($message);
};
}
}
6.Kernel文件进行注册artison命令
<?php
namespace App\Console;
use App\Console\Commands\Queue\Other;
use Illuminate\Console\Scheduling\Schedule;
use Illuminate\Foundation\Console\Kernel as ConsoleKernel;
class Kernel extends ConsoleKernel
{
/**
* The Artisan commands provided by your application.
*
* @var array
*/
protected $commands = [
Other::class
];
/**
* Define the application's command schedule.
*
* @param \Illuminate\Console\Scheduling\Schedule $schedule
* @return void
*/
protected function schedule(Schedule $schedule)
{
// $schedule->command('inspire')
// ->hourly();
}
/**
* Register the commands for the application.
*
* @return void
*/
protected function commands()
{
$this->load(__DIR__.'/Commands');
require base_path('routes/console.php');
}
}
7.代码写完了,运行一下看看效果
7.1模拟请求入队,直接请求对应控制器
7.2消费者输出
7.3RabbitMQ控制台监控
Tips:laravel需要注意下这里,去除composer执行完毕的自动发现包(php artsion package:discover)否则composer install/update会一直阻塞在消费队列监听。修改后如下图: