SOFA 源码分析— 事件总线

前言

大部分框架都是事件订阅功能,即观察者模式,或者叫事件机制。通过订阅某个事件,当触发事件时,回调某个方法。该功能非常的好用,而 SOFA 内部也设计了这个功能,并且内部大量使用了该功能。来看看是如何设计的。

源码分析

核心类有 3 个:

  • EventBus 事件总线
  • Event 事件,即被观察者
  • Subscriber 订阅者,即观察者

Subscriber 是个抽象类, 子类需要自己实现 onEvent 方法,即回调方法。还有一个是否同步执行的参数。

EventBus 类实现了注册功能,反注册功能(删除)。事件发生时通知订阅者功能。

内部使用一个“大型数据结构”保存事件和订阅者的信息。

ConcurrentHashMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>> SUBSCRIBER_MAP

所有相关信息都保存在该数据结构中。

看看注册功能。

public static void register(Class<? extends Event> eventClass, Subscriber subscriber) {
    CopyOnWriteArraySet<Subscriber> set = SUBSCRIBER_MAP.get(eventClass);
    if (set == null) {
        set = new CopyOnWriteArraySet<Subscriber>();
        CopyOnWriteArraySet<Subscriber> old = SUBSCRIBER_MAP.putIfAbsent(eventClass, set);
        if (old != null) {
            set = old;
        }
    }
    set.add(subscriber);
}

参数为 一个事件对象,一个订阅对象。

首先从 Map 中根据事件的 Class 获取对应的订阅者集合,注意,这里都是用的并发容器。

下面的判断有点意思,考虑到并发的情况,如果第一次获取 Set 是 null,则尝试创建一个并放进 Map,这里使用的并不是 put 方法,而是 putIfAbsent 方法,该方法作用等同于:

  if (!map.containsKey(key)) 
      return map.put(key, value);
  else
       return map.get(key);

所以,这里再一次考虑并发问题,如果这个间隙有其他线程 put 了,就可以获取到那个线程 put 的 Set。很谨慎。而且性能相比较锁要好很多。虽然这个方法并发量不会很高,但也是一种性能优化。

如果发生了并发,就使用已有的 Set,然后将 Set 放置到 Map 中,完成事件和订阅者的映射。

再看看取消注册方法。

public static void unRegister(Class<? extends Event> eventClass, Subscriber subscriber) {
    CopyOnWriteArraySet<Subscriber> set = SUBSCRIBER_MAP.get(eventClass);
    if (set != null) {
        set.remove(subscriber);
    }
}

很简单,就是直接删除。

再看看通知功能:

    public static void post(final Event event) {
        if (!isEnable()) {
            return;
        }
        CopyOnWriteArraySet<Subscriber> subscribers = SUBSCRIBER_MAP.get(event.getClass());
        if (CommonUtils.isNotEmpty(subscribers)) {
            for (final Subscriber subscriber : subscribers) {
                if (subscriber.isSync()) {
                    handleEvent(subscriber, event);
                } else { // 异步
                    AsyncRuntime.getAsyncThreadPool().execute(
                        new Runnable() {
                            @Override
                            public void run() {
                                handleEvent(subscriber, event);
                            }
                        });
                }
            }
        }
    }

首先看是否开启了总线功能,在性能测试的时候,可能是关闭的。

如果开启了,就根据给定的时间找到订阅者,循环调用 handleEvent 方法(其实就是调用订阅者的 onEvent 方法)。

这里有一个是否异步的判断,如果异步的,则在异步线程池执行。

这个异步线程池 AsyncRuntime 可以看一下:

public static ThreadPoolExecutor getAsyncThreadPool(boolean build) {
    if (asyncThreadPool == null && build) {
        synchronized (AsyncRuntime.class) {
            if (asyncThreadPool == null && build) {
                // 一些系统参数,可以从配置或者注册中心获取。
                int coresize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_CORE);
                int maxsize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_MAX);
                int queuesize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_QUEUE);
                int keepAliveTime = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_TIME);

                BlockingQueue<Runnable> queue = ThreadPoolUtils.buildQueue(queuesize);
                NamedThreadFactory threadFactory = new NamedThreadFactory("SOFA-RPC-CB", true);

                RejectedExecutionHandler handler = new RejectedExecutionHandler() {
                    private int i = 1;

                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        if (i++ % 7 == 0) {
                            i = 1;
                            if (LOGGER.isWarnEnabled()) {
                                LOGGER.warn("Task:{} has been reject because of threadPool exhausted!" +
                                    " pool:{}, active:{}, queue:{}, taskcnt: {}", r,
                                    executor.getPoolSize(),
                                    executor.getActiveCount(),
                                    executor.getQueue().size(),
                                    executor.getTaskCount());
                            }
                        }
                        throw new RejectedExecutionException("Callback handler thread pool has bean exhausted");
                    }
                };
                asyncThreadPool = ThreadPoolUtils.newCachedThreadPool(
                    coresize, maxsize, keepAliveTime, queue, threadFactory, handler);
            }
        }
    }
    return asyncThreadPool;
}

这里也做了双重检查锁。

默认核心线程大小 10,最大 200, 队列大小 256, 回收时间 60 秒。

因此,获取的队列就是 LinkedBlockingQueue。

这里的拒绝策略很有意思,每失败 6 次,打印详细信息,当前线程数,活动线程数量,队列 size, 任务总数,不知道为什么这么设计(6次??)。

目前框架中 Event 的实现很多,我们在之前的源码分析中也看到很多了。而订阅者目前只有一个 FaultToleranceSubscriber。用于容错处理。是 FaultToleranceModule 模块的功能。该功能也是个扩展点,当系统初始化的时候,会注册 ClientSyncReceiveEvent 事件和 ClientAsyncReceiveEvent。

总结

这个事件总线功能真是观察者模式的最佳实践,通过系统中发生的事件,能够让外部模块感知到并进行处理,比如上面介绍的容错模块。当发生订阅的事件后,外部模块能够响应,很完美。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 225,124评论 6 523
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,453评论 3 404
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 172,386评论 0 368
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,136评论 1 301
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,142评论 6 400
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,593评论 1 315
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,958评论 3 429
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,944评论 0 279
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,477评论 1 324
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,512评论 3 346
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,639评论 1 355
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,227评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,971评论 3 340
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,397评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,550评论 1 277
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,203评论 3 381
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,713评论 2 366

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,750评论 18 139
  • 1.ios高性能编程 (1).内层 最小的内层平均值和峰值(2).耗电量 高效的算法和数据结构(3).初始化时...
    欧辰_OSR阅读 29,437评论 8 265
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 172,456评论 25 707
  • 当append操作一个切片的时候,如果操作之后的切片没有超过原始切片的容量(cap)值时,新产生的切片与操作的切片...
    cheyongzi阅读 2,246评论 2 0
  • ccc
    menspg阅读 113评论 0 0