自己动手实现 RxJava 理解其调用链

RxJava 拥有繁多的 API 和复杂的逻辑链,学习复杂的知识,一般从整体再到具体,为了学习 RxJava 的原理,参考其源码,自己动手实现一个简化的 RxJava,旨在理解调用链

阅读本文,建议先下载代码 SimpleRx,毕竟在IDE里阅读代码比在网页上要清晰得多,也可以看下打印的日志

最主要的4个类:Observable、OnSubscribe、Operator、Subscriber

1、最简单的,创建一个Observable,然后订阅

Observable
        .create(new OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
            }
        })
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }
        });

public class Observable<T> {
    private OnSubscribe<T> onSubscribe;

    private Observable(OnSubscribe<T> onSubscribe) {
        this.onSubscribe = onSubscribe;
    }

    public final void subscribe(Subscriber<? super T> subscriber) {
        onSubscribe.call(subscriber);
    }

    public static <T> Observable<T> create(OnSubscribe<T> onSubscribe) {
        return new Observable<>(onSubscribe);
    }
}

这里可以看出 subscribe(subscriber)-->onSubscribe.call(subscriber),所以没有订阅动作就不会触发 OnSubscribe.call()

2、map 和 lift

Observable
        .create(new OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
            }
        })
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return "map" + integer;
            }
        })
        .subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
        });

非链式的写法

OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = Observable.create(onSubscribe);
Func1<Integer, String> func = new Func1<>();
Observable<String> observable2 = observable.map(func);
Subscriber<String> subscriber = new Subscriber<>();
observable2.subscribe(subscriber);

create() 跟之前一样,那么 map() 做了什么

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return lift(new OperatorMap<T, R>(func));
}

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

lift() 根据上一个 observable 的 onSubscribe 创建一个新的 OnSubscribeLift 返回一个新的 observable2,上面我们说过 subscribe(subscriber)-->onSubscribe.call(subscriber),所以我们接着看 OnSubscribeLift

public class OnSubscribeLift<T, R> implements OnSubscribe<R> {

    final OnSubscribe<T> parent;
    final Operator<? extends R, ? super T> operator;

    public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
        this.parent = parent;
        this.operator = operator;
    }

    // 先不用关心具体实现,下面讲到再看
    @Override
    public void call(Subscriber<? super R> r) {
        Subscriber<? super T> st = operator.call(r); // 这个 operator 就是 OperatorMap
        parent.call(st); // parent 就是第一个 observable 的 onSubscribe
    }
}

再看下 OperatorMap

public final class OperatorMap<T, R> implements Operator<R, T> {

    final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    // 先不用关心具体实现,下面讲到再看
    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new MapSubscriber<T, R>(o, transformer);
    }
    
    private class MapSubscriber<T, R> extends Subscriber<T> {

        private Subscriber<? super R> actual;
        private Func1<? super T, ? extends R> transformer;

        public MapSubscriber(Subscriber<? super R> o, Func1<? super T, ? extends R> transformer) {
            this.actual = o;
            this.transformer = transformer;
        }

        // 先不用关心具体实现,下面讲到再看
        @Override
        public void onNext(T t) {
            R r = transformer.call(t);
            actual.onNext(r);
        }
    }

}

我们把 map() 和 lift() 都去掉,使用最基本的类来实现

OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);
Func1<Integer, String> func = new Func1<>();

OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);

OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap);

Observable<String> observable2 = new Observable<>(onSubscribe2);

Subscriber<String> subscriber = new Subscriber<>();
observable2.subscribe(subscriber);

到这里,清楚了如何把第一个 Observable<Integer> 转成 Observable<String>,包括 OnSubscribe<Integer> onSubscribe 和 OnSubscribeLift<Integer, String> onSubscribe2 的关系

那么最终的 subscribe() 如何调用到第一个 observable.call(Subscriber<Integer>) 里面的 Subscriber<Integer>.onNext(Integer) 又如何调用到最终的订阅者 subscriber<String>().onNext(String)

