8-基于Spring的框架-事务——8-3 事务工作详情(二) 具体工作逻辑

概要

过度

我们上文介绍了Spring-tx中对类扫描的相关操作,在返回true后AOP会接受增强操作,我们需要关注的是增强操作后调用链上事务的相关处理逻辑,这也是本文要着重记录的。

内容简介

本文记录事务调用链的相关处理逻辑。

所属环节

事物具体工作逻辑

上下环节

上文:事物注册相关逻辑

下文:无

源码解析

入口

我们上文介绍Spring-tx在解析标签时向上下文注册了BeanFactoryTransactionAttributeSourceAdvisor方法,它集成了用于保存事物注解属性的TransactionAttributeSource和用来处理事物的TransactionInterceptor

我们先看BeanFactoryTransactionAttributeSourceAdvisor暴露Advice的逻辑:

public Advice getAdvice() {
  Advice advice = this.advice;
  if (advice != null) {
    return advice;
  }

  Assert.state(this.adviceBeanName != null, "'adviceBeanName' must be specified");
  Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve 'adviceBeanName'");

  if (this.beanFactory.isSingleton(this.adviceBeanName)) {
    // Rely on singleton semantics provided by the factory.
    advice = this.beanFactory.getBean(this.adviceBeanName, Advice.class);
    this.advice = advice;
    return advice;
  } else {
    // No singleton guarantees from the factory -> let's lock locally but
    // reuse the factory's singleton lock, just in case a lazy dependency
    // of our advice bean happens to trigger the singleton lock implicitly...
    synchronized (this.adviceMonitor) {
      advice = this.advice;
      if (advice == null) {
        advice = this.beanFactory.getBean(this.adviceBeanName, Advice.class);
        this.advice = advice;
      }
      return advice;
    }
  }
}

Emmm,基本没有什么特殊逻辑,我们直接看TransactionInterceptor吧。

TransactionInterceptor调用逻辑

我们先看它的继承树:

1.png

我们为了思路清晰,还是从调用的入口开始看。

invoke()

public Object invoke(MethodInvocation invocation) throws Throwable {
  // Work out the target class: may be {@code null}.
  // The TransactionAttributeSource should be passed the target class
  // as well as the method, which may be from an interface.
  // 因为后面我们拿到事务相关配置属性时需要调用目标的信息,所以我们需要先获得被增强的类、方法信息
  // 根据当前情况得到道理的目标对象。

  // 后面要根据当前情况获得该目标(方法级别)对应的事务配置(就是你的@Transactional中的属性)
  // (事务配置的获得规则前面进行了获得并缓存)
  Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

  // Adapt to TransactionAspectSupport's invokeWithinTransaction...
  return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

我们继续看事物相关的处理:

// 一个基于 around 增强器的通用事务委托,可以处理 CallbackPreferringPlatformTransactionManager、PlatformTransactionManager
// 两种事务管理器【有时间可以看看上那两个各自的特点和适用场景】
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                                         final InvocationCallback invocation) throws Throwable {

  // If the transaction attribute is null, the method is non-transactional.
  // 拿到前面从注解中找出来的事务属性,如果没有配置,说明没有打事务注解
  TransactionAttributeSource tas = getTransactionAttributeSource();
  // 拿到本次代理适用的事务属性配置
  final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
  // 拿到对应的事务管理器
  // TODO 这里后面二刷,争取连起来
  final PlatformTransactionManager tm = determineTransactionManager(txAttr);
  // 根据当前方法情况,生成唯一标示
  final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

  // TODO 声明式事务处理
  if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
    // Standard transaction demarcation with getTransaction and commit/rollback calls.
    // 创建 TransactionInfo
    TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    Object retVal = null;
    try {
      // This is an around advice: Invoke the next interceptor in the chain.
      // This will normally result in a target object being invoked.
      // 继续调用拦截器链
      retVal = invocation.proceedWithInvocation();
    } catch (Throwable ex) {
      // target invocation exception
      // 异常回滚 TransactionInfo
      completeTransactionAfterThrowing(txInfo, ex);
      throw ex;
    } finally {
      // 清除 TransactionInfo 信息
      cleanupTransactionInfo(txInfo);
    }
    // 提交 TransactionInfo 【事务】
    commitTransactionAfterReturning(txInfo);
    return retVal;
  } else {
    // TODO 编程式事务处理
    final ThrowableHolder throwableHolder = new ThrowableHolder();

    // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
    try {
      Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
        TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
        try {
          return invocation.proceedWithInvocation();
        } catch (Throwable ex) {
          if (txAttr.rollbackOn(ex)) {
            // A RuntimeException: will lead to a rollback.
            if (ex instanceof RuntimeException) {
              throw (RuntimeException) ex;
            } else {
              throw new ThrowableHolderException(ex);
            }
          } else {
            // A normal return value: will lead to a commit.
            throwableHolder.throwable = ex;
            return null;
          }
        } finally {
          cleanupTransactionInfo(txInfo);
        }
      });

      // Check result state: It might indicate a Throwable to rethrow.
      if (throwableHolder.throwable != null) {
        throw throwableHolder.throwable;
      }
      return result;
    } catch (ThrowableHolderException ex) {
      throw ex.getCause();
    } catch (TransactionSystemException ex2) {
      if (throwableHolder.throwable != null) {
        logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
        ex2.initApplicationException(throwableHolder.throwable);
      }
      throw ex2;
    } catch (Throwable ex2) {
      if (throwableHolder.throwable != null) {
        logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
      }
      throw ex2;
    }
  }
}

