RabbitMQ如何保证消息的可达性

一、RabbitMQ简介

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

RabbitMQ,又称为高性能分布式消息队列,它实现了AMQP标准协议。

分布式消息队列有很多应用场景,比如异步处理、应用解耦、流量削峰等。

1、异步处理

用户注册后需要发送短信和邮件,传统做法是先将用户信息写入数据库,然后发送短信、发送邮件,都完成后返回。

如果用到消息队列,可以先将用户信息写入数据库,然后将注册信息写入消息队列,发送短信、发送邮件或者还有其他的业务逻辑都订阅此消息,完成发送。

2、应用解耦

还是上面的例子,如果在一个大型分布式网站中,用户系统、短信系统、邮件系统可能都是独立的系统服务。

这时候,在用户注册成功后,你可以通过RPC远程调用不同的服务接口,但更好的做法还是通过消息队列,订阅自己感兴趣的数据,日后就算增加或者删减功能,主业务都不用变动。

3、流量削峰

一般在秒杀或者团购活动中,流量激增,应用面临压力过大。可以在应用前端加入消息队列,通过设置队列最大长度来限制活动人数。这时候,后端服务器就可以游刃有余的处理数据了。

二、消息通信

在AMQP协议中,有几个基本概念,我们必须先搞明白。

1、Virtual host

虚拟主机,每一个虚拟主机中包含所有的AMQP基本组件,用户、队里、交换器等都是在虚拟主机里面创建。典型的用法是,如果公司的多个产品只想用一个服务器,就可以把他们划分到不同的虚拟主机中,里面的任何信息都是独立存在,互不干扰。

2、Connection

连接,应用程序和服务器之间的TCP连接。

3、Channel

通道,当你的应用程序和服务器连接之后,就会创建TCP连接。一旦打开了TCP连接,就可以创建一个Channel通道,所以说Channel通道是一个TCP连接的内部逻辑单元。
这是因为,创建和销毁TCP连接是比较昂贵的开销,每一次访问都建立新的TCP连接的话,不仅是巨大浪费,而且还容易造成系统性能瓶颈。

4、Queue

队列,所有的消息最终都会被送到这里,等待着被感兴趣的人取走。

5、Exchange

