php and python 初探 rabbitmq

  • 本章使用 docker 部署 rabbitmq, 并且使用php 以及python 进行连接和通讯

  1. 使用docker 部署 rabbitmq
    部署rabbitmq
    首先我们调查得知rabbitmq的端口号都有哪些
4369 -- erlang发现口

5672 --client端通信口

15672 -- 管理界面ui端口

25672 -- server间内部通信口

生成docker 容器

docker pull rabbitmq #不带ui管理界面
docker run -d --name my-rabbitmq  -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15672:15672 rabbitmq

可以通过15672这个端口访问ui管理,账号密码默认为guest。
修改默认帐密:
RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS

docker pull rabbitmq-management #有ui管理界面
docker run -d  docker run -d --name my-rabbitmq-management  -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15672:15672 rabbitmq:management

进入docker容器管理

docker exec -it my-rabbitmq bash

  1. 编写代码连接测试

php 代码

在 php代码编写前需要 composer 引入 rabbitmq 的依赖
composer.json 代码如下

{
    "require": {
        "php-amqplib/php-amqplib": ">=2.6.1"
    }
}
send 代码 send.php :
<?php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

require_once __DIR__ . '/vendor/autoload.php';

$conn_args = [
    'host' => '127.0.0.1',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost' => '/'
];

$conn = new AMQPStreamConnection($conn_args['host'], $conn_args['port'], $conn_args['login'], $conn_args['password']);

$channel = $conn->channel();
$channel->queue_declare('hello', false, false, false, false);

$msgData = new amqpData('zpc', '20');
$msg = new AMQPMessage($msgData->getData()); // 注意这里如果想要传输对象的话需要进行序列化

//$msg = new AMQPMessage('test-msg-01');
$channel->basic_publish($msg, '', 'hello');

echo "[x] 'Sent Hello world!' \n";

$channel->close();
$conn->close();



class amqpData
{
    public $name;
    public $age;

    public function amqpData($name, $age)
    {
        $this->name = $name;
        $this->age = $age;
    }

    public function getData()
    {
        return json_encode($this);
    }
}
receive 代码 receive.php:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
//    echo "'[X] Received' {$msg->body} \n";

    $body = $msg->body;
    // 如果传输的是序列化数据的话这里进行 反序列化
    $data = json_decode($body);

    echo '[X] Received ' . $data->name;  echo "\n";
};

$channel->basic_consume('hello', '',false,true,false,false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();
测试
php send.php  

[x] 'Sent Hello world!'

此时前往容器去查看 rabbitmq 的队列信息你会发现这样的信息,说明hello这个队列里面已经进去了一条数据

cd /sbin/
rabbitmqctl list_queues

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
hello   1

开始接收

php receive.php  

[*] Waiting for messages. To exit press CTRL+C
[X] Received zpc
至此php代码测试成功

python 代码测试

首先先pip 安装库

pip install pika

由于 php 的发送rabbitmq 的代码已经写好了,就直接使用 send.php 进行测试,代码大同小异,只是 python 的简洁性真的很棒

receive 代码 receive.py:
import pika,json

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port=5672))

channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print ("[x] Received ")
    # print (body)
    
    data = json.loads(str(body, 'utf-8'))
    print (data['name'])

channel.basic_consume(callback, queue="hello", no_ack=True)

print ('[*] Waiting for messages. To exit press CTRL+C')

channel.start_consuming()

至此 python 和 php 的代码测试完成了


可以同时开启多个消费者来接收队列内容,

消息确认

执行任务可能需要几秒钟。您可能想知道如果其中一个消费者开始执行长任务并且仅在部分完成时死亡会发生什么。使用我们当前的代码,一旦RabbitMQ向客户发送消息,它立即将其标记为删除。在这种情况下,如果你杀死一个工人,我们将丢失它刚刚处理的消息。我们还将丢失分发给这个特定工作者但尚未处理的所有消息。

