map()原理分析_RxJava2.0操作符理解篇

我们知道map操作符是Rxjava中很常见的一个操作符,它可以实现单类型转换,那么它的这个转换内部原理是怎么实现呢,我们通过源代码一步步看一下:
先写个简单的例子,例如我想实现简单的整形转字符串:

Observer endObserver = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.print("onSubscribe");
            }

            @Override
            public void onNext(String s) {
                System.out.print("onNext:"+s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
Observable.just(200)
                .map(new Function<Integer, String>() {

                    @Override
                    public String apply(Integer integer) throws Exception {
                        return integer+"";
                    }
                }).subscribe(endObserver);

为了便于理解,我们上面的写法可以拆分成这样写:

Observable observable_just = Observable.just(200);
Observable observable_map = observable_just.map(new Function<Integer,String>(){

            @Override
            public String apply(Integer integer) throws Exception {
                return integer+"";
            }
        });
observable_map.subscribe(endObserver);
    }

这样我们看到有两个observable,但这两个observable是不是同一个还不知道,我们通过查看map方法的源代码看一看:

   public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

这里看到map方法返回的是RxJavaPlugins类的onAssembly方法的返回值,而onAssembly返回的其实就是方法里传入的参数,即这里的new ObservableMap<T, R>(this, mapper)。
重点:这里看到它new了一个ObservableMap,那ObservableMap又是什么呢? 其实看它继承关系就知道它继承自Observable。那么这里就能回答我们之前例子里的疑问:observable_just 和 observable_map其实不是同一个Observable,因为observable_just具体是ObservableJust,而observable_map刚才看到它是一个ObservableMap。

ObservableMap

我们看到ObservableMap的构造函数接收两个参数(this,mapper),this就是ObservableJust对象,mapper就是map方法里我们写的Function;
到这里我们知道map返回的是新创建的一个Observable,那么执行subscribe订阅操作的就是map返回的这个被观察者。那么就有问题了,这个新的被观察者去执行订阅了,我的原始被观察者observable_just怎么把数据发出去呢,什么时候发呢?下面我们通过看subscribe这个方法的代码看能不能找到里面隐藏的逻辑:

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer); //实际执行订阅的是这个方法
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } 
    }

subscribe方法里面实际执行的是subscribeActual方法,而subscribeActual又是一个抽象方法,具体逻辑实在子类实现的,那么我们就得看ObservableMap的subscribeActual方法实现了:

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

这里看到subscribeActual方法内部是调用了source的subscribe,并且这里又new了一个MapObserver,这里我们先不管这个MapObserver是什么,后面再来分析。我们先看这个source是什么:

还记得之前map方法里new 的ObservableMap吗:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

我们再看一下ObservableMap的代码:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

看到这里清楚了,原来source就是我们new ObservableMap 时传的第一个参数this,即就是ObservableJust对象;

ObservableJust

那么我们是不是就应该看ObservableJust类里的subscrib方法了

public interface ObservableSource<T> {

    /**
     * Subscribes the given Observer to this ObservableSource instance.
     * @param observer the Observer, not null
     * @throws NullPointerException if {@code observer} is null
     */
    void subscribe(@NonNull Observer<? super T> observer);
}

发现是一个接口的方法,那我们通过看ObservableJust的类结构发现,ObservableJust时继承自Ovservable,实现了ObservableSource接口, 而Ovservable的subscribe方法实际上最终调用的是抽象方法subscribeActual,那我们就要看子类ObservableJust里的subscribeActual是怎么实现的:

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }

这里终于看到s.onSubscribe(sd)调用了,说明从调用subscribe到订阅成功回调这套闭环走通了。那么

ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); 
sd.run();

这两段代码又是做什么呢

ScalarDisposable
public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable {

看类结构发现ScalarDisposable实现了Runnable,是一个线程类,那么它的run方法里做了什么:

        @Override
        public void run() {
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    observer.onComplete();
                }
            }
        }

看,这里调用了observer.onNext(value),说明数据就是从这里发送给观察者的。
咦,想想,这个observer是不是我们自己写那个观察者endObserver呢?
还得回到之前的ObservableMap这里看传的是什么:

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

这里传的是new的一个MapObserver,点进去看看MapObserver是什么,发现MapObserver实现了Observer接口,就是一个观察者,而它接收了两个参数(t,function),这个t就是我们自己写的那个endObserver:

observable_map.subscribe(endObserver);

而这个function也正是我们传给map的那个Function。

MapObserver

那么既然这两个都是我们自己传的,那MapObserver是怎么执行function以及如何发生数据给我的endObserver呢,我们看MapObserver类的代码:

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

MapObserver父类:

    public BasicFuseableObserver(Observer<? super R> actual) {
        this.actual = actual;
    }
    public final void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {

            this.s = s;
            if (s instanceof QueueDisposable) {
                this.qs = (QueueDisposable<T>)s;
            }

            if (beforeDownstream()) {

                actual.onSubscribe(this);

                afterDownstream();
            }

        }
    }

看到这是不是就明白了,原来MapObserver只是对我们自己写的观察者endObserver作了一层封装,当subscribe执行到ObservableJust类的subscribeActual方法时,调用的onSubscribe和onNext都是调用的MapObserver的onSubscribe和onNext,然后MapObserver的这两个方法里有调用了endObserver的onSubscribe和onNext,同时在onNext方法里执行了我们写的Function的apply转换,然后发送数据到endObserver的onNext中

至此,整个例子的流程算走完了,最后用一个类图总结一下整个的流程。


Observable.PNG
Observer.PNG

整个流程就是从ObservableMap的subscribe方法开始,到ObservableJust的subscribeActual,然后到MapObserver的onNext方法,这里面会执行Function的apply方法拿到转换后的数据,最后传递给endObserver的onNext方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 230,362评论 6 544
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 99,577评论 3 429
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 178,486评论 0 383
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 63,852评论 1 317
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 72,600评论 6 412
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 55,944评论 1 328
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 43,944评论 3 447
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 43,108评论 0 290
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 49,652评论 1 336
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 41,385评论 3 358
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 43,616评论 1 374
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 39,111评论 5 364
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 44,798评论 3 350
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 35,205评论 0 28
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 36,537评论 1 295
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 52,334评论 3 400
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 48,570评论 2 379

推荐阅读更多精彩内容