RxAndroid 2.0 学习笔记

Rxjava 2.x正式版出来已经快两个月了。在之前的项目中也在使用Rx。但却一直没有时间对整个的知识进行梳理,恰好今天抽出时间,也系统的再学习一遍RxJava/RxAndroid


RxJava的使用

一、观察者/被观察者

1、前奏:
在观察者之前就要先提下backpressure这个概念。简单来说,backpressure是在异步场景中,被观察者发送事件速度远快于观察者的处理速度时,告诉被观察者降低发送速度的策略。

2、在2.0中有以下几种观察者

  • Observable/Observer
  • Flowable/Subscriber
  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver

依次的来看一下:

Observable

Observable
.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
@Override public void onSubscribe(Disposable d) {}
@Override public void onNext(Integer value) {}
@Override public void onError(Throwable e) {}
@Override public void onComplete() {}
});

这里要提的就是onSubscribe(Disposable d),disposable用于取消订阅。
就用简单的just这个操作符来分析一下。

@SuppressWarnings("unchecked")
@SchedulerSupport(SchedulerSupport.NONE) 
public static < T > Observable < T > just(T item1, T item2, T item3, T item4) {
    ObjectHelper.requireNonNull(item1, "The first item is null");
    ObjectHelper.requireNonNull(item2, "The second item is null");
    ObjectHelper.requireNonNull(item3, "The third item is null");
    ObjectHelper.requireNonNull(item4, "The fourth item is null");

    return fromArray(item1, item2, item3, item4);
}
@SchedulerSupport(SchedulerSupport.NONE) 
public static < T > Observable < T > fromArray(T...items) {
    ObjectHelper.requireNonNull(items, "items is null");
    if (items.length == 0) {
        return empty();
    } else if (items.length == 1) {
        return just(items[0]);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromArray < T > (items));
}
@Override 
public void subscribeActual(Observer < ?super T > s) {
    FromArrayDisposable < T > d = new FromArrayDisposable < T > (s, array);
    s.onSubscribe(d);
    if (d.fusionMode) {
        return;
    }
    d.run();
}

@Override 
public void dispose() {
    disposed = true;
}

@Override 
public boolean isDisposed() {
    return disposed;
}

void run() {
    T[] a = array;
    int n = a.length;
    for (int i = 0; i < n && !isDisposed(); i++) {
        T value = a[i];
        if (value == null) {
            actual.onError(new NullPointerException("The " + i + "th element is null"));
            return;
        }
        actual.onNext(value);
    }
    if (!isDisposed()) {
        actual.onComplete();
    }
}

just实际调用了fromArray方法,中创建了ObservableFromArray的实例,在这个实例中实现了Observable这个接口,在调用subscribe方法进行绑定之后,首先调用了subscribeActual方法,onSubscribe就会回调。

在取消绑定是我们可以将Disposable添加到CompositeDisposable中或者直接调用Disposable的dispose() 方法在流的任意位置取消。

此外, 为了简化代码,我使用了Consumer作为观察者(可以当成1.0时候的Action1 、ActionX)subscribe的返回值就是一个Disposable (subscribe 的返回值根据传入的参数不同,也有不同)我把这个对象添加到CompositeDisposable,并在中途取消,但发射器仍然会把所有的数据全都发射完。因为LambdaSubscriber(也就是传入Consumer 所构造的观察者)的disposeisDispose 略有不同,并不是简简单单的true/false, 说实话,我没看懂Consumer的这两个方法干了什么...........尴尬

LambdaSubscriber 瞅瞅

@Override
public void dispose() { 
cancel();
}

@Override
public boolean isDisposed() {  
  return get() == SubscriptionHelper.CANCELLED;
}

Flowable

是2.0之后用的最多的观察者了,他与上一个的区别在于支持背压,也就是说,下游会知道上游有多少数据,所以他Subscriber会是这样

Flowable
.just(1, 2, 3, 4)
.subscribe(new Subscriber < Integer > () {
@Override public void onSubscribe(Subscription s) {
  s.request(Long.MAX_VALUE);
}
@Override public void onNext(Integer integer) {}
@Override public void onError(Throwable t) {}
@Override public void onComplete() {}
});

