Spring多线程事务的处理方式

有时候为了加快mysql数据处理的速度,我们会采用多线程方式来完成,但是此时再使用 @Transactional(rollbackFor = Exception.class)就没有多大作用了,因为每个mapper对象都会使用threadlocal关联一个数据库连接,也是说如果采用多线程执行,就会出现多个connection对象,属于不同的事务。无法回滚
解决办法
1)我们可以在每个线程执行完成后先不提交,而是等待其他线程的执行结束。

2)如果某一个线程执行失败,则把标志位设为false。

3)判断标志位是否为true,如果是说明所有的线程都执行成功了,提交每个事务;否则回滚每个事务。
以下是示例代码:

public class BatchApplicationTest {
   @Autowired
   private UserMapper userMapper;
   @Autowired
   private PlatformTransactionManager transactionManager;

   @Test
   public void test() throws Exception {
       List<User> userList = buildUser();

       ExecutorService executorService = Executors.newFixedThreadPool(userList.size());
       CountDownLatch countDownLatch = new CountDownLatch(userList.size());
       //创建线程屏障对象,让所有线程执行完成后处于同一个等待状态,也可以使用
       CyclicBarrier cyclicBarrier = new CyclicBarrier(userList.size());

       int i = 0;
       //用于统计是否存在失败线程
       AtomicBoolean flag = new AtomicBoolean(true);
       for (; i < userList.size(); i++) {
           final Integer tmp = i;
           executorService.submit(() -> {
               TransactionStatus status = null;
               try {
                   DefaultTransactionDefinition def = new DefaultTransactionDefinition();
                   // 事物隔离级别,开启新事务,这样会比较安全些。
                   def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); 
                   status = transactionManager.getTransaction(def); 

                     //随机设置一个线程失败并抛出异常;注释掉会全部完成,否则全部回滚
//                    if (tmp == userList.size() - 1) {
//                        throw new RuntimeException();
//                    }

                   // 具体的数据处理类
                   userMapper.insert(userList.get(tmp));
               } catch (Exception e) {
                  //如果出现异常就设置失败标志位
                   flag.set(false);
                   e.printStackTrace();
               } finally {
                   try {
                       //让所有线程都处于等待状态,不管成功还是失败
                       cyclicBarrier.await();

                       System.out.println("---" + flag.get());
                       //根据标志位判断事务最终执行状态
                       if (flag.get()) {
                           transactionManager.commit(status);
                       } else {
                           transactionManager.rollback(status);
                       }
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   } catch (BrokenBarrierException e) {
                       e.printStackTrace();
                   }
                   countDownLatch.countDown();
               }
           });
       }
       //主线程等待最终结果
       countDownLatch.await();
       System.out.println("--end--");
   }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容