但我们不想失去任何任务。如果工人死亡,我们希望将任务交付给另一名工人。

为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它。

如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失)而不发送确认,RabbitMQ将理解消息未完全处理并将重新排队。如果其他消费者同时在线,则会迅速将其重新发送给其他消费者。这样你就可以确保没有消息丢失,即使工人偶尔会死亡。

没有任何消息超时; 当消费者死亡时,RabbitMQ将重新发送消息。即使处理消息需要非常长的时间,也没关系。

默认情况下,消息确认已关闭。现在是时候通过设置第四个参数来打开它们basic_consume (true表示没有ACK),并从工作人员发送适当的确认,

修改我们的receive.php代码

<?php

$callback = function ($msg) {
    ...
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

//$channel->basic_consume('hello', '',false,true,false,false, $callback);
$channel->basic_consume('hello', '',false,false,false,false, $callback);

使用此代码,我们可以确定即使您在处理消息时使用CTRL + C杀死一名工作人员,也不会丢失任何内容。工人死后不久,所有未经确认的消息将被重新传递。

确认必须在收到的交付的同一信道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。有关确认文档指南,请参阅了解更多信息。

被遗忘的确认
错过ack是一个常见的错误。这是一个简单的错误,但后果是严重的。当您的客户端退出时,消息将被重新传递(这可能看起来像随机重新传递),但RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经处理的消息。

为了调试这种错误,您可以使用rabbitmqctl 来打印messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
在Windows上,删除sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。

当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非你告诉它不要。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久。

首先,我们需要确保RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的。为此,我们将第三个参数传递给queue_declare为true:

$channel->queue_declare('hello2', false, true, false, false);

此时我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久性 - 通过设置delivery_mode = 2消息属性,AMQPMessage将其作为属性数组的一部分。

$msg = new AMQPMessage('test-msg' . $i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

有关消息持久性的注释

将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ接受消息并且尚未保存消息时,仍然有一个短时间窗口。此外,RabbitMQ不会为每条消息执行fsync(2) - 它可能只是保存到缓存而不是真正写入磁盘。持久性保证不强,但对于我们简单的任务队列来说已经足够了。如果您需要更强的保证,那么您可以使用 发布者确认

公平派遣

您可能已经注意到调度仍然无法完全按照我们的意愿运行。例如,在有两个工人的情况下,当所有奇怪的消息都很重,甚至消息很轻时,一个工人将经常忙碌而另一个工作人员几乎不会做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。

发生这种情况是因为RabbitMQ只是在消息进入队列时调度消息。它不会查看消费者未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。
为了打败我们可以使用basic_qos方法和 prefetch_count = 1设置。这告诉RabbitMQ不要一次向一个worker发送一条消息。或者,换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息。相反,它会将它发送给下一个仍然不忙的工人。

#receive.php
$channel->basic_qos(null, 1, null);

参考资料:
rabbitmq官方的php 代码实例:http://www.rabbitmq.com/tutorials/tutorial-one-php.html
rabbitmq官方的python代码实例:http://www.rabbitmq.com/tutorials/tutorial-one-python.html
rabbitmq 的 docker 仓库: https://hub.docker.com/_/rabbitmq/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容

  • Awesome PHP 一个PHP资源列表,内容包括:库、框架、模板、安全、代码分析、日志、第三方库、配置工具、W...
    guanguans阅读 5,753评论 0 47
  • Composer Repositories Composer源 Firegento - Magento模块Comp...
    零一间阅读 3,956评论 1 66
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • awesome-php 收集整理一些常用的PHP类库, 资源以及技巧. 以便在工作中迅速的查找所需... 这个列表...
    guanguans阅读 4,416评论 0 34
  • 每天早晨,南营小区门口,买卖各类蔬菜,水果,水产品的人,乌泱泱一大片。儿子太小,一般买菜什么的,都去超市,干净不说...
    风起云湮阅读 570评论 7 5