思路如流程图所示:

2.png

我们的关注点主要是在创建事务、根据异常处理回滚信息、清理事务信息、提交事务几个阶段。

创建事务

此处是对createTransactionIfNecessary()方法的讲述

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                                                       @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

  // If no name specified, apply method identification as transaction name.
  // 如果事务属性没有指定名称,就用前面生成的唯一标识
  if (txAttr != null && txAttr.getName() == null) {
    txAttr = new DelegatingTransactionAttribute(txAttr) {
      @Override
      public String getName() {
        return joinpointIdentification;
      }
    };
  }

  TransactionStatus status = null;
  if (txAttr != null) {
    if (tm != null) {
      // 直接获得事务状态
      status = tm.getTransaction(txAttr);
    } else {
      // 没有 manager ,就不进行事务功能处理
      if (logger.isDebugEnabled()) {
        logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                     "] because no transaction manager has been configured");
      }
    }
  }
  // 根据:
  // 1. 事务信息
  // 2. 事务管理器
  // 3. 唯一标识【这个感觉有点多余了】
  // 4. 事务状态
  // 创建事务属性【事务属性用于实际实现事务功能,其中有以上东西的引用】
  return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

和Spring函数的一贯特点相似,将功能进行了分割和委托:

  1. 先借助TransactionManager根据TransactionAttribute得到TransactionStatus对象,此对象我们叫它事务信息,里面不止有我们的事务配置,还有此次调用的事务及时属性,比如事务的配置、当前占用的Connection等等
  2. 然后借助prepareTransactionInfo()方法,得到TransactionInfo,用于在事务的Interceptor中进行事务状态的记录传递

我们先看tm.getTransaction(),我们关注主要逻辑,所以直接进入DataSourceTransactionManager

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {

  // 拿到当前线程的锁对象
  Object transaction = doGetTransaction();

  // Cache debug flag to avoid repeated checks.
  boolean debugEnabled = logger.isDebugEnabled();

  if (definition == null) {
    // Use defaults if no transaction definition given.
    definition = new DefaultTransactionDefinition();
  }

  if (isExistingTransaction(transaction)) {
    // 如果当前线程已经占有锁
    // Existing transaction found -> check propagation behavior to find out how to behave.
    return handleExistingTransaction(definition, transaction, debugEnabled);
  }

  // Check definition settings for new transaction.
  // 判断配置合理性
  if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
  }

  // No existing transaction found -> check propagation behavior to find out how to proceed.
  // 在当前没有事务的情况下,根据事务的配置决定采取的策略
  if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    // 如果是 PROPAGATION_MANDATORY ,强制要求事前存在事务,报错
    throw new IllegalTransactionStateException(
      "No existing transaction found for transaction marked with propagation 'mandatory'");
  } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
             definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
             definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    // 此类三种情况要求如果没有事务就新建事务

    // 阻塞空事务(拿到一个阻塞的holder,后面用来唤醒)
    SuspendedResourcesHolder suspendedResources = suspend(null);
    if (debugEnabled) {
      logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
    }
    try {
      // 如果没有强行配置不允许事务,就可以开始创建事务了
      boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
      DefaultTransactionStatus status = newTransactionStatus(
        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
      // 设置事务,如从数据源获得connection、开启同步、将配置好的信息保存至线程变量
      doBegin(transaction, definition);
      // 将事务的相关属性设置到线程变量中
      prepareSynchronization(status, definition);
      return status;
    } catch (RuntimeException | Error ex) {
      resume(null, suspendedResources);
      throw ex;
    }
  } else {
    // 适用默认的隔离状态????
    // TODO 后面根据实际适用情况看吧
    // Create "empty" transaction: no actual transaction, but potentially synchronization.
    if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
      logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                  "isolation level will effectively be ignored: " + definition);
    }
    boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
  }
}

