RabbitMQ 高可用优化

RabbitMQ的主要作用基本上可以用8个字概括,削峰填谷异步解耦。但是引入MQ我们也不得不考虑引入MQ后带来的一些问题,如消息丢失。

在一些业务场景不一样,处理方式也就不一样,比如发短信,日志收集我们主要看吞吐量所以对消息丢失容忍度较高,这类场景基本上不用花太多时间在消息丢失问题上。另外一种,如我们用MQ来做分布式事务,续保计算,提成的计算,这类业务对消息丢失容忍度较底,所以我们一定要考虑消息丢失的问题。这次分享的内容是怎么来最大限制的防止消息丢失,顺带提一下消息的重发和重复消费。

RabbitMQ 模型图

RabbitMQ 模型.jpg

ConfirmCallback和ReturnCallback

在这个里我们主要实现了ConfirmCallback和ReturnCallback两个接口。这两个接口主要是用来发送消息后回调的。因为rabbit发送消息是只管发,至于发没发成功,发送方法不管。

  • ConfirmCallback:当消息成功到达exchange的时候触发的ack回调。
  • ReturnCallback:当消息成功到达exchange,但是没有队列与之绑定的时候触发的ack回调。发生网络分区会出现这种情况。

在这里一定要把这两个开关打开, publisher-confirms="true" publisher-returns="true"。

生产者端使用ConfirmCallback和ReturnCallback回调机制,最大限度的保证消息不丢失,对原有CorrelationData类进行扩展,来实现消息的重发,具体请看源码。

消息的日志链路跟踪

使用MQ来解耦服务,异步化处理一些复杂耗时逻辑,但是也带来了一个问题。由于异步化以后,排查问题就很不方便了,根本不知道这个消息什么时候消费,消费的日志也很不好排查。所以引入了Slf4j MDC机制将主线程的日志链路和消息的日志链路连起来,方便MQ问题的排查。

RabbitSender

import com.alibaba.fastjson.JSON;
import com.wlqq.insurance.common.enums.MetricNameEnum;
import com.wlqq.insurance.common.enums.SystemTypeEnum;
import com.wlqq.insurance.common.log.core.FisLoggerFactory;
import com.wlqq.insurance.common.mq.CorrelationData;
import com.wlqq.insurance.common.service.AlertService;
import org.slf4j.Logger;
import org.slf4j.MDC;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.util.UUID;

