php-amqplib 是RabbitMQ队列的php 客户端实现。
网址:
https://github.com/php-amqplib/php-amqplib
可以基于php的 Socket函数或Stream流函数连接AMQP服务端,依据AMQP0-9-1协议的要求,发送报文数据。
http://rabbitmq.mr-ping.com/AMQP/amqp-0-9-1-quickref.html(中文简版)
https://www.rabbitmq.com/amqp-0-9-1-reference.html (英文完整版)
php 这边使用客户端的流程如下:
一、composer安装php-amqplib客户端
创建composer.json
{
"require": {
"php-amqplib/php-amqplib":"v3.0.0"
}
}
切换非root账户,执行安装命令。
composer install
二、使用php-amqplib
php-amqplib提供的demo里 封装了Socket套接字和Stream流函数 两种类别来连接AMQP服务端。
默认提供的demo例子里,使用了Stream流函数。
需要在php文件里引入
use PhpAmqpLib\Connection\AMQPStreamConnection;
如果想使用Socket函数
use PhpAmqpLib\Connection\AMQPSocketConnection;
我的个人笔记里,使用Socket函数。
2.1 使用直连交换机 创建生产者和消费者
根据官方demo先整理出一个使用范例。
rabbitMQ的交换机有四种类型:
交换机类型 | 说明 |
---|---|
直连型交换机 | direct exchange |
扇型交换机 | |
主题交换机 | |
头交换机 |
我先使用直连交换机,创建生产者。
2.1.1 开始前的准备工作
(1)首先需要加载客户端自动类
require __DIR__."/ext/vendor/autoload.php";
(2) 创建一个配置文件
define('HOST', getenv('TEST_RABBITMQ_HOST') ? getenv('TEST_RABBITMQ_HOST') : '192.168.56.102');
define('PORT', getenv('TEST_RABBITMQ_PORT') ? getenv('TEST_RABBITMQ_PORT') : 5672);
define('USER', getenv('TEST_RABBITMQ_USER') ? getenv('TEST_RABBITMQ_USER') : 'lafen');
define('PASS', getenv('TEST_RABBITMQ_PASS') ? getenv('TEST_RABBITMQ_PASS') : 'jiemo@094');
define('VHOST', '/');
define('AMQP_DEBUG', getenv('TEST_AMQP_DEBUG') !== false ? (bool)getenv('TEST_AMQP_DEBUG') : false);
HOST:队列服务器IP
PORT:端口号
USER:账号
PASS:密码
VHOST:默认用/ (根)
AMQP_DEBUG :是否开启调式模式
在创建生产者的时候,还需要引入下面三个类
//封装的使用socket进行AMQP服务连接
use PhpAmqpLib\Connection\AMQPSocketConnection;
//交换机类别
use PhpAmqpLib\Exchange\AMQPExchangeType;
//消息
use PhpAmqpLib\Message\AMQPMessage;
这一块的完整代码:
require __DIR__."/ext/vendor/autoload.php";
require __DIR__."/config/amqp.php";
use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
连接AMQP服务
$connection = new AMQPSocketConnection(HOST,PORT,USER,PASS,VHOST);
创建通道
$channel = $connection->channel();
声明队列
$queue = 'wujin';
$channel->queue_declare($queue, false, true, false, false);
一共有8个参数,常用的是前5个参数
参数 | 说明 |
---|---|
参数1 | 字符串,队列的名字 |
参数2 | 布尔值 |
参数3 | 布尔, true:标记队列是持久队列,服务器重启后,该队列也存在。false:临时队列 |
参数4 | 布尔,true: 标记为独占队列,只能由当前连接访问,连接关闭时删除队列。 false:共享队列 |
参数5 | 布尔,是否自动删除队列。 true:当队列使用完毕后,将自动删除 |
参数6 | no-wait |
参数7 | 数组,可以像队列中传递参数 |
参数8 | 未知 |
声明交换机
$exchange = 'latu';
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
参数 | 说明 |
---|---|
参数1 | 字符串,交换机的名字 |
参数2 | 交换机类型 ,直接交换机 AMQPExchangeType::DIRECT |
参数3 | 布尔, true: 如果交换机不存在,将返回404错误,如果交换机已经存在,返回Declare Ok |
参数4 | 布尔,true: 标记为持久交换机,false:临时交换机,如果服务器重启,将清除临时交换机 |
参数5 | 布尔,true:当所有队列都使用完该交换机时,交换机将被删除。 |
参数6 | boolean,true:发布者不能直接使用交换机,只能在绑定到其他交换机时使用。 |
参数7 | nowait |
参数8 | array,传递参数 |
参数9 | 未知 |
队列绑定到交换机
$channel->queue_bind($queue, $exchange);
注意参数3,是绑定时设置的路由键,根据不同的路由键,定义绑定。这样后期发送消息数据时,消息也会代着路由。然后交换机会跟据路由,转发到指定的队列里。
参数 | 说明 |
---|---|
参数1 | string,队列的名字 |
参数2 | string ,交换机的名字 |
参数3 | string, 绑定的路由键,可以为空 |
参数4 | nowait |
参数5 | array,传递参数 |
参数6 | 保留字段 |
准备要发送的消息
$messageBody = implode(' ', array_slice($argv, 1));
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
准备消息内容时,一定要实例化AMQPMessage这个类,参数1是传递的内容,参数是数组,添加一些关于传递数据的属性。其中参数2可以省略不添。
通道执行发送数据
$channel->basic_publish($message, $exchange);
参数 | 说明 |
---|---|
参数1 | string,要发送的消息数据 |
参数2 | string ,交换机 |
参数3 | string, 路由键,交换机将跟据路由键,将消息转发到指定队列 |
参数4 | boolean, true:如果消息无法路由到队列,服务器如何做出反应 false:服务器会自动删除消息 |
参数5 | boolean, true:如果消息不能立即路由到队列使用者,服务器将使用return方法返回无法传递的消息。 false: 服务器将对消息进行排队,但不能保证消息将被使用。 |
关闭通道,关闭连接
//关闭通道
$channel->close();
//关闭连接
$connection->close();
以上是一个最简单的生产者使用例子,都没有设置路由键。现在记录一个消费者使用例子。
消费者
AMQP 0-9-1 模型中,有两种消费者模式:
(1) 消息队列将消息数据推送给应用程序
(2) 应用程序 主动从队列中获取消息
下面演示的例子就是第二种,主动从队列中获取消息数据。
通过AMQP中的 basic.consume方法,告知服务器开启一个“消费者”,此消费者实质是一个针对特定队列消息的持久化请求。消费者在声明过的信道(channel)中会一直存在,直到客户端清除他们为止。
代码:
use 引入AMQPSocketConnection 和 AMQPExchangeType
include(__DIR__ . '/config.php');
use PhpAmqpLib\Connection\AMQPSocketConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
然后 定义三个变量,交换机名称,队列名称,还有消费者标签。
一个队列中,可以注册多个消费者,也可以注册一个独家享用的消费者,当独享消费者存在时,其他消费者被排除在外。
每个消费者都有一个身份标识,就是这个消费者标签,它是一个字符串,可以被用来退订消息。
$exchange = 'latu';
$queue = 'msgs';
//消费者标签
$consumerTag = 'consumer';
连接AMQP消费者代理服务
$connection = new AMQPSocketConnection(HOST, PORT, USER, PASS, VHOST);
$channel = $connection->channel();
声明队列,声明交换机,队列绑定交换机
$channel->queue_declare($queue, false, true, false, false);
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_bind($queue, $exchange);
从队列里获取消息数据
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
参数 | 说明 |
---|---|
参数1 | string,队列名称 |
参数2 | 消费者标签 |
参数3 | true:服务器将不会向发布消息的连接发送消息 |
参数4 | true:消息从服务器传递到客户机时,服务端会立刻将消息清除出队列。 |
参数5 | true:消费者将独占使用此队列 |
参数6 | true:服务器将不响应该方法。客户端不应等待应答方法。如果服务器无法完成该方法,它将引发通道或连接异常。 |
参数7 | 回调函数,可以为null,如果设置了回调函数,从队列取得的消息,将会传到这个回调函数里进行处理 |
说明:参数4如果设置为true,服务端将消息从队列中传出后,就直接认定传输成功,然后从队列里清除这条消息。虽然会提高一定性能,但如果消息发出后,客户端程序发生崩溃,没有接收成功这条件,那么此消息就可能会彻底丢失。
这个php客户端里 如果像下面这样使用 basic_consume()
$result=$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
变量$result得到将是消费者标签。不是返回的消息数据。
所以参数7的回调函数一定要设置。
官方demo定义的回调函数,
function process_message($message)
{
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
$message->ack();
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->getChannel()->basic_cancel($message->getConsumerTag());
}
}
分析一下这个函数:
(1)参数$message 就是AMQPMessage对象。消息数据存在于body属性中。
(2)$message->ack() 方法是向服务器端,发送回执消息,确认队列消息接收成功。
(3)发送字符串为“quit”的消息以取消消费者。然后执行basic_cancel() 发出请求,这样服务器端会清除消费者。
它不会影响到已经成功投递的消息,但是会使得服务器不再将新的消息投送给此消费者。
while ($channel->is_consuming()) {
$channel->wait();
}
最后注消退出通道和连接。
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);