Java微服务下的分布式事务介绍及其解决方案2

1.前言

本文将详细介绍分布式的解决方案–消息队列实现分布式事务的解决方案,需要大家对我第一篇对分布式事务的介绍来了解下,会更清楚一点哦,第一篇博客的地址分布式事务的介绍

2.业务场景介绍

我们模拟慕课网付费课程的下单,你在慕课网买了视频后,你的学习列表要订单服务要更新你支付的状态,此外,学习服务要有你的添加选课信息

3.解决方案

下面我详细介绍下,这个解决方案
1、支付成功后,订单服务向本地数据库更新订单状态并向消息表写入“添加选课消息”,通过本地数据库保证订单状态和添加选课消息的事务。。
2、定时任务扫描消息表,取出“添加选课任务“并发向MQ。
3、学习服务接收到添加选课的消息,先查询本地数据库的历史消息表是否存在消息,存在则说明已经添加选课,否则向本地数据库添加选课,并向历史消息表添加选课消息。这里选课表和历史消息表在同一个数据库,通过本地事务保证。
4、学习服务接收到添加选课的消息,通过查询消息表判断如果已经添加选课也向MQ发送“完成添加选课任务的消息”,否则则添加选课,完成后向MQ发送“完成添加选课任务的消息”,
5、订单服务接收到完成选课的消息后删除订单数据库中消息表的“添加选课消息”,为保证后期对账将消息表的消息先添加到历史消息表再删除消息,表示此消息已经完成。

3需要用到的几个表

1.待处理的任务表,在订单系统里面保存



2.已经完成的任务表:订单和学习系统都要有



3选课信息表

4.解决方案的伪代码

3.1订单服务定时发送消息

定时任务发送消息流程如下:
0、每隔1分钟扫描一次任务表。
1、定时任务扫描task表,一次取出多个任务,取出超过1分钟未处理的任务
2、考虑订单服务可能集群部署,为避免重复发送任务使用乐观锁的方式每次从任务列表取出要处理的任务
3、任务发送完毕更新任务发送时间
关于任务表的添加:
正常的流程是订单支付成功向更新订单支付状态并向任务表写入“添加选课任务”。
(没有写用户下单,直接默认有用户下单)
订单服务定时发送消息的代码(用的是SpringTask)

 //每隔一分钟扫描消息表,向mq发送消息
    @Scheduled(cron="0/3 * * * * *")
    public void sendChoosecourseTask(){
        //取出当前一分钟之前的时间
        Calendar calendar  =new GregorianCalendar();
        calendar.setTime(new Date());
        calendar.add(GregorianCalendar.MINUTE,-1);
        Date time = calendar.getTime();
        List<XcTask> taskList = taskService.findTaskList(time, 100);
        //遍历任务表,发送选课信息
        if (taskList!=null && taskList.size()>0){
            for (XcTask xcTask : taskList){
                //发送选课信息
                taskService.publish(xcTask,xcTask.getMqExchange(),xcTask.getMqRoutingkey());
                //记录日子
                LOGGER.info("send choose course task id:{}",xcTask.getId());
            }
        }
    }

此时已经检测到有人下单支付了,就要发送,取出“添加选课任务“并发向MQ,代码如下

//发送消息
    @Transactional
    public void publish(XcTask xcTask,String ex,String routingKey){
        //查询任务,如果有任务
        Optional<XcTask> optional = xcTaskRepository.findById(xcTask.getId());
        if (optional.isPresent()){
            //发送消息
            rabbitTemplate.convertAndSend(ex,routingKey,xcTask);
            //更新时间
            XcTask xcTask1 = optional.get();
            xcTask1.setUpdateTime(new Date());
            xcTaskRepository.save(xcTask1);
        }
    }

这时候订单系统前期要处理的事(监听用户下单,发送消息到mq)已经完成了,下面就需要学习系统的服务来处理事情了(接收消息,添加选课,发送添加选课成功)

