前言
大家好,这几天都在思考一个问题:像一些特定场景需要触发一些动作,如何做到代码的解耦,而不是显式的调用,这样我想起了一句话:计算机科学领域的任何问题,都可以通过添加一个中间层来解决。这里通过查阅相关的设计模式,发现观察者模式很好的解决了该问题。
首先先看看观察者模式的定义:定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。
刚刚说了,计算机的问题都可以通过一个中间层解决,Spring的观察者模式是如何通过中间层解决的,这里给出案:applicationEventMulticaster(事件广播器)。
spring的观察者模式的实现是使用事件驱动模型来实现的:ApplicationEventPublisher发布事件给中间层applicationEventMulticaster,由其来通过事件类型ApplicationEvent的判断来选择ApplicationListener,并以广播的形式(for循环)来通知(调用)真正的ApplicationListener实现的具体方法。
先来看看Spring观察者模式的类图关系:
可以看到这类图中包含了四个重要的角色:
事件的发布者(ApplicationEventPublisher):这个为被观察者对象,其void publishEvent(ApplicationEvent event)方法将事件发布出去;在Spring-boot中其实现为上下文:AnnotationConfigEmbeddedWebApplicationContext(基于注解使用的上下文)。
事件类型(ApplicationEvent):事件类型为事件发布者和事件监听者的信息传输介质,使用者继承该类,定制目标监听器能识别的信息。
事件广播器(ApplicationEventMulticaster):该类是整个观察者的核心,接收发布者的事件推送并选择适当的事件监听器进行事件的准确分发。
事件监听器(ApplicationListener):该组件为观察者对象,由用户自行实现其void onApplicationEvent(E event)方法,定制业务逻辑。
简单使用
接下来简单的讲下 如何使用(基于Springboot的web应用):
定义事件类型
package com.observer.message;
import org.springframework.context.ApplicationEvent;
public class MessageEvent extends ApplicationEvent {
public MessageEvent(Object source) {
super(source);
}
}
定义事件监听器
@Component
@Slf4j
public class MessageListener implements ApplicationListener<MessageEvent> {
@Override
public void onApplicationEvent(MessageEvent event) {
log.info("receive message!!! source = " + event.getSource());
}
}
事件发布
@Component
@Slf4j
public class MessageComponent implements InitializingBean {
/**
* 直接注入该事件发布器
*/
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void sendMessage() {
log.info("send message");
//创建事件对象
MessageEvent messageEvent = new MessageEvent(new Integer(666));
applicationEventPublisher.publishEvent(messageEvent);
}
/**
* 实现InitializingBean的方法,在bean初始化后调用事件发布方法
*/
@Override
public void afterPropertiesSet() {
sendMessage();
}
}
启动项目,则会在控制台收到消息:
[ restartedMain] com.observer.message.MessageComponent : send message
[ restartedMain] com.observer.message.MessageListener : receive message!!! source = 666
可以看到其调用是在线程名为restartedMain执行的(这里引入了spring-boot-devtools工具,所以线程名为restartedMain,否则为main线程),可以看到已生效。
实现原理
对该代码MessageComponent :applicationEventPublisher.publishEvent(messageEvent)打断点追踪代码
其调用核心为AbstractApplicationContext里的protected void publishEvent(Object event, ResolvableType eventType):
/**
* Publish the given event to all listeners.
* @param event the event to publish (may be an {@link ApplicationEvent}
* or a payload object to be turned into a {@link PayloadApplicationEvent})
* @param eventType the resolved event type, if known
* @since 4.2
*/
protected void publishEvent(Object event, ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
if (logger.isTraceEnabled()) {
logger.trace("Publishing event in " + getDisplayName() + ": " + event);
}
// Decorate event as an ApplicationEvent if necessary
//对事件进行装饰,若事件为ApplicationEvent类型,则将其转换为ApplicationEvent,若为特殊对象,则使用PayloadApplicationEvent对其进行包装,转换为ApplicationEvent,并获取其事件类型
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<Object>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
核心代码为:getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
@Override
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
invokeListener(listener, event);
}
});
}
else {
invokeListener(listener, event);
}
}
}
该实现为SimpleApplicationEventMulticaster,首先获取事件的类型,并获取其适合的监听器(getApplicationListeners),若该ApplicationEventMulticaster(自定义)中配置了线程池,则使用线程池调用(异步调用),若无,则做同步调用。
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
//监听器实现的方法
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass().getName())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
// -> let's suppress the exception and just log a debug message.
Log logger = LogFactory.getLog(getClass());
if (logger.isDebugEnabled()) {
logger.debug("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}
可以看到,最终调用则为listener.onApplicationEvent(event),该方法需要我们自己实现。
其调用栈为:
其中涉及了事件发布,事件的处理,以及事件监听三步流程。
疑点一 ApplicationEventMulticaster如何初始化以及其如何加载ApplicatonListener
核心方法为AbstractApplicationContext的refresh
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
......
// Initialize event multicaster for this context.
initApplicationEventMulticaster();
// Check for listener beans and register them.
registerListeners();
.......
}
......
}
}
这里省略了大部分的代码,我们只关注核心的两个方法:
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
//public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME = "applicationEventMulticaster";
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isDebugEnabled()) {
logger.debug("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isDebugEnabled()) {
logger.debug("Unable to locate ApplicationEventMulticaster with name '" +
APPLICATION_EVENT_MULTICASTER_BEAN_NAME +
"': using default [" + this.applicationEventMulticaster + "]");
}
}
}
可以看到,若在启动类中我们有自己创建名字为applicationEventMulticaster类型为ApplicationEventMulticaster.class的bean则将其塞入applicationContext中,若没有,则实例化一个SimpleApplicationEventMulticaster做为applicationEventMulticaster。
/**
* Add beans that implement ApplicationListener as listeners.
* Doesn't affect other listeners, which can be added without being beans.
*/
protected void registerListeners() {
// Register statically specified listeners first.
// 把提前存储好的监听器添加到监听器容器中
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
//获取类型是ApplicationListener的beanName集合,此处不会去实例化bean
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// Publish early application events now that we finally have a multicaster...
// 如果存在earlyEventsToProcess,提前处理这些事件
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
这里会将监听器提前注册进来(上图的15个对象),或者是将Listener的beanName先获取到,后续在实际获取Listener的时候,会从该Set中通过
ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class)获取Listener(例如用户自己实现的Listenner-上图的messageListener)若有需要处理的事件,则会提前处理。
疑点二 如何对监听器进行异步调用
上边我们看到,applicationEventMulticaster 在进行监听器广播时,会查看其是否已经有线程池属性taskExecutor,若不为空,则使用其进行线程池调用,这里我们可以自己定义applicationEventMulticaster Bean,并传入线程池对象。
@Bean
public TaskExecutor mutiExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("muti-Executor");
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
return executor;
}
@Bean
public SimpleApplicationEventMulticaster applicationEventMulticaster(TaskExecutor mutiExecutor) {
SimpleApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster();
applicationEventMulticaster.setTaskExecutor(mutiExecutor);
return applicationEventMulticaster;
}
启动工程:
2020-01-20 15:21:42.617 INFO 8484 --- [ restartedMain] com.observer.message.MessageComponent : send message
2020-01-20 15:21:42.624 INFO 8484 --- [ muti-Executor3] com.observer.message.MessageListener : receive message!!! source = 666
发现,其已是通过线程池做了异步调用。
但是,该定义有缺点:这会使得所有的事件监听都是使用异步调用,不够灵活,是否有一种方式的粒度更细,只针对想要做异步的监听才走异步调用。
Spring中提供了异步调用,其实现原理为动态代理并使用线程池执行特定的方法。
使用@EnableAsync @Async两个注解实现,其原理不在这里介绍。
@Bean
public AsyncTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("async-Executor");
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
// 设置拒绝策略
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// .....
}
});
// 使用预定义的异常处理类
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
先定义个异步执行的线程池;
@SpringBootApplication
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
在工程启动类中添加@EnableAsync注解
@Component
@Slf4j
public class MessageListener implements ApplicationListener<MessageEvent> {
@Override
@Async
public void onApplicationEvent(MessageEvent event) {
log.info("receive message!!! source = " + event.getSource());
}
}
在需要异步调用的方法中使用@Async
查看效果:
2020-01-20 15:32:50.102 INFO 4304 --- [ restartedMain] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'taskExecutor'
2020-01-20 15:32:50.107 INFO 4304 --- [async-Executor1] com.observer.message.MessageListener : receive message!!! source = 666
可以看到确实是使用了自定义的线程池的方式,这种方式比较灵活,需要异步的监听执行就在其方法上添加注解。
疑点三 如何让监听器有一个优先级调用
实现SmartApplicationListener接口即可:
@Component
@Slf4j
public class MessageListener implements SmartApplicationListener {
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return eventType == MessageEvent.class;
}
@Override
public boolean supportsSourceType(Class<?> sourceType) {
return sourceType == Integer.class;
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
log.info("MessageListener receive msg: " + event.getSource());
}
@Override
public int getOrder() {
return 7778;
}
}
@Component
@Slf4j
public class Message2Listener implements SmartApplicationListener {
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return eventType == MessageEvent.class;
}
@Override
public boolean supportsSourceType(Class<?> sourceType) {
return sourceType == Integer.class;
}
@Override
public void onApplicationEvent(ApplicationEvent event) {
log.info("Message2Listener receive msg: " + event.getSource());
}
@Override
public int getOrder() {
return 888;
}
}
这里使用两个Listener来做对比,其接受的ApplicationEvent是MessageEvent,还有其souceType为Integer类型。
其中,MessageListener的order为7778,Message2Listener的order为888,
order越小,优先级越大,可以预判,先触发Message2Listener的onApplicationEvent方法。
0-01-20 18:33:23.198 INFO 16724 --- [ restartedMain] com.observer.message.MessageComponent : send message
2020-01-20 18:33:23.202 INFO 16724 --- [ restartedMain] com.observer.message.Message2Listener : Message2Listener receive msg: 666
2020-01-20 18:33:23.202 INFO 16724 --- [ restartedMain] com.observer.message.MessageListener : MessageListener receive msg: 666
事实也是如此,现在我们看看源码是如何实现的,并调用其SmartApplicationListener方法的。
让我们回顾下,核心代码为:
@Override
public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
invokeListener(listener, event);
}
});
}
else {
invokeListener(listener, event);
}
}
}
这边我们来分析getApplicationListeners(event, type))方法,因为该方法实际返回的是LinkedList,遍历迭代器则是按照其先后顺序访问的,这里则是获取合适的Listeners和做一个优先级排序。
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
Object source = event.getSource();
Class<?> sourceType = (source != null ? source.getClass() : null);
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
// Quick check for existing entry on ConcurrentHashMap...
ListenerRetriever retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
if (this.beanClassLoader == null ||
(ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) &&
(sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader)))) {
// Fully synchronized building and caching of a ListenerRetriever
synchronized (this.retrievalMutex) {
retriever = this.retrieverCache.get(cacheKey);
if (retriever != null) {
return retriever.getApplicationListeners();
}
retriever = new ListenerRetriever(true);
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
this.retrieverCache.put(cacheKey, retriever);
return listeners;
}
}
else {
// No ListenerRetriever caching -> no synchronization necessary
return retrieveApplicationListeners(eventType, sourceType, null);
}
}
首先先通过eventType和sourceType构建一个cacheKey,
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
final Map<ListenerCacheKey, ListenerRetriever> retrieverCache =
new ConcurrentHashMap<ListenerCacheKey, ListenerRetriever>(64);
然后从retrieverCache中获取Listener,若不为空,则直接返回,若为空,则接着往下走,核心代码如下:
Collection<ApplicationListener<?>> listeners =
retrieveApplicationListeners(eventType, sourceType, retriever);
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, Class<?> sourceType, ListenerRetriever retriever) {
LinkedList<ApplicationListener<?>> allListeners = new LinkedList<ApplicationListener<?>>();
Set<ApplicationListener<?>> listeners;
Set<String> listenerBeans;
synchronized (this.retrievalMutex) {
listeners = new LinkedHashSet<ApplicationListener<?>>(this.defaultRetriever.applicationListeners);
listenerBeans = new LinkedHashSet<String>(this.defaultRetriever.applicationListenerBeans);
}
for (ApplicationListener<?> listener : listeners) {
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
retriever.applicationListeners.add(listener);
}
allListeners.add(listener);
}
}
if (!listenerBeans.isEmpty()) {
BeanFactory beanFactory = getBeanFactory();
for (String listenerBeanName : listenerBeans) {
try {
Class<?> listenerType = beanFactory.getType(listenerBeanName);
if (listenerType == null || supportsEvent(listenerType, eventType)) {
ApplicationListener<?> listener =
beanFactory.getBean(listenerBeanName, ApplicationListener.class);
if (!allListeners.contains(listener) && supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
retriever.applicationListenerBeans.add(listenerBeanName);
}
allListeners.add(listener);
}
}
}
catch (NoSuchBeanDefinitionException ex) {
// Singleton listener instance (without backing bean definition) disappeared -
// probably in the middle of the destruction phase
}
}
}
AnnotationAwareOrderComparator.sort(allListeners);
return allListeners;
}
第一个for循环则是判断第一次在refresh中获取的listener,判断其是否满足eventType和sourceType的要求:
protected boolean supportsEvent(ApplicationListener<?> listener, ResolvableType eventType, Class<?> sourceType) {
GenericApplicationListener smartListener = (listener instanceof GenericApplicationListener ?
(GenericApplicationListener) listener : new GenericApplicationListenerAdapter(listener));
return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType));
}
由于我们的Listener是SmartApplicationListener,则需要使用GenericApplicationListenerAdapter适配器(适配器模式)进行封装,并调用相应的方法,
最终会调用SmartApplicationListener的supportsSourceType、supportsEventType方法,即为我们自己实现的方法。
if (!listenerBeans.isEmpty()) 该判断条件则是之前未实例化的Listener,若不为空,则需要一样的判断其是否满足要求并将其放入allListeners中。
最终,会调用AnnotationAwareOrderComparator.sort(allListeners)该方法,
该方法即为实现次序的方法,若Listener未能实现Ordered接口,则其优先级则为最低,否则按照其getOrder()返回的int类型进行排序,数字越小则优先级越高,则返回的list中遍历的时候越先执行。
到此,Spring的观察者模式就先介绍到这里。