交换器,消息到达服务的第一站就是交换器,然后根据分发规则,匹配路由键,将消息放到对应队列中。值得注意的是,交换器的类型不止一种。

  • Direct
    直连交换器,只有在消息中的路由键和绑定关系中的键一致时,交换器才把消息发到相应队列

  • Fanout
    广播交换器,只要消息被发送到广播交换器,它会将消息发到所有的队列

  • Topic
    主题交换器,根据路由键,通配规则(*和#),将消息发到相应队列

6、Binding

绑定,交换器和队列之间的绑定关系,绑定中就包含路由键,绑定信息被保存到交换器的查询表中,交换器根据它分发消息。

了解到这些组件相关概念后,我们总结一下来看看,一条消息在RabbitMQ中是如何流转的。


RabbitMQ消息流转

三、持久化和发送方确认

1、持久化

事实上,上图所示只是一个最基本的消息流转过程,交换器和队列这些组件还有一个比较重要的属性:持久化。

默认情况下,重启RabbitMQ服务器之后,我们创建的交换器和队列都会消失不见,当然了,如果里面还有未来得及消费的数据,也将难于幸免。
持久化交换器和队列,为的是在AMQP服务器重启之后,重新创建它们并绑定关系,在RabbitMQ中,设置durable属性为true即可。

不过,除了这些还不够。虽然保证了交换器和队列是安全的,但那些还未来得及消费的数据就变得岌岌可危。所以,我们还要设置消息的投递模式为持久的。

这样,如果RabbitMQ服务器重启的话,我们的策略和相关数据才会确保无忧。所以,我们说能从AMQP服务器崩溃中恢复的消息,称之为持久化消息。那么,它必须保证以下三点:

  • 设置投递模式为持久的
  • 交换器为持久的
  • 队列为持久的

2、发送方确认

到目前为止,我们已经保证了消息的安全性。但是,还有另外一个问题。由于发布操作是不返回任何信息给生产者的,我们怎么知道服务器是否正确接收消息并持久化到硬盘上了呢?

为此,我们可以将通道设置为事务模式。事务是AMQP标准中的一部分,但RabbitMQ有更好的做法,那就是发送方确认模式,publisher confirm。如果设置了confirm模式,发布的消息会被分配一个唯一的ID号,等消息被投递给匹配的队列后,通道会发送一个发送方确认模式给生产者(包含消息的唯一ID)。

四、与Spring整合实例

废话了这么多,只是为了下面的代码部分做下铺垫。毕竟,了解到上面内容之后,代码其实已经快要跃然纸上了。

1、配置文件

配置文件中我们首先要声明RabbitMQ服务器的信息,IP地址、端口号、用户名密码等,但尤为重要的是,设置发布确认模式。

<bean id="rabbitConnectionFactory"
    class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="127.0.0.1"/>
    <property name="username" value="shiqizhen"/>
    <property name="password" value="shiqizhen"/>
    <property name="port" value="5672"></property>
    <property name="virtualHost" value="shiqizhen"></property>
    <property name="publisherConfirms" value="true"></property>
    <property name="publisherReturns" value="true"></property>
</bean>

接着,还要声明交换器和队列,记得它们是持久化的哦,durable为true。

<rabbit:admin connection-factory="rabbitConnectionFactory"/>
    
//队列的名字、持久化、不要自动删除、不是独享队列
<rabbit:queue name="userInfoQueue" durable="true" auto-delete="false" exclusive="false"/>
//交换器,类型为direct。并绑定交换器和队列的关系,路由键为10086
<rabbit:direct-exchange name="user-exchange" durable="true" auto-delete="false">
    <rabbit:bindings>
        <rabbit:binding queue="userInfoQueue" key="10086"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

最后,配置消费者和消息模板

//配置消费者 ref为bean的引用 queues指明了消费者与队列的关系
//重要的是acknowledge 确认模式为手动确认
<rabbit:listener-container connection-factory="rabbitConnectionFactory" acknowledge="manual">
        <rabbit:listener ref="consumerListener" queues="userInfoQueue" method="onMessage" />
</rabbit:listener-container>

//配置Spring RabbitMQ消息模板 
<bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
    <property name="confirmCallback" ref="publisherConfirm"></property>
    <property name="returnCallback" ref="returnMsgCallBack"></property>
    <property name="mandatory" value="true"></property>
</bean>

2、生产者

上面我们声明了rabbitTemplate,直接用它的send方法发送消息即可。不过它有几个参数必须先要了解下。

  • exchange
    交换器名称,消息发到哪个交换器上
  • routingKey
    路由键,交换器怎样分发消息到对应队列
  • Message
    消息体对象,它包含消息的主体和消息属性。消息属性包含很多附属信息,比如消息内容类型、消息ID、用户ID等。
  • CorrelationData
    消息相关数据,实际它只有一个ID的属性。不过很重要,在发布方确认的回调方法里,会带有这个参数。我们可以根据它很直观的看到哪条消息发送成功或失败。
@Controller
public class IndexController {
    
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/send_msg")
    @ResponseBody
    public User send_msg() {    
        String exchange = "user-exchange";
        String routingKey = "10086";
        
        User user = new User();
        String id = IdUtil.getId();
        user.setUid(id);
        user.setUsername("小小沙弥");
        user.setPassword("1234");
        user.setCreatetime(DateUtil.getDateTime(new Date()));
        
        CorrelationData correlation = new CorrelationData(id);
        Message message = new Message(JSONObject.toJSONBytes(user, SerializerFeature.WriteNullStringAsEmpty), new MessageProperties());
        logger.info("已发送消息到RabbitMQ服务器:{}",JSONObject.toJSONString(user));
        rabbitTemplate.send(exchange, routingKey,message,correlation);  
        return user;
    }
}

3、消费者

消费者就是上面我们配置的listener ref引用的Bean。还记得我们把确认模式设置了手动确认,所以在消费者端有个很重要的动作,就是确认消息。

  • channel.basicAck(deliveryTag, false)
    第一个参数是RabbitMQ内部产生的消息ID,第二个参数代表是否批量确认消息。通过这个指令我们告诉生产者端,消息已经被正确消费了,RabbitMQ就会将此消息在磁盘上删除。
  • channel.basicReject(deliveryTag, false)
    拒绝消息。如果消费到的消息不是我们想要的,或者处理的时候报错,我们可以将消息拒绝。但值得注意的是第二个参数。如果设置为false,说明拒绝消息并将消息从服务器上删除;如果设置为true,说明拒绝消息并将消息重新放回队列。如果你的消费者只有一个,最好不要把它设置为true,否则消息会一直重试,直到把消费者端服务器搞死。如果因为处理失败而拒绝的话,最好将消息删除,同时将消息记录到日志文件或者数据库中。
@Service
public class ConsumerListener implements ChannelAwareMessageListener{

    Logger logger = LoggerFactory.getLogger(this.getClass());
    
    public void onMessage(Message message, Channel channel) throws Exception {
        
        logger.info("消费者监听到RabbitMQ消息...");
        MessageProperties properties = message.getMessageProperties();
        String msg = new String(message.getBody(),"utf-8");
        logger.info("交换器:{},路由键:{}",properties.getReceivedExchange(),properties.getReceivedRoutingKey());
        logger.info("消息内容:{}",msg); 
        long deliveryTag = properties.getDeliveryTag();
        channel.basicAck(deliveryTag, false);//确认信息,false为不批量确认
        //channel.basicReject(deliveryTag, true);//true为重入队列 false为删除消息
    }
}

4、发送方确认

我们发送消息给RabbitMQ,第一站就是交换器。RabbitMQ是否能正确接收消息,我们就靠它来反馈。这里的CorrelationData就是在生产者端设置的,我们可以将它当成消息ID,也可以直接把消息写入这里。

@Component
public class PublisherConfirm implements ConfirmCallback{

    Logger logger = LoggerFactory.getLogger(this.getClass());
    
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("消息投递成功!");
        }else {
            logger.warn("消息投递失败,原因:{},消息ID:{}",cause,correlationData.getId());
        }
    }
}