添加选课的代码:
备注:向xc_learning_course添加记录,为保证不重复添加选课,先查询历史任务表,如果从历史任务表查询不到任务说明此任务还没有处理,此时则添加选课并添加历史任务。

 //添加选课
    @Transactional
    public ResponseResult addCourse(String userId, String courseId, String valid, Date startTime, Date endTime, XcTask xcTask){
        //校验参数
        //查询历史任务,如果历史任务有,说明已经添加过选课了,没有则添加
        //查询历史任务
        Optional<XcTaskHis> optional = xcTaskHisRepository.findById(xcTask.getId());
        if(optional.isPresent()){
            return new ResponseResult(CommonCode.SUCCESS);
        }
        //历史表里面没有,要添加课程
        XcLearningCourse xcLearningCourse = xcLearningCourseRepository.findXcLearningCourseByUserIdAndCourseId(userId, courseId);
        if (xcLearningCourse == null) {//没有选课记录则添加
            xcLearningCourse = new XcLearningCourse();
            xcLearningCourse.setUserId(userId);
            xcLearningCourse.setCourseId(courseId);
            xcLearningCourse.setValid(valid);
            xcLearningCourse.setStartTime(startTime);
            xcLearningCourse.setEndTime(endTime);
            xcLearningCourse.setStatus("501001");
            xcLearningCourseRepository.save(xcLearningCourse);
        } else {//有选课记录则更新日期
            xcLearningCourse.setValid(valid);
            xcLearningCourse.setStartTime(startTime);
            xcLearningCourse.setEndTime(endTime);
            xcLearningCourse.setStatus("501001");
            xcLearningCourseRepository.save(xcLearningCourse);
        }
        //向历史记录表插入记录
        Optional<XcTaskHis> optional1= xcTaskHisRepository.findById(xcTask.getId());
        if(!optional1.isPresent()){
            //添加历史任务
            XcTaskHis xcTaskHis = new XcTaskHis();
            BeanUtils.copyProperties(xcTask,xcTaskHis);
            xcTaskHisRepository.save(xcTaskHis);
        }
        return new ResponseResult(CommonCode.SUCCESS);
    }

接收选课的消息:
接收到添加选课的消息调用添加选课方法完成添加选课,并发送完成选课消息。

@RabbitListener(queues = RabbitMQConfig.XC_LEARNING_ADDCHOOSECOURSE)
    public void receiveChoosecourseTask(XcTask xcTask){
        //取出消息内容
        //取出消息的内容
        String requestBody = xcTask.getRequestBody();
        Map map = JSON.parseObject(requestBody, Map.class);
        String userId = (String) map.get("userId");
        String courseId = (String) map.get("courseId");
        //添加选课
        ResponseResult responseResult = learningService.addCourse(userId, courseId, null, null, null, xcTask);
        if (responseResult.isSuccess()){
            //添加选课成功,向mq发送完成添加选课的消息
            rabbitTemplate.convertAndSend(RabbitMQConfig.EX_LEARNING_ADDCHOOSECOURSE,RabbitMQConfig.XC_LEARNING_FINISHADDCHOOSECOURSE_KEY,xcTask);
        }
    }

到这时候,学习系统做的事(接收订单系统发过来的消息,添加课程信息,给订单信息发送已经添加课程成功的消息),接下来就要回到订单系统来接收添加课程成功的消息了,将任务从当前任务表删除,将完成的任务添加到完成任务表。

/**
     * 完成任务
     * @param xcTask
     */
    @RabbitListener(queues = RabbitMQConfig.XC_LEARNING_FINISHADDCHOOSECOURSE)
    public void receiveFinishChoosecourseTask(XcTask xcTask){
        if(xcTask!=null && StringUtils.isNotEmpty(xcTask.getId())){
            taskService.finishTask(xcTask.getId());
        }
    }



//下面是删除任务的代码
   //删除任务
    @Transactional
    public void finishTask(String taskId){
        Optional<XcTask> taskOptional = xcTaskRepository.findById(taskId);
        if (taskOptional.isPresent()){
            XcTask xcTask = taskOptional.get();
            xcTask.setDeleteTime(new Date());
            XcTaskHis xcTaskHis = new XcTaskHis();
            BeanUtils.copyProperties(xcTask,xcTaskHis);
            //把记录添加到历史任务里面,为了以前对账能够清楚点,即把任务表的数据复制到历史任务1表里面
            xcTaskHisRepository.save(xcTaskHis);
            xcTaskRepository.delete(xcTask);
        }
    }

总结:我觉得分布式事务的处理需要对业务很了解才能够做的更好,不知道我的讲解能给大家带来什么收获,我希望大家能够多看几遍我这个流程,感谢。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352