在业务开发过程中,业务逻辑可能非常复杂,核心业务 + 多个子业务,比如,下单之后,发送通知、监控埋点、记录日志……,如果都放在一起,代码会很臃肿。而且有两个问题,一个是业务耦合,一个是串行耗时。
还有一些业务场景不需要在一次请求中同步完成,比如邮件发送、短信发送等。对于这样的场景可以使用 MQ ,使用 MQ会增加系统设计的复杂性,还要考虑消息丢失、消息重复等问题。所以,一般在开发的时候,都会把这些操作抽象成观察者模式,也就是发布/订阅模式,下面给大家介绍一下 Spring Event。
简介
Spring Event(Application Event)其实就是一个观察者模式。观察者模式,含有主题(针对该主题的事件),发布者(发布主题或事件),订阅者(监听主题的人)。有三个部分组成,事件(ApplicationEvent)、监听器(ApplicationListener)和事件发布操作。
事件:描述发生了什么事情、比如说请求处理完成、Spring 容器刷新完毕 事件源:事件的产生者、任何一个事件都必须有一个事件源。比如请求处理完成的事件源就是 DispatcherServlet 、Spring 容器刷新完毕的事件源就是 ApplicationContext 事件广播器:事件和事件监听器的桥梁、负责把事件通知给事件监听器 事件监听器:监听事件的发生、可以在监听器中做一些处理
事件实现方式
ApplicationEvent:事件,每个实现类表示一类事件,可携带数据。 ApplicationListener:事件监听器,用于接收事件处理时间。 ApplicationEventMulticaster:事件管理者,用于事件监听器的注册和事件的广播。 ApplicationEventPublisher:事件发布者,委托ApplicationEventMulticaster完成事件发布。
事件
public abstract class ApplicationEvent extends EventObject { /** use serialVersionUID from Spring 1.2 for interoperability */ private static final long serialVersionUID = 7099057708183571937L; /** System time when the event happened */ private final long timestamp; /** * Create a new ApplicationEvent. * @param source the object on which the event initially occurred (never {@code null}) */ public ApplicationEvent(Object source) { super(source); this.timestamp = System.currentTimeMillis(); } /** * Return the system time in milliseconds when the event happened. */ public final long getTimestamp() { return this.timestamp; }}
ApplicationEvent:应用事件携带一个 Objecgt 对象,可以被发布,source表示事件源。
可以继承ApplicationEvent自定义事件。
public class TestEvent extends ApplicationEvent { private String message; public TestEvent(String message) { super(message); this.message = message; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; }}
事件监听器
事件监听器,有两种实现方式,一种是实现 ApplicationListener 接口,另一种是使用@EventListener 注解。
ApplicationListener:事件监听器,职责为处理事件广播器发布的事件。
@FunctionalInterfacepublic interface ApplicationListener<E extends ApplicationEvent> extends EventListener { void onApplicationEvent(E event);}
ApplicationListener 有两个实现接口:SmartApplicationListener和GenericApplicationListener
@Slf4j@Servicepublic class OrderLogListener implements ApplicationListener<OrderEvent> { @Override public void onApplicationEvent(PlaceOrderEvent event) { log.info("[afterPlaceOrder] log."); }}
@Log4j2@Componentpublic class AEventListener implements ApplicationListener<TestEvent> { @Async @EventListener public void listener(TestEvent event) throws InterruptedException { log.info("监听到数据:{}", event.getMessage()); }}
事件广播器
事件广播器,将EventPubsher(事件发布者)发布的event 广播给事件EventListener(事件监听器)。
Spring提供了默认的实现SimpleApplicationEventMulticaster,如果用户没有配置自定义事件广播器, 则会默认使用SimpleApplicationEventMulticaster作为事件广播器。在容器刷新的过程中会实例化、初始化事件广播器。
ApplicationContext 本来就实现了ApplicationEventPublisher接口,因此应用上下文本来就是一个事件发布者,在AbstractApplicationContext中实现了事件发布的业务。
注入ApplicationContext 发布事件
@Autowiredprivate ApplicationContext applicationContext;applicationContext.publishEvent();
注入ApplicationEventPublisher发布事件
@Autowiredprivate ApplicationEventPublisher applicationEventPublisher;applicationEventPublisher.publishEvent(myEvent)
实现ApplicationEventPublisherAware 发布事件
@Servicepublic class PublishEvent implements ApplicationEventPublisherAware { public static ApplicationEventPublisher eventPublisher = null; @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { eventPublisher.publishEvent("123"); }}
Spring Event 同步模式
定义事件
public class TestEvent extends ApplicationEvent { private String message; public TestEvent(String message) { super(message); this.message = message; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; }}
监听器
@Component@Slf4jpublic class ListenerService { @EventListener public void listener(TestEvent event) throws InterruptedException { Thread.sleep(5000); log.info("监听到数据:{}", event.getMessage()); }}
发布事件
@Servicepublic class PublishService { @Autowired private ApplicationEventPublisher applicationEventPublisher; public void publish(String message) { applicationEventPublisher.publishEvent(new TestEvent(message)); }}
@AutowiredPublishService publishService;@RequestMapping("publishMsg")public void publishMsg() { for (int i = 0; i < 5; i++) { publishService.publish("消息" + (i + 1)); }}
2022-12-07 15:45:12.825 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息1 2022-12-07 15:45:17.838 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息2 2022-12-07 15:45:22.842 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息3 2022-12-07 15:45:27.857 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息4 2022-12-07 15:45:32.870 INFO 16304 --- [nio-8080-exec-1] com.test.service.ListenerService : 监听到数据:消息5
从结果可以看出,只有处理完一个事件后才会处理下一个事件,这就是同步模式
Spring Event 异步模式
@EnableAsync@SpringBootApplicationpublic class Application { public static void main(String[] args) { SpringApplication.run(Application.class,args); }}
Listener 类需要开启异步的方法增加 @Async 注解:
@EventListener@Asyncpublic void listener(TestEvent event) throws InterruptedException { Thread.sleep(5000); log.info("监听到数据:{}", event.getMessage());}
2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-5] com.test.service.ListenerService : 监听到数据:消息5 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-4] com.test.service.ListenerService : 监听到数据:消息4 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-1] com.test.service.ListenerService : 监听到数据:消息1 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-2] com.test.service.ListenerService : 监听到数据:消息2 2022-12-07 15:50:00.986 INFO 3964 --- [cTaskExecutor-3] com.test.service.ListenerService : 监听到数据:消息3
可以看到,事件监听器不会阻塞,多个事件可以同时进行。
自定义线程池
@Async默认线程池为 SimpleAsyncTaskExecutor,不是真的线程池,这个类不重用线程,默认每次调用都会创建一个新的线程。
实现接口 AsyncConfigurer
继承 AsyncConfigurerSupport
配置由自定义的 TaskExecutor 替代内置的任务执行器
@Configurationpublic class TaskPoolConfig { @Bean(name = "asyncExecutor") public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("asyncExecutor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; }}
@EventListener @Async("asyncExecutor") public void listener(TestEvent event) throws InterruptedException { Thread.sleep(5000); log.info("监听到数据:{}", event.getMessage()); }
总结
使用 Spring Event 实现发布/订阅模式,可以对一些业务进行解耦。
本文使用 文章同步助手 同步