基于Spring的RabbitTemplate实现消息事务

基于Spring的RabbitTemplate实现消息事务

分布式系统中常见一种情况,就是数据库操作成功之后发送MQ消息。

分布式消息常见问题

数据库操作之后发送MQ消息通常会遇到一些问题,理论上消息和事务要同时成功才算一个完整的事务,那到底该把发送MQ放到数据库事务之外还是数据库事务之内?

下面分析下可能存在的问题:

  1. 消息放到数据库事务之内
    1. 事务处理异常,回滚事务——ok
    2. 消息发送异常,回滚事务——ok
    3. 消息发送成功提交事务——ok
    4. 消息发送成功提交失败——不ok,不好处理,一般MQ也不能撤销消息,而且消费端可能已经在处理了
  2. 消息放到数据库事务之外
    1. 事务处理异常,回滚事务——ok,不用发消息
    2. 事务处理成功,发送消息成功——ok
    3. 事务处理成功,消息发送失败——不ok,消息丢失
  3. 在2的基础上增加本地消息表,放到同一个数据库,业务操作完成之后把需要发送的MQ消息插入本地消息表中
    1. 事务处理异常,回滚事务——ok,不用发消息,消息表也回滚
    2. 事务处理成功,发送消息成功
      1. 更新消息表状态成功——ok
      2. 更新消息表状态失败——ok(定时任务补偿)
    3. 事务处理成功,消息发送失败——ok(定时任务补偿)
    4. 通过定时扫描失败消息重新发送MQ
      1. 重发消息需保证幂等性——ok

分布式消息事务处理

常见的处理逻辑是本地消息表+消息重试补偿

image

RabbitTemplate配置和使用

我们使用RabbitMQ作为消息队列,因此我们可以使用spring-rabbit帮助实现mq发送(前提是已经安装了RabbitMQ了)。

      <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>${spring-amqp.version}</version>
      </dependency>

application.yml配置:

spring:
  rabbitmq:
    virtual-host: /
    username: appws
    password: xxxxxx
    addresses: 10.181.57.239:5672
    publisherConfirms: true

然后就可以注入RabbitTemplate了,代码片段如下:

// 消息队列配置
public static final String TEST_EXCHANGE = "test.exchange";
public static final String TEST_QUEUE = "test.queue";
public static final String TEST_ROUTEKEY = "test.routekey";
@Bean
public Exchange testExchange() {
    return new TopicExchange(TEST_EXCHANGE);
}
@Bean
public Queue testQueue() {
    return new Queue(TEST_QUEUE, true);
}
@Bean
public Binding testBinding(@Qualifier("testQueue") Queue queue, @Qualifier("testExchange") Exchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(TEST_ROUTEKEY).noargs();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void test(){
    // 一些DB操作
    rabbitTemplate.convertAndSend(TEST_EXCHANGE, TEST_ROUTEKEY, param);
    // 其他DB操作等
}

拦截器处理RabbitTemplate事务

​ 从上面的代码片段可以看出,业务方法使用了@Transactional注解使用了事务之后,rabbitTemplate.convertAndSend方法并没有专门放到事务之外,这个时候一旦有异常,可能造成消息发送成功,但是事务异常回滚的问题。要解决这个问题,需要把rabbitTemplate.convertAndSend移动到事务之外,但是通常都配置的声明式事务,不能简单的把代码移动到外面,这个需要利用Spring事务的一个特性TransactionSynchronization,注册一个同步钩子,自动把相关代码放到事务完成之后执行,我们使用拦截器拦截rabbitTemplate.convertAndSend方法,实现不用修改现有代码自动把发送MQ消息逻辑移到事务之外:

RabbitTemplateTransactionInterceptor.java代码详情:

@Aspect
@Order(50)
@Component
public class RabbitTemplateTransactionInterceptor {
    /**
     * 日志
     */
    private static final Logger logger = LoggerFactory.getLogger(RabbitTemplateTransactionInterceptor.class);
    /**
     * 代理convertAndSend方法够用
     */
    @Pointcut("execution(* org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(String, String, Object))")
    public void convertAndSend() {
        // noop
    }
    @Around("convertAndSend()")
    public void aroundMethod(ProceedingJoinPoint joinPoint) throws Throwable {
        Object[] args = joinPoint.getArgs();
        if (TransactionSynchronizationManager.isSynchronizationActive()
                && TransactionSynchronizationManager.isActualTransactionActive() // 事务开启判断
                && args.length == 3) {
            logger.info("拦截RabbitTemplate发送:{}", args);
            // 注册同步器
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() { // 事务提交之后执行
                    try {
                        joinPoint.proceed();
                    } catch (Throwable throwable) {
                        throw new RuntimeException(throwable);
                    }
                }
            });
        } else { // 没有开启事务或者参数不正确就直接执行,不处理
            joinPoint.proceed();
        }
    }
}

注:定时扫描和消息重试在另外的逻辑中。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335

推荐阅读更多精彩内容