RxJava 1.x 源码分析之map操作符变换

使用的版本:
implementation 'io.reactivex:rxjava:1.3.8'
implementation 'io.reactivex:rxandroid:1.2.1'

map操作符简单示例

先看一个简单的例子如下:

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext("123");
                    subscriber.onNext("345");
                    subscriber.onCompleted();
                }
            }
        }).map(new Func1<String, Integer>() {
            @Override
            public Integer call(String s) {
                return Integer.valueOf(s);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError");
            }

            @Override
            public void onNext(Integer i) {
                System.out.println("onNext:" + i);
            }
        });

在这里以map操作符为例并且我假设你已经知道map操作符是干什么的。create方法和subscribe方法之前已经分析过了,直接贴出map操作符相关的源码。

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        //这里其实就是返回了一个新的Observable,
        //他的onSubscribe参数就是这里的OnSubscribeMap
        return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
    }
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

可以看出map方法就是创建并返回了一个新的Observable对象,他的他的onSubscribe参数就是这里的OnSubscribeMap。我们这里贴一下它的源码。

public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {

    final Observable<T> source;

    final Func1<? super T, ? extends R> transformer;

    public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }

    @Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

    static final class MapSubscriber<T, R> extends Subscriber<T> {

        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends R> mapper;

        boolean done;

        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            R result;

            try {
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            actual.onNext(result);
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    }

构造方法中的source参数就是原来的Observable,transformer就是变换操作的Func1。这个类中的重点就是这个call方法,在里面可以看到他把传递过来的Subscriber其实就是Observer给重新包装了一下。生成了一个新的Subscriber传递给source也就是最初的Observable,这样就都连接起来了。看一下unsafeSubscribe方法,这个跟上一章的普通调用一样了。只不过这里使用的Subscriber是之前包装的MapSubscriber。

 public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            // new Subscriber so onStart it
            subscriber.onStart();
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // special handling for certain Throwable/Error/Exception types
            Exceptions.throwIfFatal(e);
            // if an unhandled error occurs executing the onSubscribe we will propagate it
            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                // if this happens it means the onError itself failed (perhaps an invalid function implementation)
                // so we are unable to propagate the error correctly and will just throw
                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                // TODO could the hook be the cause of the error in the on error handling.
                RxJavaHooks.onObservableError(r);
                // TODO why aren't we throwing the hook's return value.
                throw r; // NOPMD
            }
            return Subscriptions.unsubscribed();
        }
    }

这里我们来分析一下MapSubscriber类。他继承自Subscriber,在他的onError和onComplete方法都是直接调用实际的Observer的相应方法。比较需要注意的是onNext方法,我们看一下,他首先拿到mapper.call(t)的返回,也就是map操作符中变换的结果,然后把结果给到actual.onNext(result),到这里我们就知道Observer中onNext方法收到的就是变换后的结果。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    无求_95dd阅读 3,173评论 0 21
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    测天测地测空气阅读 650评论 0 1
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,570评论 7 62
  • 一、Retrofit详解 ·Retrofit的官网地址为 : http://square.github.io/re...
    余生_d630阅读 1,913评论 0 5
  • 引入依赖: implementation 'io.reactivex.rxjava2:rxandroid:2.0....
    为梦想战斗阅读 1,344评论 0 0