RxJava 源码学习之最简单的Demo

** 会看(我在这呢,X_X!!!) =》会用** =》 会说 =》会写 =》会模仿 =》**能new **

说明:本文章是依据 RxJava 1.x (1.2.1) 源码分析学习的。

这是开头

本文主要看源码分析一个RxJava最简单的使用示例。一步俩步,摩擦......

正文在这

首先,写个最简单的RxJava使用示例,如下:

//创建一个Observable 的对象
Observable<String> mObservable = Observable.create( 
   new Observable.OnSubscribe<String>(){ 
       @Override  
      public void call(Subscriber<? super String> subscriber) { 
           subscriber.onNext("Hello world!"); 
           subscriber.onCompleted(); 
       } 
   });
//创建一个Subscriber对象
Subscriber<String> mSubscriber = new Subscriber<String>() { 
   @Override 
   public void onCompleted() { 
       //完成  
  } 
   @Override  
  public void onError(Throwable e) { 
       //异常  
  }  
  @Override  
  public void onNext(String s) { 
       //下一步  
      System.out.print(s); 
   }
};
//关联 Observable 和 Subscriber
mObservable.subscribe(mSubscriber);

我们就根据上面最简单的使用示例,按下面步骤来学习一下RxJava的源码:

  • Observable(被观察者) 的创建
  • Observer(观察者)的创建
  • subscribe() 绑定

开始喽:

  • 1.Observable(被观察者) 的创建

Observable.create 入手

public static <T> Observable<T> create(OnSubscribe<T> f) { 
   return new Observable<T>(RxJavaHooks.onCreate(f));
} 
//.....
//相关代码:RxJavaHooks.onCreate(f)
//.....  
final OnSubscribe<T> onSubscribe;  
protected Observable(OnSubscribe<T> f) {
   this.onSubscribe = f;
}

create创建了一个Observable对象,并将OnSubscribe f参数赋值给了Observable 对象的成员变量 onSubscribe。当然,这中间调用了RxJavaHooks.onCreate(f),我们来看下它的源码。

/**
 * Hook to call when an Observable is created.
 * @param <T> the value type
 * @param onSubscribe the original OnSubscribe logic
 * @return the original or replacement OnSubscribe instance
 */
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) { 
   Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate; 
   if (f != null) { 
       return f.call(onSubscribe); 
   } 
   return onSubscribe;
}

这里其实也就是返回了 onSubscribe 本身,RxJavaHooks是一个代理对象, 仅仅用作调试的时候可以插入一些测试代码,大家这样理解就行了。
Ok,上面说了这么久 Observable.OnSubscribe ,那这个 Observable.OnSubscribe 主要是干嘛的呢?,,看源码:

/**
   * Invoked when Observable.subscribe is called.
   * @param <T> the output value type
 */
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { 
   // cover for generics insanity
}.
//Action1
public interface Action1<T> extends Action {  
   void call(T t);
}

可以看到 Observable.OnSubscribe是一个拥有 call方法的接口类而已。不过,注意到 Observable.subscribe(后面讲)被调用时,该接口方法会被调用。
好的,总结就是create()方法创建了一个Observable,且给Observable中OnSubscribe变量进行赋值。

  • 2.Observer(观察者)的创建

在文中,为了方便,所有的观察者(Observer)我将用 Subscriber(是实现Observer的接口的抽象类) 来代替,这个会在下面 subscribe()讲解中说明。先看下,Subscriber 的构造:

public abstract class Subscriber<T> implements Observer<T>, Subscription {  
    ...省略其他代码..
    protected Subscriber() {    this(null, false);}
    protected Subscriber(Subscriber<?> subscriber) {
      this(subscriber, true);
    }
    protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
      this.subscriber = subscriber;
      this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList();
    }
    ...省略其他代码...
}

Subscriber是一个抽象类,主要实现了 ObserverSubscription接口类。

public interface Observer<T> {  
    //提示观察者,当被观察者的所有事件已经结束了。
    //与 onError 互斥,
    void onCompleted();  

    //提示观察者,当有一个异常发生时。
    //与 onCompleted互斥,
    void onError(Throwable e);  

