Spring 框架事件收发功能使用(二)事务事件监听
1. 概述
系列博客(一)中,讲述了如何使用 Spring 框架来实现事件收发机制,包含了实现接口和注解两种实现方式,以及包括了同步和异步两种运行方式。
在接下来的内容里,我们会介绍如何将事件的监听与事务进行关联。
2. 基于事务的事件收发的使用
首先,我们还是依照惯例,
从 Spring 的官方文档来看看怎么使用它。
在 Spring 4.2 开始,事件的监听器可以被绑定到事务的阶段上。
典型的示例就是它可以处理当事务成功完成后的事件。
我们可以使事件更加灵活的使用在关心事务结果的监听器上。
我们可以使用 @EventListener 注解注册一个常规的事件监听器。
当你需要将它绑定到事务上,则需要使用 @TransactionalEventListener。
这个监听器默认会被绑定到事务的提交阶段。
接下的示例会展示这个概念。假设一个组件 publish 了一个 订单创建 事件,并且我们想定义一个监听器,它只处理事务提交成功后发出的事件。下面便是如何实现这个事件监听器的代码例子:
@Component
public class MyComponent {
@TransactionalEventListener
public void handleOrderCreatedEvent(CreationEvent<Order> creationEvent) {
//do something
}
}
@TransactionalEventListener 注解 暴露了一个事务阶段的属性,用来指定监控器绑定的事务阶段。可以使用的阶段有这4个 BEFORE_COMMIT, AFTER_COMMIT (default), AFTER_ROLLBACK, AFTER_COMPLETION ,包含了事务的完成和回滚。
如果没有事务在运行,那这个监听器不会被触发,直到我们提供了事务。不过我们也可以通过改变 fallbackExecution 的设置来使监听器在没有事务的情况下仍然能够被触发。
3. @TransactionEventListener 的实现原理
Talking is cheap, show the Spring framework source code.
//EventListenerMethodProcessor.processBean(148);
/*
* 由于@TransactionalEventListener 继承了 @EventListener ,
* 所以,通过 findMergedAnnotation 同样能够获取其所注解的方法
*/
annotatedMethods = MethodIntrospector.selectMethods(targetType,
(MethodIntrospector.MetadataLookup<EventListener>) method ->
AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
//EventListenerMethodProcessor.processBean(176);
/*
* @TranscationalEventListener 由 ApplicationListenerMethodTransactionalAdapter 所负责实现功能
* 通过解读他的 Adapter 便能够了解其功能是如何实现的
*/
if (applicationListener instanceof ApplicationListenerMethodAdapter) {
((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
private final TransactionalEventListener annotation;
/*
* 根据对应的 beanName Class 以及方法名,初始化 Adapter
*/
public ApplicationListenerMethodTransactionalAdapter(String beanName, Class<?> targetClass, Method method) {
super(beanName, targetClass, method);
TransactionalEventListener ann = AnnotatedElementUtils.findMergedAnnotation(method, TransactionalEventListener.class);
if (ann == null) {
throw new IllegalStateException("No TransactionalEventListener annotation found on method: " + method);
}
this.annotation = ann;
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {//判断是否开启了事务
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);//创建一个新的事务同步器
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);//将刚刚创建的事务同步器,注册到这个线程的事务同步器列表中
}
else if (this.annotation.fallbackExecution()) {//如果当前线程内没有开启事务,并且注解的 fallbackExecution 属性设置为 true
if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK && logger.isWarnEnabled()) {
logger.warn("Processing " + event + " as a fallback execution on AFTER_ROLLBACK phase");//在符合 if 条件后输出警告
}
processEvent(event);//处理事件
}
else {
// No transactional event execution at all
if (logger.isDebugEnabled()) {
logger.debug("No transaction is active - skipping " + event);//本线程没有开启事务,fallbackExecution 又设置为 false
}
}
}
private TransactionSynchronization createTransactionSynchronization(ApplicationEvent event) {
return new TransactionSynchronizationEventAdapter(this, event, this.annotation.phase());
}
/*
* 这个内部类实现了事件监听器能根据 4 个不同的事务阶段进行处理事件的功能
*/
private static class TransactionSynchronizationEventAdapter extends TransactionSynchronizationAdapter {
private final ApplicationListenerMethodAdapter listener;
private final ApplicationEvent event;
private final TransactionPhase phase;
public TransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter listener,
ApplicationEvent event, TransactionPhase phase) {
this.listener = listener;
this.event = event;
this.phase = phase;
}
@Override
public int getOrder() {
return this.listener.getOrder();
}
// 如果是 beforeCommit 的这个阶段,需要单独判断
@Override
public void beforeCommit(boolean readOnly) {
if (this.phase == TransactionPhase.BEFORE_COMMIT) {
processEvent();
}
}
// 其余 3 个事务阶段在事务提交后进行判断和处理
@Override
public void afterCompletion(int status) {
if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
processEvent();
}
else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
processEvent();
}
else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
processEvent();
}
}
protected void processEvent() {
this.listener.processEvent(this.event);
}
}
}
4. @TransactionEventListener的异步陷阱
目前为止,我们看到的都是强大的 Spring 框架所提供的事务事件监听器的美好,不过任何美好都需要我们小心的维护。
接下来,我们就看看 @TransactionalEventListener 的美好背后隐藏着什么需要注意的。
如果,看过第一部分内容的读者应该还记得,我们将 @EventListner 变为异步会有两种方式。
一种是使用 @Async 注解,并且 EnableAsync;另一种是通过给我们的 Adatper 配置需要的线程池。
如果使用第一种方式来让我们的 @TransactionalEventListener 不会有问题。
但是,使用第二种的方式会导致我们的监听器找不到事务而失败。
那我们依然还是遵循从功能中来到源码里去的思想,到源码中找找问题出在哪里。
//我们再回到所有 ApplicationEvent 类型时间分发的地方--> SimpleApplicationEventMulticaster类。
//我们会发现他是通过新建线程的方式来进行触发监听器消费事件。
executor.execute(() -> invokeListener(listener, event));
// 在 Adapter 中,我们通过这个方法的调用来判断是否开启了事务
TransactionSynchronizationManager.isSynchronizationActive();
// 接下来我们进入 Manager看看这个方法的注释。注释表明,这个方法会返回当前线程活跃的事务同步器。
// Bingo,是不是觉得有什么不对了。我们刚刚在 invokeListener 的时候是通过新建线程的方式进行异步的。
// 如果,我们再去获取当前线程的事务,结果非常明显,一定是 False。
// 所以,如果我们通过给 Multicaster 配置线程池的方式来实现异步看样子是不行的。
/**
* Return if transaction synchronization is active for the current thread.
* Can be called before register to avoid unnecessary instance creation.
* @see #registerSynchronization
*/
public static boolean isSynchronizationActive() {
return (synchronizations.get() != null);
}
5.小结
本小节中,主要围绕 @TransactionalEventListener 来讲述下面 3 个内容:
- 如何使用 Spring 框架实现事务事件监听
- @TransactionalEventListener 是怎么实现绑定到事务上的事件监听的
- 在异步场景下,@TransactionalEventListener 的陷阱。
本系列的第三部分给出如何能让 @TransactionalEventListener 异步执行,又不在每个地方都加上 @Sync 注解的办法。