【RabbitMQ-5】-生产者保证消息安全(confirm配置)

消息的不丢失,在MQ角度考虑,一般有三种途径:

  1. 生产者不丢数据
  2. MQ服务器不丢数据
  3. 消费者不丢数据
    其余的方式是根据业务来说的,比如消息落库等等,这篇咱们就研究下MQ的机制。

1. 生产者不丢失数据

1.1 开启事务模式

amqp事务仅仅适用于publish和ack,rabbitmq新增了reject的事务,其他操作不具备事务的特性。也就是说:rabbit事务只可以保证:

  1. 生产者发出的消息成功被MQ服务器收到(不保证进入queue);
  2. 消费者发出的确认消息成功的被MQ服务器收到;
    consumer端的具体消费逻辑如果需要使用事务,只能引入外部事务。

RabbitTemplate代码:

  @Bean
    @Autowired
   public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
       RabbitTemplate rabbitTemplate = new RabbitTemplate();
       rabbitTemplate.setConnectionFactory(connectionFactory);
       rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
   }

(1)生产者发送消息,若是MQ服务器挂掉,那么程序会不断尝试重试,直至broker恢复,会重新接收这个消息。
(2)若是消费者在ack之前挂掉,MQ服务器会将这条消息恢复,若是长时间没有收到consumer端的确认消息,那么会将消息从unacked状态转化为ready状态。
(3)若是消费者处理消息期间抛出异常,MQ服务器会收到一个nack或者reject,MQ服务器也会恢复这条消息。

开启事务会大幅降低消息发送及接收效率,因为当已经有一个事务存在时,后面的消息是不能被发送或者接收(对同一个consumer而言)的。

1.2 confirm模式

为了producer端知道消息是否进入queue,可以使用confirmreturn来代替事务。

confirm和return的配置:

  //消息的确认机制(confirm);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);

confirm和return的区别和联系:

  • confirmCallBack:消息从生产者到达exchange时返回ack,消息未到达exchange返回nack
  • returnCallBack:消息进入exchange但未进入queue时会被调用。

官网对confirm和return的描述:

When a rabbit template send operation completes, the channel is closed; this would preclude the reception of confirms or returns in the case when the connection factory cache is full (when there is space in the cache, the channel is not physically closed and the returns/confirms will proceed as normal). When the cache is full, the framework defers the close for up to 5 seconds, in order to allow time for the confirms/returns to be received. When using confirms, the channel will be closed when the last confirm is received. When using only returns, the channel will remain open for the full 5 seconds. It is generally recommended to set the connection factory’s channelCacheSize to a large enough value so that the channel on which a message is published is returned to the cache instead of being closed. You can monitor channel usage using the RabbitMQ management plugin; if you see channels being opened/closed rapidly you should consider increasing the cache size to reduce overhead on the server.

大概意思就是:在rabbit Template发送操作完成时,channels才会关闭。在连接工厂(ConnectionCache)满的情况下,缓存中有空间时,channel不会关闭,直到confirm/return处理完成。
使用confirm时,将在接受到ack时关闭channels;使用return时,通道会整整持续5s的时间。通常建议将Connection Cache设置足够大的值,以便发布的消息的channel可以返回连接池而不是关闭。当看到channel快速打开或者关闭时,应该考虑增加连接池大小以减少服务器的开销。

1.2.1 生产者消息丢失的情况

(1)rabbitmq由于短暂的网络异常,导致消息发送了出去,但是未到exchange,连接可以短时间恢复。
(2)rabbitmq服务器挂掉且长时间无法恢复,消息无法发送。

1.2.2 生产者消息发送失败(网络异常)

(埋坑)我们知道调用ConfirmCallback若是ack返回false的消息未必没有到达exchange(因为confirm是异步的,在ack回来之前,Connection异常中断,ConfirmCallback立刻返回false)。但是ack返回false的消息一定未到达exchange。
于是我们可以统一处理ack=false的情况,将消息再次发送一次。但是这不可避免的会导致重复消费

当消息发送失败,一般两种方式处理这个消息:

  1. 自动重发;
  2. 系统预警人工处理;

配置文件源码:

 rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
            }
        });
        rabbitTemplate.setReturnCallback(new ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
            }
        });

我们可以看到,在ReturnCallback中,返回的参数是Message对象,我们可以获取消息内容exchangeroutingKey这些信息的。