**1) observable2.subscribe(subscriber) -->

  1. onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
  2. Subscriber<Integer> st = operatorMap.call(subscriber) 即
  3. Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
  4. parent.call(st) 即 onSubscribe.call(st) -->
  5. st.onNext(1) 即 MapSubscriber.onNext(1) -->
  6. String string = func.call(1)
  7. subscriber.onNext(string)**

至此 Observable.create().map().subscribe() 的调用链就分析完了
很多操作符本质都是 lift(),以此类推,lift() 2次

3、subscribeOn

Scheduler 内部比较繁杂,我们简化下,把 subscribeOn(Scheduler) 简化成 subscribeOnIO()

Observable
        .create(new OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
            }
        })
        .subscribeOnIO()
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }
        });

如何实现 subscribeOnIO() 让第一个 observable 的 onSubscribe 运行在子线程

public final Observable<T> subscribeOnIO() {
    return create(new OnSubscribeOnIO<T>(this));
}

public final class OnSubscribeOnIO<T> implements OnSubscribe<T> {

    private static ExecutorService threadPool = Executors.newCachedThreadPool();

    final Observable<T> source;

    public OnSubscribeOnIO(Observable<T> source) {
        this.source = source;
    }

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                source.subscribe(subscriber); // --> onSubscribe.call(subscriber) --> subscriber.onNext(1)
            }
        };
        threadPool.submit(runnable);
    }
}

从上面看出 subscribeOnIO() 新建了一个线程并加入 CachedThreadPool,在子线程里订阅上一个 Observable,后续的调用都在这个线程里完成

再考虑下复杂点的,加入 map()

Observable
        .create(new OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println(Thread.currentThread());
                subscriber.onNext(1);
            }
        })
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                System.out.println(Thread.currentThread());
                return "map" + integer;
            }
        })
        .subscribeOnIO()
        .subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                System.out.println(Thread.currentThread());
                System.out.println(s);
            }
        });

非链式的写法

OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);
Func1<Integer, String> func = new Func1<>();
OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorMap);
Observable<String> observable2 = new Observable<>(onSubscribe2);

OnSubscribeOnIO<String> onSubscribe3 = new OnSubscribeOnIO(observable2);
Observable<String> observable3 = new Observable<>(onSubscribe3);

Subscriber<String> subscriber = new Subscriber<>();
observable3.subscribe(subscriber);

**1) observable3.subscribe(subscriber) -->

  1. onSubscribe3.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) -->
  2. 子线程 new Runnable(){} --> observable2.subscribe(subscriber)
  3. onSubscribe2.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
  4. Subscriber<Integer> st = operatorMap.call(subscriber) 即
  5. Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
  6. parent.call(st) 即 onSubscribe.call(st) -->
  7. st.onNext(1) 即 MapSubscriber.onNext(1) -->
  8. String string = func.call(1)
  9. subscriber.onNext(string)**

那要是把 map() 与 subscribeOnIO() 换下位置呢

OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);

OnSubscribeOnIO<Integer> onSubscribe2 = new OnSubscribeOnIO(observable);
Observable<Integer> observable2 = new Observable<>(onSubscribe2);

Func1<Integer, String> func = new Func1<>();
OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscrib2, operatorMap);
Observable<String> observable3 = new Observable<>(onSubscribe3);
Subscriber<String> subscriber = new Subscriber<>();
observable3.subscribe(subscriber);

**1) observable3.subscribe(subscriber) -->

  1. onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
  2. Subscriber<Integer> st = operatorMap.call(subscriber) 即
  3. Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
  4. parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeOnIO.call(st) -->
  5. 子线程 new Runnable(){} --> observable.subscribe(st) -->
  6. onSubscribe.call(st) -->
  7. st.onNext(1) 即 MapSubscriber.onNext(1) -->
  8. String string = func.call(1)
  9. subscriber.onNext(string)**

看得出来,不管 subscribeOnIO() 在哪,第一个 onSubscribe.call() 总是运行在子线程

