RxJava->doOnNext()

example:

Observable
    .create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            LogUtils.log(Test01.class, "onNext()->1");
            emitter.onNext(1);
            LogUtils.log(Test01.class, "subscribe()->2");
            emitter.onNext(2);
            LogUtils.log(Test01.class, "subscribe()->3");
            emitter.onNext(3);
            LogUtils.log(Test01.class, "subscribe()->onComplete()");
                 emitter.onComplete();
            }
        })
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.log(Test01.class, "accept()->integer:" + integer);
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable disposable) {
                sDisposable = disposable;
                LogUtils.log(Test01.class, "onSubscribe()");
            }

            @Override
            public void onNext(Integer value) {
                LogUtils.log(Test01.class, "onNext()->value:" + value);
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.log(Test01.class, "onError()");
            }

           @Override
           public void onComplete() {
               LogUtils.log(Test01.class, "onComplete()");
           }
      });

doOnNext():

.doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        LogUtils.log(Test01.class, "accept()->integer:" + integer);
    }
})

public interface Consumer<T> {
    void accept(T t) throws Exception;
}

public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Observable<T> doOnNext(Consumer<? super T> onNext) {
        return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
    }

    @SchedulerSupport(SchedulerSupport.NONE)
    private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
        return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
    }
}

public final class RxJavaPlugins {
    return source;
}

class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T>;
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U>;
  • 1、doOnNext()返回了ObservableDoOnNext对象, 后边subcribe应当切换到ObservableDoOnNext中去.
new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate)

public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {

    public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                              Consumer<? super Throwable> onError,
                              Action onComplete,
                              Action onAfterTerminate) {
        super(source);
    }
}

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    protected final ObservableSource<T> source;
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
}

什么时候能模仿着写出这种结构的代码, 什么时候就牛逼了

  • 1、subscribe被ObservableDoOnEach调用, 但是ObservableDoOnEach内部又持有ObserverCreater的引用;

subscribe():

public abstract class Observable<T> implements ObservableSource<T> {
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        subscribeActual(observer);
    }
    protected abstract void subscribeActual(Observer<? super T> observer);
}
  • subscribeActual实际被子类ObservableDoOnEach调用;
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    public void subscribeActual(Observer<? super T> t) {
        source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
    }
}
  • 最终还是走到了ObservableCreater里面的subscribeActual(), 而Observer是被DoOnEachObserver所实现;
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
    Disposable s;
    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            actual.onSubscribe(this);
        }
    }
}

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        source.subscribe(parent);
    }
}

这段代码应该用到的是适配器模式

  • 1、ObservableCreate持有的Observer实际为DoOnEachObserver引用;
  • 2、又通过observer.onSubscribe()将CreateEmitter传给了DoOnEachObserver中的Disposable s, 即s实际上指向的是CreateEmitter;
  • 3、source指向的是ObservableCreate, 所以source.subscribe()将内部Observer指向了DoOnEachObserver;
  • 4、actual.onSubscribe(this)将Disposable指向了DoOnEachObserver;

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        CreateEmitter(Observer<? super T> observer) {...}

        @Override
        public void onNext(T t) {...}

        @Override
        public void onError(Throwable t) {...}

        @Override
        public void onComplete() {...}

        @Override
        public void setDisposable(Disposable d) {...}

        @Override
        public void setCancellable(Cancellable c) {...}

        @Override
        public ObservableEmitter<T> serialize() {...}

        @Override
        public void dispose() {...}

        @Override
        public boolean isDisposed() {...}
}
  • 1、所以每次被观察者通过发射器emitter调用onError(), onNext(), onComplete()实际上最终都会先调用CreateEmitter的对应的方法, 然后再去调用DoOnEachObserver对应的方法;
  • 2、Disposable实际上指向的是DoOnEachObserver, 所以调用dispose(), isDisposed()时实际走的是DoOnEachObserver内部的方法, 而DoOnEachObserver内部的Disposable s又指向了CreateEmitter, 所以最终决定观察者能否收到消息的决定权还是在CreateEmitter手中;

总结:

  • 1、通过源码可以看到, onOnNext()的accept()方法仅仅只是在Observer的onXXX()方法被调用之前调用, 方且没有与Observer的调用之间没有任何关系;
  • 2、所以doOnNext()这个方法可以用来在观察者Observer:onXXX()方法被调用之前进行一些初始化操作;

试试连续调用多个doOnNext()方法:

Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                LogUtils.log(Test01.class, "subscribe->onNext()->1");
                emitter.onNext(1);
                LogUtils.log(Test01.class, "subscribe()->onNext()->2");
                emitter.onNext(2);
                LogUtils.log(Test01.class, "subscribe()->onNext()->3");
                emitter.onNext(3);
                LogUtils.log(Test01.class, "subscribe()->onComplete()");
                emitter.onComplete();
            }
        })
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.log(Test01.class, "accept()_1->integer:" + integer);
            }
        })
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                LogUtils.log(Test01.class, "accept()_2->integer:" + integer);
            }
        })
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable disposable) {
                sDisposable = disposable;
                LogUtils.log(Test01.class, "onSubscribe()");
            }

            @Override
            public void onNext(Integer value) {
                LogUtils.log(Test01.class, "onNext()->value:" + value);
            }

            @Override
            public void onError(Throwable e) {
                LogUtils.log(Test01.class, "onError()");
            }

            @Override
            public void onComplete() {
                LogUtils.log(Test01.class, "onComplete()");
            }
        });

