RxJava源码分析(五)变换操作符Map

引言

前面我们分析了RxJava的线程调度,今天我们研究下RxJava的另外一块强大的功能-事件变换操作符。

map操作符

官方定义:transform the items emitted by an Observable by applying a function to each item

翻译过来就是就是转换发射数据的操作符,说白了就是起到事件变换的作用,下面是图示:


map操作符

map操作符示例:

        /**
         * 变换操作符
         */
        //1.map将事件通过一个函数做变换,一般用作数据类型转换
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
            //Integer转换成String
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "使用 Map变换操作符 将事件" + integer + "的参数从 整型" + integer + " 变换成 字符串类型" + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
        //2.FlatMap/concatMap 将每个事件进行拆分和转换,再合并成一个新的事件序列,最后再发送,前者无序发送,后者有序发送
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                //事件拆分
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("我是事件 " + integer + "拆分后的子事件" + i);
                    // 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
                    // 最终合并,再发送给被观察者
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

在这个例子中map操作符将整形数据变换成了String数据,交给Consumer<String>对象接收。
我们看一下map函数的源码:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        //判空略过
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

返回ObservableMap对象:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    //调用层传入的Function,用于数据转换
    final Function<? super T, ? extends U> function;
    //source为上游,支持类型为T,U为下游的支持类型
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        //上游source订阅MapObserver,根据function和传入的观察者t构造MapObserver对象
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
       //MapObserver通过function对象mapper,将T数据转换U数据类型后,再转换后的数据交给真正的接受者actual
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                actual.onNext(null);
                return;
            }
            //转换后的数据
            U v;

            try {
               //调用mapper.apply(t)执行数据转换,交给actual
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}
  1. 它继承自AbstractObservableWithUpstream,前面我们提到过,它封装了上游的ObservableSource,是支持数据类型转换的Observable。

  2. subscribeActual订阅方法是调用上游(也就是用户定义的Observable对象)的subscribe方法,入参为Observer对象(下游)的装饰类MapObserver。

  3. ObservableMap持有Function对象和真正的观察者对象,在被订阅的onXXX方法中,通过Function将上游发送的T类型数据转换成U类型数据,然后将交给观察者对象处理。

FlatMap操作符

还是先看看官方定义:

官方定义:transform the items emitted by an Observable by applying a function to each item

大致意思是将每一个上游发射的数据从一个Observable转成为多个Observable,并将所有要发射的数据平铺为一个Observable。
下面是FlatMap的图解:


FlatMap

到这里我们总结一下:

  1. flatmap 转换是一对多的(一对一当然也支持),原来发射了几个数据,转换之后可以是更多个;
  2. flatMap 转换同样可以改变发射的数据类型;
  3. flatMap 转换后的数据,还是会逐个发射给我们的Observer来接收(就像这些数据是由一个Observable发射的一样,其实是多个Observable发射然后合并的);
    4.注意:转换后的数据发射顺序可能和上游的发射顺序不一致,为什么会这样,我们后面看源码分析。
    样例:
 Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                //事件拆分
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("我是事件 " + integer + "拆分后的子事件" + i);
                    // 通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送三个String事件
                    // 最终合并,再发送给被观察者
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });

下面我们看下源码

FlatMap方法

flatMap有多个重载方法,最终调用下面的方法:

@SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
       //判空
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //参数为正校验
        ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        if (this instanceof ScalarCallable) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        //返回一个ObservableFlatMap对象
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }

ObservableFlatMap

还是和之前的套路一样,看下ObservableFlatMap的订阅方法:

 @Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }
       //返回包装类MergeObserver
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

最后返回MergeObserver,显而易见这是实现flatMap操作的核心类。

MergeObserver

看关键方法onNext:

  @Override
        public void onNext(T t) {
            // safeguard against misbehaving sources
            if (done) {
                return;
            }
            ObservableSource<? extends U> p;
            try {
               //调用Function方法转换成新的Observable
                p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                s.dispose();
                onError(e);
                return;
            }

            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    if (wip == maxConcurrency) {
                        sources.offer(p);
                        return;
                    }
                    wip++;
                }
            }
            //默认情况会走到这里
            subscribeInner(p);
        }

