由于之前做的项目中需要在多个节点之间可靠地通信,所以废弃了之前使用的Redis pub/sub(因为集群有单点问题,且有诸多限制),改用了RabbitMQ。
使用期间得到不少收获,也踩了不少坑,所以在此分享下心得。
1 - 怎么保证可靠性的?
RabbitMQ提供了几种特性,牺牲了一点性能代价,提供了可靠性的保证。
-
持久化
当RabbitMQ退出时,默认会将消息和队列都清除,所以需要在第一次声明队列和发送消息时指定其持久化属性为true
,
这样RabbitMQ会将队列、消息和状态存到RabbitMQ本地的数据库,重启后会恢复。durable=true channel.queueDeclare("task_queue", durable, false, false, null);//队列 channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//消息
注:当声明的队列已经存在时,尝试重新定义它的durable是不生效的。
-
接收应答
客户端接收消息的模式默认是自动应答
,通过设置autoAck为false
可以让客户端主动应答
消息.
当客户端拒绝
此消息或者未应答
便断开连接时,就会使得此消息重新入队(2.7.0以前是重新加入到队尾,2.7.0及以后是保留在队列中的原来位置).autoAck = false; requeue = true; channel.basicConsume(queue, autoAck, callback); channel.basicAck();//应答 channel.basicReject(deliveryTag, requeue);//拒绝 channel.basicRecover(requeue);//恢复
-
发送确认
默认情况下,发送端不关注
发出去的消息是否
被消费掉了。可设置channel为confirm
模式.
所有发送的消息都会被确认一次,用户可以自行根据server发回的确认消息查看状态。详细介绍见:confirmschannel.confirmSelect(); // 进入confirm模式 do publish messages... // 每个消息都会被编号,从1开始 channel.getNextPublishSeqNo() // 查看下一个要发送的消息的序号 channel.waitForConfirms(); // 等待所有消息发送并确认
-
事务
和confirm模式不能同时使用,而且会带来大量的多余开销,导致吞吐量下降很多,故而不推荐。channel.txSelect(); try { do something... channel.txCommit(); } catch (e){ channel.txRollback(); }
-
消息队列的高可用(主备模式)
相比于路由和绑定,可以视为是共享于所有的节点的,消息队列默认只存在于第一次声明它的节点上, 这样一旦这个节点挂了
这个队列中未处理的消息就没有了. 幸好,RabbitMQ提供了将它备份到其他节点的机制. 任何时候都有
一个master
负责处理请求,其他slaves负责备份,当master挂掉,会将最早创建的那个slave提升为master命令:
rabbitmqctl set_policy ha-all “^ha\.” ‘{“ha-mode”:”all”}’
:
设置所有以’ha’开头的queue在所有节点上拥有备份。详细语法点这里.也可以在界面上配置
20160516071133333.png注:由于exclusive类型的队列会在client和server连接断开时被删掉,所以对它设置持久化属性和备份都是没有意义的
顺序保证
直接上图好了:
2 - 一些需要注意的地方
集群配置
一个集群中多个节点共享一份.erlang.cookie文件;若是没有启用RABBITMQ_USE_LONGNAME,需要在每个节点的hosts文件中指定其他节点的地址,不然会找不到其他集群中的节点。-
脑裂
RabbitMQ集群对于网络分区的处理和忍受能力不太好,推荐使用federation
或者shovel
插件去解决。federation详见高级->Federation。但是,情况已经发生了,怎么去解决呢?放心,还是有办法恢复的。当网络断断续续时,会使得节点之间的通信断掉,进而造成集群被分隔开的情况。这样,每个小集群之后便只处理各自本地的连接和消息,从而导致数据不同步。当重新恢复网络连接时,它们彼此都认为是对方挂了-_-||,便可以判断出有网络分区出现了。但是RabbitMQ默认是忽略掉不处理的,造成两个节点继续各自为政(路由,绑定关系,队列等可以独立地创建删除,甚至主备队列也会每一方拥有自己的master)。可以更改配置使得连接恢复时,会根据配置自动恢复。- ignore:默认,不做任何处理
- pause-minority:断开连接时,判断当前节点是否属于少数派(节点数少于或者等于一半),如果是,则暂停直到恢复连接。
- {pause_if_all_down, [nodes], ignore | autoheal}:断开连接时,判断当前集群中节点是否有节点在nodes中,如果有,则继续运行,否则暂停直到恢复连接。这种策略下,当恢复连接时,可能会有多个分区存活,所以,最后一个参数决定它们怎么合并。
- autoheal:当恢复连接时,选择客户端连接数最多的节点状态为主,重启其他节点。
配置:**【详见下文:集群配置】
- 多次ack:客户端多次应答同一条消息,会使得该客户端收不到后续消息。
3 - 结合Docker使用
集群版本的实现:详见我自己写的一个例子rabbitmq-server-cluster
4 - 消息队列中间件的比较
- RabbitMQ:
- 优点:支持很多协议如:AMQP,XMPP,STMP,STOMP;灵活的路由;成熟稳定的集群方案;负载均衡;数据持久化等。
- 缺点:速度较慢;比较重量级,安装需要依赖Erlang环境。
- Redis:
- 优点:比较轻量级,易上手
- 缺点:单点问题,功能单一
- Kafka:
- 优点:高吞吐;分布式;快速持久化;负载均衡;轻量级
- 缺点:极端情况下会丢消息
最后附一张网上截取的测试结果:
5 - 几个重要的概念
- Virtual Host:包含若干个Exchange和Queue,表示一个节点;
- Exchange:接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,Exchange分为direct、fanout、topic三种;
- Binding:连接Exchange和Queue,包含路由规则;
- Queue:消息队列,存储还未被消费的消息;
- Message:Header+Body;
- Channel:通道,执行AMQP的命令;一个连接可创建多个通道以节省资源。
6 - Client
RabbitMQ官方实现了很多热门语言的客户端,就不一一列举啦,以java为例,直接开始正题:
- 建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 可以加上断开重试机制:
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);
- 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
- 一对一:一个生产者,一个消费者
代码同上,只不过会有多个消费者,消息会轮序发给各个消费者。
如果设置了autoAck=false,那么可以实现公平分发(即对于某个特定的消费者,每次最多只发送指定条数的消息,直到其中一条消息应答后,再发送下一条)。需要在消费者中加上:
int prefetchCount = 1;
channel.basicQos(prefetchCount);
- 广播
生产者:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
消费者同上
- Routing
生产者:支持通配符的Routing
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
消费者同上