protected Object doGetTransaction() {
  DataSourceTransactionObject txObject = new DataSourceTransactionObject();
  txObject.setSavepointAllowed(isNestedTransactionAllowed());
  ConnectionHolder conHolder =
    (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
  txObject.setConnectionHolder(conHolder, false);
  return txObject;
}

主要思路如下:

3.png

我们依次看这两种情况:

处理新事务

4.png

我们看一下阻塞事务的逻辑,这里可能涉及对底层API的调用

5.png

感觉就是把当前的所有配置保存到一个节点中,然后清空当前所有配置,然后返回保存的节点,由调用方进行保存、传递。

我们看一下唤醒旧事务的操作:

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
  throws TransactionException {

  if (resourcesHolder != null) { // 保存事务和同步信息的holder不为空
    Object suspendedResources = resourcesHolder.suspendedResources;
    if (suspendedResources != null) { // 将事务的资源绑定到 ThreadLocal 中【这里就是数据源和 Connection 的映射关系】
      doResume(transaction, suspendedResources);
    }
    // 把节点中存储的配置进行还原
    List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
    if (suspendedSynchronizations != null) {
      TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
      TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
      TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
      TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
      doResumeSynchronization(suspendedSynchronizations);
    }
  }
}

protected void doResume(@Nullable Object transaction, Object suspendedResources) {
  TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}

private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {
  TransactionSynchronizationManager.initSynchronization();
  for (TransactionSynchronization synchronization : suspendedSynchronizations) {
    synchronization.resume();
    TransactionSynchronizationManager.registerSynchronization(synchronization);
  }
}

流程如下:

6.png

总体来说就是把事务的占用的配置重新压进存储数据源和占用 ConnectionThradLocal 中,然后把所有的同步配置也都恢复至各自的 ThreadLocal

我们看一下开启新事务和设置新同步的操作:

7.png
8.png

处理已经存在的事务

本节的内容是handleExistingTransaction()方法的记录:

private TransactionStatus handleExistingTransaction(
  TransactionDefinition definition, Object transaction, boolean debugEnabled)
  throws TransactionException {

  // 不支持事务,有事务就抛出异常
  if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
    throw new IllegalTransactionStateException(
      "Existing transaction found for transaction marked with propagation 'never'");
  }
  // 不支持事务,有就阻塞
  if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
    if (debugEnabled) {
      logger.debug("Suspending current transaction");
    }
    Object suspendedResources = suspend(transaction);
    boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    return prepareTransactionStatus(
      definition, null, false, newSynchronization, debugEnabled, suspendedResources);
  }

  // 必须新的,阻塞现有的
  if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    if (debugEnabled) {
      logger.debug("Suspending current transaction, creating new transaction with name [" +
                   definition.getName() + "]");
    }
    SuspendedResourcesHolder suspendedResources = suspend(transaction);
    try {
      boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
      DefaultTransactionStatus status = newTransactionStatus(
        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
      doBegin(transaction, definition);
      prepareSynchronization(status, definition);
      return status;
    } catch (RuntimeException | Error beginEx) {
      resumeAfterBeginException(transaction, suspendedResources, beginEx);
      throw beginEx;
    }
  }

  // 内嵌
  if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    if (!isNestedTransactionAllowed()) {
      throw new NestedTransactionNotSupportedException(
        "Transaction manager does not allow nested transactions by default - " +
        "specify 'nestedTransactionAllowed' property with value 'true'");
    }
    if (debugEnabled) {
      logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
    }
    // 默认使用保存点
    if (useSavepointForNestedTransaction()) {
      // Create savepoint within existing Spring-managed transaction,
      // through the SavepointManager API implemented by TransactionStatus.
      // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
      DefaultTransactionStatus status =
        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
      status.createAndHoldSavepoint();
      return status;
    } else {
      // 不支持保存点,就在不阻塞现有事务的基础上直接占用新的 Connection 进行事务,
      // 这里存在配置覆盖,原有事务、同步设置到 ThreadLocal 中的东西会被覆盖。
      // 但是会将覆盖前的配置传出去,需要调用者自行在结束此事务之后恢复
      // Nested transaction through nested begin and commit/rollback calls.
      // Usually only for JTA: Spring synchronization might get activated here
      // in case of a pre-existing JTA transaction.
      boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
      DefaultTransactionStatus status = newTransactionStatus(
        definition, transaction, true, newSynchronization, debugEnabled, null);
      doBegin(transaction, definition);
      prepareSynchronization(status, definition);
      return status;
    }
  }

  // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
  // 如果当前事务存在就内嵌,不存在就创建新的
  if (debugEnabled) {
    logger.debug("Participating in existing transaction");
  }
  if (isValidateExistingTransaction()) { // 看TransactionManager是否配置了在加入当前事务前要先校验属性,默认为false
    // 事务配置的传播属性和当前事务的传播属性如果不兼容,就报错
    if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
      Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
      if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
        Constants isoConstants = DefaultTransactionDefinition.constants;
        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                                   definition + "] specifies isolation level which is incompatible with existing transaction: " +
                                                   (currentIsolationLevel != null ?
                                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                                    "(unknown)"));
      }
    }
    // 事务配置的只读属性和抢断事务的只读属性不兼容,就报错【当前只读,配置的要求可以写就挂了】
    if (!definition.isReadOnly()) {
      if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                                   definition + "] is not marked as read-only but existing transaction is");
      }
    }
  }
  // 校验完成,直接加入当前事务即可。即在当前事务中加入新的同步
  boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
  return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