打印结果如下所示:

09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onSubscribe()
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe->onNext()->1
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_1->integer:1
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_2->integer:1
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onNext()->value:1
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe()->onNext()->2
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_1->integer:2
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_2->integer:2
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onNext()->value:2
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe()->onNext()->3
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_1->integer:3
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->accept()_2->integer:3
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onNext()->value:3
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->subscribe()->onComplete()
09-14 16:50:52.980 21493-21493/hb.com V/AndroidTest: Test01->onComplete()

结合源码看看为何打印会是这种打印结果;
下面的分析可能会很绕, 也可能会让人感觉废话连篇; 这也体现了RxJava架构的复杂性;

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
}
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
}
  • 1、Observable指向ObservableCreate, ObservableCreate内部持有ObservableOnSubscribe的引用;
public abstract class Observable<T> implements ObservableSource<T> {
    public final Observable<T> doOnNext(Consumer<? super T> onNext) {
        return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
    }
    private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
        return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
    }
}
public final class RxJavaPlugins {
    public static <T> Observable<T> onAssembly(Observable<T> source) {
        source;
    }
}
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    final Consumer<? super T> onNext;
    public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                              Consumer<? super Throwable> onError,
                              Action onComplete,
                              Action onAfterTerminate) {
        super(source);
        this.onNext = onNext;
    }
}
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    protected final ObservableSource<T> source;
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
}
  • 1、第一次调用doOnNext()以后, Observable指向了ObservableDoOnEach_1, 并将当前Observable的引用传给ObservableDoOnEach_1, 即ObservableDoOnEach_1持有ObservableCreate的引用, ObservableDoOnEach_1持有Consumer_1;
  • 2、同理, 第二次调用doOnNext()以后, Observable指向了ObservableDoOnEach_2, 并将当前的Observable的引用传给了ObservableDoOnEach_2, 即 ObservableDoOnEach_2持有ObservableDoOnEach_1的引用. ObservableDoOnEach_2持有Consumer_2;

接下来看subscirbe(...)何如实现doOnNext()的连续调用:

public abstract class Observable<T> implements ObservableSource<T> {
    public final void subscribe(Observer<? super T> observer) {
        subscribeActual(observer);
    }
    protected abstract void subscribeActual(Observer<? super T> observer);
}
  • 重点就是在subscribeActual()这个方法, 后边单线程操作符的话, 就只看这个方法了;
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    public void subscribeActual(Observer<? super T> t) {
        source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
    }
}
  • 1、subscribeActual实际被ObservableDoOnEach_2调用, 而此时的source为ObservableDoOnEach_1, onNext为Consumer_2;

  • 2、通过subscribe(...)将DoOnEachObserver_02的引用付给ObservableDoOnEach_1, 然后递推, 将DoOnEachObserver_1的引用付给ObservableCreate;

  • 3、DoOnEachObserver_2持有的Observer actual实际为我们外部通过new Observer创建的引用;

  • 4、ObservableDoOnEach_1调用subscribeActual(...)时传的参数Observer实际就是ObservableDoOnEach__2调用subscribeActual(...)时所创建的DoOnEachObserver_2, 所以DoOnEachObserver_1内部Observer actual实际指向的是DoOnEachObserver_2;

  • 然后切到ObservableCreate中去:

public final class ObservableCreate<T> extends Observable<T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        source.subscribe(parent);
    }
}
  • 1、CreateEmitter持有的Observer为DoOnEachObserver_1, DoOnEachObserver_1持有的Disposable为CreateEmitter, ObservableOnSubscribe持有CreateEmitter的引用;
static final class CreateEmitter<T> {
    @Override
    public void onNext(T t) {
        observer.onNext(t);
    }
}
static final class DoOnEachObserver<T> {
    @Override
    public void onNext(T t) {
        onNext.accept(t);
        actual.onNext(t);
    }
}
  • 当CreateEmitter调用一次onNext()时, DoOnEachObserver_1调用了自己的onNext()方法;
  • 而此时onNext.accept()实际为Consumer_1.accept(t);
  • actual.onNext()因为此时的acutal实际持有的是DoOnEachObserver_2的引用, 所以继续调用DoOnEachObserver_2.onNext(), DoOnEachObserver_2中的onNext()实际指向Consumer_2, 而actual实际指向我们通过new Observer创建的Observer对象;

通过几张图来对文字进行归纳总结:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容