【RabbitMq】快速入门之work queue模式、fanout模式、direct模式、topic模式、RPC实现、publisher confirm机制

消息队列(MQ),很多场景都有它的身影,MQ的主要功能包括应用解耦流量削峰异步处理。本文主要讲解RabbitMq的原理及应用实例,将参考官网文档重点介绍RabbitMq基本概念work queue模式fanout模式direct模式topic模式RPC实现publisher confirms机制,从而达到快速入门的目的。

0.RabbitMq基本概念

  • vhost,虚拟主机,提供了完全隔离独立的环境,包括exchange、queue等,可通过插件web管理后台或者rabbitmqctl命令设置user的vhost权限。
  • connection,要使用rabbitmq必然要与服务器建立连接了,AMQP协议是基于TCP连接的应用层协议
  • channel,信道用于复用connection,减少TCP连接带来的资源开销,当访问量大的时候则需要开辟多个connection,并分摊到chennel。
  • routing_key,路由键在pub/sub模式下作为exchange匹配binding到queue的条件;在work queue模式下,可视为队列名称发送消息。
  • exchange,交换机在信道内,负责接受并转发消息。根据交换机的类型,有不同的匹配方式。
  • binding_key,绑定值可视为exchange与queue之间的映射关系值,绑定值与queue之间的关系是n:n,当一个queue对应exchange的多个binding_key时,exchange只会发送一次到该queue。
  • queue,消息队列
  • message,消息是要传递及处理的数据,通过RabbitMq指定的类来构造,可配置消息的参数属性,如correlation_id(请求标识),delivery_mode(投递模式)等。
  • producer/publisher,消息的生产者/发布者,携带routing_key和msg。
  • consumer/subscriber,消息的消费者/订阅者,按照不同的模式处理队列中的消息。

1.work queues模式

1.work queues

常规的消息队列模式,不涉及交换机exchange和队列绑定queue_binding,执行过程:生产者发送消息至队列,消费者从队列中取数据消费。

producer代码示例(PHP)

//1.建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
//2.信道
$channel = $connection->channel();
//3.信道中声明队列
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
$message = "Hello Task";
//4.生成amqp消息
$msg = new AMQPMessage($message, [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);//投递模式设置为消息持久化
//5.发布消息
$channel->basic_publish($msg, '', $queue_name);
echo "publisher  Sent '{$message}!'\n";
$channel->close();
$connection->close();

consumer代码示例

$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
    echo "consumer received : " . $msg->body . PHP_EOL;
    sleep(1);
    echo "Done" . PHP_EOL;
    //确认消息
    $msg->ack();
};
//公平调度, 设置预加载个数
$channel->basic_qos(null, 1, null);
//持续监听,回调处理消息
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
while ($channel->is_open()) {
    $channel->wait();
}

下面介绍publish/subscribe模式,并引入exchangequeue_binding。该模式根据exchange的不同类型有不同的转发规则,exchange的类型主要有fanout、direct、topic

2.fanout模式

2.fanout

该模式引入exchange、queue_binding,但不涉及routing_key和binding_key,因为publisher把消息投递给exchange后,所有绑定在该交换机上的队列都能接收到消息。

publisher代码

...
//通用连接部分参考上面,后面代码同理,只展示核心变更部分;完整代码可看官网
//该模式不用声明队列,只需声明exchange
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.fanout交换机
..
//消息投递到交换机
$channel->basic_publish($msg, 'fanout_logs');//2.fanout模式

subscriber代码

...
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.声明交换机
...
$channel->queue_bind($queue_name, 'fanout_logs');//2.队列绑定交换机
...

比起work queue,该模式更灵活,利用exchange可将消息转发到多个queue中。

3.direct模式

3.direct

如果在pub/sub模式下,只想将交换机的消息转发给指定的队列,fanout模式显然无法满足。此时可以利用direct模式,该模式将exchange和queue通过binding_key绑定在一起;exchange在接收publisher消息时依据routing_key和binding_key是否完全匹配,决定是否转发到对应queue。

publisher代码