流程图如下:

9.png

将新事务押入调用堆栈

prepareTransactionInfo()的方法,会将上面的到的事务所有信息,包括属性、状态、使用的TM等等构建成一个TransactionInfo,用来在ThreadLocal中拉链,并返回。

总结

总的来说创建事务的过程主要是处理当前事务和新事务的兼容问题,如果单纯创建新事务其实很简单,就是

从数据源占据一个Connection,然后把事务、同步属性进行设置即可。当事务出现冲突时在判断状态合理的情况下可以选择

  • 创建新的:阻塞当前事务(把当前事务占据的 Connection进行保存,然后解绑 ThreadLocal属性),保存当前同步属性。然后按照从配置中读出来的东西开启新事务、设置新的同步
  • 内嵌事务:
    • 或者使用保存点,继续占用当前事务(使用当前Connection),使用当前的同步属性,然后创建保存点,结束。
    • 不允许使用保存点,只能将就着模拟,放弃当前事务,新创建一个事务(占用一个Connection),照常设置同步、返回
  • 不支持事务:当前事务阻塞,不占用新的Connection,整一个空事务

处理异常回滚信息

本节主要处理调用抛出异常后的回滚相关操作。

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
  if (txInfo != null && txInfo.getTransactionStatus() != null) {
    if (logger.isTraceEnabled()) {
      logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                   "] after exception: " + ex);
    }
    // 如果事务存在【事务属性配置了】且表明事务要针对此异常回滚,就回滚
    if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
      try {
        txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
      } catch (TransactionSystemException ex2) {
        logger.error("Application exception overridden by rollback exception", ex);
        ex2.initApplicationException(ex);
        throw ex2;
      } catch (RuntimeException | Error ex2) {
        logger.error("Application exception overridden by rollback exception", ex);
        throw ex2;
      }
    } else {
      // We don't roll back on this exception.
      // Will still roll back if TransactionStatus.isRollbackOnly() is true.
      try {
        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
      } catch (TransactionSystemException ex2) {
        logger.error("Application exception overridden by commit exception", ex);
        ex2.initApplicationException(ex);
        throw ex2;
      } catch (RuntimeException | Error ex2) {
        logger.error("Application exception overridden by commit exception", ex);
        throw ex2;
      }
    }
  }
}

这里的思路比较明白,如果事务存在且根据配置项要对此异常回滚,就回滚;否则提交。

