RxJava 2.0 简单介绍

RxJava 2.0 简单介绍

一年一年有一年,RxJava也新增了2.0版本,那么为什么是新增版本而不说升级版本呢?

因为2.0版本和1.0版本两者并不兼容,2.0版本是基于Reactive-Streams规范重新设计而来;同时1.x版本和2.x版本两者会并行开发维护,但是1.x版本只维护到2018-03-31

下面我们简单介绍一下两者的不同。

0x00 依赖&包名不同

使用rxjava 1.x、2.x版本的依赖如下:

// rxjava 1.x
compile 'io.reactivex:rxjava:1.1.6'

// rxjava 2.x
compile "io.reactivex.rxjava2:rxjava:2.x.y"

包名修改如下:

// 1.x -> 2.x
rx.** -> io.reactivex.**

0x01 Observable与Flowable

Observable在2.0版本不支持backpressure,它会缓存全部的数据,一一发送给消费者,如果消费不及时,会产生OOM。于此对应,在2.x版本新增了Flowable,支持设置/自定义backpressure,同时在创建时必须制定backpressure。

 Flowable.create(new FlowableOnSubscribe<Object>() {
            @Override
            public void subscribe(FlowableEmitter<Object> e) throws Exception {
                for (int i = 0; i < 256; i++) {
                    e.onNext(i);
                }
                e.onComplete();
            }
        }, BackpressureStrategy.BUFFER).subscribe(System.out::println, Throwable::printStackTrace);


0x02 Single

当使用Single时,生产者调用onSuccess()通知订阅者,同时终止整个事件流,生产者只能发送一个success事件,订阅者也只能收到一个success事件,适用于网络请求等确定只有单个事件的事件流。对于1.x版本而言,则需要主动调用onComplete()来终止事件流。

注意: Single没有onComplete()方法;只能产生success、error两种事件。

  Single.create(s -> s.onSuccess("aaaa"))
                .subscribe(System.out::println, Throwable::printStackTrace);

0x03 Completable

当使用Completable时,生产者通过调用onComplete()终止事件流,订阅者会收到事件结束回调,适用于订阅者仅需要知道事件结束,而不需要执行结果的情形。

注意: Completable没有onSuccess()方法;只能产生complete、error两种事件。

Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(CompletableEmitter e) throws Exception {
                // do something;
                e.onComplete();
            }
        }).subscribe();

0x04 Maybe

MaybeSingleCompletable的组合体,相较于Single只能发送一次item,Completable只能通知事件结束,Maybe可以发送最多一个item,也就是可以发送一个item或者直接终止事件流。

Maybe调用onSuccess()结束事件流时,订阅者收到一次success事件;当Maybe调用onComplete()结束事件流时,订阅者只能收到事件结束事件。

  • onSuccess()收到一次事件:
 Maybe.create(new MaybeOnSubscribe<Object>() {
            @Override
            public void subscribe(MaybeEmitter<Object> e) throws Exception {
                e.onSuccess("aaa");
            }
        }).subscribe(System.out::println, Throwable::printStackTrace, () -> {
            System.out.println("onCompletable...");
        });
  • onComplete()收到结束事件:
 Maybe.create(new MaybeOnSubscribe<Object>() {
            @Override
            public void subscribe(MaybeEmitter<Object> e) throws Exception {
                e.onComplete();
            }
        }).subscribe(System.out::println, Throwable::printStackTrace, () -> {
            System.out.println("onCompletable...");
        });

注意: Maybe拥有onSuccess()和onComplete()方法;可以产生success、complete、error三种事件,其中success和complete是对立的。

0x05 Null

2.0x版本不支持传递null事件,会抛出NullPointerException终止整个事件流。

Single.create(new SingleOnSubscribe<Object>() {
            @Override
            public void subscribe(SingleEmitter<Object> e) throws Exception {
                e.onSuccess(null);
            }
        }).subscribe(System.out::println, Throwable::printStackTrace);

错误日志如下:

java.lang.NullPointerException: onSuccess called with null. Null values are generally not allowed in 2.x operators and sources.

0x06 取消订阅

1. 接口改变

2.x版本由于按照Reactive-Streams规范进行开发,而在Reactive-Streams中已经定义了org.reactivestreams.Subscription接口

package org.reactivestreams;

public interface Subscription {
    void request(long var1);

    void cancel();
}

,而1.x版本也定义了一个rx.Subscription接口

package rx;

public interface Subscription {
    void unsubscribe();

    boolean isUnsubscribed();
}

2. 简单取消订阅

可以看到两个类名一样,但是接口方法并不一样,含义也不相同,所以为了避免歧义,2.x版本中干掉了旧的Subscription,同时使用Disposable接口来替代旧的Subscription。具体代码如下:

// 1.x 调用unsubscribe()方法来取消订阅
final rx.Subscription subscription = rx.Observable.just(1, 2, 3).subscribe();
subscription.unsubscribe();

// 2.x 调用dispose()方法来取消订阅
final Disposable subscriber = Flowable.just(1, 2, 3).subscribe();
subscriber.dispose();

3. 使用Subscriber取消订阅

在1.x版本中,我们调用subscribe()后会返回一个rx.Subscription,我们可以使用它进行操作;在2.x版本中,我们调用subscribe()时,如果传入的是Subscriber,那就返回值是void,需要大家自己保存引用。


// 1.x 
rx.Subscription subscription = rx.Observable.just(1, 2, 3)
  .subscribe(new rx.Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {

            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });
subscription.unsubscribe();


// 2.x
ResourceSubscriber<Integer> resourceSubscriber = new ResourceSubscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {

            }

            @Override
            public void onError(Throwable t) {
              // must dispose;
                  dispose();
            }

            @Override
            public void onComplete() {
              // must dispose;
                  dispose();
            }
        };
// 注意当传入subscriber进行订阅时,返回值是void,所以需要自己保存;
Flowable.just(1, 2, 3).subscribe(resourceSubscriber);
resourceSubscriber.dispose();

4. 批量取消订阅

1.x版本使用rx.CompositeSubscription批量取消订阅;2.x版本使用io.reactivex.disposables.CompositeDisposable批量取消订阅。

0x07 Subject & Processor

按照Reactive-Streams规范,Subject是一种行为,既是消费者,同时也是生成者,最终被定义为org.reactivestreams.Processor接口,故而,在1.x版本中的subject,在2.x版本中就变成了processor,并且支持backpressure。同时2.x版本中保留了1.x版本的subject,配合Observable使用,不过也不支持backpressure。如:

// 1.x
Subject<Object, Object> subject= new SerializedSubject<>(PublishSubject.<Object>create());
subject.onNext("aaa");
subject.onError("aaa");
subject.onComplete();


// 2.x
final FlowableProcessor<Object> objectFlowableProcessor =
            PublishProcessor.create().toSerialized();
objectFlowableProcessor.onNext("aa");
objectFlowableProcessor.onError(new Throwable());
objectFlowableProcessor.onComplete();

参考

  1. RxJava 2.0
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,192评论 6 511
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,858评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,517评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,148评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,162评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,905评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,537评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,439评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,956评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,083评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,218评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,899评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,565评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,093评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,201评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,539评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,215评论 2 358

推荐阅读更多精彩内容