RabbitMQ死信队列实战

前言:

之前有写过死信队列的使用场景以及通过管控台创建死信。这次就通过代码实现死信队列的创建,同时也分享一下RabbitMQ封装的类。

准备:

1. 先准备一个死信队列(最后用来消费)的参数配置,包括虚拟机,交换机,队列,有效时间等,如下。

193d6acca3f67a453bde93c881e1019a_up-b60af12822bd93f33a2771887041ce07594.png

2. 按照上面在RabbitMQ中创建虚拟机和交换机,死信队列。并让交换机与死信队列绑定,操作方法前面有介绍。

bff1e0452fb45ee5cde0bc46d1ccc749_up-5cb1ea1636e62ce35729ea91c8ec8e6d49f.png

3. 这里就直接提供rabbitMQ操作的基本封装的类,包括一个基类,生产者类,消费者类。

3.1. 基类。

<?php
namespace rabbitmq;

/** Member
 *  AMQPChannel
 *  AMQPConnection
 *  AMQPEnvelope
 *  AMQPExchange
 *  AMQPQueue
 * Class BaseMQ
 * @package rabbitMQ
 */
class BaseMQ
{
    /** MQ Channel
    * @var \AMQPChannel
    */
    public $AMQPChannel ;

    /** MQ Link
    * @var \AMQPConnection
    */
    public $AMQPConnection ;

    /** MQ Envelope
    * @var \AMQPEnvelope
    */
    public $AMQPEnvelope ;

    /** MQ Exchange
    * @var \AMQPExchange
    */
    public $AMQPExchange ;

    /** MQ Queue
    * @var \AMQPQueue
    */
    public $AMQPQueue ;

    /** conf
    * @var
    */
    public $conf ;

    /** exchange
    * @var
    */
    public $exchange ;

    /**
     * queue
     * @var
     */
    public $queue;

    /**
     * routes
     * @var
     */
    public $route;

    /**
     * queue_args
     * @var
     */
    public $queueArgs;

    /** link
    * BaseMQ constructor.
    * @throws \AMQPConnectionException
    */
    public function __construct($host,$options,$args = [])
    {

        $config = include 'config/config.php';

        if (!$config)
            throw new \AMQPConnectionException('config error!');

        $this->host  = array_merge($config,$host);

        isset($options['vhost']) && $this->host['vhost'] = $options['vhost'];

        $this->exchange = $options['exchange'];

        $this->queue = $options['queue'];

        $this->route = $options['route'];

        $this->queueArgs = $args;

        $this->AMQPConnection = new \AMQPConnection($this->host);

        if (!$this->AMQPConnection->connect())
            throw new \AMQPConnectionException("Cannot connect to the broker!\n");
    }

    /**
    * close link
    */
    public function close()
    {

        $this->AMQPConnection->disconnect();
    }

    /** Channel
    * @return \AMQPChannel
    * @throws \AMQPConnectionException
    */
    public function channel()
    {

        if (!$this->AMQPChannel) {
            $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
        }

        return $this->AMQPChannel;
    }

    /** Exchange
    * @return \AMQPExchange
    * @throws \AMQPConnectionException
    * @throws \AMQPExchangeException
    */
    public function exchange()
    {

        if (!$this->AMQPExchange) {

            $this->AMQPExchange = new \AMQPExchange($this->channel());

            $this->AMQPExchange->setName($this->exchange);
        }

        return $this->AMQPExchange ;
    }

    /** queue
    * @return \AMQPQueue
    * @throws \AMQPConnectionException
    * @throws \AMQPQueueException
    */
    public function queue()
    {

        if (!$this->AMQPQueue) {
            $this->AMQPQueue = new \AMQPQueue($this->channel());
        }

        return $this->AMQPQueue ;
    }

