使用RabbitMQ 的php客户端

php-amqplib 是RabbitMQ队列的php 客户端实现。
网址:
https://github.com/php-amqplib/php-amqplib

可以基于php的 Socket函数或Stream流函数连接AMQP服务端,依据AMQP0-9-1协议的要求,发送报文数据。

http://rabbitmq.mr-ping.com/AMQP/amqp-0-9-1-quickref.html(中文简版)
https://www.rabbitmq.com/amqp-0-9-1-reference.html (英文完整版)

php 这边使用客户端的流程如下:

一、composer安装php-amqplib客户端

创建composer.json

{
    "require": {
        "php-amqplib/php-amqplib":"v3.0.0"
    }
}

切换非root账户,执行安装命令。

composer install

二、使用php-amqplib

php-amqplib提供的demo里 封装了Socket套接字和Stream流函数 两种类别来连接AMQP服务端。

默认提供的demo例子里,使用了Stream流函数。
需要在php文件里引入

use PhpAmqpLib\Connection\AMQPStreamConnection;

如果想使用Socket函数

use PhpAmqpLib\Connection\AMQPSocketConnection;

我的个人笔记里,使用Socket函数。

2.1 使用直连交换机 创建生产者和消费者

根据官方demo先整理出一个使用范例。

rabbitMQ的交换机有四种类型:

交换机类型 说明
直连型交换机 direct exchange
扇型交换机
主题交换机
头交换机

我先使用直连交换机,创建生产者。

2.1.1 开始前的准备工作

(1)首先需要加载客户端自动类

require __DIR__."/ext/vendor/autoload.php";

(2) 创建一个配置文件

define('HOST', getenv('TEST_RABBITMQ_HOST') ? getenv('TEST_RABBITMQ_HOST') : '192.168.56.102');
define('PORT', getenv('TEST_RABBITMQ_PORT') ? getenv('TEST_RABBITMQ_PORT') : 5672);
define('USER', getenv('TEST_RABBITMQ_USER') ? getenv('TEST_RABBITMQ_USER') : 'lafen');
define('PASS', getenv('TEST_RABBITMQ_PASS') ? getenv('TEST_RABBITMQ_PASS') : 'jiemo@094');
define('VHOST', '/');
define('AMQP_DEBUG', getenv('TEST_AMQP_DEBUG') !== false ? (bool)getenv('TEST_AMQP_DEBUG') : false);

HOST:队列服务器IP
PORT:端口号
USER:账号
PASS:密码
VHOST:默认用/ (根)
AMQP_DEBUG :是否开启调式模式

在创建生产者的时候,还需要引入下面三个类

//封装的使用socket进行AMQP服务连接
use PhpAmqpLib\Connection\AMQPSocketConnection;
//交换机类别
use PhpAmqpLib\Exchange\AMQPExchangeType;
//消息
use PhpAmqpLib\Message\AMQPMessage;

这一块的完整代码:

require __DIR__."/ext/vendor/autoload.php";
require __DIR__."/config/amqp.php";

use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;

连接AMQP服务

$connection = new AMQPSocketConnection(HOST,PORT,USER,PASS,VHOST);

创建通道

$channel = $connection->channel();

声明队列

$queue = 'wujin';
$channel->queue_declare($queue, false, true, false, false);

一共有8个参数,常用的是前5个参数

参数 说明
参数1 字符串,队列的名字
参数2 布尔值
参数3 布尔, true:标记队列是持久队列,服务器重启后,该队列也存在。false:临时队列
参数4 布尔,true: 标记为独占队列,只能由当前连接访问,连接关闭时删除队列。 false:共享队列
参数5 布尔,是否自动删除队列。 true:当队列使用完毕后,将自动删除
参数6 no-wait
参数7 数组,可以像队列中传递参数
参数8 未知

声明交换机

$exchange = 'latu';
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
参数 说明
参数1 字符串,交换机的名字
参数2 交换机类型 ,直接交换机 AMQPExchangeType::DIRECT
参数3 布尔, true: 如果交换机不存在,将返回404错误,如果交换机已经存在,返回Declare Ok
参数4 布尔,true: 标记为持久交换机,false:临时交换机,如果服务器重启,将清除临时交换机
参数5 布尔,true:当所有队列都使用完该交换机时,交换机将被删除。
参数6 boolean,true:发布者不能直接使用交换机,只能在绑定到其他交换机时使用。
参数7 nowait
参数8 array,传递参数
参数9 未知

队列绑定到交换机

$channel->queue_bind($queue, $exchange);

注意参数3,是绑定时设置的路由键,根据不同的路由键,定义绑定。这样后期发送消息数据时,消息也会代着路由。然后交换机会跟据路由,转发到指定的队列里。

参数 说明
参数1 string,队列的名字
参数2 string ,交换机的名字
参数3 string, 绑定的路由键,可以为空
参数4 nowait
参数5 array,传递参数
参数6 保留字段

