RabbitMQ消息确认、消息持久化等核心知识总结

本文参考石杉的架构笔记公众号, 总结了一下RabbitMQ的核心知识, 非常感谢

一、消息中间件选型

1. ActiveMQ:
  • 优点: ActiveMQ是老牌的消息中间件,国内很多公司过去运用的还是非常广泛的,功能很强大。
  • 缺点: 没法确认ActiveMQ可以支撑互联网公司的高并发、高负载以及高吞吐的复杂场景,在国内互联网公司落地较少。而且使用较多的是一些传统企业,用ActiveMQ做异步调用和系统解耦。
2. RabbitMQ:
  • 优点: 可以支撑高并发、高吞吐、性能很高,同时有非常完善便捷的后台管理界面可以使用。另外,他还支持集群化、高可用部署架构、消息高可靠支持,功能较为完善。而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的case较多,国内各种中小型互联网公司使用RabbitMQ的实践也比较多。除此之外,RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化,因此,综合对比后,RabbitMQ是一个不错的消息中间件选择。
  • 缺点: 基于erlang语言开发的,所以导致较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为扎实的erlang语言功底才可以。
3. RocketMQ:
  • 优点: 阿里开源的,经过阿里的生产环境的超高并发、高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景。而且RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造。
  • 缺点: 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准JMS规范走的有些系统要迁移需要修改大量代码
4. Kafka:
  • 优点: 专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计。在大数据领域中配合实时计算技术(比如Spark Streaming、Storm、Flink)使用的较多。
  • 缺点: 相对于以上几种中间件来说,功能较少,在传统的MQ中间件使用场景中较少采用。

二、消息中间件的常见使用场景

  • 复杂系统的解耦: 多个系统间通过中间件进行数据交互, 避免牵一发而动全身, 减少耦合, 提升系统稳定性与可扩展性
  • 复杂链路的异步调用: 某些业务场景可以通过异步执行减少同步调用的时间, 从而大大提高系统响应时间而不影响核心逻辑
  • 瞬时高峰的削峰处理: 流量高峰期, 可以将请求积压在MQ中, 服务器不用一下处理所有请求从而导致系统崩溃, 高峰期后, 消费者可以慢慢消费

三、系统架构引入消息中间件后会有哪些缺点

  • 系统可用性降低: 引入MQ,系统多了一个依赖。依赖如果出现问题,就会导致系统可用性降低。一旦引入中间件,就必须考虑这个中间件是如何部署的,如何保证高可用性
  • 系统稳定性降低: 引入MQ, 可能由于网络故障、中间件故障、消费者异常等原因导致各种各样乱七八糟的问题产生, 从而使系统稳定性下降
  • 分布式一致性问题: 多系统协同处理一个业务, 不能保证所有系统都正常处理, 有可能出现系统数据不一致的情况, 所以此时又需要使用可靠消息最终一致性的分布式事务方案来保障数据一致性。

四、消息发送确认

生产者发送消息, 先发送消息到Exchange, 然后Exchange再路由到Queue, 这中间就需要确认两个事情

  • 确认消息是否成功发送到Exchange
  • 确认消息是否从Exchange成功路由到Queue

spring提供了两个回调函数来处理这两种消息发送确认

1. 确认消息是否成功发送到Exchange

有2种方式, 一种是重量级的事务消息机制。采用类事务的机制把消息投递到MQ,可以保证消息不丢失,但是性能极差,经过测试性能会呈现几百倍的下降。

所以说现在一般是不会用这种过于重量级的机制,而是会用轻量级的confirm机制。

另一种方式是confirm机制, 跟手动ack机制类似, 生产者将消息投递到RabbitMQ, 且将消息持久化到硬盘后, RabbitMQ会通过一个回调方法将confirm信息回传给生产端, 这样, 如果生产端的服务接收到了这个confirm消息,就知道是已经持久化到磁盘了。否则如果没有接收到confirm消息,那么就说明这条消息可能半路丢失了,此时你就可以重新投递消息到MQ去,确保消息不会丢失。

1.1 通过AMQP的事务机制可以保证消息发送确认
事务机制主要是通过对channel的设置实现

channel.txSelect();// 声明启动事务模式
channel.txComment();// 提交事务
channel.txRollback();// 回滚事务

1.2 使用confirm确认机制
实现ConfirmCallback并重写confirm回调方法, 消息发送到Broker后触发回调, 可以确认消息是否成功发送到Exchange

application.properties:

# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true

回调:

        // 消息是否成功发送到Exchange
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息成功发送到Exchange");
            } else {
                log.info("消息发送到Exchange失败: cause: {}", correlationData, cause);
            }
        });
2. 确认消息是否从Exchange成功路由到Queue

实现ReturnCallback并重写returnedMessage回调方法, 可以确认消息从EXchange路由到Queue失败, 注意: 这里的回调是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法

application.properties:

# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 触发returnedMessage回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
spring.rabbitmq.template.mandatory=true

回调:

        // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
        });

五、消息接收确认

消息怎样才算消费成功?

RabbitMQ默认自动确认(ack)消息被正确消费, 即消息投递到消费者后就自动确认消息被处理完毕, 并且会将该消息删除, 即使消费者意外宕机, 或者抛出异常, 如果消费者接收到消息, 还没处理完成就down掉或者抛出异常, 那么, 这条消息就丢失了

