Spring/Spring boot中使用自定义事件监听

在实际开发过程中,常常遇到这种场景:
做完某一件事情以后,需要广播一些消息或者通知,告诉其他的模块进行一些事件处理,一般来说,可以一个一个发送请求去通知,但是有一种更好的方式,那就是事件监听,事件监听也是设计模式中 发布-订阅模式、观察者模式的一种实现。

观察者模式:在对象之间定义了一对多的依赖,这样一来,当一个对象改变状态,依赖它的对象会收到通知并自动更新。

Spring的事件为Bean和Bean之间的消息传递提供支持。当一个对象处理完某种任务后,通知另外的对象进行某些处理,常用的场景有进行某些操作后发送通知,消息、邮件等情况。
Spring提供5种标准的事件监听:

  1. 上下文更新事件(ContextRefreshedEvent):该事件会在ApplicationContext被初始化或者更新时发布。也可以在调用ConfigurableApplicationContext接口中的refresh()方法时被触发。
  2. 上下文开始事件(ContextStartedEvent):当容器ConfigurableApplicationContext的Start()方法开始/重新开始容器时触发该事件。
  3. 上下文停止事件(ContextStoppedEvent):当容ConfigurableApplicationContext的Stop()方法停止容器时触发该事件。
  4. 上下文关闭事件(ContextClosedEvent):当ApplicationContext被关闭时触发该事件。容器被关闭时,其管理的所有单例Bean都被销毁。
  5. 请求处理事件(RequestHandledEvent):在Web应用中,当一个http请求(request)结束触发该事件。

不过也有些实际场景并用不上框架提供的标准事件,这个时候我们就需要自定义事件监听
Spring的事件遵循的流程:

  1. 自定义事件,继承ApplicationEvent(org.springframework.context.ApplicationEvent)
  2. 定义监听事件,实现ApplicationListener(org.springframework.context.ApplicationListener)
  3. 使用容器触发事件

Spring中书写事件监听有两种方式,编程式和注解式,这里两种都进行简单的说明。因为现在正好在看boot方面的东西,这里就以boot项目为基础开启事件监听,boot也是spring全家桶的一部分,所以配置和使用和传统spring项目也没什么区别,当然,boot提倡无xml,所以这里开启监听的方式也用注解的方式进行。

一:自定义事件

package com.wzh.demo.event;

import org.springframework.context.ApplicationEvent;

/**
 * 自定义监听方法测试类
 */
public class MyEvent extends ApplicationEvent{


    private String msg;

    /**
     * 在自定义事件的构造方法中除了第一个source参数,其他参数都可以去自定义,
     * 可以根据项目实际情况进行监听传参,这里就只定义个简单的String字符串的透传
     * @param source
     * @param msg
     */
    public MyEvent(Object source,String msg) {
        super(source);
        this.msg = msg;
    }

    /**
     * 自定义监听器触发的透传打印方法
     * @param msg
     */
    public void printMsg(String msg)
    {
        System.out.println("编程事件监听:" + msg);
    }


    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}

二:事件监听

package com.wzh.demo.event.listener;

import com.wzh.demo.event.MyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * 测试用自定义监听器,监听事件为MyEvent
 */
@Component
public class MyLisenter implements ApplicationListener<MyEvent>{

    /**
     * 对监听到的事件进行处理
     * @param myEvent
     */
    @Override
    public void onApplicationEvent(MyEvent myEvent) {

        /*
          这里不做处理,只对消息进行透传打印,实际情况,
          可以根据项目进行逻辑进行处理
         */
        myEvent.printMsg(myEvent.getMsg());
    }
}

三:事件发布

package com.wzh.demo.service.impl;

import com.wzh.demo.event.MyEvent;
import com.wzh.demo.service.ListenerTestService;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;


import javax.annotation.Resource;

@Service("listenerService")
public class ListenerServieImpl implements ListenerTestService{

    /**
     *  上下文对象
     */
    @Resource
    private ApplicationContext applicationContext;

    @Override
    public void publish(String msg) {

        //通过上下文对象发布监听
        applicationContext.publishEvent(new MyEvent(this,msg));
        System.out.println(msg);

    }
}

junit测试类

import base.BaseJunit;
import com.wzh.demo.service.ListenerTestService;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

/**
 * 事件监听测试
 */
public class listenerTest extends BaseJunit {

    @Autowired
    @Qualifier(value = "listenerService")
    private ListenerTestService listenerTestService;

    @Test
    public void testEvent()
    {
        listenerTestService.publish("测试监听");
    }
}

image.png

我们看到,这里已经成功的监听了,当调用listenerTestService.publish方法时会触发监听,进行一些列业务操作,如果listenerTestService类中还有其他方法,不过没有调用发布监听,是不会触发监听器的。

监听器的第二种自定义方式,注解方式,第一步自定义事件的步骤是一样的,只是在事件监听的步骤不一样,发布的步骤也是一样的。

事件监听

package com.wzh.demo.event.listener;

import com.wzh.demo.event.MyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * 测试用注解监听器
 */
@Component
public class MyAnnotationListener {

    @EventListener
    public void listener1(MyEvent event)
    {
        System.out.println("注解事件监听-2:"+ event.getMsg());
    }

    @EventListener
    public void listener2(MyEvent event)
    {
        System.out.println("注解事件监听-1:"+ event.getMsg());
    }
}

运行junit测试类


image.png

