再续RxBus--RxJava实现事件总线

大家好,我叫石头

前言

事件总线出现的原因:为了使组件之间的通信变得简单,深度解耦!
说白了就是切断组件之间的直接联系,采用 发布/订阅 的模式(观察者模式


相信我们很多人都用过EventBus或者Otto来作为我们APP中的事件总线,所以我们会有这样的困惑,RxBus真的能替代EventBus吗?
那接下来我们就先来分析分析下:

This project is deprecated in favor of RxJava and RxAndroid. These projects permit the same event-driven programming model as Otto, but they’re more capable and offer better control of threading.
该项目已被RxJavaRxAndroid取代。Rx类项目允许与Otto类似的事件驱动编程模型,而且能力更强,操作线程更方便。

Otto已经停止开发了,所以我们只需对比EventBus和RxBus了。

对于EventBus和RxBus的比较我们要先明白 一个完美的事件总线应该具备哪些功能?

  • 容易订阅事件:事件订阅者只要声明自己就好了,当事件发生时自然会被调到。订阅和取消可以方便绑定到Activity和Fragment的生命周期上。

  • 容易发送事件:事件发送者直接发送就好了,其他的事都不管。

  • 方便的切换线程:有些事必须主线程干,有些事必须非主线程干,所以这个还是要说清楚。

  • 性能:随着应用的成长,总线可能会被重度使用,性能一定要好。

纠结到底是用EventBus还是RxBus的朋友可以参考这篇文章--RxBus真的能替代EventBus吗?


接下来我们就开始RxBus之旅了---------------

一、添加RxJava和RxAndroid依赖

//RxJava and RxAndroid
compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.0'

顺便说下,我们是用的rxjava1.X的版本,现在也有了rxjava2.x的版本,他们之间有些区别,感兴趣的朋友可以去看看。

二、建立RxBus类

import java.util.HashMap;

import rx.Observable;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subscriptions.CompositeSubscription;

/**
 * Created by shitou on 2017/4/26.
 */

public class RxBus {
    private static volatile RxBus mInstance;
     /**
     * PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者
     */
    private SerializedSubject<Object, Object> mSubject;
    private HashMap<String, CompositeSubscription> mSubscriptionMap;

    private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
    }

    public static RxBus getInstance() {
        if (mInstance == null) {
            synchronized (RxBus.class) {
                if (mInstance == null) {
                    mInstance = new RxBus();
                }
            }
        }
        return mInstance;
    }

    /**
     * 发送事件
     */
    public void post(Object o) {
        mSubject.onNext(o);
    }

    /**
     * 是否已有观察者订阅
     */
    public boolean hasObservers() {
        return mSubject.hasObservers();
    }

    /**
     * 一个默认的订阅方法
     */
    public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
        return toObservable(type)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(next, error);
    }

    /**
     * 返回指定类型的Observable实例
     */
    public <T> Observable<T> toObservable(final Class<T> type) {
        return mSubject.ofType(type);
    }

    /**
     * 保存订阅后的subscription
     */
    public void addSubscription(Object o, Subscription subscription) {
        if (mSubscriptionMap == null) {
            mSubscriptionMap = new HashMap<>();
        }
        String key = o.getClass().getName();
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).add(subscription);
        } else {
            CompositeSubscription compositeSubscription = new CompositeSubscription();
            compositeSubscription.add(subscription);
            mSubscriptionMap.put(key, compositeSubscription);
        }
    }

    /**
     * 取消订阅
     */
    public void unSubscribe(Object o) {
        if (mSubscriptionMap == null) {
            return;
        }

        String key = o.getClass().getName();
        if (!mSubscriptionMap.containsKey(key)){
            return;
        }
        if (mSubscriptionMap.get(key) != null) {
            mSubscriptionMap.get(key).unsubscribe();
        }

        mSubscriptionMap.remove(key);
    }
}