    /** 为观察者提供了一个新的项目观察
     *  这个方法可能被调用 0 ~ N 次。
     *  当 onCompleted 或 onError 被调用后,onNext 不会在被调用
    */
    void onNext(T t);
}
.....
public interface Subscription {   
    //观察者与被观察者解绑,不再对它进行观察
    void unsubscribe();
    //检查是否已经解绑    
    boolean isUnsubscribed();
}

上面的代码注释已经很清楚了写明了 Subscriber 的相关方法作用。

3.subscribe() 绑定

subscribe() 实现 被观察者Observable 和 观察者**Subscriber ** 的绑定关系,也是代码开始执行的地方。现在来看下这个方法的源码,我们先找到 subscribe(final Observer<? super T> observer)看下,如下:

public final Subscription subscribe(final Observer<? super T> observer) {  
  if (observer instanceof Subscriber) {  
      return subscribe((Subscriber<? super T>)observer);  
  }  
  return subscribe(new ObserverSubscriber<T>(observer));
}
.....
//再查看下 ObserverSubscriber这个类
public final class ObserverSubscriber<T> extends Subscriber<T> { 
   final Observer<? super T> observer; 
   public ObserverSubscriber(Observer<? super T> observer) {   
     this.observer = observer;  
  }    
  @Override  
  public void onNext(T t) {   
     observer.onNext(t);  
  }   
  @Override  
  public void onError(Throwable e) {    
    observer.onError(e);  
  }     
 @Override  
  public void onCompleted() {   
     observer.onCompleted();  
  }
}

从上面代码中,我们看到在 subscribe()中最终就是将 Observer 转成了 Subscriber 进行后续代码执行的。这也就说明了之前的用Subsciber代替Observer的原因。
好的,我们现在继续查看 subscribe()方法的源码:

public final Subscription subscribe(Subscriber<? super T> subscriber) { 
   return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { 
 ....省略无关代码...
 subscriber.onStart();
 ...省略无关代码.....
 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);  
 return hook.onSubscribeReturn(subscriber); 
}

subscriber.onStart(); (观察者Subscriber的onStart 方法可以做些预先操作,比如发起请求前,显示进度条等),最主要的是 执行了这一句hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);,ok,我们先看下 hook.onSubscribeStart的实现:

public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {  
  // pass through by default 
  return onSubscribe;
}

竟然,就是直接返回了 onSubscribe这个对象,什么事也没干,而我们知道这个 onSubscribe就是我们创建 Observable时传进入的参数,所以 subscribe()简化如下

//subscriber 就是我们的观察者
onSubscribe.call(subscriber);

然后,就是在 call() 方法中执行 Subscriber的 onNext(),onCompleted(),或 onError()相关方法了。

Observable<String> mObservable = Observable.create( 
   new Observable.OnSubscribe<String>(){ 
       @Override  
      public void call(Subscriber<? super String> subscriber) { 
          //调用 subscriber 的相关方法
           subscriber.onNext("Hello world!"); 
           subscriber.onCompleted(); 
       } 
   });

我们来画一个流程图,如下:


Paste_Image.png

结合图,我们整理下思路:

  • 首先,通过 create() 方法创建ObservableA对象,并赋值其变量OnSubscribeA。
  • 创建观察者对象Subscriber。
  • 然后,通过 subscribe() 绑定 ObservableA 和 Subsciber,并调用 OnSubcribeA.call()。
  • 最后,就是在 call()方法中,执行调用 Subscriber 的 onNext / onCompleted / onError 的对应方法。

尾巴呢?

到这里,我们算是把RxJava最简单的部分弄清楚了,如果文中有什么差错或者不懂的地方,欢迎大家指正与交流!

接下来呢,我们将向高级部分冲刺,比如 各种操作符lift的原理Scheduler线程控制的原理 等等,我们将一一探索其源码的奥秘,(●'◡'●)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容

  • 文章转自:http://gank.io/post/560e15be2dca930e00da1083作者:扔物线在正...
    xpengb阅读 7,027评论 9 73
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,160评论 6 151
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,464评论 7 62
  • 几个遗留的问题: 迭代器没加进来,还在研究中 ArrayList 继承关系没说明,后期专门写一篇博客
    VictorBXv阅读 157评论 0 1
  • 新东方的面试通知让我等了很久,久到我都想要冲到新东方总部去质问他们了。要是原来我就这样淡淡地悲伤,然后难过地再去学...
    寅颖阅读 182评论 0 2