我们继续看回滚的逻辑,后面会专门看提交 :

public final void rollback(TransactionStatus status) throws TransactionException {
  if (status.isCompleted()) {
    throw new IllegalTransactionStateException(
      "Transaction is already completed - do not call commit or rollback more than once per transaction");
  }

  DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  processRollback(defStatus, false);
}

这里主要做了一个入参合理性判断:已经完成的事务不能再次进行回滚、提交。将实际的回滚逻辑继续做了委托:

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
  try {
    boolean unexpectedRollback = unexpected;

    try {
      // 如果是新的同步快结束的话,调用同步的钩子
      triggerBeforeCompletion(status);

      if (status.hasSavepoint()) {
        // 如果有保存点,就回滚至保存点
        if (status.isDebug()) {
          logger.debug("Rolling back transaction to savepoint");
        }
        status.rollbackToHeldSavepoint();
      } else if (status.isNewTransaction()) {
        // 如果是新事务,直接回滚
        if (status.isDebug()) {
          logger.debug("Initiating transaction rollback");
        }
        doRollback(status);
      } else {
        // Participating in larger transaction
        // 是更大事务的一部分【从逻辑上来说存在嵌套,但是不支持保存点和创建嵌套那种】
        if (status.hasTransaction()) {
          // status 设置了"设置回滚标志位" 或者全局设置了"设置回滚标志位"
          if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
            if (status.isDebug()) {
              logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
            }
            doSetRollbackOnly(status);
          } else {
            if (status.isDebug()) {
              logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
            }
          }
        } else {
          logger.debug("Should roll back transaction but cannot - no transaction available");
        }
        // Unexpected rollback only matters here if we're asked to fail early
        if (!isFailEarlyOnGlobalRollbackOnly()) {
          unexpectedRollback = false;
        }
      }
    } catch (RuntimeException | Error ex) {
      triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
      throw ex;
    }

    // 调用同步的结束后的钩子
    triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

    // Raise UnexpectedRollbackException if we had a global rollback-only marker
    if (unexpectedRollback) {
      throw new UnexpectedRollbackException(
        "Transaction rolled back because it has been marked as rollback-only");
    }
  } finally {
    cleanupAfterCompletion(status);
  }
}

我们不在乎什么钩子的调用和钩子预留位,这里其实就是一个if-else

  • 如果有保存点,说明是嵌套事务,直接控制回滚至保存点即可
  • 如果是新事务,直接回滚即可
  • 如果是更大事务的一部分,且没有使用保存点,设置回滚标志位,等待外层函数完成后进行回滚

当然,最后调用了cleanupAfterCompletion(),对现场进行了清理:

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
  // 设置事务状态为结束
  status.setCompleted();
  // 如果是新的同步,就清理同步属性设置
  if (status.isNewSynchronization()) {
    TransactionSynchronizationManager.clear();
  }
  // 如果是新的事务 ,就清理事务设置【归还 Connection】
  if (status.isNewTransaction()) {
    doCleanupAfterCompletion(status.getTransaction());
  }

  // 如果当前事务的创建涉及另一个事务的阻塞,就唤醒阻塞的事务
  if (status.getSuspendedResources() != null) {
    if (status.isDebug()) {
      logger.debug("Resuming suspended transaction after completion of inner transaction");
    }
    Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
    resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
  }
}

清理事务信息

我们在“创建事务”中将新事务的TransactionInfo压入了调用堆栈,这里进行出栈,其他的没有啥东西

提交事务

我们先看commitTransactionAfterReturning():

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
  if (txInfo != null && txInfo.getTransactionStatus() != null) {
    if (logger.isTraceEnabled()) {
      logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
    }
    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
  }
}

这里照例只进行了参数的判断,干掉了一些可以快速结束的情况,将具体的提交逻辑进行了委托:

public final void commit(TransactionStatus status) throws TransactionException {
  if (status.isCompleted()) {
    throw new IllegalTransactionStateException(
      "Transaction is already completed - do not call commit or rollback more than once per transaction");
  }

  DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
  // 之前基于此事务的逻辑失败了,并标记了回滚
  if (defStatus.isLocalRollbackOnly()) {
    if (defStatus.isDebug()) {
      logger.debug("Transactional code has requested rollback");
    }
    processRollback(defStatus, false);
    return;
  }

  if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
    if (defStatus.isDebug()) {
      logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
    }
    processRollback(defStatus, true);
    return;
  }

  processCommit(defStatus);
}