$channel->exchange_declare('direct_logs', 'direct', false, true, false);//1.direct交换机
$routing_key = 'black';
$channel->basic_publish($msg, 'direct_logs', $routing_key);//2.发布消息至交换机,携带routing_key

subscriber代码

...
$channel->exchange_declare('direct_logs', 'direct', false, true, false);
$bindingKey = 'black';
$channel->queue_bind($queue_name, 'direct_logs', $bindingKey);//队列绑定交换机,声明binding_key
...

4.topic模式

4.topic

topic模式在direct模式基础上升级,routing_key和binding_key非完全匹配,支持更灵活的匹配规则;routing_key/binding_key可以通过word1.word2.wordn方式进行灵活扩展。【符号*代表1个word,符号#可代表0或n个words】

publisher代码

$channel->exchange_declare('topic_logs', 'topic', false, false, false); //3.1.topics路由
$routing_key = 'black.tall.big';
$channel->basic_publish($msg, 'topic_logs', $routing_key);//2.发布消息至交换机,携带routing_key

subscriber代码

$channel->exchange_declare('topic_logs', 'topic', false, true, false);//topic模式
$bindingKey = '#';//相当于全部消息都能接收
$channel->queue_bind($queue_name, 'topic_logs', $bindingKey);//队列绑定交换机,声明binding_key

bindingKey的举🌰
成功:black.#,自动匹配2个words、'black.tall.*'匹配1个word,占位匹配时必须要有点号.
失败:black.short.*
失败-错误使用符号:black#

5.RPC模式

5.RPC

RPC, 全称remote procedure call即远程程序调用,比起常规的远程调用,基于RabbitMq的RPC优点有:1.异步调用2.方便扩展提升服务端性能(开启多个server)

5.1.实现原理

  • 服务端和客户端,通过两个队列进行通信,RPC队列rpc_queue和回调队列reply_to_queue
  • 客户端携带请求标识correlation_idreply_to_queue回调队列信息,发送请求至rpc_queue,服务端监听rpc_queue,消费消息并发送消息至指定回调队列reply_to_queue。
  • 客户端监听回调队列reply_to_queue并通过correlation_id获取请求处理结果

下面以计算斐波那契数为作为RPC示例。

client端代码

class FibonacciRpcClient
{
    private $connection;
    private $channel;
    private $callback_queue;
    private $response;
    private $corr_id;

    //构造函数,监听回调队列,处理
    public function __construct()
    {
        $this->connection = new AMQPStreamConnection(
            'localhost',
            5672,
            'root',
            'root'
        );
        $this->channel = $this->connection->channel();
        //1.生成回调队列
        $this->callback_queue = 'reply_to';
        $this->channel->queue_declare($this->callback_queue, false, true, false, false);

        //2.1.轮训消费
        $this->channel->basic_consume(
            $this->callback_queue,
            '',
            false,
            true,
            false,
            false,
            array(
                $this,
                'onResponse'
            )
        );


    }

    //2.1.2监听队列的回调函数
    public function onResponse($rep)
    {
        if ($rep->get('correlation_id') == $this->corr_id) {
            $this->response = $rep->body;
        }
    }

    //远程调用,发送消息至rpc队列
    public function call($n)
    {
        $this->response = null;
        $this->corr_id = uniqid();//3.生成请求的唯一标识

        //4.1.创建消息,携带请求标识、回调队列名称
        $msg = new AMQPMessage(
            (string)$n,
            array(
                'correlation_id' => $this->corr_id,
                'reply_to'       => $this->callback_queue
            )
        );
        //4.2.发送消息至rpc队列,等待服务端消费
        $this->channel->basic_publish($msg, '', 'rpc_queue');
        //5.循环判断结果
        while (!$this->response) {
            $this->channel->wait();
        }
        return intval($this->response);
    }
}

$fibonacci_rpc = new FibonacciRpcClient();//构造函数,监听回调队列reply_to
$response = $fibonacci_rpc->call(35);//发送消息至prc队列,并循环判断回调队列的处理结果。
echo ' [.] Got ', $response, "\n";//回调队列的处理结果