    /** Envelope
    * @return \AMQPEnvelope
    */
    public function envelope()
    {

        if (!$this->AMQPEnvelope) {
            $this->AMQPEnvelope = new \AMQPEnvelope();
        }

        return $this->AMQPEnvelope;
    }
}

3.2. 生产者类。

<?php
//生产
namespace rabbitmq;

class ProductMQ extends BaseMQ
{
    /** 只控制发送成功 不接受消费者是否收到
    * @throws \AMQPChannelException
    * @throws \AMQPConnectionException
    * @throws \AMQPExchangeException
    */
    public function publish($message)
    {
        $message = is_array($message)?json_encode($message):$message;

        //频道
        $channel = $this->channel();
        //创建交换机对象
        $ex = $this->exchange();

        return $ex->publish($message, $this->route, AMQP_NOPARAM, array('delivery_mode' => 2));
    }
}

3.3. 消费者。


image.png

编码:

上面的死信队列已经创建好了,接下来主要就是通过代码创建一个用于直接生产消息的普通队列,但是这个队列需要设置三个参数。

x-dead-letter-exchange:       关联死信的交换机
x-dead-letter-routing-key      关联死信的路由key
x-message-ttl                  当前队列消息的有效期,也就是多久后消息自动进行死信队列,并且从本队列删除

1. 代码部分:

public function addToDlx()
    {
        $host = [
            'host' => '127.0.0.1',
            'port' => '5672',
            'login' => 'guest',
            'password' => 'guest',
            'vhost' => 'report',
            'heartbeat' => 60
        ];

        // 普通队列
        $normal = [
            'vhost' => 'report',                    // 虚拟机
            'exchange' => 'normal',                 // 交换机
            'route' => 'normal_route',              // 路由key - 用于交换机与队列进行绑定
            'queue' => 'normal_queue',              // 队列
            'expire' => 1000*60,                    // 有效时间单位:毫秒   - 1分钟
        ];

        // 死信队列
        $normal_dlx = [
            'vhost' => 'report',
            'exchange' => 'normal_dlx',
            'route' => 'normal_dlx_route',
            'queue' => 'normal_dlx_queue'
        ];

        // 给普通队列关联死信队列,携带的参数
        $dlx_args = [
            'x-dead-letter-exchange' => $normal_dlx['exchange'],
            'x-dead-letter-routing-key' => $normal_dlx['route'],
            'x-message-ttl' => $normal['expire'],
        ];

        //////////////// 通过消费者方式创建死信队列/////////////
        $dlx_mq = new ConsumerMQ($host,$normal,$dlx_args);
        $dlx_mq->run(null);
        ////////////////////////////////////////////////////////

        //////////////// 将消息放入普通队列/////////////////////
        $mq = new ProductMQ($host, $normal);

        $param = json_encode([
            'name' => 'test',
            'id' => 11568,
            'remark' => '测试一下'
        ]);

        $mq->publish($param);

        $mq->close();
        ////////////////////////////////////////////////////////
    }

2. 测试结果:

通过postman点击上面接口,控制台就可以看出多出了一个normal队列,并且队列的 Features 为“ D TTL DLX DLK ”,$param的消息也会首先进入“normal”队列。

8538bb6c973820bae77b8e4601e0eb5b_up-52587cdf0b0ef8301beedf3e619d74da6f5.png

23bc7cf9fa2852712f41630df5760952_up-1b87db327b9ee87edce63d0c4e08e79db49.png

2. 1分钟后(自己设置的),normal的消息会失效,进而开始添加到了死信队列“normal_dxl”,可以点击死信查看最新的消息信息。


5c658f0b8a63608811e48ef3003b3c4e_up-ae6e91185ebc419a6dce5e3bc8ff43ac6ad.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,198评论 6 514
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,334评论 3 398
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,643评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,495评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,502评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,156评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,743评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,659评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,200评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,282评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,424评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,107评论 5 349
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,789评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,264评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,390评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,798评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,435评论 2 359

推荐阅读更多精彩内容