这里进行了一个判断:如果前面有嵌套事务给自己设置了回滚标志位,这里就回滚,否则照常提交,对提交事务API的调用过程,这里进行了委托:

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
  try {
    boolean beforeCompletionInvoked = false;

    try {
      boolean unexpectedRollback = false;
      // 预留
      prepareForCommit(status);
      // 调用钩子
      triggerBeforeCommit(status);
      triggerBeforeCompletion(status);
      beforeCompletionInvoked = true;

      if (status.hasSavepoint()) {
        if (status.isDebug()) {
          logger.debug("Releasing transaction savepoint");
        }
        unexpectedRollback = status.isGlobalRollbackOnly();
        // 有保存点,就清除保存点【此事务为内嵌事务,交给外部事务进行提交即可,此处不再提交】
        status.releaseHeldSavepoint();
      } else if (status.isNewTransaction()) {
        if (status.isDebug()) {
          logger.debug("Initiating transaction commit");
        }
        unexpectedRollback = status.isGlobalRollbackOnly();
        doCommit(status);
      } else if (isFailEarlyOnGlobalRollbackOnly()) {
        unexpectedRollback = status.isGlobalRollbackOnly();
      }

      // Throw UnexpectedRollbackException if we have a global rollback-only
      // marker but still didn't get a corresponding exception from commit.
      if (unexpectedRollback) {
        throw new UnexpectedRollbackException(
          "Transaction silently rolled back because it has been marked as rollback-only");
      }
    } catch (UnexpectedRollbackException ex) {
      // can only be caused by doCommit
      triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
      throw ex;
    } catch (TransactionException ex) {
      // can only be caused by doCommit
      if (isRollbackOnCommitFailure()) {
        doRollbackOnCommitException(status, ex);
      } else {
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
      }
      throw ex;
    } catch (RuntimeException | Error ex) {
      if (!beforeCompletionInvoked) {
        triggerBeforeCompletion(status);
      }
      doRollbackOnCommitException(status, ex);
      throw ex;
    }

    // Trigger afterCommit callbacks, with an exception thrown there
    // propagated to callers but the transaction still considered as committed.
    try {
      triggerAfterCommit(status);
    } finally {
      triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
    }

  } finally {
    cleanupAfterCompletion(status);
  }
}

这里主要有以下几步:

  1. 调用钩子
  2. 如果有SavePoint,说明此事务不是最外层事务,释放保存点退出即可,外部事务会帮助提交
  3. 如果是新事务,就提交
  4. 调用钩子、清理现场

具体的提交事务API的调用在doCommit()中:

protected void doCommit(DefaultTransactionStatus status) {
  DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
  Connection con = txObject.getConnectionHolder().getConnection();
  if (status.isDebug()) {
    logger.debug("Committing JDBC transaction on Connection [" + con + "]");
  }
  try {
    con.commit();
  } catch (SQLException ex) {
    throw new TransactionSystemException("Could not commit JDBC transaction", ex);
  }
}

总结

上面算是把事务的相关东西过了一遍,忽略了很多东西,注意几点即可:

  1. 主要看看针对事务配置的传播方式的实现逻辑即可,毕竟学主要是为了用,别死钻
  2. 关注一下嵌套事务的回滚点、回滚标志位的相关逻辑,了解即可
  3. 当前环节的事务栈以ThreadLocal<TransactionInfo> transactionInfoHolder为准,那些杂乱的ThreadLocal在某些特殊情况下会丢东西。

在实用方面的一些经验:

  1. 很多时候我们出错都抛出RuntimeException,然后一直上抛的,所以事务的传递机制很多时候是用的默认的REQUIRE_NEW,需要继续就catch,不需要停止就不管,分别对应了两种嵌套的策略。很多时候也没有针对事务配置做那么多东西
  2. 事务这里只是针对jdbc,对一些dubbo、redis之类的设置不生效

扩展

问题遗留

参考文献

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,417评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,921评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,850评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,945评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,069评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,188评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,239评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,994评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,409评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,735评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,898评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,578评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,205评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,916评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,156评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,722评论 2 363
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,781评论 2 351

推荐阅读更多精彩内容