RxJava 1.x 源码分析

Demo 分析

最简单的 demo

Observable.OnSubscribe sourceOnSubscribe = new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onCompleted();
    }
};
Observable sourceObservable = Observable.create(sourceOnSubscribe);
Subscriber<String> targetSubscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, s);
    }
    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }
    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};
sourceObservable.subscribe(targetSubscriber);

没有进行简写,每个中间变量都赋予了名字,方便后面说明.

demo的输出结果就是 Hello Hi Completed!

基本 demo 的函数调用链

分析上述最简单的 demo, 分析中不看 部分与性能、兼容性、扩展性有关的代码和函数调用,仅关注核心代码和调用,
在相关源码里,关键部分有相关注释说明.

先上图.

demo
demo
  1. 实例化 Observable.OnSubscribe , 记为 sourceOnSubscribe.
  2. 调用 Observable.create 静态方法
  3. create()内部会实例化 Observable 对象,需要传入sourceOnSubscribe,将其返回值记为 sourceObservable.
  4. 实例化 Subscriber , 记为 targetSubscriber.
  5. 调用 sourceObservable.subscribe(targetSubscriber) 方法,这是将观察者和观察源建立联系的地方,订阅.
  6. sourceObservable 会首先调用 targetSubscriber.onStart() 方法.
  7. sourceObservable 调用 sourceOnSubscribe 的 call(targetSubscriber)方法, 就是上面我们自定义的地方,执行到我们写的代码附近了.
  8. 执行相关逻辑,上面 demo 中什么都没做,这里需要我们自己实现具体逻辑.
  9. 调用 targetSubscriber.onNext(T) 方法,这里也是上面 demo 里自己实现的地方.
  10. targetSubscriber.onCompleted() or targetSubscriber.onError(e),整个流程跑完了.

整个调用流程其实并不复杂,跟踪下来还是很容易的,RxJava 在内部也没做太多的事.

深入使用的源码分析

RxJava 最强大的就是操作符 和 线程操作,接下来看看这部分.

操作符

  • map
Func1 mapFun=new Func1<Integer, String>() {
             @Override
             public String call(Integer number) { // 参数类型 int
                 return "number " + number; // 返回类型 String
             }
         };
Action1 action1=new Action1<String>() {
          @Override
          public void call(String str) { // 参数类型 String
              Log.i(str);
          }
      };
Observable.just(1,2,3,4,5)
.map(mapFun)
.subscribe(action1);
map
map
  1. 调用 map(mapFun)
  2. map 方法内部实例化 OnSubscribeMap ,传入 this (Observable) 和 mapFun.
  3. 调用 Observable.create 方法,生成新的 MapObservable
  4. 我们调用 subscribe 时,实际上调用的是 MapObservable.subscribe().
  5. 回调 onStart()
  6. 调用 onSubscribeMap 的 call()
  7. 生成一个新的 mapSubscriber ,之后会订阅原来的 Observable.
  8. 关联两个 subscriber 的 unsubscribe()
  9. 用新的 mapSubscriber 订阅原来的 Observable.
  10. 原来的 Observable 回调 mapSubscriber的 onStart()
  11. 调用原来的 OnSubscribe.call()
  12. OnSubscribe 内部的执行逻辑
  13. 调用 mapSubscriber 的 onNext(T)
  14. mapSubscriber 会调用 mapFun.call(T) 返回 R
  15. mapSubscriber 调用真正的 targetSubscriber.onNext(R),R 是转换后的数据

看流程有点复杂,其实也很简单,就是 map 在观察源和观察者之前做了一层转换,当发生订阅时,观察者订阅的不是真正的观察源,
而是 map 内部的'转换观察源','转换观察源'内部会再去订阅真正的观察源,然后将观察源返回的数据通过转换函数mapFun转换,
再返回给我们定义的观察者.

  • lift 变换

    new Func1<Integer, String>() {
                @Override
                public String call(Integer number) { // 参数类型 int
                    return "number " + number; // 返回类型 String
                }
            };
            Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
            observable.lift(new Observable.Operator<String, Integer>() {
                @Override//这个参数 subscriber,就是最终的,也就是我们使用时传入的
                public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
                    return new Subscriber<Integer>() {
                        @Override
                        public void onNext(Integer integer) {
                            subscriber.onNext("number " + integer);
                        }
    
                        @Override
                        public void onCompleted() {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
                    };
                }
            }).subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    Log.d(s);
                }
            });
    

    这个例子功能和上面 map 的例子相同,都是将数字变为字符串.

    lift
    lift

    整体流程和 map 类似,都是生成一个中间 Subscriber ,去订阅原来的 Observable,
    然后在 onNext 等方法里将数据处理转换之后,回调真正的 Subscriber.

线程控制

  - subscribeOn()
    `OperatorSubscribeOn` 新建了一个 OnSubscribe,执行 call() 即产生事件.即 OperatorSubscribeOn,内部调用了线程相关.
  - observeOn()
    使用了 lift 操作符, operator 是 `OperatorObserveOn`.内部也是线程相关代码.
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,258评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,335评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,225评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,126评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,140评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,098评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,018评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,857评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,298评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,518评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,400评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,993评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,638评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,661评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容