RxJava 从入门到放弃

叫这个题目也是因为这篇博客写了太久太久了!有段时间都觉得完全没有必要写下去的,索性终于完工了,也算是对这段时间的肯定吧!

RxJava基本概念

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。ObservableObserver 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer

接下来就围绕Observable创建、Observer创建、线程切换、事件类型转换、订阅和取消订阅展开。

Observerble的基本创建方式:

1、create()//最基本的

Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("xxixxii");
            subscriber.onCompleted();//这里必须调用该方法或者onError(),通知订阅者发送完毕,否者无法进行解除订阅。

        }
    });

2、form()//适配集合等

    ArrayList<Student> students = new ArrayList<>();
    students.add(s1);
    students.add(s2);
    students.add(s3);
    students.add(s1);
    students.add(s4);
    students.add(s5);
    students.add(s6);

    Observable.from(students)

3、just()//适配已经写好的方法

    Student s1 = new Student(19, "xiaoqiang");
    Student s2 = new Student(19, "xiaoqiang1");
    Student s3 = new Student(20, "xiaoqiang1");
    Student s4 = new Student(19, "xiaoqiang");
    Student s5 = new Student(21, "xiaoqiang");
    Student s6 = new Student(22, "xiaoqiang");

    Observable.just(s1, s2, s3, s4, s5, s6)

4、merge(o1,02)//将多个合并为一个
5、concat(o1,o2)//one by one emit!

Observer Subscriber的创建

     Subscriber<String> stringSubscriber = new Subscriber<String>() {
        @Override
        public void onStart() {
            Log.e(TAG, "onStart: ");
        }

        @Override
        public void onCompleted() {
            Log.e(TAG, "onCompleted: ");

        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: ");
        }

        @Override
        public void onNext(String s) {
            Log.e(TAG, "onNext: " + s);
        }
    };

    Observer<String> stringObserver = new Observer<String>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {

        }
    };

这里需要注意:ObserverSubscriber不仅基本使用方式一样,实质上,在 RxJavasubscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。所以如果你只想使用基本功能,选择 ObserverSubscriber 是完全一样的。它们的区别对于使用者来说主要有两点:

onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。

unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed()先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onStop()onDestory()等方法中)调用unsubscribe() 来解除引用关系,以避免内存泄露的发生。

强大的条件筛选

说了这么多没用的东西,肯定要来点儿实际的才能体会到RxJava的强大功能!

1、take( )只发送指定数量的事件。
2、filter( )过滤指定条件的事件。
3、first()只发送第一个事件。
4、distinct( )只发送不同的事件。(怎么定义为不同?!)
其实还有很多。。。

随时随地线程切换

说完创建过滤你可能觉得这也没撒嘛!那么接下来想想之前在Android开发里要切换线程需要怎么处理呢?view.post()或者使用handler.sendMessage()!而在RxJava中,线程切换不用这么搞了,Schedulers是RxJava中用来管理相关线程调度的,基于订阅和被订阅,这里有两个方法!
1、subscribeOn() 事件产生在哪个线程。

2、observeOn()事件消费在哪个线程。

3、Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
4、Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

5、Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。其行为模式和 newThread()是差不多滴,但是区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
6、Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation()中,否则 I/O 操作的等待时间会浪费 CPU。
7、AndroidSchedulers.mainThread():Android 特供,它指定的操作将在 Android 主线程运行。

transform 转换

强大的内部转换功能,让你可以做到要什么就是什么。

1、map():进行对象转换,不会创建新的Observable
2、flatMap():也是进行对象转换,会创建新的Observable
3、buffer()、:缓冲区,缓冲指定的Observable包装成新的
4、Observable发射。
5、toList():将单个的对象转换为集合。

取消订阅

爽了之后重视要记住一件事,那就是要释放相关资源!不然后果也是很严重的,尤其是在使用RxView相关的方法时会警告你需要调用Unsubscribe()来释放相关的引用。

warn.png

释放操作其实很简单。定义一个集合维护相关的Subscription,然后在ActivityonStop()或者onDestroy()方法中释放相关资源。

    Subscription clickSubscribe = RxView.clicks(findViewById(R.id.bt))
            .throttleFirst(1, TimeUnit.SECONDS)
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "clicks->doOnUnsubscribe");
                }
            })
            .subscribe(new Action1<Void>() {
                @Override
                public void call(Void aVoid) {
                    methd6();
                }
            });
    //维护相关的资源引用
    subscriptions.add(clickSubscribe);

@Override
protected void onDestroy() {
    for (Subscription s : subscriptions) {
        if (!s.isUnsubscribed()) {
            s.unsubscribe();
            Log.e(TAG, "onDestroy: 取消订阅!");
        }
    }
    super.onDestroy();
}

手动create()一个Observable的话,一定要调用 onComplete()或者onError()来结束这个事件,不然资源也不会被释放的。

动手时间

练习一

