RxJava2.0学习笔记

序言

由于我是先学习了1.0版本接着继续学习2.0,所以本文可能不太适合没有接触过RxJava的同学,所以可以先看一下,1.0的学习笔记,传送门 http://www.jianshu.com/p/a8cedc061ab1

首先要使用RxJava2,先要添加依赖:

compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

类介绍

与RxJava1相比:

  • 相同点:
    都有Observable,Observer,subscribe()
  • 不同点:
  1. 在RxJava2的Observable中重写的方法变为subscribe(ObservableEmitter<String> emitter),其中的ObservableEmitter: Emitter是发射器的意思,就是用来发出事件的。

  2. 在RxJava2的observer重写方法中新添加了一个方法onSubscribe(Disposable d),其中Disposable是一次性用品,用完就丢弃,对与这个参数可以理解为一个拦截器,将所有发送过来的数据拦截下俩,让observer不会收到。

  3. 在RxJava2中subscribe()重载方法的参数变了。

        public final Disposable subscribe() {}
        public final Disposable subscribe(Consumer<? super T> onNext) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
        public final void subscribe(Observer<? super T> observer) {}
    

在实际项目中我们一般只关心onNext(),和onError(),所以我们一般只会重载两个参数的。

然后我们用代码来理解一下上面的相同点和不同点

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            Log.e(TAG, "subscribe: 1" );
            e.onNext(1);
            Log.e(TAG, "subscribe: 2" );
            e.onNext(2);
            Log.e(TAG, "subscribe: 3" );
            e.onNext(3);
            e.onComplete();
        }
    });

    Observer<Integer> observer = new Observer<Integer>() {
        private Disposable disposable;

        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "onSubscribe: " + d.isDisposed());
            disposable = d;
        }

        @Override
        public void onNext(Integer value) {
            Log.e(TAG, "onNext: " + value);
            if (value == 2) {
                disposable.dispose();
                Log.e(TAG, "onNext: " + disposable.isDisposed());
            }
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "onComplete: ");
        }
    };

    observable.subscribe(observer);

运行结果如下所示:

com.example.cosima.rxjava2learn E/MainActivity: onSubscribe: false
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 1
com.example.cosima.rxjava2learn E/MainActivity: onNext: 1
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 2
com.example.cosima.rxjava2learn E/MainActivity: onNext: 2
com.example.cosima.rxjava2learn E/MainActivity: onNext: true
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 3

可以看到在disposable为true的时候,Observable可以发送数据,但是在Observer没有接收到数据。

注:与RxJava1相同,当Observable发送onComplete之后,Observable在onComplete之后的数据可以发送,Observer在接收到onComplete之后不再继续接收事件。onError与onComplete的原理一样,但是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。

创建方式

想对于RxJava1来说,Observable的创建方式心添加了几种方式:

  • fromIterable((Iterable<? extends T> list)方式
    遍历集合,发送每个item相当于多次调用onNext().

注Collection接口是Iterable接口的子接口,所以所有Collection接口的实现类都可以作为Iterable对象直接传入fromIterable()方法。

  • fromArray(T... items)方式
    遍历集合,发送每个item相当于多次调用onNext().

  • interval(long period, TimeUnit unit)方式
    创建一个按固定时间间隔发射整数序列的Observable,可用作定时器 period:时间间隔

  • interval(long initialDelay, long period, TimeUnit unit)方式
    initialDelay:开始值,默认为0。

  • range(final int start, final int count)方式
    创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常。

  • timer(long delay, TimeUnit unit)方式
    一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。

用于实现观察者模式方式有很多种:
<center>

</center>

现在我们来实现一个简单的倒计时功能:

封装Observable

private Observable<Integer> initEvent2(final int time) {
    return Observable.interval(0, 1, TimeUnit.SECONDS)
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread())
            .map(new Function<Long, Integer>() {
                @Override
                public Integer apply(Long aLong) throws Exception {
                    return time - aLong.intValue();
                }
            })
            .take(time + 1);//限制循环次数
}

使用如下:

initEvent2(5).doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            Log.e(TAG, "accept: 开始计时");
        }
    })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept: 当前计时" + integer);
                }
            });

打印结果为

在上述代码中我们使用到了map()操作符,接下来我们就一起来看下RxJava中的操作符

RxJava操作符

1.map()操作符:

把原来的Observable对象转换成另一个Observable对象
2.flatMap()操作符:


flatMap和Map的相同点就是把一个对象转化为另一个对象返回,但是不同的是flatMap()返回的是个Observable对象,并且这个对象并不是直接发送到了回调方法中,而是把这个对象激活,之后将他发送到回调方法中。
3.filter()操作符:


根据自己的需求加入判断逻辑,他的返回值是true或者是false,用于表示是否需要被过滤。
4.take()操作符:


再上面的代码中已经用到过,具体含义就是限制输出次数
5.doOnNext()操作符:


在输出前可以做一个额外的操作
6.delay()操作符:


延迟Observer的输出

线程控制Scheduler

该部分与RxJava1相比没有变换,在平常使用中较多的都是以下几个:

  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

都是通过subscribeOn()observeOn() 两个方法来对线程进行控制。
具体可见RxJava1.0学习笔记

与Retrofit结合使用

与1.0版本总体相差不多,只需要修改相应的参数就可以了,比如subcribe()方法中的参数,请求如下所示:
主要内容为红框所示

接下来讲一下RxJava2特有的了。

Flowable及Backpressure

来由

Rxjava2中,还有一个很大的变化就是Backpressure(背压),何为背压,就是观察者来不及处理被观察者发出的事件(产生事件的速度大于处理事件的速度),导致事件被无限堆积,最后产生异常。Flowable就是由此产生,专门用来处理这类问题。将原来的Observable拆分成了新的Observable和Flowable,同时其他相关部分也同时进行了拆分。

注意:处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方法。即使采用了处理Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是Subscriber接收事件的方式。

处理Backpressure的策略

  • 产生原因:
    生产者和消费者不在同一线程下,生产者的速度大于消费者的速度,就会产生Backpressure问题。如果生产者和消费者在同一线程下,不会产生Backpressure问题,所以可以说成同步不会产生问题,异步可能产生问题。

  • 处理策略:
    1. ERROR策略
    产生Backpressure问题的时候直接抛出异常(MissingBackpressureException)

          Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
          @Override
          public void subscribe(FlowableEmitter<String> e) throws Exception {
              e.onNext("我");
              e.onNext("爱");
              e.onNext("你");
              e.onComplete();
          }
          }, BackpressureStrategy.ERROR);
    
          Subscriber<String> subscriber = new Subscriber<String>() {
              @Override
              public void onSubscribe(Subscription s) {
                  Log.e(TAG, "onSubscribe: ");
                  s.request(Long.MAX_VALUE);
              }
    
              @Override
              public void onNext(String s) {
                  Log.e(TAG, "onNext: " + s);
              }
    
              @Override
              public void onError(Throwable t) {
                  Log.e(TAG, "onError: " + t.getMessage());
              }
    
              @Override
              public void onComplete() {
                  Log.e(TAG, "onComplete: " );
              }
          };
          flowable.subscribeOn(Schedulers.io())
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(subscriber);
    

上述代码中在Flowable(被观察者)添加了一个参数,并且onSubscribe(Subscription s)中传给我们的不再是Disposable了, 而是Subscription。然而Subscription也可以用于切断观察者与被观察者之间的联系,调用Subscription.cancel()方法,并在此方法中添加了s.request(long count);这个方法就是用来向生产者申请可以消费的事件数量。这样就可以根据自身的消费能力进行消费。在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。当然如果本身并没有这么多事件需要发送,则不会存128个事件。
在ERROR策略下,如果缓存池溢出,就会立刻抛出MissingBackpressureException异常。

注:如果不调用request表示消费能力为0。如果不限制想request()中传入任意参数,超过消费能力,也会造成资源浪费,产生OOM。

2. BUFFER策略
BUFFER就是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。
这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。
但是这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。BUFFER要慎用
3. DROP策略
当消费者处理不了事件,就丢弃。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
4. LATEST策略
LATEST与DROP功能基本一致。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。
还是以上述例子展示,唯一的区别就是Flowable不再无限发事件,只发送1000000个。

参考:http://www.jianshu.com/p/d149043d103a

源码地址:https://github.com/MrMJL/RxJava2Demo
由于只是学习笔记,源码可能会有点乱,又不对或者不明白欢迎评论多多交流。推荐一个Android实习&&经验交流群:541144061

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354

推荐阅读更多精彩内容