4、observeOn

先看下 demo 最终写法

Handler handler = new Handler();

Observable
        .create(new OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(1);
            }
        })
        .observeOn(handler)
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return "map" + integer;
            }
        })
        .subscribeOnIO()
        .subscribe(new Subscriber<String>() {
            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
        });

handler.loop(); //队列没有消息时会挂起当前线程,直到收到新的消息

同样我们也自己实现一个简单的可以切换回主线程的 observeOn(Handler)

public class Observable<T> {
    ...
    public final Observable<T> observeOn(Handler handler) {
        return lift(new OperatorObserveOn<T>(handler));
    }
}

OperatorObserveOn

public final class OperatorObserveOn<T> implements Operator<T, T> {
    private Handler handler;

    public OperatorObserveOn(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
        Subscriber<T> s = new Subscriber<T>() {
            @Override
            public void onNext(final T t) {
                handler.post(new Runnable() {
                    @Override
                    public void run() {
                        subscriber.onNext(t);
                    }
                });
            }
        };
        return s;
    }
}

自定义Handler

public class Handler {

    private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10);

    public void loop() {
        for (; ; ) {
            Runnable runnable;
            try {
                runnable = queue.take();// 没有数据则一直阻塞,直到有数据自动唤醒
            } catch (InterruptedException e) {
                return;
            }
            if (runnable == null) {
                return;
            }
            runnable.run();
        }
    }

    public void post(Runnable runnable) {
        try {
            queue.put(runnable);// 没有空间则一直阻塞,直到有空间
        } catch (InterruptedException e) {
            return;
        }
    }
}

非链式写法

OnSubscribe<Integer> onSubscribe = new OnSubscribe<>();
Observable<Integer> observable = new Observable<>(onSubscribe);

OperatorObserveOn<Integer> operatorObserveOn = new OperatorObserveOn(handler);
OnSubscribeLift<Integer, String> onSubscribe2 = new OnSubscribeLift<>(onSubscribe, operatorObserveOn);

Func1<Integer, String> func = new Func1<>();
OperatorMap<Integer, String> operatorMap = new OperatorMap<>(func);
OnSubscribeLift<Integer, String> onSubscribe3 = new OnSubscribeLift<>(onSubscribe2, operatorMap);
Observable<String> observable2 = new Observable<>(onSubscribe3);
OnSubscribeOnIO<String> onSubscribe4 = new OnSubscribeOnIO(observable2);
Observable<String> observable3 = new Observable<>(onSubscribe4);
Subscriber<String> subscriber = new Subscriber<>();
observable3.subscribe(subscriber);

**1) observable3.subscribe(subscriber) -->

  1. onSubscribe4.call(subscriber) 即 OnSubscribeOnIO.call(subscriber) -->
  2. 子线程 new Runnable(){} --> observable2.subscribe(subscriber)
  3. onSubscribe3.call(subscriber) 即 OnSubscribeLift.call(subscriber) -->
  4. Subscriber<Integer> st = operatorMap.call(subscriber) 即
  5. Subscriber<Integer> st = new MapSubscriber<Integer, String>(subscriber, func)
  6. parent.call(st) 即 onSubscribe2.call(st) 即 OnSubscribeLift.call(st)-->
  7. Subscriber<Integer> st2 = operatorObserveOn.call(st) -->
  8. parent.call(st2) 即 onSubscribe.call(st2) -->
  9. st2.onNext(1) --> // onNext()里面切换到Handler所在线程
  10. st.onNext(1) -->
  11. String string = func.call(1)
  12. subscriber.onNext(string)**

5、其他

总的来说,调用链确实有点复杂,不过也还是可以接受的,一个调用链花点时间想想还是能清楚,只是每碰到一个调用链都要花点时间才能想清楚,还没能力能在几秒内就能想清楚,只能是多想多锻炼了。比如想想上面的,如果把 observeOn(handler) 放在 map() 后面呢

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容