简介Rabbitmq的几种消费模式

前言

在日常开发中,消息队列能帮我们解决系统的异步问题,流量的控制和服务解耦,不同的消息队列有不同的消费模型

思考

redis也可以实现消息队列(list和stream),也称为轻量级消息队列,list实现的缺点在哪里?stream类型怎么用?

RabbitMq

具体概念的东西网上很多,文档也有详细描述这里不做过多阐述,本文主要以PHP代码为主进行实验,消息队列之rabbitmq

docker run -d --name mq \
-p 5672:5672 -p 15672:15672 \
-v /home/docker/mq/data:/var/lib/rabbitmq --hostname myRabbit \
-e RABBITMQ_DEFAULT_VHOST=my_vhost \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin ee045987e252

--hostname myRabbit 是因为rabbitmq是基于Node节点名的

官方讲rabbitmq比喻为邮局,queue比喻为邮局里的邮箱,我们要寄信(producer发送message),就需要把信赛到(send)邮箱,或者你交给前台窗口(exchange)让他帮你寄,但是不同的前台窗口提供的服务不同,因为呀不同的邮箱他发往的地址不同,有的需要你指定投到哪些邮箱(exchange类型为direct类型时要求完全匹配routing-key),有的只需要你告诉他投到邮箱的大致有什么特点就行(exchange类型为topic时routing-key模糊匹配就行),还有的是把信件复制多个(魔法)往每个邮箱都塞一封(exchange类为fanout)。最后有的邮箱在派送员派送完信件后要求收信人(consumer)签个字(ack验证)才能把信给他

下面例子基于官方提供的包,
composer require php-amqplib/php-amqplib

例子来自小猴子喝牛奶的博客

Direct-Exchange

生产者

<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;


//获取终端提示用户输入的数据
fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);

//建立生产者与mq之间的连接 -------动身前往邮局   
//参数:地址,端口,账号,密码,虚拟机名
//注意这个虚拟机名为绑定-e RABBITMQ_DEFAULT_VHOST=my_vhost参数时指定的
$connection = new AMQPStreamConnection('容器ip', 5672, 'admin', 'my_vhost');

//在已连接基础上建立生产者与mq之间的通道-----进邮局门
$channel = $connection->channel();

//声明初始化交换机,交换机不存在则创建----找前台窗口   
//参数:交换机名,路由类型,是否检测同名队列,是否开启队列持久化,通道关闭后是否删除队列
$channel->exchange_declare('ex_direct', 'direct', false, true, false); 

//声明初始化一条队列,队列不存在则创建-----告诉前台窗口要什么邮箱
//参数:队列名,是否检测同名队列,是否开启队列持久化,是否能被其他队列访问,通道关闭后是否删除队列
$channel->queue_declare('ex_direct_queue', false, false, false, false);

//前台窗口找你要的邮箱
//将队列与某个交换机进行绑定,并使用路由关键字
//参数:队列名,交换机名,路由键名
$channel->queue_bind('ex_direct_queue', 'ex_direct', 'hello'); 

//把信给封好
//生成消息
$msg = new AMQPMessage($msg_str);

//推送消息到某个交换机------把信给前台
//参数:消息,交换机名,路由键名
//就如同前面所讲,你只需要把信给前台,并告诉他投给哪些指定的邮箱即可
$channel->basic_publish($msg, 'ex_direct', 'hello');
echo " [x] Sent: $msg_str \n";

$channel->close();
$connection->close();

消费者

<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
//快递员进门
$connection = new AMQPStreamConnection('容器ip', 5672, 'admin', 'my_vhost');
$channel = $connection->channel();
//快递员先看看给自己任务的前台在不在
$channel->exchange_declare('ex_direct', 'direct', false, true, false); 
//在看看自己负责的邮箱在不在
$channel->queue_declare('ex_direct_queue', false, false, false, false);
$channel->queue_bind('ex_direct_queue', 'ex_direct', 'hello'); 
//执行上面的步骤主要是为保证这些目标交换机和队列已经存在
//这里是收信人的动作
$callback = function($msg) {
    //打印消息
    echo " [x] Received ", $msg->body, "\n";
    //消息确认
    $msg->ack();
};
//第三个参数为true表示了这个邮箱规定了收信人必须签名
//参数:队列名,消费者标识符,不接收此使用者发布的消息,使用者是否使用自动确认模式,请求独占使用者访问,不等待,消息回调函数
$channel->basic_consume('ex_direct_queue', 'consumer1', false, true, false, false, $callback);
//快递员看有没有信,有就立马寄
//监听通道消息
while(count($channel->callbacks)) {
    $channel->wait();
}

思考

1.万一RabbitMQ崩溃了退出了怎么办?里面的队列和消息会不会消失,这需要我们在声明交换机和队列时候,让他保证持久化

//第4个参数为true
$channel->exchange_declare('ex_direct', 'direct', false, true, false); 
//第三个参数为true,这里不能在已存在的队列上加持久化
$channel->queue_declare('hello', false, true, false, false);
//里面的消息也保证持久化
//第二个参数为数组
$msg = new AMQPMessage('你的消息', ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT] );

2.Rabbitmq默认一旦发送消息给客户端后就立即删除,那万一消费者收到消息后要执行一个耗时任务,但是中途异常退出了,那么这个消息不就丢了吗(比如上面的回调函数中sleep(10)但是我们中涂把他kill掉导致没发送ack码)。
们希望消费者完成消息处理后发送ack确认,rabbitMQ收到后才能对消息删除。

