有时候为了加快mysql数据处理的速度,我们会采用多线程方式来完成,但是此时再使用 @Transactional(rollbackFor = Exception.class)
就没有多大作用了,因为每个mapper对象都会使用threadlocal关联一个数据库连接,也是说如果采用多线程执行,就会出现多个connection对象,属于不同的事务。无法回滚
解决办法:
1)我们可以在每个线程执行完成后先不提交,而是等待其他线程的执行结束。
2)如果某一个线程执行失败,则把标志位设为false。
3)判断标志位是否为true,如果是说明所有的线程都执行成功了,提交每个事务;否则回滚每个事务。
以下是示例代码:
public class BatchApplicationTest { @Autowired private UserMapper userMapper; @Autowired private PlatformTransactionManager transactionManager; @Test public void test() throws Exception { List<User> userList = buildUser(); ExecutorService executorService = Executors.newFixedThreadPool(userList.size()); CountDownLatch countDownLatch = new CountDownLatch(userList.size()); //创建线程屏障对象,让所有线程执行完成后处于同一个等待状态,也可以使用 CyclicBarrier cyclicBarrier = new CyclicBarrier(userList.size()); int i = 0; //用于统计是否存在失败线程 AtomicBoolean flag = new AtomicBoolean(true); for (; i < userList.size(); i++) { final Integer tmp = i; executorService.submit(() -> { TransactionStatus status = null; try { DefaultTransactionDefinition def = new DefaultTransactionDefinition(); // 事物隔离级别,开启新事务,这样会比较安全些。 def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); status = transactionManager.getTransaction(def); //随机设置一个线程失败并抛出异常;注释掉会全部完成,否则全部回滚 // if (tmp == userList.size() - 1) { // throw new RuntimeException(); // } // 具体的数据处理类 userMapper.insert(userList.get(tmp)); } catch (Exception e) { //如果出现异常就设置失败标志位 flag.set(false); e.printStackTrace(); } finally { try { //让所有线程都处于等待状态,不管成功还是失败 cyclicBarrier.await(); System.out.println("---" + flag.get()); //根据标志位判断事务最终执行状态 if (flag.get()) { transactionManager.commit(status); } else { transactionManager.rollback(status); } } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } countDownLatch.countDown(); } }); } //主线程等待最终结果 countDownLatch.await(); System.out.println("--end--"); } }