从运行结果我们可以看到是先执行的注解事件监听,然后执行的编程事件监听。这个时候代码的执行顺序是线程同步的,在执行listenerTestService.publish的过程中,根据代码顺序,代码第一步是触发监听,这个时候就会执行监听,监听的方法执行完成后才会执行后面的代码。不过大多数情况,我们是希望代码异步执行,监听器和业务逻辑为两个线程执行,实现的方法很多,列如自己写个多线程,开个线程池什么的。这里也提供一个spring支持的@Async注解方式开启异步方法。这里因为用的boot,所以在boot的启动类中进行配置。boot讲究开箱即用,并不用专门去配置监听器什么的。这里只展示启动类,其他的什么application.properties,pom.xml这些配置boot项目的东西就不在这里做说明。

package com.wzh.application;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.apache.ibatis.io.VFS;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.tomcat.jdbc.pool.DataSource;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.support.SpringBootServletInitializer;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * Spring Boot 应用启动类,这里继承SpringBootServletInitializer并重写SpringApplicationBuilder方法
 * 是为了打包为war进行外部tomcat的部署
 */
@SpringBootApplication // Spring Boot 应用的标识
@EnableTransactionManagement // 启动注解事务 等同于传统Spring 项目中xml配置<tx:annotation-driven />
@EnableAsync //异步调用
@ComponentScan(basePackages = { "com.wzh"}) // 指定spring管理路径,就是那些bean 注解的路径
@MapperScan({ "com.wzh.**.mapper" }) // mapper 接口类扫描包配置,两个*为目录通配符
public class Application extends SpringBootServletInitializer{

    // 程序启动入口
    public static void main(String[] args) {
        // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件
        SpringApplication.run(Application.class, args);
    }

    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        return application.sources(Application.class);
    }

    // 创建数据源,因为用是mybatis-spring 1.2 取消了数据源的自动注入,所以这里需要手动配置
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")// 指定数据源的前缀 ,在application.properties文件中指定
    public DataSource dataSource() {
        return new DataSource();
    }

    // 创建SqlSessionFactory
    @Bean
    public SqlSessionFactory sqlSessionFactoryBean() throws Exception {

        //解决myBatis下 不能嵌套jar文件的问题
        VFS.addImplClass(SpringBootVFS.class);
        
        SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
        sqlSessionFactoryBean.setDataSource(dataSource());

        PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();

        // 两个*为目录通配符
        sqlSessionFactoryBean.setMapperLocations(resolver.getResources("classpath:/mapper/**/*.xml"));

        //指定扫描别名包的路径,多个bean的扫描路径,拼接以分号隔开
        String typeAliasesPackage = "com.wzh.demo.domain;";
    
        sqlSessionFactoryBean.setTypeAliasesPackage(typeAliasesPackage);
        return sqlSessionFactoryBean.getObject();
    }

    // 创建事物管理器
    @Bean
    public PlatformTransactionManager transactionManager() {

        return new DataSourceTransactionManager(dataSource());
    }

    //自定义线程池,当配置多个executor时,被@Async("id")指定使用;也被作为线程名的前缀
    @Bean
    public AsyncTaskExecutor taskExecutor()
    {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        //线程池名字
        executor.setThreadNamePrefix("asyncExecutor");
        //最大线程数
        executor.setMaxPoolSize(50);
        //最小线程数
        executor.setCorePoolSize(3);

        // 使用预定义的异常处理类
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());


        // 自定义拒绝策略
        /*executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                //........
            }
        });*/
        return executor;
    }
}

一些参数的用法,这里是xml中配置的参数的用法,我们用的是代码的操作,调用对应方法的意义也是一样的。
id:当配置多个executor时,被@Async(“id”)指定使用;也被作为线程名的前缀。
pool-size:
core size:最小的线程数,缺省:1
max size:最大的线程数,缺省:Integer.MAX_VALUE
queue-capacity:当最小的线程数已经被占用满后,新的任务会被放进queue里面,当这个queue的capacity也被占满之后,pool里面会创建新线程处理这个任务,直到总线程数达到了max size,这时系统会拒绝这个任务并抛出TaskRejectedException异常(缺省配置的情况下,可以通过rejection-policy来决定如何处理这种情况)。缺省值为:Integer.MAX_VALUE
keep-alive:超过core size的那些线程,任务完成后,再经过这个时长(秒)会被结束掉
rejection-policy:当pool已经达到max size的时候,如何处理新任务
ABORT(缺省):抛出TaskRejectedException异常,然后不执行
DISCARD:不执行,也不抛出异常
DISCARD_OLDEST:丢弃queue中最旧的那个任务
CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行

稍微修改监听,在监听器三做个@Async,并做个sleep,方便测试。

package com.wzh.demo.event.listener;

import com.wzh.demo.event.MyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * 测试用注解监听器
 */
@Component
public class MyAnnotationListener {

    @EventListener
    public void listener1(MyEvent event)
    {
        System.out.println("注解事件监听-2:"+ event.getMsg());
    }

    @EventListener
    @Async
    public void listener2(MyEvent event)
    {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("注解事件监听-1:"+ event.getMsg());
    }
}

运行junit测试类


image.png

这里有看到报错java.lang.InterruptedException: sleep interrupted做个错误是因为代码在使用单元测试因为单元测试启动的主线程很快就结束了,而子线程确sleep3秒,使得主线程强行打断子线程的sleep,因此抛出异常,解决办法是可以在单元测试的最后加上sleep(4*1000),目的是不让主线程在子线程前结束。这里我们暂时忽略做个错误,单纯看控制台输出,我们看到,我们先咨询了注解监听2,然后执行到注解监听1方法的时候休眠3秒,因为注解监听1是个异步方法,会开启一个子线程,做个时候子线程休眠3秒,主线程继续执行编程事务监听,然后执行原本方法中的打印操作,子线程休眠时间到后执行的子线程注解监听1的打印。

到这里就完成了解了自定义事件监听的使用方式了,等有空了再研究标准的事件监听,那个多用于项目启动的时候一些常年或者权限的加载。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容