1、参考
2、前言
公司项目中有个发送站内信的功能,是使用ApplicationEvent
和ApplicationListener
实现的,对于这一块,之前未接触过,在该项目中,其具体功能如下:
-
ApplicationContext
发送ApplicationEvent
类型的站内信 -
ApplicationListener
接收到站内信信息,进行入库处理,createTime
直接写死成了new Date()
- 用户界面读取入库后的数据,根据
createTime
倒序呈现给用户
产品经理要求站内信需要按发送的顺序呈现给用户,那问题来了:
ApplicationListener
接收到Event
信息时,是否是有序的呢?
3、Demo
基于spring boot
创建一个简易的ApplicationEvent
使用demo,基于该demo执行并发测试,初步验证是否为有序接收
3.1、CustomEvent
自定义Event
,继承ApplicationEvent
,并实现构造方法
public class CustomEvent extends ApplicationEvent {
/**
* Create a new ApplicationEvent.
*
* @param source the object on which the event initially occurred (never {@code null})
*/
public CustomEvent(Object source) {
super(source);
}
}
3.2、CustomPublisher
Event
事件发布者,此处定义publish
方法作为发送入口,提供给controller
调用,方便测试
@Component
public class CustomPublisher {
@Autowired
private ApplicationContext applicationContext;
public void publish(CustomEvent customEvent) {
applicationContext.publishEvent(customEvent);
}
}
除了使用注解的方式,spring也提供了实现
ApplicationEventPublisherAware
接口的方式,具体可戳参考链接
3.3、CustomListener
Event事件监听器,接收Event事件并对其进行处理,此处进行简单的打印
@Component
public class CustomListener {
@EventListener
public void listen(CustomEvent customEvent) {
System.out.println(customEvent.getSource());
}
}
除了使用注解的方式,spring也提供了
ApplicationListener
接口的方式,具体可戳参考链接
3.4、测试
@RestController
public class RestController {
@Autowired
private CustomPublisher customPublisher;
int i = 0;
@GetMapping(value = "/publish")
public void publish() {
CustomEvent customEvent = new CustomEvent(i++);
customPublisher.publish(customEvent);
}
}
使用postman
的runner
功能,创建如下runner
,进行并发测试:
从结果来看,事件接收是有序的,查阅官网,可以看到这么一句话:
You can register as many event listeners as you wish, but note that, by default, event listeners receive events synchronously. This means that the publishEvent() method blocks until all listeners have finished processing the event.
大致意思是:event listeners
默认上同步接收events
,这意味着,publicEvent()
方法将会阻塞直至所有的listeners
处理完event
为止。
4、源码
追溯源码,查看ApplicationEvent
的发布和接收功能是如何实现的,首先,程序入口位于ApplicationContext#publishEvent
:
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
方法内调用了重载的publicEvent
,而该重载publishEvent
是一个接口方法,有以下实现:
显然,具体的实现类是AbstractApplicationContext#publishEvent
:
@Override
public void publishEvent(Object event) {
publishEvent(event, null);
}
继续调用重载方法publishEvent(Object event, @Nullable ResolvableType eventType)
:
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
if (logger.isTraceEnabled()) {
logger.trace("Publishing event in " + getDisplayName() + ": " + event);
}
// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
逻辑可以总结为以下三点:
- 将入参
event
包装成applicationEvent
- 交由多播器进行消息发布(
multicastEvent
) - 若父级上下文不为空,则父级上下文同样需要进行消息发布
显然,核心方法在于multicastEvent
:
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
Executor executor = getTaskExecutor();
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
这里使用了线程池,调用了invokeListener
:
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
继续调用doInvokeListener
:
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
// ……
}
}
onApplicationEvent
,是ApplicationListener
接口类中的方法,在3.3节中,有提及自定义监听器可以继承ApplicationListener
接口并实现onApplicationEvent
方法进行监听,若是使用该种方法,则事件的发送和接收处理的源码就已经追溯完毕,不过本文中使用的是注解,故需要继续跟踪下去,onApplicationEvent
的实现类颇多:
通过debug验证,
ApplicationListenerMethodAdapter
才是注解类listener
的具体实现:
public void onApplicationEvent(ApplicationEvent event) {
processEvent(event);
}
查看processEvent
:
public void processEvent(ApplicationEvent event) {
Object[] args = resolveArguments(event);
if (shouldHandle(event, args)) {
Object result = doInvoke(args);
if (result != null) {
handleResult(result);
}
else {
logger.trace("No result object given - no result to handle");
}
}
}
可以看到核心在于doInvoke
:
protected Object doInvoke(Object... args) {
Object bean = getTargetBean();
ReflectionUtils.makeAccessible(this.method);
try {
return this.method.invoke(bean, args);
}
// ……
}
这里使用了反射,调用method
方法,那么,method
是什么时候赋值,具体值又是什么呢,对method
进行Find Usages
,可以看到:
public ApplicationListenerMethodAdapter(String beanName, Class<?> targetClass, Method method) {
this.beanName = beanName;
this.method = BridgeMethodResolver.findBridgedMethod(method);
// ……
}
method
是在ApplicationListenerMethodAdapter
构造方法中赋值的,debug,可以看到具体值为:
public void com.kungyu.rabbitmq.CustomListener.listen(com.kungyu.rabbitmq.CustomEvent)
也就是我们自定义的监听方法,那么,ApplicationListenerMethodAdapter
构造方法何时被调用,继续Find Usages
:
public class DefaultEventListenerFactory implements EventListenerFactory, Ordered {
// ……
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
return new ApplicationListenerMethodAdapter(beanName, type, method);
}
}
对createApplicationListener
进行Find Usages
:
protected void processBean(
final List<EventListenerFactory> factories, final String beanName, final Class<?> targetType) {
// ……
ApplicationListener<?> applicationListener =
factory.createApplicationListener(beanName, targetType, methodToUse);
// ……
}
对processBean
进行Find Usages
,可以看到是在afterSingletonsInstantiated
进行调用的,而afterSingletonsInstantiated
是类实例化相关的内容,此处不予展开
5、总结
从测试结果来看,ApplicationListener
是有序接收消息的,从源码来看,其实现方式也不算过于复杂,是对知识点的一个很好的补充。另外,在中间件横行的时代,业务角度上貌似这一功能没怎么被使用,出于解耦的目的,应该都会用MQ实现该功能了。