/**
 * Rabbit 发送消息
 *
 * @author yuhao.wang
 */
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {
    private final Logger logger = FisLoggerFactory.getLogger(RabbitSender.class);

    @Value("${mq.retry.count}")
    private int mqRetryCount;

    /**
     * 告警服务
     */
    @Autowired
    private AlertService alertService;

    /**
     * Rabbit MQ 客户端
     */
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送MQ消息,异步
     *
     * @param exchangeName 交换机名称
     * @param routingKey   路由名称
     * @param message      发送消息体
     */
    public void sendMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
        Assert.notNull(message, "message 消息体不能为NULL");
        Assert.notNull(exchangeName, "exchangeName 不能为NULL");
        Assert.notNull(routingKey, "routingKey 不能为NULL");
        // 获取CorrelationData对象
        CorrelationData correlationData = this.correlationData(message, message.getMessageId());
        correlationData.setExchange(exchangeName);
        correlationData.setRoutingKey(routingKey);
        correlationData.setMessage(message);

        logger.info("发送MQ消息,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
        // 发送消息
        this.convertAndSend(exchangeName, routingKey, message, correlationData);
    }

    /**
     * RPC方式,发送MQ消息
     *
     * @param exchangeName 交换机名称
     * @param routingKey   路由名称
     * @param message      发送消息体
     */
    public void sendAndReceiveMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
        Assert.notNull(message, "message 消息体不能为NULL");
        Assert.notNull(exchangeName, "exchangeName 不能为NULL");
        Assert.notNull(routingKey, "routingKey 不能为NULL");
        // 获取CorrelationData对象
        CorrelationData correlationData = this.correlationData(message, message.getMessageId());
        correlationData.setExchange(exchangeName);
        correlationData.setRoutingKey(routingKey);
        correlationData.setMessage(message);

        logger.info("发送MQ消息,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);

        rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message);
    }

    /**
     * 用于实现消息发送到RabbitMQ交换器后接收ack回调。
     * 如果消息发送确认失败就进行重试。
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
        CorrelationData correlationDataExtends = null;
        if (correlationData instanceof CorrelationData) {
            correlationDataExtends = (CorrelationData) correlationData;
            if (correlationDataExtends.getMdcContainer() != null) {
                // 日志链路跟踪
                MDC.setContextMap(correlationDataExtends.getMdcContainer());
            }
        }

        // 消息回调确认失败处理
        if (!ack) {
            if (correlationDataExtends != null) {
                //消息发送失败,就进行重试,重试过后还不能成功就记录到数据库
                if (correlationDataExtends.getRetryCount() < mqRetryCount) {
                    logger.info("MQ消息发送失败,消息重发,消息ID:{},重发次数:{},消息体:{}", correlationDataExtends.getId(),
                            correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));

                    // 将重试次数加一
                    correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);

                    // 重发发消息
                    this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
                            correlationDataExtends.getMessage(), correlationDataExtends);
                } else {
                    //消息重试发送失败,将消息放到数据库等待补发
                    logger.error("MQ消息重发失败,消息ID:{},消息体:{}", correlationData.getId(),
                            JSON.toJSONString(correlationDataExtends.getMessage()));

                    alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(),
                            correlationDataExtends.getExchange(), null);
                }
            }
        } else {
            logger.info("消息发送成功,消息ID:{}", correlationData.getId());
        }
    }

    /**
     * 用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
     * 在脑裂的情况下会出现这种情况。
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化消息
        Object msg = rabbitTemplate.getMessageConverter().fromMessage(message);
        if (msg instanceof com.wlqq.insurance.common.mq.message.Message) {
            // 日志链路跟踪
            MDC.setContextMap(((com.wlqq.insurance.common.mq.message.Message) msg).getMdcContainer());
        }

        logger.error("MQ消息发送失败,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息体:{}",
                replyCode, replyText, exchange, routingKey, JSON.toJSONString(msg));

        alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
    }

    /**
     * 消息相关数据(消息ID)
     *
     * @param message   消息体
     * @param messageId 消息ID
     * @return
     */
    private CorrelationData correlationData(Object message, String messageId) {
        // 消息ID默认使用UUID
        if (StringUtils.isEmpty(messageId)) {
            messageId = UUID.randomUUID().toString();
        }
        return new CorrelationData(messageId, message);
    }

    /**
     * 发送消息
     *
     * @param exchange        交换机名称
     * @param routingKey      路由key
     * @param message         消息内容
     * @param correlationData 消息相关数据(消息ID)
     * @throws AmqpException
     */
    private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) {
        try {
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        } catch (Exception e) {
            logger.error("MQ消息发送异常,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                    correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);

            alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
}

CorrelationData

import lombok.Data;
import org.slf4j.MDC;

import java.util.Map;

/**
 * 发送消息的相关数据
 *
 * @author yuhao.wang
 */
@Data
public class CorrelationData extends org.springframework.amqp.rabbit.support.CorrelationData {


    /**
     * MDC容器
     * 获取父线程MDC中的内容,做日志链路
     */
    private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();

    /**
     * 消息体
     */
    private volatile Object message;

    /**
     * 交换机名称
     */
    private String exchange;

    /**
     * 路由key
     */
    private String routingKey;

    /**
     * 重试次数
     */
    private int retryCount = 0;

    public CorrelationData(String id) {
        super(id);
    }

    public CorrelationData(String id, Object data) {
        this(id);
        this.message = data;
    }
}

Message

/**
 * MQ消息的父类消息体
 *
 * @author yuhao.wang
 */
@Data
public class Message implements Serializable {
    private static final long serialVersionUID = -4731326195678504565L;

    /**
     * MDC容器
     * 获取父线程MDC中的内容,做日志链路
     */
    private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();

    /**
     * 消息ID(消息的唯一标示)
     */
    private String messageId;
}

AbstractConsumer

/**
 * 默认消费者
 *
 * @author yuhao.wang3
 */
public abstract class AbstractConsumer implements MessageListener {
    private static final Logger LOGGER = FisLoggerFactory.getLogger(AbstractConsumer.class);

    @Override
    public void onMessage(Message msg) {
        String body = null;

        try {
            // 日志链路跟踪逻辑
            body = new String(msg.getBody(), "utf-8");
            DefaultMessage message = JSON.parseObject(body, DefaultMessage.class);
            Map<String, String> container = message.getMdcContainer();
            if (container != null) {
                // 日志链路跟踪
                MDC.setContextMap(message.getMdcContainer());
            }
        } catch (Exception e) {
            LOGGER.warn("没有找到MQ消息日志链路数据,无法做日志链路追踪");
        }

        try {
            // 处理消息逻辑
            doMessage(msg);
            LOGGER.info("成功处理MQ消息, 消息体:{}", body);
        } catch (Exception e) {
            LOGGER.error("处理MQ消息异常 {}, 消息体:{}", JSON.toJSONString(msg), body, e);
        }
    }

    /**
     * 处理消息的实现方法
     *
     * @param msg
     */
    public abstract void doMessage(Message msg);
}

源码

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-rabbitmq 工程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,357评论 2 34
  • 姓名:周小蓬 16019110037 转载自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw阅读 34,721评论 13 425
  • “ 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列...
    落羽成霜丶阅读 3,984评论 1 41
  • 消息队列设计精要 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终...
    meng_philip123阅读 1,511评论 1 25
  • 从A市做完采访,我独自驱车回家的时候,已是黄昏。 太阳收敛起了灼热的光芒,藏匿到了山的另一头。天空呈现出灰暗的蓝色...
    胡岱阅读 883评论 7 6