如果我们把交换器的名字写错,那么在这里,你就会得到以下信息:

22:57:51,635  WARN PublisherConfirm:19 - 消息投递失败,原因:
channel error; protocol method: #method<channel.close>
(reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost 'shiqizhen', class-id=60, method-id=40),

消息ID:516387069669408768
  22:57:51,638 ERROR CachingConnectionFactory:1278 - Channel shutdown: 
channel error; protocol method:
 #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost 'shiqizhen', class-id=60, method-id=40)

5、返回回调

除了设置RabbitMQ的发送方确认,在Spring中还有一个publisherReturns值的我们注意。虽然我们将消息发送到了交换器,但交换器是否能正确将消息分发到对应队列,还要打个问号。如果消息无法发送到指定的队列,那么publisherReturns就会发挥作用。记住,如果想应用这个特性,需要将mandatory设置为true。

@Component
public class ReturnMsgCallBack implements ReturnCallback{

    Logger logger = LoggerFactory.getLogger(this.getClass());
    
    public void returnedMessage(Message message, int replyCode, 
                    String replyText, String exchange, String routingKey) {
        logger.info("消息内容:{}",new String(message.getBody()));
        logger.info("回复文本:{},回复代码:{}",replyText,replyCode);
        logger.info("交换器名称:{},路由键:{}",exchange,routingKey); 
    }
}

如果我们不小心写错了路由键的名字,那就会调用到这里。

23:24:27,813  INFO ReturnMsgCallBack:16 - 消息内容:{"createtime":"2018-11-25 23:24:24","password":"1234","role":null,"uid":"516393749815754752","username":"小小沙弥"}
23:24:27,814  INFO ReturnMsgCallBack:17 - 回复文本:NO_ROUTE,回复代码:312
23:24:27,814  INFO ReturnMsgCallBack:18 - 交换器名称:user-exchange,路由键:10086_xxx

//这里是发送方确认打印的信息 说投递到交换器成功
23:24:27,814  INFO PublisherConfirm:17 - 消息投递成功!

有个问题,如同第一个例子,如果写错了路由键的名称,发送方确认会打印ack为false的异常信息,但为什么不会调用到publisherReturns呢?

如果路由键错误,说明消息压根就没有被接收到。这肯定是一个严重错误,所以RabbitMQ直接把当前通道关闭了。

Channel shutdown: 
channel error; protocol method:
reply-code=404, reply-text=NOT_FOUND - no exchange 'user-exchange_xxx' in vhost ...

五、监听RabbitMQ服务器状态

如果你的RabbitMQ服务不是一个集群,那么当网络故障或其他原因导致RabbitMQ服务停掉的时候,我们怎么做呢?当然,你可以在Send方法中加入try/catch,根据catch信息返回你的状态。但有个更好的思路,可以结合使用。
在创建RabbitMQ服务连接的时候,我们要配置一个Bean,CachingConnectionFactory它有个方法addConnectionListener,我们可以利用它来监听服务器的连接状态。

public class RabbitMQConnectionListener implements ConnectionListener{
    public void onCreate(Connection connection) {
        System.out.println("服务器已启动...");
    }
    public void onClose(Connection connection) {
        System.out.println("服务器已关闭...");
    }
}

并在合适的位置,比如Spring容器初始化方法里,加入这么一句rabbitConnectionFactory.addConnectionListener(new RabbitMQConnectionListener());

这样,我们就可以掌握RabbitMQ服务器的连接状态了,那么我们就可以根据此状态,在生产者方调用send方法的时候,判断此状态。如果未连接,可以先将消息保存到数据库或者缓存中。当连接到RabbitMQ,我们先把缓存的消息拿出来发送,再将此状态重置为已连接。

六、总结

本文简单介绍了AMQP协议标准中的相关概念,以及RabbitMQ在Spring中如何正确配置使用持久化消息、发送方模式和返回回调等机制。并在最后,介绍了在Spring中如何监听RabbitMQ的服务器连接状态。总而言之一句话,我们将要怎样使用RabbitMQ,才能保证消息不会丢失。希望本文对你使用RabbitMQ有所帮助!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,895评论 2 11
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,353评论 2 34
  • 利用RabbitMQ集群横向扩展能力,均衡流量压力,让消息集群的秒级服务能力达到百万,Google曾做过此类实验;...
    有货技术阅读 3,459评论 0 1
  • 最近看到的JD里面,常常默默的附带这么一句要求:文笔好。 这不,为了证明文笔好,只能裸奔给你看。 师傅我这多年没有...
    元师傅阅读 286评论 1 2