server端代码

$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
//声明队列
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n)
{
    if ($n == 0) {
        return 0;
    }
    if ($n == 1) {
        return 1;
    }
    return fib($n-1) + fib($n-2);
}

echo " [x] Awaiting RPC requests\n";
$callback = function ($req) {
    //1.1监听rpc队列,处理client发送的消息
    $n = intval($req->body);
    echo ' [.] fib(', $n, ")\n";

    //1.2.返回处理结果,并携带请求标识
    $msg = new AMQPMessage(
        (string) fib($n),
        array('correlation_id' => $req->get('correlation_id'))
    );
    //2.发送消息至同一信道的 回调队列, 由client监听消费。
    $req->delivery_info['channel']->basic_publish(
        $msg,
        '',
        $req->get('reply_to')
    );
    //3.消息接受确认
    $req->ack();
};

//设置预加载数量,服务端worker公平调度
$channel->basic_qos(null, 1, null);
//轮训消费,监听rpc队列
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);

while ($channel->is_open()) {
    $channel->wait();
}

$channel->close();
$connection->close();

调用结果

client:
 [.] Got 9227465
server:
 [x] Awaiting RPC requests
 [.] fib(35)

6.publisher confirms模式
publisher confirms是RabbitMq实现可靠传输的扩展,用来判断publisher是否成功把消息发送到RabbitMq的broker。RabbitMq实现可靠传输的方式有两种:事务(不推荐)、publisher confirms,这两种方式互斥。publisher confirms的实现方式又可分为:同步异步

  • 6.1. 同步实现
    该模式是基于信道的,所以只要增加两个步骤即可:
    6.1.1. 信道声明为confirm模式
    6.1.2. 声明同步等待的超时时间
    代码如下:
...
$channel->confirm_select();//1.声明信道为confirm模式
...
try {
    $channel->wait_for_pending_acks($timeOut);//2.同步等待timeOut时间
}catch (Exception $exception){
    echo "exception:" . $exception->getMessage() . PHP_EOL;
}

..

  • 6.2. 异步实现
    异步实现通过注册回调的两个方法set_ack_handler和set_nack_handler。
    代码如下
$channel->confirm_select();//1.声明信道为confirm模式

//2.消息被ack后的回调
$channel->set_ack_handler(function (AMQPMessage $msg) {
    echo "ack msg" . PHP_EOL;
    file_put_contents('./ackfile.txt',json_encode($msg),FILE_APPEND);
});

//3.消息被nack'ed后的回调
$channel->set_nack_handler(function (AMQPMessage $msg) {
    echo "nack msg" . PHP_EOL;
    file_put_contents('./nackfile.txt',json_encode($msg),FILE_APPEND);
});

$channel->wait_for_pending_acks();

以上只是RabbitMq各种模式的基本使用,其他很多特性(持久化、网络分区、集群等)并未涉及,若要使用更多的特性请查阅官网文档,然后手动跑一下代码才能理解得更好。希望本文能帮助大家对RabbitMq的使用有个大致了解。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容

  • 1,摘要 从安装环境,配置入门,到HelloWorld实操,各种类型消息传递的演示代码,原理介绍,答疑解惑,面试题...
    笔名辉哥阅读 1,941评论 0 3
  • 问题一:RabbitMQ中的 broker 是指什么?cluster 又是指什么? 答:broker是指一个或多个...
    Leslie_Lee阅读 175评论 0 0
  • # HelloWorld 简介 RabbitMQ:接受消息再传递消息,可以视为一个“邮局”。发送者和接受者通过队列...
    xncode阅读 1,436评论 0 1
  • 1.什么是消息队列 消息队列允许应用间通过消息的发送与接收的方式进行通信,当消息接收方服务忙或不可用时,其提供了一...
    zhuke阅读 4,463评论 0 12
  • RabbitMQ是一个分布式系统 一、使用rabbitmq时的系统架构图 通过路由键将交换机和队列进行绑定,从而实...
    Gem_kaili阅读 4,987评论 0 0