统计集合中年龄大于20的学生姓名(年龄姓名一致的视为同一个!)
首先当然是创建Observable

    //初始化数据
    Student s1 = new Student(19, "xiaoqiang0");
    Student s2 = new Student(20, "xiaoqiang1");
    Student s3 = new Student(21, "xiaoqiang2");
    Student s4 = new Student(22, "xiaoqiang3");
    Student s5 = new Student(23, "xiaoqiang4");
    Student s6 = new Student(24, "xiaoqiang5");
    Student s7 = new Student(25, "xiaoqiang6");
    Student s8 = new Student(25, "xiaoqiang5");
    students = new ArrayList<>();
    students.add(s1);
    students.add(s2);
    students.add(s3);
    students.add(s1);
    students.add(s4);
    students.add(s5);
    students.add(s6);
    students.add(s7);
    students.add(s8);

        Observable.just(students)//创建Observable
            .flatMap(new Func1<ArrayList<Student>, Observable<Student>>() {
                @Override
                public Observable<Student> call(ArrayList<Student> students) {
                    //变换为新的Observable
                    return Observable.from(students);
                }
            })
            //过滤掉年龄和姓名相同的对象
            .distinct()
            //过滤掉年龄小于20的对象
            .filter(new Func1<Student, Boolean>() {
                @Override
                public Boolean call(Student student) {
                    return student.getAge() >= 20;
                }

            })
            //将事件对象由Student 转换为 String
            .map(new Func1<Student, String>() {
                @Override
                public String call(Student student) {
                    return student.getName();
                }
            })
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "call: 取消订阅了!!");

                }
            })
            //订阅
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(String s) {
                    Log.e(TAG, "onNext: " + s);
                }
            });


E/MainActivity: onNext: xiaoqiang1
E/MainActivity: onNext: xiaoqiang2
E/MainActivity: onNext: xiaoqiang3
E/MainActivity: onNext: xiaoqiang4
E/MainActivity: onNext: xiaoqiang5
E/MainActivity: onNext: xiaoqiang6
E/MainActivity: onNext: xiaoqiang5
E/MainActivity: call: 取消订阅了!!

PS:至于这里对象的唯一性判断是通过复写equal()hashCode()来实现的!

@Override
public boolean equals(Object o) {
    return o instanceof Student && this.getAge() == ((Student) o).getAge() && this.getName().equals(((Student) o).getName());
}

@Override
public int hashCode() {
    return Arrays.hashCode(new Object[]{getAge(), getName()});
}

练习二

concat()使用:本地有缓存读取本地的数据,没有走网络请求并缓存到本地。

Observable<String> netObservable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            Log.e(TAG, "走网络了!!");
            subscriber.onNext("这是缓存数据!!");
            subscriber.onCompleted();
        }
    }).doOnNext(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.e(TAG, "call: 保存数据到本地");
            rxPreferences.getString("cash").asAction().call(s);

        }
    });
    Observable<String> nativeObservable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            if (TextUtils.isEmpty(rxPreferences.getString("cash").get())) {
                Log.e(TAG, "没有缓存,走网络!");
                subscriber.onCompleted();
            } else {
                Log.e(TAG, "有缓存!");
                subscriber.onNext(rxPreferences.getString("cash").get());
                subscriber.onCompleted();
            }
        }
    });

    Observable.concat(nativeObservable, netObservable)
            .first()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "完成了!");
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "错误了!");
                }

                @Override
                public void onNext(String s) {
                    Log.e(TAG, s);
                }
            });

   //第一次              
 E/MainActivity: 没有缓存,走网络!
 E/MainActivity: 走网络了!!
 E/MainActivity: call: 保存数据到本地
 E/MainActivity: 这是缓存数据!!
 E/MainActivity: 完成了!
 //第二次
 E/MainActivity: 有缓存!
 E/MainActivity: 这是缓存数据!!
 E/MainActivity: 完成了!

练习三

merge()使用:服务端和本地都有相关数据,汇总展示。使用merge()对应的事件顺序是无序的,谁先产生了谁就先发送!

    Observable<String> just = Observable.just("S", "O", "S")
            .subscribeOn(Schedulers.newThread())
            .doOnNext(new Action1<String>() {
                @Override
                public void call(String s) {
                    SystemClock.sleep(20);
                }
            });

    Observable<String> just1 = Observable.just("S","T","R").subscribeOn(Schedulers.newThread())
            .doOnNext(new Action1<String>() {
                @Override
                public void call(String s) {
                    SystemClock.sleep(20);
                }
            });

    Observable.merge(just1, just)
            .subscribeOn(Schedulers.newThread())
            .distinct()
            .subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    Log.e(TAG, "call: " + s);
                }
            });


 E/MainActivity: call: S
 E/MainActivity: call: T
 E/MainActivity: call: R
 E/MainActivity: call: O
//或者这样
 E/MainActivity: call: S
 E/MainActivity: call: T
 E/MainActivity: call: O
 E/MainActivity: call: R

练习四

RxView和RxCompoundButton的使用:

RxCompoundButton.checked(checkBox).call(rxPreferences.getBoolean("checked").get());
    //noinspection ConstantConditions
    Subscription checkedSubscription1 = RxCompoundButton.checkedChanges(checkBox)
            .subscribe(rxPreferences.getBoolean("checked").asAction());
    subscriptions.add(checkedSubscription1);

    Subscription checkedSubscription = rxPreferences.getBoolean("checked")
            .asObservable()
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "rxPreferences->doOnUnsubscribe");

                }
            })
            .subscribe(new Subscriber<Boolean>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "rxPreferences->onNext: +onCompleted");
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Boolean aBoolean) {
                    Log.e(TAG, "rxPreferences->onNext: " + aBoolean);
                }
            });
    subscriptions.add(checkedSubscription);

练习五

RxJava和Retrofit的搭配使用:

   Subscription beforeSubscribe = BaseDataManager.getDailyApiService()
        .getBeforeDailyStories(date)
                .subscribeOn(Schedulers.io())//事件产生在子线程
                .doOnSubscribe(new Action0() {//subscribe之后,事件发送前执行。
                    @Override
                    public void call() {
                        isLoadingMore = true;
                        Log.e("TAG", "call: true");
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<DailyStories>() {
                    @Override
                    public void call(DailyStories dailyStories) {
                        mView.onLoadMore(dailyStories);
                        mView.isLoadingMore(false);
                        Log.e("TAG", "call: false");
                        isLoadingMore = false;
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        mView.onLoadError(throwable.toString());
                        isLoadingMore = false;
                    }
                });
        subscribe(beforeSubscribe);

相关回调

对于Observable,这里有一系列的回调方法,作用在不同的时期,其中常用的是doOnSubscribe()doOnNext()
另外在Subscriber里,还有一个onStart()的方法!

callBack.png

Observable.just("L", "O", "V", "E")
            .doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "call: doOnSubscribe");

                }
            })
            .doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "call: doOnUnsubscribe");
                }
            })
            .doOnEach(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "doOnEach: onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "doOnEach: onError");
                }

                @Override
                public void onNext(String s) {
                    Log.e(TAG, "doOnEach: onNext:"+s);
                }
            })
            .doOnNext(new Action1<String>() {
                @Override
                public void call(String s) {
                    Log.e(TAG, "call: doOnNext");
                }
            })
            .doOnRequest(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    Log.e(TAG, "call: doOnRequest");
                }
            })
            .doOnTerminate(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "call: doOnTerminate");
                }
            })
            .doAfterTerminate(new Action0() {
                @Override
                public void call() {
                    Log.e(TAG, "call: doAfterTerminate");
                }
            })
            .subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    Log.e(TAG, "subscribe->call: onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "subscribe->call: onError");
                }

                @Override
                public void onNext(String s) {
                    Log.e(TAG, "subscribe->call: onNext:" + s);
                }
            });

E/MainActivity: subscribe->call: onStart:
E/MainActivity: call: doOnRequest
E/MainActivity: call: doOnSubscribe
E/MainActivity: doOnEach: onNext:L
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:L
E/MainActivity: doOnEach: onNext:O
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:O
E/MainActivity: doOnEach: onNext:V
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:V
E/MainActivity: doOnEach: onNext:E
E/MainActivity: call: doOnNext
E/MainActivity: subscribe->call: onNext:E
E/MainActivity: doOnEach: onCompleted
E/MainActivity: call: doOnTerminate
E/MainActivity: subscribe->call: onCompleted
E/MainActivity: call: doOnUnsubscribe
E/MainActivity: call: doAfterTerminate

通过Log可以看到一个事件的订阅过程。
首先回调Subscriber.onStart()表明ObservableSubscriber已经建立了连接,但是这个时候事件还没有开始发射。
然后是 doOnRequest()doOnSubscribe(),这个时候事件也没有开始发射。
接着是doOnEach()doOnNext()onNext(),这个时候事件正式发射了。
最后发射完了,就是onCompleteddoOnTerminate(),然后取消订阅释放资源doOnUnsubscribe()

Rx全家桶

欢迎加入Rx全家桶豪华套餐,只有你想不到的,没有它做不到的!

Rx全家桶

小结

总的来说,使用RxJava之后,可以让我们的代码更加清晰一目了然,而且线程的切换也非常的简单,不用自己维护相关线程和写那该死的Runnable,内部强大的事件转换和筛选过滤也是为我们开发省去了不少的工作。

参考文档

1、给 Android 开发者的 RxJava 详解
2、RxJava wiki
3、RxJava使用场景小结
4、Demo下载

---- Edit By Joe ----

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容

  • 作者寄语 很久之前就想写一个专题,专写Android开发框架,专题的名字叫 XXX 从入门到放弃 ,沉淀了这么久,...
    戴定康阅读 7,620评论 13 85
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,464评论 7 62
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,159评论 6 151
  • 最近项目里面有用到Rxjava框架,感觉很强大的巨作,所以在网上搜了很多相关文章,发现一片文章很不错,今天把这篇文...
    Scus阅读 6,869评论 2 50
  • 日复一日的重复工作很容易就消磨人的热情,在20几岁的年纪,正是迷茫为未来打拼的时候。我很怕就这样过日子下去,我最终...
    林家筱筱阅读 221评论 0 0