手写RxJava简易框架领悟RxJava的美秒

RxJava笔记

前言

看此篇之前最好知道RxJava的使用。由于RxJava内部源码实现有点复杂,既然用拆轮子的方式来分析源码比较难啃,不如换种方式,以造轮子的方式,将源码中与性能、兼容性、扩展性有关的代码剔除,只留下核心代码,加上我个人的理解,带大家揭秘RxJava的实现原理(本文不涉及框架的使用介绍)。

一、构建观察者类

Subsribler在RxJava里面是一个抽象类,它实现了Observer接口。

public interface Observer<T> {

    void onCompleted();

    void onError(Throwable throwable);

    void onNext(T value);
}

public abstract class Subscriber<T> implements Observer<T>{

    public void onStart(){

    }
}

二、构建被观察者

Observable(被观察者)拥有很多工厂方法和各式各样的操作符。每个Observable里面都维护了一个OnSubscribe对象,并通过subscribe()里面的call(Subscriber<? super T> subscriber)方法与观察者产生联系。

public class Observable<T> {

    final OnSubscribe<T> onSubscribe;

    private  Observable(OnSubscribe<T> onSubscribe){
        this.onSubscribe = onSubscribe;
    }

    public static <T> Observable<T> create(OnSubscribe<T> onSubscribe){
        return new Observable<T>(onSubscribe);
    }

    public void subscribe(Subscriber<T> subscriber){
        subscriber.onStart();
        onSubscribe.call(subscriber);
    }

    public interface OnSubscribe<T>{
        void call(Subscriber<? super T> subscriber);
    }
}

三、RxJava的事件流雏形产生

通过上面写的观察者和被观察者,即可写出一个没有操作符和线程切换功能的简易版Rxjava。

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Result: "+value);
            }
        });

通过Observable.create将OnSubscribe的匿名类传给Observable,在subscribe()时回调OnSubscribe接口中的call方法,同时call方法参数即为subscribe的参数,即观察者,因此继续回调subscriber.onNext()即可完成观察者里的逻辑。

结果如下:

image.png

四、玩转RxJava里的操作符

RxJava之所以强大好用,与其拥有丰富灵活的操作符是分不开的。那么我们就试着为这个框架添加一个最常用的操作符:map。先看代码:

    public <R> Observable<R> map(final Fun1<T, R> transformer){
        return create(new OnSubscribe<R>() {
            @Override
            public void call(final Subscriber<? super R> subscriber) {
                Observable.this.onSubscribe.call(new Subscriber<T>() {
                    @Override
                    public void onCompleted() {
                        subscriber.onCompleted();
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        subscriber.onError(throwable);
                    }

                    @Override
                    public void onNext(T value) {
                        subscriber.onNext(transformer.transfer(value));
                    }
                });
            }
        });
    }

    public interface Fun1<T, R>{
        R transfer(T from);
    }

测试代码

 Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).map(new Observable.Fun1<Integer, String>() {
             @Override
             public String transfer(Integer from) {
                 return String.valueOf(from)+"_Map";
             }
         }
        ).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String value) {
                System.out.println("Result: "+value);
            }
        });

结果如下:


image.png
  • 其实RxJava每调用一次操作符的方法,就相当于在上层数据源和下层观察者之间桥接了一个新的Observable。桥接的Observable内部会实例化新的OnSuscribe和Subscriber。

  • 新建的OnSuscribe的call方法负责持有目标Subscriber,此时就可以回调subscriber的方法来完成观察的行为了。但是这是还没有数据源,想要获得数据源必须调用源Observable.OnSubscribe的subscribe方法,传入一个新的Subscriber,这样就可以在它的onNext()方法中获得数据源,并经过传入的接口处理后,发送给最终的Subscriber。

总体来说就是源Observable.OnSubscribe将Event往下发送给桥接Observable.Subscriber,最终桥接Observable.Subscriber将Event做相应处理后转发给目标Subscriber。

五、RxJava里的线程切换

RxJava中最激动人心的功能是异步处理,能够自如地切换线程。

利用subscribeOn() 结合observeOn() 来实现线程控制,让事件的产生和消费发生在不同的线程。 observeOn() 可以多次调用,Subscriber的执行线程与最后一次observeOn()的调用有关。但subscribeOn() 多次调用只有第一个subscribeOn() 起作用。

