2021-03-01RabbitMQ+PHP

介绍

RabbitMQ是一个消息代理器:它接受和转发消息。你可以把它当作一个邮局:当你把邮件放在信箱里时,你可以肯定邮差先生最终会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ就是这里的邮箱,邮局和邮差。

RabbitMQ和邮局之间的主要区别是,它不处理纸张,而是接受、存储和转发二进制数据‒消息。

RabbitMQ,和一般的消息传递,使用专业术语。

生产者的工作就是发送消息。发送消息的程序是生产者:


队列类比一个邮箱,存在于RabbitMQ, 然而信息流通过RabbitMQ和您的应用程序,他们只能存储在一个队列。队列只受主机内存和磁盘限制的约束,它本质上是一个很大的消息缓冲区。会有许多生产者可以发送到一个队列的消息,许多消费者可以尝试从一个队列接收数据。这就是我们如何表示队列的方式:


消费者和生产者有着相似的意义. 消费者无非就是等待消息然后处理的程序:


请注意,生产者、消费者和代理不必同一主机上;事实上,在大多数应用程序中它们没有这样做。

"Hello World"

(使用PHP amqplib客户端)

在本教程的这一部分中,我们将用PHP编写两个程序;一个生产者发送一条消息,一个用户接收消息并将它们打印出来。我们会PHP amqplib API的忽略一些细节,集中在这个非常简单的事情刚刚开始。这是一个“Hello World”的消息传递。

在下图中,“p”是我们的生产商,“C”是我们的消费者。在中间的框是一个队列的消息缓冲区,RabbitMQ保持代表的消费。

PHP amqplib客户端库

RabbitMQ有很多协议。本教程介绍AMQP 0-9-1,这是一个开放的、通用的协议消息。有许多不同的语言RabbitMQ一批客户。我们将在本教程中使用PHP amqplib,composer解决依赖管理。

添加composer.json:

{
    "require": {
        "php-amqplib/php-amqplib": ">=2.6.1"
    }
}
composer install

# 或者 直接运行包引入
composer require php-amqplib/php-amqplib

现在我们可以开始我们的hello world

生产者(消息发送方)

我们命令我们的消息发布者(发送者)send.php和消息接收receive.php。发送者将连接到RabbitMQ,发送一条消息,然后退出。

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

现在我们能创建一个连接服务器的Connection:

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

该连接抽象套接字(socket)连接,并为我们负责协议版本协商和认证等。这里,我们连接到一个rabbitmq代理器在本地机器上-使用localhost。如果我们想在不同的机器上连接到一个代理,我们只需在这里指定它的名称或IP地址。

接下来,我们创建一个通道,这是处理事情的大部分API的地方。

发送消息前,我们必须声明一个队列为我们发送做准备;然后我们可以向队列发布消息:

$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

声明队列是幂等的(原句:Declaring a queue is idempotent,这里的idempotent不知道是什么意思) - 只有在它不存在时才会创建队列。消息内容是一个字节数组,因此您可以在那里编码用你喜欢的方式。

最后,我们关闭通道和连接;

$channel->close();
$connection->close();

上面我们完成了send.php.

接下来我们完成消费方的代码

消费者(接收方,任务处理方)

消费者从RabbitMQ接收推来的消息,我们会保持运行监听消息并打印出来。


require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

设置与发布程序相同;我们打开一个连接和一个通道,并声明将要消耗的队列。注意,这与发送发布的队列匹配。

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

注意,我们也在这里声明队列。因为我们可能在发布之前启动消费者,我们希望在我们尝试从它那里消费消息之前确定队列的存在。

我们将告诉服务器从队列中发送消息。我们将定义一个PHP可调用,它将接收服务器发送的消息。请记住,消息是从服务器异步发送到客户机的。

$callback = function($msg) {
  echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

当调用basic_consume,我们的代码会阻塞。当我们收到消息时,我们的回调函数将通过接收到返回的消息传递。

以上是我们receive.php的代码

运行测试

运行消费者

php receive.php

运行消息发送方

php send.php

列出队列

rabbitmqctl list_queues

完整源码(调整过)

config.php

<?php
return [
    'vendor' => [
        'path' => dirname(dirname(__DIR__)) . '/vendor'
    ],
    'rabbitmq' => [
        'host' => '127.0.0.1',
        'port' => '5672',
        'login' => 'qkl',
        'password' => '123456',
        'vhost' => '/'
    ]
];
?>

receive.php

<?php
$config = require "../config.php";

require_once $config['vendor']['path'] . '/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection($config['rabbitmq']['host'], $config['rabbitmq']['port'],
    $config['rabbitmq']['login'], $config['rabbitmq']['password'], $config['rabbitmq']['vhost']);
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);
 
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

send.php

<?php
$config = require "../config.php";

require_once $config['vendor']['path'] . '/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection($config['rabbitmq']['host'], $config['rabbitmq']['port'],
    $config['rabbitmq']['login'], $config['rabbitmq']['password'], $config['rabbitmq']['vhost']);
$channel = $connection->channel();

//发送方其实不需要设置队列, 不过对于持久化有关,建议执行该行
$channel->queue_declare('hello', false, false, false, false);

$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();
?>

了解如何构建一个简单的工作队列, 你可以阅读下一章节: RabbitMQ+PHP 教程二(Work Queues)


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容