作者:shihuaping0918@163.com转载请注明作者
spring通过Event来通信,说白了就是bean之间数据交换。Event承载消息内容,回调负责接收处理内容。这些回调是注册到spring框架里的,由spring框架来调用。自spring4开始,引入了一个注解来方便用户使用消息机制,这个注解就是 @EventListener,之前需要实现ApplicationListener接口。Event怎么使用不是文章要包含的内容,它的使用也很简单,网上的例子也非常多,自行搜索即可。
spring有一些内部的事件,ContextStartedEvent、ContextStoppedEvent、ContextClosedEvent、ContextRefreshedEvent,这几个Event都是和框架应用上下文有关的。其实就是spring框架生命周期相关的事件,如果用户需要在这些生命周期的时间点做一些工作,就需要响应对应的事件。
除了内部的Event,用户还可以自定义自己的Event,用来做应用内部通信,从java项目对spring的依赖来看,基本上所有的项目都要引入spring,所以用它的Event机制来做内部通信是可以的。
下面来分析一下event机制。在AbstractApplicationContext这个类中,有一个refresh方法,
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// Prepare this context for refreshing.
prepareRefresh();
// Tell the subclass to refresh the internal bean factory.
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
// Prepare the bean factory for use in this context.
prepareBeanFactory(beanFactory);
try {
// Allows post-processing of the bean factory in context subclasses.
postProcessBeanFactory(beanFactory);
// Invoke factory processors registered as beans in the context.
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
registerBeanPostProcessors(beanFactory);
// Initialize message source for this context.
initMessageSource();
// Initialize event multicaster for this context.
initApplicationEventMulticaster(); //这里初始化了发布器
// Initialize other special beans in specific context subclasses.
onRefresh();
// Check for listener beans and register them.
registerListeners(); //注册listener
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory); //这里对event listener做了处理
// Last step: publish corresponding event.
finishRefresh();
}
catch (BeansException ex) {
if (logger.isWarnEnabled()) {
logger.warn("Exception encountered during context initialization - " +
"cancelling refresh attempt: " + ex);
}
// Destroy already created singletons to avoid dangling resources.
destroyBeans();
// Reset 'active' flag.
cancelRefresh(ex);
// Propagate exception to caller.
throw ex;
}
finally {
// Reset common introspection caches in Spring's core, since we
// might not ever need metadata for singleton beans anymore...
resetCommonCaches();
}
}
}
refresh方法前面讲过一部分了,前面讲的是xml文件的读取加载。现在要关注的是event相关的内容。在refresh方法里面,finishBeanFactoryInitialization这一行对event listener做了一些处理,至于是什么处理放到最后面再分析,因为内容比较多,先看一下registerListeners这一行,这一行实际上做的工作就是把event listener添加到发布器上。看一下代码:
protected void registerListeners() {
// Register statically specified listeners first.
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener); //添加
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName); //添加
}
// Publish early application events now that we finally have a multicaster...
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
可见event listener是附加到发布器上的。那么事件发布和这些发布器是什么关系,又是怎么被event listener处理的呢?先看一下最简单的publishEvent
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); //发布
}
// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
从这里看出来,发布工作实际上是转交给了发布器,我们去看一下发布器的代码。从initApplicationEventMulticaster方法去找发布器。
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); //看它就行了
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
跟到SimpleApplicationEventMulticaster类中的multicastEvent里面。
@Override
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event)); //调用event listener,异步,非阻塞
}
else {
invokeListener(listener, event); //调用event listener,同步,阻塞
}
}
}
再跟一下,看看invokeListener干了什么。
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event); //看下面的方法
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event); //看下面的方法
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event); //注意这里
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
// -> let's suppress the exception and just log a debug message.
Log logger = LogFactory.getLog(getClass());
if (logger.isTraceEnabled()) {
logger.trace("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}
可以看出来,最终是调了listener.onApplicationEvent(event),也就是调用了event listener的onApplicationEvent方法。到此为止,我们就大概了解了,编写event listener,注册event listener,还有发布event,发布以后publisher/multicaster调了event listern的onApplicationEvent。整个流程梳理i清楚了。
现在回过头来看finishBeanFactoryInitialization这个方法,这个方法里对event listener做了封装,为什么要做封装呢,因为使用注解方式的方法不能直接被spring框架使用,需要生成一个代理类才行。下面来挖一挖它的老底,到底这个封装是怎么实现的。
protected void finishBeanFactoryInitialization(ConfigurableListableBeanFactory beanFactory) {
// Initialize conversion service for this context.
if (beanFactory.containsBean(CONVERSION_SERVICE_BEAN_NAME) &&
beanFactory.isTypeMatch(CONVERSION_SERVICE_BEAN_NAME, ConversionService.class)) {
beanFactory.setConversionService(
beanFactory.getBean(CONVERSION_SERVICE_BEAN_NAME, ConversionService.class));
}
// Register a default embedded value resolver if no bean post-processor
// (such as a PropertyPlaceholderConfigurer bean) registered any before:
// at this point, primarily for resolution in annotation attribute values.
if (!beanFactory.hasEmbeddedValueResolver()) {
beanFactory.addEmbeddedValueResolver(strVal -> getEnvironment().resolvePlaceholders(strVal));
}
// Initialize LoadTimeWeaverAware beans early to allow for registering their transformers early.
String[] weaverAwareNames = beanFactory.getBeanNamesForType(LoadTimeWeaverAware.class, false, false);
for (String weaverAwareName : weaverAwareNames) {
getBean(weaverAwareName);
}
// Stop using the temporary ClassLoader for type matching.
beanFactory.setTempClassLoader(null);
// Allow for caching all bean definition metadata, not expecting further changes.
beanFactory.freezeConfiguration();
// Instantiate all remaining (non-lazy-init) singletons.
beanFactory.preInstantiateSingletons(); //跳转
}
跳到DefaultListableBeanFactory类里面,找到preInstantiateSingletons
@Override
public void preInstantiateSingletons() throws BeansException {
if (logger.isTraceEnabled()) {
logger.trace("Pre-instantiating singletons in " + this);
}
// Iterate over a copy to allow for init methods which in turn register new bean definitions.
// While this may not be part of the regular factory bootstrap, it does otherwise work fine.
List<String> beanNames = new ArrayList<>(this.beanDefinitionNames);
// Trigger initialization of all non-lazy singleton beans...
for (String beanName : beanNames) {
RootBeanDefinition bd = getMergedLocalBeanDefinition(beanName);
if (!bd.isAbstract() && bd.isSingleton() && !bd.isLazyInit()) {
if (isFactoryBean(beanName)) {
Object bean = getBean(FACTORY_BEAN_PREFIX + beanName);
if (bean instanceof FactoryBean) {
final FactoryBean<?> factory = (FactoryBean<?>) bean;
boolean isEagerInit;
if (System.getSecurityManager() != null && factory instanceof SmartFactoryBean) {
isEagerInit = AccessController.doPrivileged((PrivilegedAction<Boolean>)
((SmartFactoryBean<?>) factory)::isEagerInit,
getAccessControlContext());
}
else {
isEagerInit = (factory instanceof SmartFactoryBean &&
((SmartFactoryBean<?>) factory).isEagerInit());
}
if (isEagerInit) {
getBean(beanName);
}
}
}
else {
getBean(beanName);
}
}
}
// Trigger post-initialization callback for all applicable beans...
for (String beanName : beanNames) {
Object singletonInstance = getSingleton(beanName);
if (singletonInstance instanceof SmartInitializingSingleton) {
final SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance;
if (System.getSecurityManager() != null) {
AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
smartSingleton.afterSingletonsInstantiated();
return null;
}, getAccessControlContext());
}
else {
smartSingleton.afterSingletonsInstantiated(); //看这里
}
}
}
}
这里的SmartInitializingSingleton是一个接口,所有实现了这个接口,并且被加载到类厂的类,都会调用afterSingletonsInstantiated方法。从代码上来看,EventListenerMethodProcessor这个类正好实现了这个接口。
public class EventListenerMethodProcessor
implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor {
跳到EventListenerMethodProcessor这个类里面。
@Override
public void afterSingletonsInstantiated() {
ConfigurableListableBeanFactory beanFactory = this.beanFactory;
Assert.state(this.beanFactory != null, "No ConfigurableListableBeanFactory set");
String[] beanNames = beanFactory.getBeanNamesForType(Object.class);
for (String beanName : beanNames) {
if (!ScopedProxyUtils.isScopedTarget(beanName)) {
Class<?> type = null;
try {
type = AutoProxyUtils.determineTargetClass(beanFactory, beanName);
}
catch (Throwable ex) {
// An unresolvable bean type, probably from a lazy bean - let's ignore it.
if (logger.isDebugEnabled()) {
logger.debug("Could not resolve target class for bean with name '" + beanName + "'", ex);
}
}
if (type != null) {
if (ScopedObject.class.isAssignableFrom(type)) {
try {
Class<?> targetClass = AutoProxyUtils.determineTargetClass(
beanFactory, ScopedProxyUtils.getTargetBeanName(beanName));
if (targetClass != null) {
type = targetClass;
}
}
catch (Throwable ex) {
// An invalid scoped proxy arrangement - let's ignore it.
if (logger.isDebugEnabled()) {
logger.debug("Could not resolve target bean for scoped proxy '" + beanName + "'", ex);
}
}
}
try {
processBean(beanName, type); //跳转
}
catch (Throwable ex) {
throw new BeanInitializationException("Failed to process @EventListener " +
"annotation on bean with name '" + beanName + "'", ex);
}
}
}
}
}
跳转到processBean
private void processBean(final String beanName, final Class<?> targetType) {
if (!this.nonAnnotatedClasses.contains(targetType) &&
AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&
!isSpringContainerClass(targetType)) {
Map<Method, EventListener> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(targetType,
(MethodIntrospector.MetadataLookup<EventListener>) method ->
AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
}
catch (Throwable ex) {
// An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.
if (logger.isDebugEnabled()) {
logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);
}
}
if (CollectionUtils.isEmpty(annotatedMethods)) {
this.nonAnnotatedClasses.add(targetType);
if (logger.isTraceEnabled()) {
logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());
}
}
else {
// Non-empty set of methods
ConfigurableApplicationContext context = this.applicationContext;
Assert.state(context != null, "No ApplicationContext set");
List<EventListenerFactory> factories = this.eventListenerFactories;
Assert.state(factories != null, "EventListenerFactory List not initialized");
for (Method method : annotatedMethods.keySet()) {
for (EventListenerFactory factory : factories) {
if (factory.supportsMethod(method)) {
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
ApplicationListener<?> applicationListener =
factory.createApplicationListener(beanName, targetType, methodToUse);//注意这一行,这里对event listner做了一次封装
if (applicationListener instanceof ApplicationListenerMethodAdapter) {
((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
context.addApplicationListener(applicationListener);
break;
}
}
}
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +
beanName + "': " + annotatedMethods);
}
}
}
}
类厂DefaultEventListenerFactory,它的代码很简单。
@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
return new ApplicationListenerMethodAdapter(beanName, type, method);
}
类厂DefaultEventListenerFactory对event listener做了一次封装,封装的类是ApplicationListenerMethodAdapter,adapter设计模式。这个类是用反射对原有的event listener做了一次封装。
@Override
public void onApplicationEvent(ApplicationEvent event) {
processEvent(event);
}
这个onApplicationEvent就和前面对应上了。再看一下processEvent还有什么内容。
/**
* Process the specified {@link ApplicationEvent}, checking if the condition
* matches and handling a non-null result, if any.
*/
public void processEvent(ApplicationEvent event) {
Object[] args = resolveArguments(event);
if (shouldHandle(event, args)) {
Object result = doInvoke(args);
if (result != null) {
handleResult(result); //如果不返回空
}
else {
logger.trace("No result object given - no result to handle");
}
}
}
protected void handleResult(Object result) {
if (reactiveStreamsPresent && new ReactiveResultHandler().subscribeToPublisher(result)) {
if (logger.isTraceEnabled()) {
logger.trace("Adapted to reactive result: " + result);
}
}
else if (result instanceof CompletionStage) {
((CompletionStage<?>) result).whenComplete((event, ex) -> {
if (ex != null) {
handleAsyncError(ex);
}
else if (event != null) {
publishEvent(event); //返回了事件
}
});
}
else if (result instanceof ListenableFuture) {
((ListenableFuture<?>) result).addCallback(this::publishEvents, this::handleAsyncError);
}
else {
publishEvents(result);
}
}
private void publishEvents(Object result) {
if (result.getClass().isArray()) { //数组?
Object[] events = ObjectUtils.toObjectArray(result);
for (Object event : events) {
publishEvent(event);
}
}
else if (result instanceof Collection<?>) { //集合
Collection<?> events = (Collection<?>) result;
for (Object event : events) {
publishEvent(event);
}
}
else { //普通类
publishEvent(result);
}
}
private void publishEvent(@Nullable Object event) {
if (event != null) {
Assert.notNull(this.applicationContext, "ApplicationContext must not be null");
this.applicationContext.publishEvent(event); //继续发布event
}
}
由上面的代码可见,如果event listener返回的是事件,这个事件会被发布。这篇文章到这里就结束了,希望能让大家明白sprint中event机制是怎么运行的。event机制和refresh方法还有一定的关联。spring中注解相关的代码比较难懂,我还没有完全吃透,所以注解部分这里就没有提到了,注解的部分如果要讲的话,篇幅肯定很大,起码也要七八篇才能讲得清楚。暂时作为远期计划吧。
写在最后:接单,有后台活java/cpp/lua/go联系shihuaping0918@163.com。不上班了也要有点收入才行。