但是在ConfirmCallback中,确是没有消息信息,只有一个correlationData[ˌkɒrəˈleɪʃn] 相关性的,并且我们看到他的日志,打印出来还是null

输出日志:

2019-03-10-00-52 [AMQP Connection 127.0.0.1:5672] [com.Configuration.MyAMQPConfig]
 [INFO] - 消息发送成功:correlationData(null),ack(false),cause
(channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no 
exchange 'exchange_direct_no' in vhost '/', class-id=60, method-id=40))

于是我们打开correlationData的源码:

可以看到里面的属性只有一个id

public class CorrelationData implements Correlation {
    private volatile String id;

    public CorrelationData() {
    }

    public CorrelationData(String id) {
        this.id = id;
    }

    public String getId() {
        return this.id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String toString() {
        return "CorrelationData [id=" + this.id + "]";
    }
}

于是我们打开RabbitTemplate send/convertAndSend方法的源码:

发现里面含有CorrelationData对象,很显然我们在发送消息的时候,消息信息和correnlationData.id属性进行了绑定,我们若是可以根据id拿到消息,那么就可以进行“重试”或者“预警”等操作了。

于是我们扩展correlationData类,将id和消息属性绑定起来。

public class CorrelationData extends 
org.springframework.amqp.rabbit.support.CorrelationData {
    //消息体
    private volatile Object message;
    //交换机
    private String exchange;
    //路由键
    private String routingKey;
    //重试次数
    private int retryCount = 0;

我们在发送消息的时候,可以发送correlationData扩展对象,在我们confirmack=false的情况下,于是我们就可以拿到消息主体了。

 @Test
    public void contextLoads() {
        Map<String, Object> map = new HashMap<>();
        Book book = new Book("西游记", "120.00");
        //使用继承扩展的CorrelationData 、id消息流水号
        CorrelationData correlationData = 
               new CorrelationData(UUID.randomUUID().toString()); 
        correlationData.setMessage(book);
        correlationData.setExchange("exchange_direct_no");
        correlationData.setRoutingKey("ord");
        try {
            rabbitTemplate.convertAndSend("exchange_direct_no", "ord", book, correlationData);
        } catch (AmqpConnectException e) {
            System.out.println("保存信息编号:" + correlationData);
        }
    }

现在我们可以拿到消息主体,也可以拿到rabbitTemplate,那么我们是否可以在confirm回调方法中再次重试?

ConfirmCallback回调函数:

public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
    private static final Logger logger = LoggerFactory.getLogger(MessageConfirmCallback.class);
    private RabbitTemplate rabbitTemplate;

    public MessageConfirmCallback(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack == true) {
            logger.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
        } else {
            if (correlationData instanceof com.Configuration.CorrelationData) {
                com.Configuration.CorrelationData messageCorrelationData = (com.Configuration.CorrelationData) correlationData;
                String exchange = messageCorrelationData.getExchange();
                Object message = messageCorrelationData.getMessage();
                String routingKey = messageCorrelationData.getRoutingKey();
                int retryCount = messageCorrelationData.getRetryCount();
                //重试次数+1
                ((com.Configuration.CorrelationData) correlationData).setRetryCount(retryCount + 1);
                rabbitTemplate.convertSendAndReceive(exchange, routingKey, message, correlationData);
            }
        }
    }
}

注意事项:
但是在主线程发送消息的过程中,rabbitMQf服务器关闭,这时候主程序和ConfirmCallback线程都会等待Connection恢复,然后重新启动rabbitmq,当应用程序重新建立connection之后,两个线程都会死锁

解决方案:
ack=fasle的情况下,可以将消息存到缓存中,定时发起任务重发。

1.2.2 生产者消息发送失败(MQ服务器异常)

对于这种情况,confirm是没有发送出去的,但是消息丢失怎么处理,但是会抛出AmqpConnectException异常,我们可以捕获该异常,然后将msgId也就是CorrelationData对象保存即可。

 @Test
    public void contextLoads() {
        Map<String, Object> map = new HashMap<>();
        Book book = new Book("西游记", "120.00");
        CorrelationData msgId=new CorrelationData();
        try {
            rabbitTemplate.convertAndSend("exchange_direct_no", "ord", book,msgId);
        }catch (AmqpConnectException e){
            System.out.println("保存信息编号:"+msgId);
        }

推荐阅读:
https://www.jianshu.com/p/9aec19a910b1
https://blog.csdn.net/qq315737546/article/details/66475103
https://www.colabug.com/2325507.html
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容