多线程事务控制

背景

      在项目中使用多线程抓取第三方数据执行数据入库时,如果某个子线程执行异常,其他子线事务全部回滚,spring对多线程无法进行事务控制,是因为多线程底层连接数据库的时候,是使用的线程变量(TheadLocal),线程之间事务隔离,每个线程有自己的连接,事务肯定不是同一个了。

解决办法

     思想就是使用两个CountDownLatch实现子线程的二段提交

    步骤:

  1、主线程将任务分发给子线程,然后使用childMonitor.await();阻塞主线程,等待所有子线程处理向数据库中插入的业务,并使用BlockingDeque存储线程的返回结果。

     2、使用childMonitor.countDown()释放子线程锁定,同时使用mainMonitor.await();阻塞子线程,将程序的控制权交还给主线程。

     3、主线程检查子线程执行任务的结果,若有失败结果出现,主线程标记状态告知子线程回滚,然后使用mainMonitor.countDown();将程序控制权再次交给子线程,子线程检测回滚标志,判断是否回滚。

代码实现

线程池工具类

publicclassThreadPoolTool {

    /**    * 多线程任务

    * @param transactionManager

    * @param data

    * @param threadCount

    * @param params

    * @param clazz

    */publicvoidexcuteTask(DataSourceTransactionManager transactionManager, List data,intthreadCount, Map params, Class clazz) {

        if(data ==null|| data.size() == 0) {

            return;

        }

        intbatch = 0;

        ExecutorService executor = Executors.newFixedThreadPool(threadCount);

        //监控子线程的任务执行CountDownLatch childMonitor =new CountDownLatch(threadCount);

        //监控主线程,是否需要回滚CountDownLatch mainMonitor =newCountDownLatch(1);

        //存储任务的返回结果,返回true表示不需要回滚,反之,则回滚BlockingDeque results =newLinkedBlockingDeque(threadCount);

        RollBack rollback =newRollBack(false);

        try {

            LinkedBlockingQueue queue = splitQueue(data, threadCount);

            while(true) {

                List list = queue.poll();

                if(list ==null) {

                    break;

                }

                batch++;

                params.put("batch", batch);

                Constructor constructor = clazz.getConstructor(newClass[]{CountDownLatch.class, CountDownLatch.class, BlockingDeque.class, RollBack.class, DataSourceTransactionManager.class, Object.class, Map.class});

                ThreadTask task = (ThreadTask) constructor.newInstance(childMonitor, mainMonitor, results, rollback, transactionManager, list, params);

                executor.execute(task);

            }

            //  1、主线程将任务分发给子线程,然后使用childMonitor.await();阻塞主线程,等待所有子线程处理向数据库中插入的业务。            childMonitor.await();

            System.out.println("主线程开始执行任务");

            //根据返回结果来确定是否回滚for(inti = 0; i < threadCount; i++) {

                Boolean result = results.take();

                if(!result) {

                    //有线程执行异常,需要回滚子线程rollback.setNeedRoolBack(true);

                }

            }

            //  3、主线程检查子线程执行任务的结果,若有失败结果出现,主线程标记状态告知子线程回滚,然后使用mainMonitor.countDown();将程序控制权再次交给子线程,子线程检测回滚标志,判断是否回滚。            mainMonitor.countDown();

        } catch (Exception e) {

            log.error(e.getMessage());

        } finally {

            //关闭线程池,释放资源            executor.shutdown();

        }

    }

    /**    * 队列拆分

    *

    * @param data 需要执行的数据集合

    * @param threadCount 核心线程数

    * @return*/privateLinkedBlockingQueue> splitQueue(List data,int threadCount) {

        LinkedBlockingQueue> queueBatch =new LinkedBlockingQueue();

        inttotal = data.size();

        intoneSize = total / threadCount;

        int start;

        int end;

        for(inti = 0; i < threadCount; i++) {

            start = i * oneSize;

            end = (i + 1) * oneSize;

            if(i < threadCount - 1) {

                queueBatch.add(data.subList(start, end));

            } else {

                queueBatch.add(data.subList(start, data.size()));

            }

        }

        return queueBatch;

    }

}


子线程任务执行类

