背景
在业务中,经常会有这样的需求,在数据库事务提交之后,发送异步消息或者进行其他的事务操作。
例如当用户注册成功之后,发送激活码,如果用户注册后就执行发送激活码,但是在用户保存时出现提交事务异常,数据库进行回滚,用户实际没有注册成功,但是用户却收到了激活码,此时,正确的是应该在用户注册保存事务提交完成之后,然后发送激活码。
解决方案
1、使用注解@TransactionalEventListener
@Service
@Transactional
public class FooService {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public User saveUser(User user) {
userDao.save(user);
// 注册事件
applicationEventPublisher.publishEvent(new SavedUserEvent(user.getId()));
}
}
// -------------------------------------
/**
* 保存用户事件
*/
public class SavedUserEvent {
private int userId;
public SavedUserEvent(int userId) {
this.userId = userId;
}
// getter and setter
}
/**
* 事件侦听,处理对应事件
*/
@Component
public class FooEventListener() {
@Autowired
private UserDao userDao;
@Autowired
private MailService mailService;
@TransactionalEventListener
public sendEmail(SavedUserEvent savedUserEvent) {
User user = userDao.get(userId);
String email = user.getEmail();
mailService.send(email);
}
}
publishEvent 底层调用了一个SimpleApplicationEventMulticaster 来发布事件,属性有一个Executor 可以用来设置异步的方式。
1.1 设置线程池
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class Config {
@Bean
public ThreadPoolTaskExecutor executor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);//核心线程数
executor.setMaxPoolSize(10);//最大线程数
executor.setQueueCapacity(100);//队列大小
return executor;
}
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster(ThreadPoolTaskExecutor executor){
SimpleApplicationEventMulticaster multicaste = new SimpleApplicationEventMulticaster();
multicaste.setTaskExecutor(executor);
return multicaste;
}
}
1.2 发布事件后测试
-
有2个线程来发布事件
2、使用TransactionSynchronizationManager 和 TransactionSynchronizationAdapter
@Autowired
private UserDao userDao;
@Autowired
private JmsProducer jmsProducer;
public User saveUser(User user) {
// 保存用户
userDao.save(user);
final int userId = user.getId();
// 兼容无论是否有事务
if(TransactionSynchronizationManager.isActualTransactionActive()) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
jmsProducer.sendEmail(userId);
}
});
} else {
jmsProducer.sendEmail(userId);
}
}
如果saveUser()和sendEmail()这两个方法使用了相同的事务,但是需要注意的是sendEmail()方法是在afterCommit事务提交之后执行的,此时会导致sendEmail()中的JPA数据保存最终无法提交。所以我们需要使sendEmail()进入一个新的事务中。
如果afterCommit()方法中执行的方法也包含事务,在该方法的@Transactional注解中使用propagation参数用来控制事务的传播,其默认被设置为Propagation.REQUIRED
2.1 解决办法
在@Transactional注解中propagation参数用来控制事务的传播。其默认被设置为Propagation.REQUIRED
Propagation.REQUIRED其逻辑是,如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。而上面的业务中我们并不希望其加入已有的事务中,所以单介绍上面的逻辑,假如希望JPA的数据保存到数据库中,需要在事务注解修改为@Transactional(propagation = Propagation.REQUIRES_NEW)参数
然而在很多时候我们希望新加入的方法能够被同一个事务所管理,而使用Propagation.REQUIRES_NEW会导致当前操作脱离上一级事务的控制。所以在使用@Transactional(propagation = Propagation.REQUIRES_NEW)的时候一定要慎重,并且严格控制其被滥用。
在上面2的基础上扩展TransactionSynchronizationAdapter
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @Author: huangyibo
* @Date: 2023/7/21 17:12
* @Description: 线程池配置
*/
@Configuration
@EnableAsync
public class TaskPoolConfig {
/**
* 异步执行线程池
* @return ThreadPoolTaskExecutor
*/
@Bean(name = "asyncPoolTaskExecutor")
public ThreadPoolTaskExecutor asyncPoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//线程池维护线程的最少数量
taskExecutor.setCorePoolSize(3);
//线程池维护线程的最大数量
taskExecutor.setMaxPoolSize(6);
//线程池所使用的缓冲队列
taskExecutor.setQueueCapacity(500);
//线程池维护线程所允许的空闲时间
taskExecutor.setKeepAliveSeconds(200);
taskExecutor.setThreadNamePrefix("asyncPoolTaskExecutor--");
// 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//调度器shutdown被调用时等待当前被调度的任务完成
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//等待时长
taskExecutor.setAwaitTerminationSeconds(60);
taskExecutor.initialize();
return taskExecutor;
}
/**
* 异步执行线程池————任务延时执行
* @return ThreadPoolTaskScheduler
*/
@Bean(name = "delayAsyncPoolTaskScheduler")
public ThreadPoolTaskScheduler delayAsyncPoolTaskScheduler() {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
//线程池维护线程的数量
threadPoolTaskScheduler.setPoolSize(5);
// 线程名称
threadPoolTaskScheduler.setThreadNamePrefix("delayAsyncPoolTaskScheduler--");
// 等待时长
threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
// 调度器shutdown被调用时等待当前被调度的任务完成
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskScheduler.initialize();
return threadPoolTaskScheduler;
}
}
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
/**
* @Author: huangyibo
* @Date: 2023/8/7 16:13
* @Description: 事务提交后执行器:被提交的方法将在事务提交后执行,如果调用方法不在事务中运行,则立即执行
*/
@Slf4j
@Component
public class AfterCommitExecutor extends TransactionSynchronizationAdapter implements Executor {
private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
@Resource(name = "asyncPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPool;
@Override
public void execute(Runnable runnable) {
log.info("AfterCommitExecutorImpl got New runnable {}", runnable);
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
log.info("Transaction is NOT ACTIVE. Executing right now {}", runnable);
runnable.run();
return;
}
List<Runnable> threadRunnableList = RUNNABLES.get();
if (CollectionUtils.isEmpty(threadRunnableList)) {
threadRunnableList = new ArrayList<>();
RUNNABLES.set(threadRunnableList);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnableList.add(runnable);
}
@Override
public void afterCommit() {
log.debug("事务提交完成处理 ... ");
List<Runnable> threadRunnableList = RUNNABLES.get();
log.info("Transaction successfully committed, executing {} runnables", threadRunnableList.size());
threadRunnableList.forEach(runnable -> {
log.info("Executing {}", runnable);
try {
threadPool.execute(runnable);
} catch (Exception e) {
log.error("Failed to execute {}", runnable, e);
}
});
}
@Override
public void afterCompletion(int status) {
log.info("Transaction completed with status {}",
status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
RUNNABLES.remove();
}
}
业务层使用:
@Autowired
private UserDao userDao;
@Autowired
private JmsProducer jmsProducer;
@Autowired
private AfterCommitExecutor afterCommitExecutor;
public User saveUser(User user) {
// 保存用户
userDao.save(user);
final int userId = user.getId();
// 使用AfterCommitExecutor
afterCommitExecutor.execute(new Runnable() {
@Override
public void run() {
jmsProducer.sendEmail(userId);
}
});
}
2.2 存在的问题
TransactionSynchronizationAdapter的事务提交afterCommit方法后执行,虽然这是可行的,但它需要在任何使用它的地方添加大量样板代码,这根本不是解决问题的一种非常干净的方法。
让我们看看如何创建一个注解 ( @PostCommit) 并使用 Spring AOP 围绕通知从后台驱动这一切。
- 1、构建注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface PostCommit {
}
- 2、构建 PostCommitAdapter
PostCommitAdapter 将提供两个功能
- 它将注册runnables到一个ThreadLocal
- 覆盖TransactionSynchronizationAdapter的AfterCommit,在事务提交时运行所有注册runnables
/**
* @Author: huangyibo
* @Date: 2023/8/7 16:13
* @Description: 事务提交后执行器:被提交的方法将在事务提交后执行,如果调用方法不在事务中运行,则立即执行
*/
@Slf4j
@Component
public class AfterCommitExecutor extends TransactionSynchronizationAdapter implements Executor {
private static final ThreadLocal<List<Runnable>> RUNNABLES = new ThreadLocal<List<Runnable>>();
@Resource(name = "asyncPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPool;
@Override
public void execute(Runnable runnable) {
log.info("AfterCommitExecutorImpl got New runnable {}", runnable);
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
log.info("Transaction is NOT ACTIVE. Executing right now {}", runnable);
runnable.run();
return;
}
List<Runnable> threadRunnableList = RUNNABLES.get();
if (CollectionUtils.isEmpty(threadRunnableList)) {
threadRunnableList = new ArrayList<>();
RUNNABLES.set(threadRunnableList);
TransactionSynchronizationManager.registerSynchronization(this);
}
threadRunnableList.add(runnable);
}
@Override
public void afterCommit() {
log.debug("事务提交完成处理 ... ");
List<Runnable> threadRunnableList = RUNNABLES.get();
log.info("Transaction successfully committed, executing {} runnables", threadRunnableList.size());
threadRunnableList.forEach(runnable -> {
log.info("Executing {}", runnable);
try {
threadPool.execute(runnable);
} catch (Exception e) {
log.error("Failed to execute {}", runnable, e);
}
});
}
@Override
public void afterCompletion(int status) {
log.info("Transaction completed with status {}",
status == STATUS_COMMITTED ? "COMMITTED" : "ROLLED_BACK");
RUNNABLES.remove();
}
}
如果事务处于活动状态,execute方法是注册在ThreadLoca的runnables的方法,它只是继续并执行runnables。ThreadLocal里面的afterCommit运行所有的runnables。
- 3、使用 around 建议连接适配器和注释
为了让PostCommitAdapter的execute方法与@PostCommit注释挂钩,围绕@PostCommit创建的一个advice,每一个连接点封装runnables的执行方法,并调用PostCommitAdapter内部行execute方法:
@Aspect
@Slf4j
@AllArgsConstructor
@Configuration
public class PostCommitAnnotationAspect {
private final AfterCommitExecutor afterCommitExecutor;
@Pointcut("@annotation(com.yibo.common.annotation.PostCommit)")
private void postCommitPointCut(){}
@Around("postCommitPointCut()")
public Object aroundAdvice(final ProceedingJoinPoint pjp) {
afterCommitExecutor.execute(new PjpAfterCommitRunnable(pjp));
return null;
}
private static final class PjpAfterCommitRunnable implements Runnable {
private final ProceedingJoinPoint pjp;
public PjpAfterCommitRunnable(ProceedingJoinPoint pjp) {
this.pjp = pjp;
}
@Override
public void run() {
try {
pjp.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
}
- 4、用法
一旦编写了样板,用法就很简单了,无论应该在事务提交后执行哪种方法,都必须简单地用注释对其进行PostCommit注释。
示例:考虑具有 PostCommit 注释方法的两个类 A 和 B
Class A {
@PostCommit
void log(){
log.info("log from class A")
}
}
Class B {
@PostCommit
void log(){
log.info("log from class B")
}
}
一个驱动类调用这些方法:
Class mainClass {
@Transactional
void transactionalMethod(Entity entity){
someOperation(entity)
log.info("inside transaction");
a.log();
b.log();
save(entity);
log.info("end of method");
}
}
执行后输出:
> inside transaction
> ** saving entity
> log from class A
> log from Class B
参考:
https://blog.csdn.net/weixin_35973945/article/details/115067904
https://blog.csdn.net/qq330983778/article/details/112255441