RxBus学习之旅--从入门到提高

在公司的技术分享会上,做了关于RxBus的学习分享,记录如下:

一.RxBus与RxJava

一次RxJava调用过程可以划分为以下环节:

  • 创建观察内容 (片段1)
  • 数据处理/映射(片段2)
  • 选择线程(片段3)
  • 订阅(片段4,片段5)
  • 完成/错误处理(片段6)

示例代码:

Observable
                // 片段1
            .create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onStart();
                    String trim = mainEd.getText().toString().trim();
                    subscriber.onNext(trim);
                    subscriber.onError(new Throwable());
                    subscriber.onCompleted();
                }
            }) 
            // 片段2
            .map(new Func1<String, String>() {
                @Override
                public String call(String s) {
                    return s + " sugarya";
                }
            })
            // 片段3
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())  
            // 片段4
            .subscribe(
                    // 片段5
                    new Subscriber<String>() {

                @Override
                public void onStart() {
                    super.onStart();
                }

                //片段6
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(String s) {
                    mainTv2.setText(s);
                }
            });

二.RxBus与接口回调

一次完整的接口回调,包括四个步骤:

  • 接口定义
  • 接口调用
  • 接口实现
  • 接口注入

RxBus的使用过程,就是一个接口回调的过程。

接口定义,在RxJava定义好了。

上面示例代码片段1里的suscriber.onNext(),onStart(),onError(),onComplete()对应接口调用。

代码片段5,片段6这些是接口实现

注入的过程是调用suscribe()方法订阅的过程

三.RxBus源码分析

RxBus的代码实现如下:

public class RxBus {

private static volatile RxBus instance;
private final Subject<Object, Object> _bus;


private RxBus() {
    _bus = new SerializedSubject<>(PublishSubject.create());
}

public static RxBus getInstance() {
    if (null == instance) {
        synchronized (RxBus.class) {
            if (null == instance) {
                instance = new RxBus();
            }
        }
    }
    return instance;
}

public void send(Object object) {
    try{
        _bus.onNext(object);
    }catch (Exception e){
        e.printStackTrace();
    }
}

public boolean hasObservers() {
    return _bus.hasObservers();
}



private <T> Observable<T> toObservable(final Class<T> type) {
    return _bus.ofType(type);//filter + cast
}







public <T> Subscription toSubscription(final Class<T> type, Observer<T> observer) {
    return toObservable(type).subscribe(observer);
}

public <T> Subscription toSubscription(final Class<T> type, Action1<T> action1) {
    return toObservable(type).subscribe(action1);
}

public <T> Subscription toSubscription(final Class<T> type, Action1<T> action1, Action1<Throwable> errorAction1) {
    return toObservable(type).subscribe(action1,errorAction1);
}
}

接下来对上述代码做些简要分析:

volatile

  • 保证instance可见性
  • 禁止指令重排

这里涉及到Java内存模型,相关资料: 传送门

SerializedSubject

  • SerializedSubject extends Subject extends Observable implements Observer,既是观察内容,又是观察者,起到桥梁/数据转发的作用

  • 保证多线程安全,Subject 当作一个 Subscriber 使用,从多个线程中调用它的onNext方法(包括其它的on系列方法)

PublishSubject

主题,RxJava里有四种主题

  • PublishSubject
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

PublishSubject的含义是:在订阅者订阅的时间点之后的数据发送给观察者

ofType操作符 = filter操作符 + cast操作符

  • filter只有符合过滤条件的数据才会被“发射”
  • cast将一个Observable转换成指定类型的Observable

CompositeSubscription

该对象作为subscription的容器,方便统一取消订阅

四.RxBus异常处理

当RxBus在执行过程中,任意环节发生了错误异常,订阅关系就会被取消。之后再次发送,将无法执行订阅后的回调。

做了一个数组越界的错误来演示

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    ButterKnife.bind(this);
    onRxBus();
}


private void onRxBus() {
    int[] array = new int[2];
    RxBus.getInstance().toSubscription(Integer.class, new Subscriber<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {
            //onRxBus();
            Log.e(TAG, "mSubscriber17 onError: " + e.toString());
        }

        @Override
        public void onNext(Integer integer) {
            array[integer] = integer;
            Log.e(TAG, "mSubscriber17 onNext:" + integer);
        }
    });
}

@OnClick(R.id.btn_main17)
void onClick17() {
    RxBus.getInstance().send(2);
}

这时候点击button按钮,数组越界异常,第二次再点击Button发送消息,没有响应了。这里,RxBus,确切的说是RxJava捕获到错误异常,就会取消订阅关系。

E/MainActivity: mSubscriber17 onError: java.lang.ArrayIndexOutOfBoundsException: length=2; index=2

解决的思路:

  • 使用try-catch捕获异常,不让异常被RxJava捕获
  • 在onError里重新订阅。

接下来说说第二种方法,具体怎么操作,其实就是在onError方法里,重新执行一遍订阅,执行上述注释掉的代码onRxBus,就解决问题了。

五.小结

上述其实是这次技术分享大纲,技术分享准备功课前,刷了下面的文章,要对RxBus有更细致的学习和了解,可以阅读:(推荐)

谢三弟系列

RxBus简单实现

RxBus深入源码解析

RxJava里onError异常处理

Yokey系列

RxJava实现事件总线

RxBus异常处理

RxBus实现Sticky事件(粘性订阅)

其他

RxBus从基础实现到升级——打造属于自己的RxBus

EventBus和RxBus实现和性能比较

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容