Rxjava2~Single~CompositeDisposable~PublishSubject(你应该懂的)~学渣带你扣rxjava2

有几种模式是你真的想要弄明白的。
Single~SingleObserver 单一发送
CompositeDisposable~DisposableObserver 防止内存泄露
Completable~CompletableObserver
PublishSubject~ConnectableObservable 热观察者
ReplaySubject !Observer重复发送
Flowable 背压
BehaviorSubject~Observer 在订阅的时候接收上一次发送的最后一个数据

~1 single
如果你发现需要用到一个 Observable 的操作符而 Single 并不支持,你可以用 toObservable 操作符把 Single<T> 转换为 Observable<T> 。
<pre>
Single.just("Amit")
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}

        @Override
        public void onSuccess(String value) {
            textView.append(" onNext : value : " + value);
            textView.append(AppConstant.LINE_SEPARATOR);
            Log.d(TAG, " onNext value : " + value);
        }

        @Override
        public void onError(Throwable e) {
            textView.append(" onError : " + e.getMessage());
            textView.append(AppConstant.LINE_SEPARATOR);
            Log.d(TAG, " onError : " + e.getMessage());
        }
    };;

</pre>

执行到
SingleJust的subscribeActual的方法
<pre>
@Override
protected void subscribeActual(SingleObserver<? super T> s) {
s.onSubscribe(Disposables.disposed());
s.onSuccess(value);
}

</pre>
我们可以看到这个只是调用一个,single单一的发送一个数据
single是一个observable ,但不是发射一系列的值,无论是从无处到无限数,它总是发出一个值或一个错误通知。

2~Completable
使用完备时我们忽略OnNext事件,只处理完全和OnError事件

Completable 本质上来说和 Observable 与 Single 不一样,因为它不发射数据。因此 Completable 的操作符也有所区别,最常用的是 andThen 。在这个操作符中你可以传任何 Observable 、 Single 、 Flowable 、 Maybe 或者其他 Completable ,它们会在原来的 Completable 结束后执行。例如。你想执行一些其他操作(Single):

<pre>
apiClient.updateMyData(myUpdatedData)
.andThen(performOtherOperation()) // a Single<OtherResult>
.subscribe(new Consumer<OtherResult>() {
@Override
public void accept(OtherResult result) throws Exception {
// handle otherResult
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception{
// handle error
}
});
</pre>
跟 Single 不同的是 RxJava 不允许直接把 Observable 转换为 Completable,因为没办法知道一个 Observable 什么时候 complete。但是你可以把 Single 转换为 Completable,因为 Single 保证 onComplete 会被调用,这个操作符是 toCompletable 。

3~Flowable 专门处理背压

<pre>
Flowable<Integer> observable = Flowable.just(1, 2, 3, 4);

    observable.reduce(50, new BiFunction<Integer, Integer, Integer>() {
        @Override
        public Integer apply(Integer t1, Integer t2) {
            return t1 + t2;
        }
    }).subscribe(getObserver());

</pre>

3~CompositeDisposable
这个作用其实主要是节省内存用
它提供了了add,clear等方法。


protected void onDestroy() {
super.onDestroy();
disposables.clear(); // do not send event after activity has been destroyed
}

防止内存溢出

4PublishSubject

Paste_Image.png

先说一点介绍
一旦订阅了,将所有随后观察到的项发送给订阅方。 如同字面解释一样。 下面是官方的解释
<pre>
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onComplete events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onComplete
subject.subscribe(observer2);
subject.onNext("three");
subject.onComplete();

这个PublishSubject就是我们说的热订阅者。他在订阅了之后就会一直订阅数据。第二次订阅了之后不会从头开始接受数据。而是从后面开始。
</pre>

然后说说这个ConnectableObservable
就是一种特殊的Observable对象,并不是Subscrib的时候就发射数据,而是只有对其应用connect操作符的时候才开始发射数据,所以可以用来更灵活的控制数据发射的时机
需要注意的是如果发射数据已经开始了再进行订阅只能接收以后发射的数据。

<pre>
PublishSubject<Integer> source = PublishSubject.create();
ConnectableObservable<Integer> connectableObservable = source.replay(2); // bufferSize = 3 to retain 3 values to replay
connectableObservable.connect(); // connecting the connectableObservable

    connectableObservable.subscribe(getFirstObserver());

    source.onNext(1);
    source.onNext(2);
    source.onNext(3);
    source.onNext(4);
    source.onComplete();

    /*
     * it will emit 2, 3, 4 as (count = 3), retains the 3 values for replay
     */
    connectableObservable.subscribe(getSecondObserver());

</pre>

Replay操作符返回一个Connectable Observable 对象并且可以缓存其发射过的数据,这样即使有订阅者在其发射数据之后进行订阅也能收到其之前发射过的数据。不过使用Replay操作符我们最好还是限定其缓存的大小,否则缓存的数据太多了可会占用很大的一块内存。对缓存的控制可以从空间和时间两个方面来实现。
这个就是还存了两个数据

5ReplaySubject

/* ReplaySubject emits to any observer all of the items that were emitted
* by the source Observable, regardless of when the observer subscribes.
*/

Paste_Image.png

从这个图中我们应该很好理解,当你订阅的时候就会发送数据

<pre>
ReplaySubject<Integer> source = ReplaySubject.create();

    source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4

    source.onNext(1);
    source.onNext(2);
    source.onNext(3);
    source.onNext(4);
    source.onComplete();

    /*
     * it will emit 1, 2, 3, 4 for second observer also as we have used replay
     */
    source.subscribe(getSecondObserver());

</pre>

因为这个是重复发送数据。当你订阅的时候。就会重复发送

7BehaviorSubject

Paste_Image.png

<pre>
BehaviorSubject<Integer> source = BehaviorSubject.create();

    source.subscribe(getFirstObserver()); // it will get 1, 2, 3, 4 and onComplete

    source.onNext(1);
    source.onNext(2);
    source.onNext(3);

    /*
     * it will emit 3(last emitted), 4 and onComplete for second observer also.
     */
    source.subscribe(getSecondObserver());

    source.onNext(4);
    source.onComplete()

</pre>

<pre>
// observer will receive all 4 events (including "default").
BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default");
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");

// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");

// observer will receive only onComplete
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onComplete();
subject.subscribe(observer);

// observer will receive only onError
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onError(new RuntimeException("error"));
subject.subscribe(observer);
上面试官网的一些例子。 从这里可以看到是BehaviorSubject的一些特性。他接受最后一个
</pre>

重点
completes方法之后,才开始接受最后一个发送的数据
<pre>
AsyncSubject<Integer> source = AsyncSubject.create();

    source.subscribe(getFirstObserver()); // it will emit only 4 and onComplete

    source.onNext(1);
    source.onNext(2);
    source.onNext(3);

    /*
     * it will emit 4 and onComplete for second observer also.
     */
    source.subscribe(getSecondObserver());

    source.onNext(4);
    source.onComplete();

</pre>

4~reduce可以相互转化

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 创建操作 用于创建Observable的操作符Create通过调用观察者的方法从头创建一个ObservableEm...
    rkua阅读 1,872评论 0 1
  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,898评论 0 10
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,993评论 19 139
  • 前言 如果你对RxJava1.x还不是了解,可以参考下面文章。 1. RxJava使用介绍 【视频教程】2. Rx...
    jdsjlzx阅读 21,245评论 3 78
  • 响应式编程简介 响应式编程是一种基于异步数据流概念的编程模式。数据流就像一条河:它可以被观测,被过滤,被操作,或者...
    说码解字阅读 3,126评论 0 5