消息队列(MQ),很多场景都有它的身影,MQ的主要功能包括应用解耦、流量削峰、异步处理。本文主要讲解RabbitMq的原理及应用实例,将参考官网文档重点介绍RabbitMq基本概念、work queue模式、fanout模式、direct模式、topic模式、RPC实现、publisher confirms机制,从而达到快速入门的目的。
0.RabbitMq基本概念
- vhost,虚拟主机,提供了完全隔离独立的环境,包括exchange、queue等,可通过插件web管理后台或者rabbitmqctl命令设置user的vhost权限。
- connection,要使用rabbitmq必然要与服务器建立连接了,AMQP协议是基于TCP连接的应用层协议。
- channel,信道用于复用connection,减少TCP连接带来的资源开销,当访问量大的时候则需要开辟多个connection,并分摊到chennel。
- routing_key,路由键在pub/sub模式下作为exchange匹配binding到queue的条件;在work queue模式下,可视为队列名称发送消息。
- exchange,交换机在信道内,负责接受并转发消息。根据交换机的类型,有不同的匹配方式。
- binding_key,绑定值可视为exchange与queue之间的映射关系值,绑定值与queue之间的关系是n:n,当一个queue对应exchange的多个binding_key时,exchange只会发送一次到该queue。
- queue,消息队列。
- message,消息是要传递及处理的数据,通过RabbitMq指定的类来构造,可配置消息的参数属性,如correlation_id(请求标识),delivery_mode(投递模式)等。
- producer/publisher,消息的生产者/发布者,携带routing_key和msg。
- consumer/subscriber,消息的消费者/订阅者,按照不同的模式处理队列中的消息。
1.work queues模式
常规的消息队列模式,不涉及交换机exchange和队列绑定queue_binding,执行过程:生产者发送消息至队列,消费者从队列中取数据消费。
producer代码示例(PHP)
//1.建立连接
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
//2.信道
$channel = $connection->channel();
//3.信道中声明队列
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
$message = "Hello Task";
//4.生成amqp消息
$msg = new AMQPMessage($message, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);//投递模式设置为消息持久化
//5.发布消息
$channel->basic_publish($msg, '', $queue_name);
echo "publisher Sent '{$message}!'\n";
$channel->close();
$connection->close();
consumer代码示例
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
$queue_name='task_queue';
$channel->queue_declare($queue_name, false, true, false, false);
echo " [*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo "consumer received : " . $msg->body . PHP_EOL;
sleep(1);
echo "Done" . PHP_EOL;
//确认消息
$msg->ack();
};
//公平调度, 设置预加载个数
$channel->basic_qos(null, 1, null);
//持续监听,回调处理消息
$channel->basic_consume($queue_name, '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
下面介绍publish/subscribe模式,并引入exchange和queue_binding。该模式根据exchange的不同类型有不同的转发规则,exchange的类型主要有fanout、direct、topic。
2.fanout模式
该模式引入exchange、queue_binding,但不涉及routing_key和binding_key,因为publisher把消息投递给exchange后,所有绑定在该交换机上的队列都能接收到消息。
publisher代码
...
//通用连接部分参考上面,后面代码同理,只展示核心变更部分;完整代码可看官网
//该模式不用声明队列,只需声明exchange
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.fanout交换机
..
//消息投递到交换机
$channel->basic_publish($msg, 'fanout_logs');//2.fanout模式
subscriber代码
...
$channel->exchange_declare('fanout_logs', 'fanout', false, true, false);//1.声明交换机
...
$channel->queue_bind($queue_name, 'fanout_logs');//2.队列绑定交换机
...
比起work queue,该模式更灵活,利用exchange可将消息转发到多个queue中。
3.direct模式
如果在pub/sub模式下,只想将交换机的消息转发给指定的队列,fanout模式显然无法满足。此时可以利用direct模式,该模式将exchange和queue通过binding_key绑定在一起;exchange在接收publisher消息时依据routing_key和binding_key是否完全匹配,决定是否转发到对应queue。
publisher代码
$channel->exchange_declare('direct_logs', 'direct', false, true, false);//1.direct交换机
$routing_key = 'black';
$channel->basic_publish($msg, 'direct_logs', $routing_key);//2.发布消息至交换机,携带routing_key
subscriber代码
...
$channel->exchange_declare('direct_logs', 'direct', false, true, false);
$bindingKey = 'black';
$channel->queue_bind($queue_name, 'direct_logs', $bindingKey);//队列绑定交换机,声明binding_key
...
4.topic模式
topic模式在direct模式基础上升级,routing_key和binding_key非完全匹配,支持更灵活的匹配规则;routing_key/binding_key可以通过word1.word2.wordn方式进行灵活扩展。【符号*代表1个word,符号#可代表0或n个words】
publisher代码
$channel->exchange_declare('topic_logs', 'topic', false, false, false); //3.1.topics路由
$routing_key = 'black.tall.big';
$channel->basic_publish($msg, 'topic_logs', $routing_key);//2.发布消息至交换机,携带routing_key
subscriber代码
$channel->exchange_declare('topic_logs', 'topic', false, true, false);//topic模式
$bindingKey = '#';//相当于全部消息都能接收
$channel->queue_bind($queue_name, 'topic_logs', $bindingKey);//队列绑定交换机,声明binding_key
bindingKey的举🌰
成功:black.#
,自动匹配2个words、'black.tall.*'匹配1个word,占位匹配时必须要有点号.
失败:black.short.*
失败-错误使用符号:black#
5.RPC模式
RPC, 全称remote procedure call即远程程序调用,比起常规的远程调用,基于RabbitMq的RPC优点有:1.异步调用;2.方便扩展提升服务端性能(开启多个server)
5.1.实现原理�
- 服务端和客户端,通过两个队列进行通信,RPC队列rpc_queue和回调队列reply_to_queue。
- 客户端携带请求标识correlation_id和reply_to_queue回调队列信息,发送请求至rpc_queue,服务端监听rpc_queue,消费消息并发送消息至指定回调队列reply_to_queue。
- 客户端监听回调队列reply_to_queue并通过correlation_id获取请求处理结果。
下面以计算斐波那契数为作为RPC示例。
client端代码
class FibonacciRpcClient
{
private $connection;
private $channel;
private $callback_queue;
private $response;
private $corr_id;
//构造函数,监听回调队列,处理
public function __construct()
{
$this->connection = new AMQPStreamConnection(
'localhost',
5672,
'root',
'root'
);
$this->channel = $this->connection->channel();
//1.生成回调队列
$this->callback_queue = 'reply_to';
$this->channel->queue_declare($this->callback_queue, false, true, false, false);
//2.1.轮训消费
$this->channel->basic_consume(
$this->callback_queue,
'',
false,
true,
false,
false,
array(
$this,
'onResponse'
)
);
}
//2.1.2监听队列的回调函数
public function onResponse($rep)
{
if ($rep->get('correlation_id') == $this->corr_id) {
$this->response = $rep->body;
}
}
//远程调用,发送消息至rpc队列
public function call($n)
{
$this->response = null;
$this->corr_id = uniqid();//3.生成请求的唯一标识
//4.1.创建消息,携带请求标识、回调队列名称
$msg = new AMQPMessage(
(string)$n,
array(
'correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue
)
);
//4.2.发送消息至rpc队列,等待服务端消费
$this->channel->basic_publish($msg, '', 'rpc_queue');
//5.循环判断结果
while (!$this->response) {
$this->channel->wait();
}
return intval($this->response);
}
}
$fibonacci_rpc = new FibonacciRpcClient();//构造函数,监听回调队列reply_to
$response = $fibonacci_rpc->call(35);//发送消息至prc队列,并循环判断回调队列的处理结果。
echo ' [.] Got ', $response, "\n";//回调队列的处理结果
server端代码
$connection = new AMQPStreamConnection('localhost', 5672, 'root', 'root');
$channel = $connection->channel();
//声明队列
$channel->queue_declare('rpc_queue', false, false, false, false);
function fib($n)
{
if ($n == 0) {
return 0;
}
if ($n == 1) {
return 1;
}
return fib($n-1) + fib($n-2);
}
echo " [x] Awaiting RPC requests\n";
$callback = function ($req) {
//1.1监听rpc队列,处理client发送的消息
$n = intval($req->body);
echo ' [.] fib(', $n, ")\n";
//1.2.返回处理结果,并携带请求标识
$msg = new AMQPMessage(
(string) fib($n),
array('correlation_id' => $req->get('correlation_id'))
);
//2.发送消息至同一信道的 回调队列, 由client监听消费。
$req->delivery_info['channel']->basic_publish(
$msg,
'',
$req->get('reply_to')
);
//3.消息接受确认
$req->ack();
};
//设置预加载数量,服务端worker公平调度
$channel->basic_qos(null, 1, null);
//轮训消费,监听rpc队列
$channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}
$channel->close();
$connection->close();
调用结果
client:
[.] Got 9227465
server:
[x] Awaiting RPC requests
[.] fib(35)
6.publisher confirms模式
publisher confirms是RabbitMq实现可靠传输的扩展,用来判断publisher是否成功把消息发送到RabbitMq的broker。RabbitMq实现可靠传输的方式有两种:事务(不推荐)、publisher confirms,这两种方式互斥。publisher confirms的实现方式又可分为:同步、异步。
- 6.1. 同步实现
该模式是基于信道的,所以只要增加两个步骤即可:
6.1.1. 信道声明为confirm模式
6.1.2. 声明同步等待的超时时间
代码如下:
...
$channel->confirm_select();//1.声明信道为confirm模式
...
try {
$channel->wait_for_pending_acks($timeOut);//2.同步等待timeOut时间
}catch (Exception $exception){
echo "exception:" . $exception->getMessage() . PHP_EOL;
}
..
- 6.2. 异步实现
异步实现通过注册回调的两个方法set_ack_handler和set_nack_handler。
代码如下
$channel->confirm_select();//1.声明信道为confirm模式
//2.消息被ack后的回调
$channel->set_ack_handler(function (AMQPMessage $msg) {
echo "ack msg" . PHP_EOL;
file_put_contents('./ackfile.txt',json_encode($msg),FILE_APPEND);
});
//3.消息被nack'ed后的回调
$channel->set_nack_handler(function (AMQPMessage $msg) {
echo "nack msg" . PHP_EOL;
file_put_contents('./nackfile.txt',json_encode($msg),FILE_APPEND);
});
$channel->wait_for_pending_acks();
以上只是RabbitMq各种模式的基本使用,其他很多特性(持久化、网络分区、集群等)并未涉及,若要使用更多的特性请查阅官网文档,然后手动跑一下代码才能理解得更好。希望本文能帮助大家对RabbitMq的使用有个大致了解。