RxJava学习笔记(解读源码)

写在前面的话

看了源码也不少,但是每次看源码都仅仅,也就是仅仅跟着作者一步步跟进ta的方法,点进去,再点进去,再再点进去,渐渐的感觉自己当时看的太肤浅,有很多问题都会冒出来,最多的就是为什么要这样写呢,一追究这个问题就会发现自己在知识的海洋里是如此的渺小。写了这么多,希望能够在以后眼光放高一些,看大局,看别人是如何排兵布阵!

rxjava执行原理

先从调用rxjava功能的代码入手,首先最简单的调用就是:

Observable.create(new Observable.OnSubscribe<Integer>() {//创建

    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        subscriber.onNext(1);
        subscriber.onCompleted();
    }

}).subscribe(new Observer<Integer>() {

    @Override
    public void onCompleted() {
        System.out.println("Completed");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Error");
    }

    @Override
    public void onNext(Integer integer) {
        System.out.println("i = " + integer);
    }
});

Observable.create()

  • 首先调用Observable.create()创建一个生产者Observable,同时创建了一个OnSubscribe作为其参数传给Observable

  • OnSubscribeObservable的一个内部接口,从它的注释"* Invoked when Observable.subscribe is called."可以看出当生产者被消费者订阅的时候,它将会被激活。

  • 再看方法内部:

      public static <T> Observable<T> create(OnSubscribe<T> f) {
          return new Observable<T>(RxJavaHooks.onCreate(f));
      }
    
  • 接着我们看到创建的OnSubscribe又传入了RxJavaHooks.onCreate(f)中:

      public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
          Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
          if (f != null) {
              return f.call(onSubscribe);
          }
          return onSubscribe;
      }
    
  • 这里判断了RxJavaHooks中的成员变量onObservableCreate是否为空,在RxjavaHooks的内部有一个静态方法initCreate()可以对其进行初始化,但是我们并没有调用它,所有我们最后返回的onSubscribe依然还是我们自己创建的onSubscribe

  • 倒回至Observable的构造函数,最后将这个onSubscribe赋值给了Observable的成员变量。

      protected Observable(OnSubscribe<T> f) {
          this.onSubscribe = f;
      }
    

Subscriber

创建了生产者Observable,那么肯定还要创建消费者ObserverSubscriber就是Observer接口的一个实现类,同时还实现了Subscription接口,这个接口的方法unsubscribe()用于取消订阅,还有一个isUnsubscribed()方法判断订阅的状态,unsubscribe() 这个方法很关键,因为在 subscribe()之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。

subscribe()

  • 接下来就是subscribe()方法:

      static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
          // new Subscriber so onStart it
          subscriber.onStart();
    
          if (!(subscriber instanceof SafeSubscriber)) {
              subscriber = new SafeSubscriber<T>(subscriber);
          }
    
          try {
              RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
              return RxJavaHooks.onObservableReturn(subscriber);
          } catch (Throwable e) {
                  ...
              }
              return Subscriptions.unsubscribed();
          }
      }
    
  • 首先subscriber = new SafeSubscriber<T>(subscriber)会将我们自己写的subscriber进行包装,其实也就是代理的设计模式,在我们写的代码中通过代理进行一些安全校验,这里就保证了onCompleted()onError()只会有一个执行切只执行一次。

  • 接着看RxJavaHooks.onObservableStart(observable, observable.onSubscribe)就会发现,同样该方法返回的就是我们创建的onSubscribe,之后还调用了它的call()方法,也就成了
    调用我们自己写的call()方法。

  • call()方法中的参数subscriber其实就是我们在调用subscribe()订阅时,作为参数传进来的subscriber,这样也就让我们在生产者这里调用到了消费者的方法,这样也就达到了观察者的目的。

rxjava变换原理

rxjava的变换虽然功能各有不同,但实质上都是针对时间序列的处理和再发送,这里我们就通过map()来了解其中的原理,一下就是利用map的一段代码:

Observable.just(1,2,3)
        .map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return "i = " + integer;
            }
        })
        .subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("s : " + s);
            }
        });

接着看map()方法:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
    return create(new OnSubscribeMap<T, R>(this, func));
}

是不是感觉很熟悉,返回的就是创建Observable的方法,也就是说它将我们map()中的func作为onSubscribeMap的构造参数,那么onSubscribeMap又是什么,它其实就是OnSubscribe的一个实现类,先看其构造方法:

public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
    this.source = source;
    this.transformer = transformer;
}

这里通过内部的成员变量保存了传递进来的ObservableFunc1,记住,这里也就是说在外头调用该方法的Observable,也就是源Observable保存在了source,而我们在map()中写的Func1()保存在了transformer中,在往后看.subscribe(),注意调用订阅方法的已经不是源Observable,而是通过map()内部自己创建返回的一个新Observable,也就是说新的Observable持有了Subscriber的对象,那么,订阅了之后自然就会激活Observable发射数据,也就是onSubscribe中的call()方法开始执行,在这里就是onSubscribeMapcall()方法。

public void call(final Subscriber<? super R> o) {
    MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
    o.add(parent);
    source.unsafeSubscribe(parent);
}

代码开头就创建了一个MapSubscriber,它是Subscriber的一个子类,构造函数如下:

public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
    this.actual = actual;
    this.mapper = mapper;
}

联系上头代码,可以发现它将传入的Subscriber包装成了MapSubscriber,同时还讲源SubscriberFunc1保存在了成员变量中,之后执行的时候肯定要执行onNext()方法,直接看onNext()

public void onNext(T t) {
    R result;

    try {
        result = mapper.call(t);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        unsubscribe();
        onError(OnErrorThrowable.addValueAsLastCause(ex, t));
        return;
    }

    actual.onNext(result);
}

这里有一个泛型R,也就是返回值类型,是通过构造函数传入的,最重要的代码result = mapper.call(t),这里的mapper就是我们在map()中写的Func1,接着传入参数,通过我们自己的方法得到我们想要的结果返回。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容