事件驱动模型的初次使用

个人理解:事件驱动,顾名思义,当某一个事件发生时,会有其他监听该事件发生的方法伴随着该事件的执行而触发。因此,事件驱动中的组成就很明确了,一个事件、一个调用事件的事件源、多个监听者。

案例:在我的工作中,有这样的需求,一个游戏服务器需要对玩家的数据进行持久化,但是玩家的数据又会关联到很多其他的实体,例如玩家的背包、宠物等等。如果将所有的保存逻辑都放在玩家数据保存的代码块中,那么每当我新增一个关联玩家的实体时,就要添加新的代码进来,这样对于一个分模块的游戏服务器来说是很不友好的。因此,我打算将保存玩家数据作为一个事件,在我完成对玩家数据的保存操作以后就调用该事件,让监听该事件的其他方法随之触发,例如保存背包和保存宠物等。这样,只需要把这些方法注册为保存玩家数据事件的监听者,就能避免在玩家数据保存的代码块中不断的新增其余的逻辑了。

代码如下:我这是参考了Google的EventBus事件总线,自己写了一个简化版的事件驱动。

/**
 * 事件抽象类,代表基础事件
 *
 * @author Administrator
 */
public interface BaseEvent {

    /**
     * 得到事件的拥有者
     * @return
     */
    default Object getOwner() {
        return "system";
    }

}
/**
 * 利用Spring管理Bean,来对bean进行扫描,注册监听者。
 *
 * @author Administrator
 */
@Component("eventRegisterBeanProcessor")
public class EventRegisterBeanProcessor implements BeanPostProcessor {

    @Resource
    EventBus eventBus;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        // 注册bean中存在@Subscribe注解的方法(注册消息订阅)
        eventBus.register(bean);
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        return bean;
    }

}
/**
 * 消息注册器,用于寻找符合的Bean,此处为寻找添加了@Subscribe注解的方法,并注册进入集合当中。
 * 具体的流程在代码中有注释说明。
 *
 * @author Administrator
 */
public class SubscriberRegistry {

    /**
     * 事件类型,**所有** 消息订阅者 的集合
     */
    private final Map<Class<? extends BaseEvent>, CopyOnWriteArraySet<Subscriber>> eventSubscribersMap = new ConcurrentHashMap<>();





    /**
     * 注册bean中存在@Subscribe注解的方法(注册消息订阅)
     *
     * @param bean listener
     */
    public void register(Object bean) {
        Map<Class<? extends BaseEvent>, Subscriber> eventSubscriberMap = getAllEventSubscriber(bean);
        for (Entry<Class<? extends BaseEvent>, Subscriber> subscriberEntry : eventSubscriberMap.entrySet()) {
            Class<? extends BaseEvent> event = subscriberEntry.getKey();
            eventSubscribersMap.putIfAbsent(event, new CopyOnWriteArraySet<>());
            CopyOnWriteArraySet<Subscriber> subscriberSet = eventSubscribersMap.get(event);
            subscriberSet.add(subscriberEntry.getValue());
        }
    }

    // 注意:bean(也就是一个listener)中有method 和 @Subscribe注解 和 event事件参数
    // 1.遍历bean中的method,找到有@Subscribe注解的method,得到method当中的event事件参数的 类型
    // 2.保存eventSubscribeMap =  map<event, new subscriber(bean, method)>
    // 3.遍历eventSubscribeMap的event,为每个event创建一个CopyOnWriteArraySet(如果未创建)
    // 4.将eventSubscribeMap的value保存进CopyOnWriteArraySet

    private Map<Class<? extends BaseEvent>, Subscriber> getAllEventSubscriber(Object bean) {
        Map<Class<? extends BaseEvent>, Subscriber> eventSubscribeMap = new HashMap<>();
        Method[] declaredMethods = bean.getClass().getDeclaredMethods();
        for (Method method : declaredMethods) {
            Class<?>[] parameterTypes = method.getParameterTypes();
            // 如果方法method上有@Subscribe注解,即消息处理
            if (method.isAnnotationPresent(Subscribe.class)) {
                if (parameterTypes.length != 1) {
                    throw new IllegalArgumentException();
                }
                // 如果参数是event类型
                if (BaseEvent.class.isAssignableFrom(parameterTypes[0])) {
                    // 得到event类型
                    Class<? extends BaseEvent> event = (Class<? extends BaseEvent>) parameterTypes[0];
                    // 添加进map
                    eventSubscribeMap.put(event, new Subscriber(bean, method));
                }
            }
        }
        return eventSubscribeMap;
    }