onSubscribe 这个回调传出了一个Subscription, 我们要指定他传出数据的大小, 调用他的request() 方法。如没有要求可以传入一个Long的最大数值Long.MAX_VALUE
要说明一下,request这个方法若不调用,下游的onNext与OnComplete都不会调用;若你写的数量小于,只会传你的个数,但是不会调用onComplete方法,可以看下FlowableFromArrayslowPath方法

@Override void slowPath(long r) {
    long e = 0;
    T[] arr = array;
    int f = arr.length;
    int i = index;
    Subscriber < ?super T > a = actual;
    for (;;) {
        while (e != r && i != f) {
            if (cancelled) {
                return;
            }
            T t = arr[i];
            if (t == null) {
                a.onError(new NullPointerException("array element is null"));
                return;
            } else {
                a.onNext(t);
            }
            e++;
            i++;
        }
        if (i == f) {
            if (!cancelled) {
                a.onComplete();
            }
            return;
        }
        r = get();
        if (e == r) {
            index = i;
            r = addAndGet( - e);
            if (r == 0L) {
                return;
            }
            e = 0L;
        }
    }
}
}

需要if (i == f) f 是这个数据的大小,i是当前发送数据的个数,所以不会调用onComplete

休息一下

这是几种被观察者实现的接口

  • Observable 接口 ObservableSource
  • Flowable 接口 Publisher
  • Single 接口 SingleSource
  • Completable 接口 CompletableSource
  • Maybe 接口 MaybeSource

梳理完了两个被观察者,发现Flowable支持背压,父类是Publisher;Observable不支持背压,父类是ObservableSource,他们的实现方式,与其的操作符,到最后的观察者,都有些不同,他们是完全独立开的。各自之间也互不影响。

Single

单值相应的模式: 就是只有一个值呗?

Completable

表示没有任何值但仅指示完成或异常的延迟计算。

Maybe

maybe 可以当成上面两个的合体吧!

后面的三种,就不细掰了,我就是这么不求甚解。

二、操作符

操作符基本就没有改变,但还是会发现,我擦,from没了,可以使用fromIterable
之前的actionx 也替换了Action \ Consumer \ BiConsumer
Func也跟action一样, 名字改变了Function

感觉这样是正经Rx了。

三、线程切换

当然现场切换没有发生改变,用法还是一样,但是之前没有看过源码,不知道怎样神奇的把线程切换了,难道是来自东方的神秘力量。趁着还有时间,撸一下代码。
在调用subscribeOn(Schedulers.io())之后,会创建ObservableSubscribeOn

parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
        source.subscribe(parent);
    }
}
));

在这个过程中,会把source也就是ObservableSource在线程中订阅,同时也把把传入的Observer变成SubscribeOnObserver。若指定的是io线程,可以在IoScheduler中看见对线程的管理
在调用observeOn(AndroidSchedulers.mainThread())时,会产生一个ObservableObserveOn,同时还会把Observer变成ObserveOnObserver,可以发现在HandlerScheduler,在ui线程调用了ObserveOnObserver的run方法

四、Rxjava的数据传递

Rxjava是我在工作这几个月最喜欢的框架,没有之一。完全解决了我这个有洁癖的人在打代码时的玻璃心。
虽然重复造轮轮子是没有必要的(我也造不出来),但是为了全面的了解Rxjava,我也准备简单的实现一下,数据在每个操作符之中传输的整个过程。

在实现之前先猜想一下大概的过程吧:
我的需求是在一个static方法中产生一个数值,并且通过一层层的接口传递下去,下面的操作符的人参是上一个的返回值,最后输出,我就模仿着Rx的 Maybe 的名字实现吧。

  • 首先我要一直‘点’下去的话Maybe 一定要返回自己
  • 接口要一层层的传进去,这样的话就可以在发射数据时,发原始数据传入这个一堆的接口,然后每个接口计算自己的实现。
  • 最后返回结果

之后就是仿造源码完成这段需求了,当然这些方法也都简单写了,就是为了弄清楚思路:

1、创建一个MaybeSource,我们的Maybe 和 各个操作符都会实现它。

public interface MaybeSource {
     void subscribe(MaybeObserver observer);
}

2、创建一个MaybeObserver, 这就是最后绑定的时候的接口

public interface MaybeObserver {
    void onSuccess(int value);
}

3、创建Function, 这个在操作符中用于实现

public interface Function {
    int apply(int t);
}

4、当然少不了Maybe, 这里就实现just和map两个方法吧

public abstract class Maybe implements MaybeSource {
    public static Maybe just(int item) {
        return new MaybeJust(item);
    }

    public final Maybe map(Function mapper) {
        return new MaybeMap(this, mapper);
    }
}

5、just实际返回的对象是MaybeJust,他的父类是Maybe

public class MaybeJust extends Maybe {
    final int value;

    public MaybeJust(int value) {
        this.value = value;
    }

    @Override
    public void subscribe(MaybeObserver observer) {
        observer.onSuccess(value);
    }
}

6、map实际返回的对象是MaybeMap,他的父类是Maybe

public class MaybeMap extends Maybe {
    final Function mapper;
    final MaybeSource source;

    public MaybeMap(MaybeSource source, Function mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    public void subscribe(MaybeObserver observer) {
        source.subscribe(new MapMaybeObserver(observer, mapper));
    }

    static final class MapMaybeObserver implements MaybeObserver {
        final MaybeObserver actual;

        final Function mapper;

        MapMaybeObserver(MaybeObserver actual, Function mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onSuccess(int value) {
            this.actual.onSuccess(this.mapper.apply(value));
        }
    }
}

7、在main中可以这么运行

Maybe
.just(1)
.map(new Function() {

    @Override
    public int apply(int t) {
        return t + 1;
    }
}).map(new Function() {

    @Override
    public int apply(int t) {
        return t * 4;
    }
}).subscribe(new MaybeObserver() {

    @Override
    public void onSuccess(int value) {
        System.out.println(value);
    }
});

8、运行结果,传入1,先+1, 在 * 4,最后结果应该是8

Paste_Image.png

得到了期望的结果


RxJava 2.0 + Retrofit 2 .0

之前做过一个项目,没用什么架构,也没什么封装。但对我帮助最大的是,之前是不能接受这样的代码的,感觉看上去脑袋都大了。但看习惯了, 也就习惯了。
但平时自己弄个小项目还是使用mvp,自己的洁癖可能更加强烈一点

在Retrofit 中选择了Flowable作为返回值,支持背压,在2.0之后应该最为常用

@GET("/")  
Flowable<ResponseBody> getText();

在RxJava 2.0 中使用CompositeDisposable做解除绑定的操作, Consumer 回调中使用了三个Consumer,作为成功、失败、完成的回调

    public <T> void addSubscription(Flowable flowable,
        final RxSubscriber<T> subscriber) {
        if (mCompositeDisposable == null) {
            mCompositeDisposable = new CompositeDisposable();
        }

        if (subscriber == null) {
            Log.e(TAG, "rx callback is null");

            return;
        }

        Disposable disposable = flowable.subscribeOn(Schedulers.io())
                                        .observeOn(AndroidSchedulers.mainThread())
                                        .subscribe(new Consumer<T>() {
                    @Override
                    public void accept(T o) throws Exception {
                        subscriber.onNext(o);
                    }
                },
                new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable)
                        throws Exception {
                        subscriber.onError(throwable);
                    }
                },
                new Action() {
                    @Override
                    public void run() throws Exception {
                        subscriber.onComplete();
                    }
                });

此外,之前的项目后台接口也是奇葩,同一个人写的接口,接口的返回格式更是多种多样,还不改,没办法,客户端只能将就着服务端,谁叫我们是新来的呢。遇到这种问题,就不直接转成对象格式了,先转成ResponseBody得到Body,再拿出string来。
okhttp中response的body对象就是这个ResponseBody,他的string() 方法就可以获得整个body,然后再做json解析吧

Paste_Image.png
Paste_Image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容