分析一下问题出在哪, 问题出在RabbitMQ只管消息投递出去, 而不管消息是否被正确处理就自动删除消息, 所以, 只要将自动ack修改为手动ack, 消费成功才通知RabbitMQ可以删除该消息即可, 如果消费者宕机, 消费失败, 由于RabbitMQ并未收到ack通知, 且感知到该消费者状态异常(如抛出异常), 就会将该消息重新推送给其他消费者, 让其他消费者继续执行, 这样就保证消费者挂掉但消息不会丢失

消息确认模式有:

  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据情况确认
  • AcknowledgeMode.MANUAL:手动确认

默认情况下消息消费者是自动ack(确认)消息的, 如果要手动ack(确认), 则需要修改确认模式为manual

application.properties:

# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消费消息并手动确认:

@Component
@Slf4j
public class LogUserConsumer {

    @Autowired
    UserLogService userLogService;

    @RabbitListener(queues = "log.user.queue")
    public void logUserConsumer(Message message, Channel channel, @Header (AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            log.info("收到消息: {}", message.toString());
            userLogService.insert(MessageHelper.msgToObj(message, UserLog.class));
        } catch (Exception e){
            log.error("logUserConsumer error", e);
            channel.basicNack(tag, false, true);
        } finally {
            channel.basicAck(tag, false);
        }
    }

}
  • 重点在channel.basicAck(tag, false)方法, 第一个参数deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel

  • 第二个参数multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

六、消息持久化

消息被投递到RabbitMQ的内存中, 还没投递到消费者实例之前宕机了, 消息不就丢失了?

可以进行消息持久化, 将Exchange、queue和message都持久化到硬盘, 这样, RabbitMQ重启时, 会把持久化的Exchange、queue和message从硬盘重新加载出来, 重新投递消息

1.1 Exchange的持久化, 声明交换机时指定持久化参数为true即可

    @Bean
    public DirectExchange logUserExchange() {
        return new DirectExchange("log.user.exchange", true, false);
    }

第二个参数durable: 是否持久化, 第三个参数autoDelete: 当所有绑定队列都不再使用时, 是否自动删除交换器, true: 删除, false: 不删除

1.2 queue的持久化, 声明队列时指定持久化参数为true即可

    @Bean
    public Queue logUserQueue() {
        return new Queue("log.user.queue.name", true);
    }

第二个参数durable, 是否持久化

1.3 message的持久化, 是通过配置deliveryMode实现的, 生产者投递时, 指定deliveryModeMessageDeliveryMode.PERSISTENT即可实现消息的持久化, 投递和消费都需要通过Message对象进行交互, 为了不每次都写配置转换的代码, 我们写一个消息帮助类MessageHelper:

public class MessageHelper {

    public static Message objToMsg(Object obj) {
        if (null == obj) {
            return null;
        }

        Message message = MessageBuilder.withBody(JsonUtil.objToStr(obj).getBytes()).build();
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 消息持久化
        message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);

        return message;
    }

    public static <T> T msgToObj(Message message, Class<T> clazz) {
        if (null == message || null == clazz) {
            return null;
        }

        String str = new String(message.getBody());
        T obj = JsonUtil.strToObj(str, clazz);

        return obj;
    }

}

消息投递时:

rabbitTemplate.convertAndSend("log.user.exchange.name", "log.user.routing.key.name", MessageHelper.objToMsg(userLog));

消息消费时(参考五、消息接收确认):

UserLog userLog = MessageHelper.msgToObj(message, UserLog.class);

如果不需要消息持久化, 则不需要通过Message进行转换, 可以直接通过字符串或者对象投递和消费

七、unack消息的积压问题

什么叫unack消息的积压问题, 简单来说就是消费者处理能力有限, 无法一下将MQ投递过来的所有消息消费完, 如果MQ推送消息过多, 比如可能有几千上万条消息积压在某个消费者实例内存中, 此时这些积压的消息就处于unack状态, 如果一直积压, 就有可能导致消费者服务实例内存溢出、内存消耗过大、甚至内存泄露

所以, RabbitMQ是必须要考虑一下消费者服务的处理能力的。

如何解决?

RabbitMQ基于一个prefetch count来控制这个unack message的数量。

你可以通过 “channel.basicQos(10)” 这个方法来设置当前channel的prefetch count。也可以通过配置文件设置: spring.rabbitmq.listener.simple.prefetch=10

举个例子,比如你要是设置为10的话,那么意味着当前这个channel里,unack message的数量不能超过10个,以此来避免消费者服务实例积压unack message过多。

这样的话,就意味着RabbitMQ正在投递到channel过程中的unack message,以及消费者服务在处理中的unack message,以及异步ack之后还没完成ack的unack message,所有这些message加起来,一个channel也不能超过10个。

如果你要简单粗浅的理解的话,也大致可以理解为这个prefetch count就代表了一个消费者服务同时最多可以获取多少个message来处理。

prefetch就是预抓取的意思,就意味着你的消费者服务实例预抓取多少条message过来处理,但是最多只能同时处理这么多消息。

如果一个channel里的unack message超过了prefetch count指定的数量,此时RabbitMQ就会停止给这个channel投递消息了,必须要等待已经投递过去的消息被ack了,此时才能继续投递下一个消息。

设置多大合理?

RabbitMQ官方给出的建议是prefetch count一般设置在100 - 300之间。也就是一个消费者服务最多接收到100 - 300个message来处理,允许处于unack状态。

这个状态下可以兼顾吞吐量也很高,同时也不容易造成内存溢出的问题。

八、总结

  • 配置汇总
# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 触发returnedMessage回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
spring.rabbitmq.template.mandatory=true
# 设置手动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容