- 本章使用 docker 部署 rabbitmq, 并且使用php 以及python 进行连接和通讯
- 使用docker 部署 rabbitmq
部署rabbitmq
首先我们调查得知rabbitmq的端口号都有哪些
4369 -- erlang发现口
5672 --client端通信口
15672 -- 管理界面ui端口
25672 -- server间内部通信口
生成docker 容器
docker pull rabbitmq #不带ui管理界面
docker run -d --name my-rabbitmq -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15672:15672 rabbitmq
可以通过15672这个端口访问ui管理,账号密码默认为guest。
修改默认帐密:
RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS
docker pull rabbitmq-management #有ui管理界面
docker run -d docker run -d --name my-rabbitmq-management -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15672:15672 rabbitmq:management
进入docker容器管理
docker exec -it my-rabbitmq bash
- 编写代码连接测试
php 代码
在 php代码编写前需要 composer 引入 rabbitmq 的依赖
composer.json 代码如下
{
"require": {
"php-amqplib/php-amqplib": ">=2.6.1"
}
}
send 代码 send.php :
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
require_once __DIR__ . '/vendor/autoload.php';
$conn_args = [
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
];
$conn = new AMQPStreamConnection($conn_args['host'], $conn_args['port'], $conn_args['login'], $conn_args['password']);
$channel = $conn->channel();
$channel->queue_declare('hello', false, false, false, false);
$msgData = new amqpData('zpc', '20');
$msg = new AMQPMessage($msgData->getData()); // 注意这里如果想要传输对象的话需要进行序列化
//$msg = new AMQPMessage('test-msg-01');
$channel->basic_publish($msg, '', 'hello');
echo "[x] 'Sent Hello world!' \n";
$channel->close();
$conn->close();
class amqpData
{
public $name;
public $age;
public function amqpData($name, $age)
{
$this->name = $name;
$this->age = $age;
}
public function getData()
{
return json_encode($this);
}
}
receive 代码 receive.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
// echo "'[X] Received' {$msg->body} \n";
$body = $msg->body;
// 如果传输的是序列化数据的话这里进行 反序列化
$data = json_decode($body);
echo '[X] Received ' . $data->name; echo "\n";
};
$channel->basic_consume('hello', '',false,true,false,false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
测试
php send.php
[x] 'Sent Hello world!'
此时前往容器去查看 rabbitmq 的队列信息你会发现这样的信息,说明hello这个队列里面已经进去了一条数据
cd /sbin/
rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
hello 1
开始接收
php receive.php
[*] Waiting for messages. To exit press CTRL+C
[X] Received zpc
至此php代码测试成功
python 代码测试
首先先pip 安装库
pip install pika
由于 php 的发送rabbitmq 的代码已经写好了,就直接使用 send.php 进行测试,代码大同小异,只是 python 的简洁性真的很棒
receive 代码 receive.py:
import pika,json
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print ("[x] Received ")
# print (body)
data = json.loads(str(body, 'utf-8'))
print (data['name'])
channel.basic_consume(callback, queue="hello", no_ack=True)
print ('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
至此 python 和 php 的代码测试完成了
可以同时开启多个消费者来接收队列内容,
消息确认
执行任务可能需要几秒钟。您可能想知道如果其中一个消费者开始执行长任务并且仅在部分完成时死亡会发生什么。使用我们当前的代码,一旦RabbitMQ向客户发送消息,它立即将其标记为删除。在这种情况下,如果你杀死一个工人,我们将丢失它刚刚处理的消息。我们还将丢失分发给这个特定工作者但尚未处理的所有消息。
但我们不想失去任何任务。如果工人死亡,我们希望将任务交付给另一名工人。
为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它。
如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失)而不发送确认,RabbitMQ将理解消息未完全处理并将重新排队。如果其他消费者同时在线,则会迅速将其重新发送给其他消费者。这样你就可以确保没有消息丢失,即使工人偶尔会死亡。
没有任何消息超时; 当消费者死亡时,RabbitMQ将重新发送消息。即使处理消息需要非常长的时间,也没关系。
默认情况下,消息确认已关闭。现在是时候通过设置第四个参数来打开它们basic_consume (true表示没有ACK),并从工作人员发送适当的确认,
修改我们的receive.php代码
<?php
$callback = function ($msg) {
...
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//$channel->basic_consume('hello', '',false,true,false,false, $callback);
$channel->basic_consume('hello', '',false,false,false,false, $callback);
使用此代码,我们可以确定即使您在处理消息时使用CTRL + C杀死一名工作人员,也不会丢失任何内容。工人死后不久,所有未经确认的消息将被重新传递。
确认必须在收到的交付的同一信道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。有关确认的文档指南,请参阅了解更多信息。
被遗忘的确认
错过ack是一个常见的错误。这是一个简单的错误,但后果是严重的。当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经处理的消息。
为了调试这种错误,您可以使用rabbitmqctl 来打印messages_unacknowledged字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,删除sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久性
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。
当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非你告诉它不要。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久。
首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的。为此,我们将第三个参数传递给queue_declare为true:
$channel->queue_declare('hello2', false, true, false, false);
此时我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久性 - 通过设置delivery_mode = 2消息属性,AMQPMessage将其作为属性数组的一部分。
$msg = new AMQPMessage('test-msg' . $i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
有关消息持久性的注释
将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且尚未保存消息时,仍然有一个短时间窗口。此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。持久性保证不强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,那么您可以使用 发布者确认。
公平派遣
您可能已经注意到调度仍然无法完全按照我们的意愿运行。例如,在有两个工人的情况下,当所有奇怪的消息都很重,甚至消息很轻时,一个工人将经常忙碌而另一个工作人员几乎不会做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。
发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。它不会查看消费者未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。
为了打败我们可以使用basic_qos方法和 prefetch_count = 1设置。这告诉RabbitMQ不要一次向一个worker发送一条消息。或者,换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息。相反,它会将它发送给下一个仍然不忙的工人。
#receive.php
$channel->basic_qos(null, 1, null);
参考资料:
rabbitmq官方的php 代码实例:http://www.rabbitmq.com/tutorials/tutorial-one-php.html
rabbitmq官方的python代码实例:http://www.rabbitmq.com/tutorials/tutorial-one-python.html
rabbitmq 的 docker 仓库: https://hub.docker.com/_/rabbitmq/