docker搭建rabbitmq,配合php-amqplib+supervisor使用(下)

前言:最近因为工作忙,没抽出时间来继续文章的下半部分,现在手头忙的差不多,便抽时间写了这篇文章!

  1. 上篇文章只是简单的介绍了,rabbitmp的搭建和基础发送队列,封装了一个公用类。
<?php
namespace common\tools;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
/**
 * Created by PhpStorm.
 * User: steven
 * Date: 2017/7/10
 * Time: 下午4:38
 */
class RabbitMq
{
    /**
     * @var AMQPStreamConnection
     */
    protected $connection;
    protected $queue_key;
    protected $exchange_key;
    protected $exchange_suffix;
    protected $priority;
    protected $channel;
    /**
     * RabbitQueue constructor.
     * @param $config
     * @param $queue_name
     * @param null $priority
     */
    public function __construct($config, $queue_name,$priority=null)
    {
        $this->connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['pass']);
        $this->queue_key = $queue_name;
        $this->exchange_suffix = $config['exchange'];
        $this->priority=$priority;
        $this->channel = $this->connection->channel();

        $this->bind_exchange();
        return $this->connection;
    }
    /**
     * 绑定交换机
     * @return mixed|null
     */
    protected function bind_exchange() {
        $queue_key=$this->queue_key;
        $exchange_key = $this->exchange_suffix;
        $this->exchange_key = $exchange_key;
        $channel = $this->channel;

        if(!empty($this->priority)){
            $priorityArr = array('x-max-priority' => array('I', $this->priority));
            $size = $channel->queue_declare($queue_key, false, true, false, false,false,$priorityArr);
        }else{
            $size = $channel->queue_declare($queue_key, false, true, false, false);
        }
        $channel->exchange_declare($exchange_key, 'topic', false, true, false);
        $channel->queue_bind($queue_key, $exchange_key,$queue_key);
        $this->channel=$channel;
        return $size ;
    }
    /**
     * 发送数据到队列
     * @param $data = array('key'=>'val')
     */
    public function put($data)
    {
        $channel = $this->channel;
        $value = json_encode($data);
        $toSend = new AMQPMessage($value, array('content_type' => 'application/json', 'delivery_mode' => 2));
        $channel->basic_publish($toSend, $this->exchange_key,$this->queue_key);
    }

    /**
     * 获取数据
     * @return mixed
     */
    public function get()
    {
        $channel = $this->channel;
        $message = $channel->basic_get($this->queue_key);
        if (!$message) {
            return  array(null,null);
        }
        $ack = function() use ($channel,$message) {
            $channel->basic_ack($message->delivery_info['delivery_tag']);
        };
        $result = json_decode($message->body,true);
        return array($ack,$result);
    }

    /**
     * 关闭链接
     */
    public function close() {
        $this->channel->close();
        $this->connection->close();
    }

    /**
     * 获得队列长度
     * @return int
     */
    public function length(){
        $info =  $this->bind_exchange();
        return $info[1];
    }
}
  1. 发送队列简单demo
<?php
namespace frontend\controllers;
use common\tools\RabbitMq;
use Yii;
use yii\web\Controller;
/**
 * Site controller
 */
class IndexController extends Controller
{
    public function actionIndex()
    {
         $queueName = 'queue_test';
        $rabbitMqConfig = [
            'exchange' => 'web',//自己手动添加交换机,这里就不做描述
            'host' => '172.17.0.6',//填写自己的容器ip
            'port' => '5672',
            'user' => 'guest',
            'pass' => 'guest',
        ];
        $queue = new RabbitMq($rabbitMqConfig, $queueName);
        $data = ['uuid' => rand(100,99999999)];
        $queue->put($data);
        echo '发送完成,发送的内容:'.print_r($data,1);
        exit();
    }
}
  1. 查看发送队列数据


    image.png

    image.png
  2. 编写消耗队列脚本

<?php
namespace console\controllers;
use common\tools\RabbitMq;
use yii\console\Controller;
class TestController extends Controller
{
    /**
     * console rabbit_mq demo
     */
    public function actionTest()
    {
        $queueName = 'queue_test';
       $rabbitMqConfig = [
            'exchange' => 'web',//自己手动添加交换机,这里就不做描述
            'host' => '172.17.0.6',//填写自己的容器ip
            'port' => '5672',
            'user' => 'guest',
            'pass' => 'guest',
        ];
        $queue = new RabbitMq($rabbitMqConfig, $queueName);
        $cnt = 0;
        while (1) {
            list($ack,$data) = $queue->get();
            if(!$data){
                $cnt++;
                if($cnt > 20){
                    $queue->close();
                    exit();
                }
                echo "no data:$cnt \n";
                sleep(1);
                continue;
            }

            //逻辑处理
            echo "start work \n";
            print_r($data);
            echo "end work \n";
            //确认消耗
            $ack();
        }
    }
}
  1. 配置supervisor的消耗队列进程(PS:我的demo.ini是放在我创建的docker项目容器里的,不熟悉的小伙伴可以看看我的关于创建容器的文章)
    a. 创建文件
vim demo.ini

b. 编写配置(PS:注意这里用的是yii的console,请根据你们的使用来进行更改)

;测试demo
[program:demo]
command= /usr/share/nginx/html/advanced/yii test/test
directory=/usr/share/nginx/html/advanced/
stdout_logfile=/tmp/demo.log
redirect_stderr=true
autostart=false
autorestart=false

c. 增加进程

supervisorctl update

d. 成功添加,demo进程已启动,通过supervisorctl tail -f demo 看进程消耗

image.png

e. 再查看队列状态,显示队列已被消耗

image.png

结尾

好了,到此为止,简单的队列消耗已经完成,可能有些地方不是说的太清楚,毕竟自己也在摸索中,文章中的如果有不对的地方欢迎指正,共同学习~
PS.搭建supervisor的docker容器文章:http://www.jianshu.com/p/40418711cf8aTask

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容

  • 转载自 http://blog.opskumu.com/docker.html 一、Docker 简介 Docke...
    极客圈阅读 10,487评论 0 120
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 九 人生所追求的一切 都是为了自由 因此,人终究带不走一切 十 人们喜欢天空的颜色 可那本是大海的影子 十一 当你...
    关长天阅读 163评论 8 26
  • 瑞峰敲开工厂的大门,春生正好从车间迎面走出,不禁暗自诧异:一般情况下,瑞峰这个时候是不来的,难道又有什么好事?春生...
    西岭布衣阅读 219评论 1 3
  • 01 一直想动笔写点什么,又不知道从何写起,自从去年把老婆儿子留在老家,独自一人来到上海,已经有一年零四个月了。记...
    云成鹏阅读 274评论 5 4