RabbitMQ在分布式系统中的应用


由于之前做的项目中需要在多个节点之间可靠地通信,所以废弃了之前使用的Redis pub/sub(因为集群有单点问题,且有诸多限制),改用了RabbitMQ。
使用期间得到不少收获,也踩了不少坑,所以在此分享下心得。

1 - 怎么保证可靠性的?

RabbitMQ提供了几种特性,牺牲了一点性能代价,提供了可靠性的保证。

  • 持久化
    当RabbitMQ退出时,默认会将消息和队列都清除,所以需要在第一次声明队列和发送消息时指定其持久化属性为true
    这样RabbitMQ会将队列、消息和状态存到RabbitMQ本地的数据库,重启后会恢复。

    durable=true
    channel.queueDeclare("task_queue", durable, false, false, null);//队列 
    channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//消息
    

    注:当声明的队列已经存在时,尝试重新定义它的durable是不生效的。

  • 接收应答
    客户端接收消息的模式默认是自动应答,通过设置autoAck为false可以让客户端主动应答消息.
    当客户端拒绝此消息或者未应答便断开连接时,就会使得此消息重新入队(2.7.0以前是重新加入到队尾,2.7.0及以后是保留在队列中的原来位置).

    autoAck = false;
    requeue = true;
    channel.basicConsume(queue, autoAck, callback);
    channel.basicAck();//应答
    channel.basicReject(deliveryTag, requeue);//拒绝
    channel.basicRecover(requeue);//恢复
    
  • 发送确认
    默认情况下,发送端不关注发出去的消息是否被消费掉了。可设置channel为confirm模式.
    所有发送的消息都会被确认一次,用户可以自行根据server发回的确认消息查看状态。详细介绍见:confirms

    channel.confirmSelect(); // 进入confirm模式
    do publish messages... // 每个消息都会被编号,从1开始
    channel.getNextPublishSeqNo() // 查看下一个要发送的消息的序号
    channel.waitForConfirms(); // 等待所有消息发送并确认
    
  • 事务
    和confirm模式不能同时使用,而且会带来大量的多余开销,导致吞吐量下降很多,故而不推荐。

    channel.txSelect();
    try {
        do something...
        channel.txCommit();
    } catch (e){
        channel.txRollback();
    }
    
  • 消息队列的高可用(主备模式)

    相比于路由和绑定,可以视为是共享于所有的节点的,消息队列默认只存在于第一次声明它的节点上, 这样一旦这个节点挂了
    这个队列中未处理的消息就没有了. 幸好,RabbitMQ提供了将它备份到其他节点的机制. 任何时候都有
    一个master负责处理请求,其他slaves负责备份,当master挂掉,会将最早创建的那个slave提升为master

    命令:rabbitmqctl set_policy ha-all “^ha\.” ‘{“ha-mode”:”all”}’
    设置所有以’ha’开头的queue在所有节点上拥有备份。详细语法点这里.也可以在界面上配置

    20160516071133333.png

    注:由于exclusive类型的队列会在client和server连接断开时被删掉,所以对它设置持久化属性和备份都是没有意义的

  • 顺序保证

直接上图好了:

2 - 一些需要注意的地方

  • 集群配置
    一个集群中多个节点共享一份.erlang.cookie文件;若是没有启用RABBITMQ_USE_LONGNAME,需要在每个节点的hosts文件中指定其他节点的地址,不然会找不到其他集群中的节点。

  • 脑裂
    RabbitMQ集群对于网络分区的处理和忍受能力不太好,推荐使用federation或者shovel插件去解决。federation详见高级->Federation。但是,情况已经发生了,怎么去解决呢?放心,还是有办法恢复的。当网络断断续续时,会使得节点之间的通信断掉,进而造成集群被分隔开的情况。这样,每个小集群之后便只处理各自本地的连接和消息,从而导致数据不同步。当重新恢复网络连接时,它们彼此都认为是对方挂了-_-||,便可以判断出有网络分区出现了。但是RabbitMQ默认是忽略掉不处理的,造成两个节点继续各自为政(路由,绑定关系,队列等可以独立地创建删除,甚至主备队列也会每一方拥有自己的master)。可以更改配置使得连接恢复时,会根据配置自动恢复。

    1. ignore:默认,不做任何处理
    2. pause-minority:断开连接时,判断当前节点是否属于少数派(节点数少于或者等于一半),如果是,则暂停直到恢复连接。
    3. {pause_if_all_down, [nodes], ignore | autoheal}:断开连接时,判断当前集群中节点是否有节点在nodes中,如果有,则继续运行,否则暂停直到恢复连接。这种策略下,当恢复连接时,可能会有多个分区存活,所以,最后一个参数决定它们怎么合并。
    4. autoheal:当恢复连接时,选择客户端连接数最多的节点状态为主,重启其他节点。

    配置:**【详见下文:集群配置】

    1. 多次ack:客户端多次应答同一条消息,会使得该客户端收不到后续消息。

3 - 结合Docker使用

集群版本的实现:详见我自己写的一个例子rabbitmq-server-cluster

4 - 消息队列中间件的比较

  • RabbitMQ:
    1. 优点:支持很多协议如:AMQP,XMPP,STMP,STOMP;灵活的路由;成熟稳定的集群方案;负载均衡;数据持久化等。
    2. 缺点:速度较慢;比较重量级,安装需要依赖Erlang环境。
  • Redis:
    1. 优点:比较轻量级,易上手
    2. 缺点:单点问题,功能单一
  • Kafka:
    1. 优点:高吞吐;分布式;快速持久化;负载均衡;轻量级
    2. 缺点:极端情况下会丢消息

最后附一张网上截取的测试结果:

5 - 几个重要的概念

  • Virtual Host:包含若干个Exchange和Queue,表示一个节点;
  • Exchange:接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,Exchange分为direct、fanout、topic三种;
  • Binding:连接Exchange和Queue,包含路由规则;
  • Queue:消息队列,存储还未被消费的消息;
  • Message:Header+Body;
  • Channel:通道,执行AMQP的命令;一个连接可创建多个通道以节省资源。

6 - Client

RabbitMQ官方实现了很多热门语言的客户端,就不一一列举啦,以java为例,直接开始正题:

  • 建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 可以加上断开重试机制:
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(10000);
  • 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
  • 一对一:一个生产者,一个消费者

代码同上,只不过会有多个消费者,消息会轮序发给各个消费者。

如果设置了autoAck=false,那么可以实现公平分发(即对于某个特定的消费者,每次最多只发送指定条数的消息,直到其中一条消息应答后,再发送下一条)。需要在消费者中加上:

int prefetchCount = 1;
channel.basicQos(prefetchCount);
  • 广播

生产者:

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

消费者同上

  • Routing

生产者:支持通配符的Routing

String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

消费者同上

原文链接

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,497评论 18 139
  • 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时...
    预流阅读 584,163评论 51 785
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,335评论 2 34
  • 本文章翻译自http://www.rabbitmq.com/api-guide.html,并没有及时更新。 术语对...
    joyenlee阅读 7,604评论 0 3
  • 在writeToFile 方法中 atomically,参数值为BOOL类型,用于通知是否应该首先将文件内容保存在...
    执着的小蛋挞阅读 634评论 0 1