在RxJava中有个Subject类,它继承Observable类,同时实现了Observer接口,因此Subject可以同时担当订阅者和被订阅者的角色,这里我们使用Subject的子类PublishSubject来创建一个Subject对象(PublishSubject只有被订阅后才会把接收到的事件立刻发送给订阅者
Rxjava中,订阅操作会返回一个Subscription对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个Subscription对象,我们可以用一个CompositeSubscription存储起来,以进行批量的取消订阅。

由于Subject类是非线程安全的,所以我们通过它的子类SerializedSubjectPublishSubject转换成一个线程安全的Subject对象。

public <T> Observable<T> toObservable(final Class<T> type) {
        return mSubject.ofType(type);
    }

ofType()方法能过滤掉不符合条件的事件类型(比如你的type是EventType1.class,那么就只能输出EventType1.class的类型),然后将满足条件的事件类型通过cast()方法,转换成对应类型的Observable对象,这是在源码中转换的。

/**
 * 一个默认的订阅方法
 */
    public <T> Subscription doSubscribe(Class<T> type, Action1<T> next, Action1<Throwable> error) {
        return toObservable(type)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(next, error);
    }

上面的方法封装了订阅方法,并且指定了执行线程,我们只需要传入type(事件类型)next(成功的Action1)error(错误的Action1),其实你也可以根据你自己的需要封装自己的doSubscribe方法,来简化代码。

在需要发送事件的地方调用post()方法,它间接的通过mSubject.onNext(o);将事件发送给订阅者。
同时RxBus提供了addSubscription()unSubscribe()方法,分别来保存订阅时返回的`Subscription对象,以及取消订阅。

实战一

主线程(UI线程)发送String类型的事件

button点击事件代码

mButton1 = (Button) findViewById(R.id.button);
mButton1.setOnClickListener(new View.OnClickListener() {
     @Override
     public void onClick(View v) {
         //在主线程中发送String类型的事件
         RxBus.getInstance().post("hello RxBus!");
     }
});

onCreate中实现下面代码

Subscription subscription = RxBus.getInstance()
                .toObservable(String.class)  
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        mTextView.setText("接收的事件内容"+s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.e(TAG, "error");
                    }
                });

之后我们可以把subscription对象保存到HashMap<String, CompositeSubscription>集合中去。

 RxBus.getInstance().addSubscription(this,subscription);

这样当我们点击button时,textview就收到了消息。

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

protected void onDestroy() {
        super.onDestroy();
        RxBus.getInstance().unSubscribe(this);
    }

实战二

在子线程中发送Integer类型的事件

button点击事件代码

mButton2 = (Button) findViewById(R.id.button2);
mButton2.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        RxBus.getInstance().post(1234);
                    }
                }).start();
            }
        });

在onCreate中实现下面代码

Subscription subscription1 = RxBus.getInstance()
                .doSubscribe(Integer.class, new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        mTextView.setText("接收的事件内容"+integer);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.e(TAG, "error");
                    }
                });

之后我们可以把subscription对象保存到HashMap<String, CompositeSubscription>集合中去。

RxBus.getInstance().addSubscription(this,subscription1);

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

protected void onDestroy() {
        super.onDestroy();
        RxBus.getInstance().unSubscribe(this);
    }

上面的都是发送的基本数据类型,那么我们能不能发送自己封装的类型呢?答案是:肯定行的!

实战三

创建你要发送的事件类

下面我们来创建一个学生类:StudentEvent

public class StudentEvent {
    private String id;
    private String name;

    public StudentEvent(String id, String name) {
        this.id = id;
        this.name = name;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

发送事件

RxBus.getInstance().post(new StudentEvent("110","小明"));

注册和接收事件

Subscription subscription2 = RxBus.getInstance()
                .toObservable(StudentEvent.class)
                .observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<StudentEvent>() {
                    @Override
                    public void call(StudentEvent studentEvent) {
                        String id = studentEvent.getId();
                        String name = studentEvent.getName();
                        mTextView.setText("学生的id:"+id+" 名字:"+name);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {

                    }
                });

最后,一定要记得在生命周期结束的地方取消订阅事件,防止RxJava可能会引起的内存泄漏问题。

protected void onDestroy() {
        super.onDestroy();
        RxBus.getInstance().unSubscribe(this);
    }

实战四

广播中发送事件,订阅方式按照实战一的方式。
定义一个检测网络状态的广播:

public class NetworkChangeReceiver extends BroadcastReceiver {
    @Override
    public void onReceive(Context context, Intent intent) {
        ConnectivityManager manager = (ConnectivityManager) context.getSystemService(Context.CONNECTIVITY_SERVICE);
        NetworkInfo networkInfo = manager.getActiveNetworkInfo();
        if (networkInfo != null && networkInfo.isAvailable()) {
            RxBus.getInstance().post("网络连接成功");
        } else {
            RxBus.getInstance().post("网络不可用");
        }
    }
}

在网络可用与不可用时发送提示事件,然后在onCreate()方法中注册广播:

private void registerReceiver() {
        IntentFilter intentFilter = new IntentFilter();
        intentFilter.addAction("android.net.conn.CONNECTIVITY_CHANGE");
        mReceiver = new NetworkChangeReceiver();
        registerReceiver(mReceiver, intentFilter);
    }

最后不要忘了在onDestory()中对广播进行取消注册,以及取消订阅。

protected void onDestroy() {
        super.onDestroy();
        unregisterReceiver(mReceiver);
        RxBus.getInstance().unSubscribe(this);
}

到这里我们实现了几种事件传送,但是细心的童鞋可能发现我们在上面的例子中都是先订阅 事件,然后发送 事件(因为我们是用的PublishSubject,PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者,这在前面我们提到过),如果我们反过来,先发送了事件,再进行订阅操作,怎么保证发送的事件不丢失呢?也就是EventBus中的StickyEven功能。RxBus--支持Sticky事件里面讲解了Subject的4种实现,有兴趣的朋友可以去看看。

最后推荐一些RxJava的学习资源:RxJava入门给 Android 开发者的 RxJava 详解

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容