这是因为 observeOn() 作用的是Subscriber,而subscribeOn() 作用的是OnSubscribe,这时事件还没开始发送,因此subscribeOn()的线程控制可以从事件发出的开端就造成影响。

线程调度除了桥接Observable以外,RxJava还用到一个很关键的类Scheduler(调度器)。

5.1 Scheduler核心代码如下:
public class Scheduler {
    private final static Scheduler ioScheduler
            = new Scheduler(Executors.newSingleThreadExecutor());

    Executor executor;

    public Scheduler(Executor executor){
        this.executor = executor;
    }

    public Worker createWorker(){
        return new Worker(executor);
    }

    public static class Worker {
        Executor executor1;
        public Worker(Executor executor1){
            this.executor1 = executor1;
        }

        public void schedule(Runnable runnable){
            executor1.execute(runnable);
        }
    }

    public static Scheduler io(){
        return ioScheduler;
    }
}

具体的Scheduler的实现类就不看了,但我们需要知道,能做到线程切换的关键是Worker的schedule方法,因为它会把传过来的任务放入线程池,并在新线程中执行。

5.2 实现observeOn

observeOn是作用于下层Subscriber的,需要让下层Subscriber的事件处理方法放到新线程中执行。为此,在Observable类里面,添加如下代码:

public Observable<T> observeOn(final Scheduler scheduler){
        return create(new OnSubscribe<T>() {
            @Override
            public void call(final Subscriber<? super T> subscriber) {
                subscriber.onStart();
                final Scheduler.Worker worker = scheduler.createWorker();
                Observable.this.onSubscribe.call(new Subscriber<T>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable throwable) {
                    }

                    @Override
                    public void onNext(final T value) {
                        worker.schedule(new Runnable() {
                            @Override
                            public void run() {
                                subscriber.onNext(value);
                            }
                        });
                    }
                });
            }
        });
    }

测试代码如下:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).map(new Observable.Fun1<Integer, String>() {
                   @Override
                   public String transfer(Integer from) {
                       return String.valueOf(from)+"_Map";
                   }
               }
        ).observeOn(Scheduler.io()).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String value) {
                System.out.println("Result: "+Thread.currentThread().getName());
            }
        });

结果如下:

image.png
5.3 实现subscribeOn

subscribeOn是作用于上层OnSubscribe的,可以让OnSubscribe的call方法在新线程中执行。

因此,在Observable类里面,添加如下代码:

public Observable<T> subscribeOn(final Scheduler scheduler){
        return create(new OnSubscribe<T>() {
            @Override
            public void call(final Subscriber<? super T> subscriber) {
                scheduler.createWorker().schedule(new Runnable() {
                    @Override
                    public void run() {
                        Observable.this.onSubscribe.call(subscriber);
                    }
                });
            }
        });
    }

测试代码如下:

Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                System.out.println("Observable thread: "+Thread.currentThread().getName());
                for(int i = 0; i < 10; i++){
                    subscriber.onNext(i);
                }
            }
        }).map(new Observable.Fun1<Integer, String>() {
                   @Override
                   public String transfer(Integer from) {
                       System.out.println("Map Observable thread: "+Thread.currentThread().getName());
                       return String.valueOf(from)+"_Map";
                   }
               }
        ).observeOn(Scheduler.io()).subscribeOn(Scheduler.io()).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onNext(String value) {
//                System.out.println("Result: "+Thread.currentThread().getName());
            }
        });

结果如下:


image.png

六、总结

相信看RxJava这个简易版的设计对大家的启示,比网上的一些源码解析清晰的多,希望可以抛砖引玉。有时候我们总是认为看几篇博文貌似当时就懂了明白了,但是这种理解或者说记忆貌似不持久。过了一段时间总是还给博主了。学习还是得深入源码,从源码中学习,然后在结合其他人的博客查漏补缺,这样才是自己的东西。大家有兴趣可以把flatMap等其他操作符来自己实现一下。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,417评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,921评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,850评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,945评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,069评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,188评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,239评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,994评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,409评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,735评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,898评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,578评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,205评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,916评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,156评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,722评论 2 363
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,781评论 2 351

推荐阅读更多精彩内容