publicabstractclassThreadTaskimplements Runnable {

    /**    * 监控子任务的执行

    */private CountDownLatch childMonitor;

    /**    * 监控主线程

    */private CountDownLatch mainMonitor;

    /**    * 存储线程的返回结果

    */privateBlockingDeque resultList;

    /**    * 回滚类

    */private RollBack rollback;

    privateMap params;

    protected Object obj;

    protected DataSourceTransactionManager transactionManager;

    protected TransactionStatus status;

    publicThreadTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj,Map params) {

        this.childMonitor = childCountDown;

        this.mainMonitor = mainCountDown;

        this.resultList = result;

        this.rollback = rollback;

        this.transactionManager = transactionManager;

        this.obj = obj;

        this.params = params;

        initParam();

    }

    /**    * 事务回滚

    */privatevoid rollBack() {

        System.out.println(Thread.currentThread().getName()+"开始回滚");

        transactionManager.rollback(status);

    }

    /**    * 事务提交

    */privatevoid submit() {

        System.out.println(Thread.currentThread().getName()+"提交事务");

        transactionManager.commit(status);

    }

    protected Object getParam(String key){

        return params.get(key);

    }

    publicabstractvoid initParam();

    /**    * 执行任务,返回false表示任务执行错误,需要回滚

    * @return*/publicabstractboolean processTask();

    @Override

    publicvoid run() {

        System.out.println(Thread.currentThread().getName()+"子线程开始执行任务");

        DefaultTransactionDefinition def =new DefaultTransactionDefinition();

        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);

        status = transactionManager.getTransaction(def);

        Boolean result = processTask();

        //向队列中添加处理结果        resultList.add(result);

        //2、使用childMonitor.countDown()释放子线程锁定,同时使用mainMonitor.await();阻塞子线程,将程序的控制权交还给主线程。        childMonitor.countDown();

        try {

            //等待主线程的判断逻辑执行完,执行下面的是否回滚逻辑            mainMonitor.await();

        } catch (Exception e) {

          log.error(e.getMessage());

        }

        System.out.println(Thread.currentThread().getName()+"子线程执行剩下的任务");

        //3、主线程检查子线程执行任务的结果,若有失败结果出现,主线程标记状态告知子线程回滚,然后使用mainMonitor.countDown();将程序控制权再次交给子线程,子线程检测回滚标志,判断是否回滚。if (rollback.isNeedRoolBack()) {

            rollBack();

        }else{

            //事务提交            submit();

        }

    }


回滚标记类


@Datapublicclass RollBack {

    publicRollBack(boolean needRoolBack) {

        this.needRoolBack = needRoolBack;

    }

    privateboolean needRoolBack;

}

 使用线程池工具:

  1,首先建立自己的任务执行类 并且 extends ThreadTask ,实现initParam()和processTask()方法

/** * 多线程处理任务类

*/publicclassTestTaskextends ThreadTask {

    /**      分批处理的数据

    */privateList objectList;

    /**    * 可能需要注入的某些服务

    */private TestService testService;

    publicTestTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj, Map params) {

        super(childCountDown, mainCountDown, result, rollback, transactionManager, obj, params);

    }

    @Override

    publicvoid initParam() {

        this.objectList = (List) getParam("objectList");

        this.testService = (TestService) getParam("testService");

    }

    /**    * 执行任务,返回false表示任务执行错误,需要回滚

    * @return*/    @Override

    publicboolean processTask() {

        try {

            for (Object o : objectList) {

                testService.list();

                System.out.println(o.toString()+"执行自己的多线程任务逻辑");

            }

            returntrue;

        } catch (Exception e) {

            returnfalse;

        }

    }

}

2,编写主任务执行方法

/**    * 执行多线程任务方法

    */publicvoid testThreadTask() {

        try {

                intthreadCount = 5;

                //需要分批处理的数据List objectList =newArrayList<>();

                Map params =newHashMap<>();

                params.put("objectList",objectList);

                params.put("testService",testService);

                //调用多线程工具方法threadPoolTool.excuteTask(transactionManager,objectList,threadCount,params, TestTask.class);

        }catch (Exception e){

            thrownew RuntimeException(e.getMessage());

        }

    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容