RabbitMQ与Laravel项目中结合

搭建RabbitMQ环境不在此文范围内,后面会单独出搭建的教程资料

与Laravel的结合使用

1.composer引入官方php-amqplib/php-amqplib包

2.封装消息生产者

/**
 * 入消息队列
 *
 * @param $queue string 队列名
 * @param $data mixed 数据
 */
public static function pushMessageQueue($queue, $data = null)
{
    $connection  = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $table = new AMQPTable([
        'x-queue-type' => 'classic'
    ]);

    $channel->queue_declare($queue, false, true, false, false, false, $table);

    $message = new AMQPMessage(json_encode($data, JSON_UNESCAPED_UNICODE));
    $channel->basic_publish($message, '', $queue);

    $channel->close();
    try {
        $connection->close();
    } catch (\Exception $e) {
    }
}

3.控制器调用封装好的入队方法

IndexController.php
SystemService::pushMessageQueue('other', ['date' => date('Y-m-d H:i:s')]);

4.封装消费者基类

<?php

namespace App\Console\Commands\Queue;

use PhpAmqpLib\Wire\AMQPTable;
use Illuminate\Console\Command;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;

class Base extends Command
{
    /**
     * rabbitMQ队列名称
     *
     * @var string
     */
    protected $queue = '';

    /**
     * rabbitMQ连接
     *
     * @var AMQPStreamConnection|null
     */
    protected $connection = null;

    /**
     * 连接频道
     *
     * @var \PhpAmqpLib\Channel\AMQPChannel|null
     */
    protected $channel = null;

    public function __construct()
    {
        parent::__construct();

        if (!empty($this->queue)) {
            $this->connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
            $this->channel = $this->connection->channel();
            $this->channel->queue_declare($this->queue, false, true, false, false, false, new AMQPTable([
                'x-queue-type' => 'classic'
            ]));

            $this->channel->basic_consume($this->queue, '', false, false, false, false, $this->handle());

            while (count($this->channel->callbacks)) {
                try {
                    $this->channel->wait();
                } catch (\ErrorException $exception) {
                }
            }
        }
    }

    public function handle()
    {
        return function ($message) {

        };
    }

    /**
     * 确认消息
     *
     * @param $message AMQPMessage 当前消息
     */
    protected function ack($message)
    {
        $this->channel->basic_ack($message->delivery_info['delivery_tag']);
    }

    /**
     * 拒收消息
     *
     * @param $message AMQPMessage 当前消息
     * @param $multiple bool 是否应用于多消息
     * @param $requeue bool 是否requeue
     */
    protected function nack($message, $multiple = false, $requeue = false)
    {
        $this->channel->basic_nack($message->delivery_info['delivery_tag'], $multiple, $requeue);
    }

    /**
     * 拒绝消息并选择是否重新入队
     *
     * @param $message AMQPMessage 当前消息
     * @param $requeue bool 是否requeue true则重新入队列(该消费者还是会消费到该条被reject的消息),否则丢弃或者进入死信队列。
     */
    protected function reject($message, $requeue = false)
    {
        $this->channel->basic_reject($message->delivery_info['delivery_tag'], $requeue);
    }

    /**
     * 是否恢复消息到队列
     *
     * @param $requeue bool true则重新入队列并且尽可能的将之前recover的消息投递给其他消费者消费,而不是自己再次消费,false则消息会重新被投递给自己
     */
    protected function recover($requeue = false)
    {
        $this->channel->basic_recover($requeue);
    }
}

5.other队列消费者

<?php

namespace App\Console\Commands\Queue;

class Other extends Base
{
    protected $queue = 'other';

    protected $signature = 'command:other';

    protected $description = '队列测试';

    public function handle()
    {
        return function ($message) {
            echo '收到消息:'.$message->body.PHP_EOL;
            // 业务....
            sleep(2);
            $this->ack($message);
        };
    }
}

6.Kernel文件进行注册artison命令

<?php

namespace App\Console;

use App\Console\Commands\Queue\Other;
use Illuminate\Console\Scheduling\Schedule;
use Illuminate\Foundation\Console\Kernel as ConsoleKernel;

class Kernel extends ConsoleKernel
{
    /**
     * The Artisan commands provided by your application.
     *
     * @var array
     */
    protected $commands = [
        Other::class
    ];

    /**
     * Define the application's command schedule.
     *
     * @param \Illuminate\Console\Scheduling\Schedule $schedule
     * @return void
     */
    protected function schedule(Schedule $schedule)
    {
        // $schedule->command('inspire')
        //          ->hourly();
    }

    /**
     * Register the commands for the application.
     *
     * @return void
     */
    protected function commands()
    {
        $this->load(__DIR__.'/Commands');

        require base_path('routes/console.php');
    }
}

7.代码写完了,运行一下看看效果

7.1模拟请求入队,直接请求对应控制器
7.2消费者输出
image.png
7.3RabbitMQ控制台监控
image.png
Tips:laravel需要注意下这里,去除composer执行完毕的自动发现包(php artsion package:discover)否则composer install/update会一直阻塞在消费队列监听。修改后如下图:
image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351