//即在消费者绑定队列时第4个参数为false
$channel->basic_consume('ex_direct_queue', 'consumer1', false, false, false, false, $callback);

3.快递员有多个,那么万一有的快递员要寄很多信,有的在偷懒怎么办?
利用函数进行公平调度

//消费者代码添加,表示在等待消费者处理完消息后才能再接受消息,不堆积消息
$channel->basic_qos(null, 1, null);

Topic-Exchange

和Direct代码基本相同不同的是绑定交换机是时的'direct'成了'topic'

注意,routing-key是模糊匹配,这里并不是参考正则,*表示多个字符,#表示一个字符如 .log. 匹配 aaa.log.aaa

Fanout-Exchange

又称发布与订阅,即向与交换机的所有队列广播消息,既然是广播,那么我们就不需要考虑消息的ack了
生产者

<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;


//获取终端提示用户输入的数据
fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);

//建立生产者与mq之间的连接    
//参数:地址,端口,账号,密码
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

//在已连接基础上建立生产者与mq之间的通道
$channel = $connection->channel();

//声明初始化交换机   
//参数:交换机名,路由类型,是否检测同名队列,是否开启队列持久化,通道关闭后是否删除队列
$channel->exchange_declare('mq_sms_send_ex3', AMQPExchangeType::FANOUT, false, false, false); 


//生成消息
$msg = new AMQPMessage($msg_str);

//推送消息到某个交换机
//参数:消息,交换机名,路由键名
$channel->basic_publish($msg, 'mq_sms_send_ex3');
echo " [x] Sent: $msg_str \n";

$channel->close();
$connection->close();

消费者

<?php 
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

//声明初始化交换机   
//参数:交换机名,路由类型,是否检测同名队列,是否开启队列持久化,通道关闭后是否删除队列
$channel->exchange_declare('mq_sms_send_ex3', AMQPExchangeType::FANOUT, false, false, false);

//声明初始化一条队列
//参数:队列名,是否检测同名队列,是否开启队列持久化,是否能被其他队列访问,通道关闭后是否删除队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

//将队列与某个交换机进行绑定,并使用路由关键字
//参数:队列名,交换机名,路由键名
$channel->queue_bind($queue_name, 'mq_sms_send_ex3');

echo ' [*] Waiting for messages', "\n";

$callback = function($msg) {
  echo " [x] Received ", $msg->body, "\n";

  //判断获取到quit后
  if (trim($msg->body) == 'quit') { 
        $msg->getChannel()->basic_cancel($msg->getConsumerTag());
    }

};

$channel->basic_qos(null, 1, null);

//参数:队列名,消费者标识符,不接收此使用者发布的消息,使用者是否使用自动确认模式,请求独占使用者访问,不等待,消息回调函数
$channel->basic_consume($queue_name, 'consumer1', false, true, false, false, $callback);

死信队列

即延迟队列,讲消息发送到指定的队列,消息要在队列中待到指定时间(ttl)后才能被发送给消费者

RabbtMq实现大致示意图
image.png

如何保证死信队列在消息过去后才把消息发给业务交换机---不设置消费者(快递员)不就行了
生产者

require_once '../vendor/autoload.php';


use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Wire\AMQPWriter;

fwrite(STDOUT, "Please enter a message:\n");
$msg_str = fgets(STDIN);

$connection = new AMQPStreamConnection(
    '172.17.0.5','5672','admin','admin','my_vhost'
);
$channel = $connection->channel();
//业务交换机,负责处理过期消息
$channel->exchange_declare(
    'ex_dl','direct',false,true
);
//死信交换机
$channel->exchange_declare(
    'ex_normal','fanout',false,true
);
//因此创建死信队列的配置参数要求是AMQPTable类型
$args=new AMQPTable();
//设置消息过期时间
$args->set('x-message-ttl',120000);
//过期后发送给哪个交换机
$args->set('x-dead-letter-exchange','ex_dl');
//设置路由键
$args->set('x-dead-letter-routing-key','ex_qu');
//也就是说normal队列上的消息存活时间都是2分组
//死信队列
$channel->queue_declare('queue_normal',false,true,
false,false,false,$args
);
//业务队列
$channel->queue_declare('queue_dlx',false,true,
false,false
);
$channel->queue_bind('queue_normal','ex_normal');
$channel->queue_bind('queue_dlx','ex_dl','ex_qu');
$message =new AMQPMessage($msg_str);
//只发送消息给死信交换机,因为业务交换机的消息是死信队列给的
$channel->basic_publish($message,'ex_normal','ex_qu');

$channel->close();
$connection->close();

消费者

require_once '../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
$connection = new AMQPStreamConnection(
    '172.17.0.5','5672','admin','admin','my_vhost'
);
$channel= $connection->channel();
//我们只要保证业务交换机和业务队列在就行了
//死信队列不给消费者消费消息
$channel->exchange_declare(
    'ex_dl','direct',false,true
);
$channel->queue_declare('queue_dlx',false,true,
false,false
);

$channel->queue_bind('queue_dlx','ex_dl','ex_qu');
echo '[*]Waiting for message';

$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    $msg->ack();
    if(trim($msg->body)=='quit'){
        echo 2;
        $msg->getChannel()->basic_cancel($msg->getConsumerTag());
    }
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume('queue_dlx','c1',false,false,false,false,$callback);
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,427评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,551评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,747评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,939评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,955评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,737评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,448评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,352评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,834评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,992评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,133评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,815评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,477评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,022评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,147评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,398评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,077评论 2 355

推荐阅读更多精彩内容