Spring Boot RabbitMQ实践

背景

我们现在有两个主要的系统一个是活动系统一个是奖品系统,活动系统会调用奖品系统发放奖励。

最开始两个之间只通过http直接调用,优点:开发成本低,没有多余组件引入;发放奖励实时返回;活动系统不需要管奖品是否还有剩余库存。缺点:这样就导致上游活动系统强依赖于下游的奖品系统,如果一旦奖品系统挂掉,我们活动系统也就不可用了;这里还有个bug,在调用奖品系统发放奖励,奖品系统发放成功了,但是活动系统请求超时了,就会导致提示客户的没有奖品了,但是实际奖品又发放了。

访问量上来后发直接走http肯定是不行的,所以引入了MQ将将两个系统隔离开,优点:所有发放流程异步执行,活动系统响应更快了;这两个系统就变成弱引用关系,即使奖品系统挂掉,活动系统仍能正常运行;不会出现上面说的bug了;缺点:发放奖励将会有延迟;引入MQ增加了项目复杂度,我们必须去考虑消息的丢失,重复消费等问题;活动系统需要知道奖品的库存情况。

解决方案

针对上面使用MQ发放奖励会遇到的问题,我们可以通过面的方案来解决。

消息的丢失问题

在数据库创建一张异常消息表。

  • 在发消息的时候如果出现异常,直接将消息记录到异常消息表,等待后台跑批,进行补偿发放。
  • 在发消息的时候,如果发送消息的ack回调没没有发送成功,将进行消息重发,如果重发3次还是失败,该消息就记录到异常消息表,等待后台跑批,进行补偿发放。消息的重复发送可以使用RabbitMQ的ConfirmCallback、ReturnCallback机制来实现。
  • 在消费端处理消息(调用奖品系统发放奖励)的时候,如果出现异常也将消息放到异常消息表中,等待后台跑批,进行补偿发放。如果将异常消息保存到数据库时发生了异常,则将消息放到死信队列,等待后台跑批,进行补偿发放。

这样子虽然还是不能完全杜绝消息丢失,但是绝大部分情况下是没有问题的。

重复消费问题

为每个消息生成业务流水号,将流水号和发放里的参数一起发送到奖品系统,奖品系统在发放奖励的时候先判断这个流水号是否存在,存在就表示该奖品已经发过来直接返回发放成功,如果没有就进行发放奖励操作。

活动系统需要知道奖品的库存情况。

我们在配置活动的时候会将奖品的库存放到我们活动系统,在发MQ消息之前回去判断是否有剩余库存,如果没有直接返回奖励领完了,如果有才回去发MQ消息。扣减库存可以参考基于redis实现的扣减库存

活动流程图

下面是引入MQ过后我们系统的流程图


MQ解耦系统间的依赖关系
生产者.png
消费者.png

生产者端实现

/**
 * Rabbit 发送消息
 *
 * @author yuhao.wang
 */
@Service
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {
    private final Logger logger = LoggerFactory.getLogger(RabbitSender.class);

    /**
     * Rabbit MQ 客户端
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 系统配置
     */
    @Autowired
    private SystemConfig systemConfig;

    /**
     * 发送MQ消息
     *
     * @param exchangeName 交换机名称
     * @param routingKey   路由名称
     * @param message      发送消息体
     */
    public void sendMessage(String exchangeName, String routingKey, Object message) {
        Assert.notNull(message, "message 消息体不能为NULL");
        Assert.notNull(exchangeName, "exchangeName 不能为NULL");
        Assert.notNull(routingKey, "routingKey 不能为NULL");

        // 获取CorrelationData对象
        CorrelationData correlationData = this.correlationData(message);
        correlationData.setExchange(exchangeName);
        correlationData.setRoutingKey(routingKey);
        correlationData.setMessage(message);

        logger.info("发送MQ消息,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
        // 发送消息
        this.convertAndSend(exchangeName, routingKey, message, correlationData);
    }

    /**
     * 用于实现消息发送到RabbitMQ交换器后接收ack回调。
     * 如果消息发送确认失败就进行重试。
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
        // 消息回调确认失败处理
        if (!ack && correlationData instanceof CorrelationData) {
            CorrelationData correlationDataExtends = (CorrelationData) correlationData;

            //消息发送失败,就进行重试,重试过后还不能成功就记录到数据库
            if (correlationDataExtends.getRetryCount() < systemConfig.getMqRetryCount()) {
                logger.info("MQ消息发送失败,消息重发,消息ID:{},重发次数:{},消息体:{}", correlationDataExtends.getId(),
                        correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));

                // 将重试次数加一
                correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);

                // 重发发消息
                this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
                        correlationDataExtends.getMessage(), correlationDataExtends);
            } else {
                //消息重试发送失败,将消息放到数据库等待补发
                logger.warn("MQ消息重发失败,消息入库,消息ID:{},消息体:{}", correlationData.getId(),
                        JSON.toJSONString(correlationDataExtends.getMessage()));

                // TODO 保存消息到数据库
            }
        } else {
            logger.info("消息发送成功,消息ID:{}", correlationData.getId());
        }
    }

    /**
     * 用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.error("MQ消息发送失败,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息体:{}",
                replyCode, replyText, exchange, routingKey, JSON.toJSONString(message.getBody()));

        // TODO 保存消息到数据库
    }

    /**
     * 消息相关数据(消息ID)
     *
     * @param message
     * @return
     */
    private CorrelationData correlationData(Object message) {

        return new CorrelationData(UUID.randomUUID().toString(), message);
    }

    /**
     * 发送消息
     *
     * @param exchange        交换机名称
     * @param routingKey      路由key
     * @param message         消息内容
     * @param correlationData 消息相关数据(消息ID)
     * @throws AmqpException
     */
    private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) throws AmqpException {
        try {
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        } catch (Exception e) {
            logger.error("MQ消息发送异常,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                    correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);

            // TODO 保存消息到数据库
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
}

生产者端使用ConfirmCallback和ReturnCallback回调机制,最大限度的保证消息不丢失,对原有CorrelationData类进行扩展,来实现消息的重发,具体请看源码。

消费者端实现


/**
 * 发放优惠券的MQ处理
 *
 * @author yuhao.wang
 */
@Service
@ConditionalOnClass({RabbitTemplate.class})
public class SendMessageListener {

    private final Logger logger = LoggerFactory.getLogger(SendMessageListener.class);

    @RabbitListener(queues = RabbitConstants.QUEUE_NAME_SEND_COUPON)
    public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception {
        logger.info("[{}]处理发放优惠券奖励消息队列接收数据,消息ID:{},消息体:{}", RabbitConstants.QUEUE_NAME_SEND_COUPON,
                message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage));

        try {
            // 参数校验
            Assert.notNull(sendMessage, "sendMessage 消息体不能为NULL");

            // TODO 处理消息

            // 确认消息已经消费成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            logger.error("MQ消息处理异常,消息ID:{},消息体:{}", message.getMessageProperties().getCorrelationIdString(),
                    JSON.toJSONString(sendMessage), e);

            try {
                // TODO 保存消息到数据库

                // 确认消息已经消费成功
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e1) {
                logger.error("保存异常MQ消息到数据库异常,放到死性队列,消息ID:{}", message.getMessageProperties().getCorrelationIdString());
                // 确认消息将消息放到死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }
}

消费者端主要做了消息消费失败的容错处理。

源码

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-rabbitmq 工程

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容