由于我们默认调用的flatmap 的 maxConcurrency 大小是 Integer.MAX_VALUE, 所以最终会调用 subscribeInner(p),注意这里我们的mapper方法以及被调用了,p就是跟我们传入的Function生成的Observable,我们再继续往下看

       @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                if (p instanceof Callable) {
                    tryEmitScalar(((Callable<? extends U>)p));

                    if (maxConcurrency != Integer.MAX_VALUE) {
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                break;
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                     //一般情况下,返回的Observable 都不是 Callable类型的,走这里
                    //构造最终的观察者InnerObserver,它是真正接收数据的
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    addInner(inner);
                    p.subscribe(inner);
                    break;
                }
            }
        }

终于找到最后的接受者InnerObserver,接着看它的onNext方法:

        @Override
        public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                //MergeObserable的方法
                parent.tryEmit(t, this);
            } else {
                parent.drain();
            }
        }

paren为上面的MergeObserable的tryEmit方法,看字面意思是尝试发送,所以可能会引入同步机制,下面是tryEmit方法:

 void tryEmit(U value, InnerObserver<T, U> inner) {
            //引入同步机制,保证每个时刻只发射一个数据给最后的接受者
            //尝试获取cas锁
            if (get() == 0 && compareAndSet(0, 1)) {
                //最后的接受者
                actual.onNext(value);
                if (decrementAndGet() == 0) {
                    //释放锁返回
                    return;
                }
            } else {//没有拿到锁,就把数据加入队列
                SimpleQueue<U> q = inner.queue;
                if (q == null) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                    inner.queue = q;
                }
                q.offer(value);
                if (getAndIncrement() != 0) {
                    return;
                }
            }
            //轮询队列,取数据
            drainLoop();
        }

这里需要提一下:MergeObserver 继承了 AtomicInteger,所以这里的tryEmit方法就利用了 AtomicInteger 的同步机制,同时只会有一个 value 被 actual Observer 发射,而且这里 刚好 可以解答我们上面留下的 问题,由于 AtomicInteger CAS锁只能保证操作的原子性,并不保证锁的获取顺序,是抢占式的,所以最终数据的发射顺序并不是固定的(同一个Observable发出的数据是有序的)。
如果没有获取到锁,就会将要发射的数据放入 队列中,drainLoop 方法会循环去获取队列中的 数据,然后发射。

void drainLoop() {
            //取最终的观察者
            final Observer<? super U> child = this.actual;
            int missed = 1;
            for (;;) {
                //错误检查
                if (checkTerminate()) {
                    return;
                }
                SimplePlainQueue<U> svq = queue;

                if (svq != null) {
                    for (;;) {
                        U o;
                        for (;;) {
                            if (checkTerminate()) {
                                return;
                            }
                            //取数据
                            o = svq.poll();
                            if (o == null) {
                                break;
                            }
                            //交给最后的观察者
                            child.onNext(o);
                        }
                        if (o == null) {
                            break;
                        }
                    }
                }
                .....
                //队列数据轮询完毕,做完成或者错误处理
                if (d && (svq == null || svq.isEmpty()) && n == 0) {
                    Throwable ex = errors.get();
                    if (ex == null) {
                        child.onComplete();
                    } else {
                        child.onError(errors.terminate());
                    }
                    return;
                }
              ....
        }

这个方法就是从循环从数据队列中取数据交给最后的观察者接收。
FlatMap引入CAS机制,在onNext方法中尝试拿到锁,如果拿到则立即交给最终的观察者,否则加入等待队列。
关于map操作法我们先讲到这里,水平有限,难免有纰漏,还望多多指正!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,997评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,603评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,359评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,309评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,346评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,258评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,122评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,970评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,403评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,596评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,769评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,464评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,075评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,705评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,848评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,831评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,678评论 2 354