    /**
     * 根据event类型找到所有的消息执行方法
     * @param eventType
     * @return
     */
    public Set<Subscriber> getSubscribersByEvent(Class<? extends BaseEvent> eventType) {
        if (!eventSubscribersMap.containsKey(eventType)) {
            return Collections.emptySet();
        }
        return eventSubscribersMap.get(eventType);
    }

}
/**
 * @Subscribe注解,即文中提到的监听者,注解的方法会伴随着事件的执行而触发。
 * 订阅消息(行为)
 * 一个被标注了@Subscribe注解的方法就是一个订阅者。
 *
 * @author Administrator
 */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {

}
/**
 * 消息订阅者(类方法)
 * 实际上就是监听者的类。
 * 可以这样理解,监听者所对应的对象。
 * 即,method对应标注了@Subscribe注解的方法。
 * listener对应其所在的类。
 * 举个例子:Test类中有一个test()方法,test()方法是被标注了@Subscribe注解的。
 * 那么,此处就会有一个Subscriber对象,listener就是Test类;method就是test()方法。
 *
 * @author Administrator
 */
public class Subscriber {

    /**
     * 消息监听器
     */
    private Object listener;

    /**
     * 消息执行者
     */
    private Method method;

    public Subscriber(Object listener, Method method) {
        this.listener = listener;
        this.method = method;
    }

    /**
     * 执行消息
     * @param event 事件
     */
    public void handleEvent(BaseEvent event) {
        try {
            method.invoke(listener, event);
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        } catch (InvocationTargetException e) {
            e.printStackTrace();
        }
    }
}
/**
 * 事件总线,用于注册监听者以及处理事件。
 *
 * @author Administrator
 */
@Slf4j
@Component("eventBus")
public class EventBus {


    private ThreadPoolExecutor threadPoolExecutor;

    /**
     * 初始化事件总线线程池
     */
    public void initEventBusPool() {
        ThreadFactory businessThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("EventBus thread-%d")
                .setUncaughtExceptionHandler((thread, exception) -> exception.printStackTrace())
                .build();
        // 创建线程数量为1、无界任务队列的线程池。
        threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), businessThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
    }

    private SubscriberRegistry registry = new SubscriberRegistry();

    /**
     * 注册的目的是,通过事件的类型,找到 所有 对应的subscriber(跟事件关联的方法)
     * @param bean
     */
    public void register(Object bean) {
        registry.register(bean);
    }


    /**
     * 处理消息
     * 当有事件发布到事件总线中,事件总线遍历所有的订阅者进行事件处理
     *
     * @param event
     */
    public void post(BaseEvent event) {
        // 得到事件类型
        Class<? extends BaseEvent> eventType = event.getClass();
        // 通过事件类型找到所有要处理的消息(事件订阅者)
        Set<Subscriber> subscribers = registry.getSubscribersByEvent(eventType);
        for (Subscriber subscriber : subscribers) {
            // 处理事件
            subscriber.handleEvent(event);
        }
    }

    /**
     * 异步处理事件
     * @param event
     */
    public void asyncPost(BaseEvent event) {
        this.threadPoolExecutor.execute(() -> post(event));
    }
}
/**
 * 一个测试Subscriber
 * @author Administrator
 */
@Slf4j
public class Test {

    /**
    * 测试案例中的保存玩家数据
    */
    @Subscribe
    public void testPlayerSaveEvent(PlayerSaveEvent playerSaveEvent) {
        Player p = playerSaveEvent.getOwner();
        log.info("=========" + p.getName() + "=========");
    }

}
/**
 * 角色数据保存事件
 *
 * @author Administrator
 */
public class PlayerSaveEvent implements BaseEvent {

    private Player player;

    public PlayerSaveEvent(Player player) {
        this.player = player;
    }

    @Override
    public Player getOwner() {
        return player;
    }
}
/**
* 玩家管理类,保存玩家数据,并调用对应的保存事件
*/
public TestSavePlayer {
    
    /**
     * 保存角色数据
     * 不管角色增加了什么,都不影响,不用修改这里的代码。
     * 保存角色是一个事件,用监听器监听保存角色,一旦监听成功,驱动保存背包、保存宠物等事件一起执行
     *
     * @param playerId
     */
    public void savePlayer(Long playerId) {
        // 通过playerId得到Player
        Player player = playerCache.getPlayerByPlayerId(playerId);
        if (player != null) {
            playerDao.save(dbPlayer);
            // 处理玩家数据保存事件
            eventBus.post(new PlayerSaveEvent(player));

        }
    }
}

代码就是这样,一旦保存了玩家数据,即调用了TestSavePlayer中的savePlayer()方法时,Test中的testPlayerSaveEvent()方法就会随之触发。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355