准备要发送的消息

$messageBody = implode(' ', array_slice($argv, 1));
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

准备消息内容时,一定要实例化AMQPMessage这个类,参数1是传递的内容,参数是数组,添加一些关于传递数据的属性。其中参数2可以省略不添。

通道执行发送数据

$channel->basic_publish($message, $exchange);
参数 说明
参数1 string,要发送的消息数据
参数2 string ,交换机
参数3 string, 路由键,交换机将跟据路由键,将消息转发到指定队列
参数4 boolean, true:如果消息无法路由到队列,服务器如何做出反应 false:服务器会自动删除消息
参数5 boolean, true:如果消息不能立即路由到队列使用者,服务器将使用return方法返回无法传递的消息。 false: 服务器将对消息进行排队,但不能保证消息将被使用。

关闭通道,关闭连接

//关闭通道
$channel->close();
//关闭连接
$connection->close();

以上是一个最简单的生产者使用例子,都没有设置路由键。现在记录一个消费者使用例子。

消费者

AMQP 0-9-1 模型中,有两种消费者模式:
(1) 消息队列将消息数据推送给应用程序
(2) 应用程序 主动从队列中获取消息

下面演示的例子就是第二种,主动从队列中获取消息数据。

通过AMQP中的 basic.consume方法,告知服务器开启一个“消费者”,此消费者实质是一个针对特定队列消息的持久化请求。消费者在声明过的信道(channel)中会一直存在,直到客户端清除他们为止。

代码:

use 引入AMQPSocketConnection 和 AMQPExchangeType

include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

然后 定义三个变量,交换机名称,队列名称,还有消费者标签。
一个队列中,可以注册多个消费者,也可以注册一个独家享用的消费者,当独享消费者存在时,其他消费者被排除在外。

每个消费者都有一个身份标识,就是这个消费者标签,它是一个字符串,可以被用来退订消息。

$exchange = 'latu';
$queue = 'msgs';
//消费者标签
$consumerTag = 'consumer';

连接AMQP消费者代理服务

$connection = new AMQPSocketConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();

声明队列,声明交换机,队列绑定交换机

$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($queue, $exchange);

从队列里获取消息数据

$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
参数 说明
参数1 string,队列名称
参数2 消费者标签
参数3 true:服务器将不会向发布消息的连接发送消息
参数4 true:消息从服务器传递到客户机时,服务端会立刻将消息清除出队列。
参数5 true:消费者将独占使用此队列
参数6 true:服务器将不响应该方法。客户端不应等待应答方法。如果服务器无法完成该方法,它将引发通道或连接异常。
参数7 回调函数,可以为null,如果设置了回调函数,从队列取得的消息,将会传到这个回调函数里进行处理

说明:参数4如果设置为true,服务端将消息从队列中传出后,就直接认定传输成功,然后从队列里清除这条消息。虽然会提高一定性能,但如果消息发出后,客户端程序发生崩溃,没有接收成功这条件,那么此消息就可能会彻底丢失。

这个php客户端里 如果像下面这样使用 basic_consume()

$result=$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');

变量$result得到将是消费者标签。不是返回的消息数据。

所以参数7的回调函数一定要设置。

官方demo定义的回调函数,

function process_message($message)
{
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    $message->ack();

    // Send a message with the string "quit" to cancel the consumer.
    if ($message->body === 'quit') {
        $message->getChannel()->basic_cancel($message->getConsumerTag());
    }
}

分析一下这个函数:

(1)参数$message 就是AMQPMessage对象。消息数据存在于body属性中。

(2)$message->ack() 方法是向服务器端,发送回执消息,确认队列消息接收成功。
(3)发送字符串为“quit”的消息以取消消费者。然后执行basic_cancel() 发出请求,这样服务器端会清除消费者。
它不会影响到已经成功投递的消息,但是会使得服务器不再将新的消息投送给此消费者。

while ($channel->is_consuming()) {
    $channel->wait();
}

最后注消退出通道和连接。

function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}

register_shutdown_function('shutdown', $channel, $connection);
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容

  • 本文章翻译自http://www.rabbitmq.com/api-guide.html,并没有及时更新。 术语对...
    joyenlee阅读 7,651评论 0 3
  • 一、安装: 1.安装rabbitmq运行环境 1)下载地址:https://www.erlang.org/down...
    ParadigmSh_f7ad阅读 347评论 0 1
  • RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息...
    彩虹之梦阅读 1,086评论 2 1
  • 表情是什么,我认为表情就是表现出来的情绪。表情可以传达很多信息。高兴了当然就笑了,难过就哭了。两者是相互影响密不可...
    Persistenc_6aea阅读 124,811评论 2 7
  • 16宿命:用概率思维提高你的胜算 以前的我是风险厌恶者,不喜欢去冒险,但是人生放弃了冒险,也就放弃了无数的可能。 ...
    yichen大刀阅读 6,044评论 0 4