RxJava小考题 -- Rxjava源码分析(一)

前言:

现在面试很多都会问RxJava的源码,直接讲RxJava的源码,估计大家都不太会看下去,我们先看个小考题,然后再去看相关的源码。

正文:

问题一:

//对象类
class Data {
        public String name;
        public Data(String name) {
            this.name = name;
        }
}

//比如我们使用just操作符来发送二个Data对象
Observable<Data> data = Observable.just(
      new Data("aaaa"),
      new Data( "bbbb")
);

//比如我们用一个Consumer对象来订阅
data.subscribe(new Consumer<Data>() {
      @Override
      public void accept(Data data) throws Exception {
            //里面的内容是把Observable发送过来的对象里面的name属性值改成cccc
            data.name = "cccc";
      }
});

//这时候我们在用一个新的Consumer来订阅这个Observable
data.subscribe(new Consumer<Data>() {
      @Override
      public void accept(Data data) throws Exception {
            //这时候打印data.name
            Log.v("TAG",data.name);
      }
});

问题:
我们用二个Customer分别去订阅一个发送对象的Observable,这时候我们的Log.v("TAG",data.name);输出内容是什么。


问题二:

Observable<Integer> data1 = Observable.just(
       1,2
);

data1.subscribe(new Consumer<Integer>() {
     @Override
     public void accept(Integer d) throws Exception{
            d++;
     }
});

data1.subscribe(new Consumer<Integer>() {
     @Override
     public void accept(Integer d) throws Exception{
            Log.v("TAG","d:"+d);
     }
});

问题:
这时候我们的Log.v("TAG","d:"+d);输出内容是什么。


答案是: 第一个输出是cccc,cccc;第二个是1,2。不知道大家做对了没有。

如果没有做对题目的,我们就来一起来分析代码。

问题分析:

我们先看第一个情形的代码:

Observable.just(
      new Data("aaaa"),
      new Data( "bbbb")
);

just源码 :

public static <T> Observable<T> just(T item1, T item2) {
     ObjectHelper.requireNonNull(item1, "The first item is null");
     ObjectHelper.requireNonNull(item2, "The second item is null");

     return fromArray(item1, item2);
}

前面的二行是检查是否为null。主要是第三行通过fromeArray返回了一个Observable。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromArray(T... items) {
     ObjectHelper.requireNonNull(items, "items is null");
     if (items.length == 0) {
         return empty();
     } else
     if (items.length == 1) {
         return just(items[0]);
     }
     return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}

我们可以看到根据用户传的个数,返回不同的Observable,比如0个的时候返回empty();,一个的时候返回just(items[0]);,其他都返回RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));,但其实本质都差不多。为什么这么说:

empty()源码:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Observable<T> empty() {
    return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE);
}

我们可以看到也是调用了RxJavaPlugins.onAssembly方法。

just(items[0])源码:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
     ObjectHelper.requireNonNull(item, "The item is null");
     return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

其实也是调用了RxJavaPlugins.onAssembly方法。

所以三者虽然是不同的Observable,但是都是调用RxJavaPlugins.onAssembly方法,然后传入不同的对象参数而已。

PS : 对于我们平时见到的什么Observable.create方法,或者Observable.interval方法等,都是最终调用RxJavaPlugins.onAssembly方法,只是一个调用了RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));,一个调用了RxJavaPlugins.onAssembly(new ObservableInterval(xxx,xxxx,xxxx))方法。

因为我们的情形一种是发射了二个对象,那我们就重点来看一下RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));方法:

我们可以看到RxJavaPlugins.onAssembly方法:

@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
   Function<? super Observable, ? extends Observable> f = onObservableAssembly;
   if (f != null) {
         return apply(f, source);
   }
   return source;
}

我们可以看到最后返回了传入的Observable source,所以也就是我们传入的new ObservableFromArray<T>(items)

所以最终我们拿到的Observable是new ObservableFromArray<T>(items),所以我们一般接下去就是

//本质就是传了一个 new ObservableFromArray<T>(items)
Observable observable = Observable.just(
      new Data("aaaa"),
      new Data( "bbbb")
);

//比如我们用一个Observer对象来订阅
observable .subscribe(new Observer<Data>() {
      @Override
      public void onSubscribe(Disposable d) {

      }

      @Override
      public void onNext(Data data) {

      }

      @Override
      public void onError(Throwable e) {

      }

      @Override
      public void onComplete() {

      }
});

我们先来看subscribe方法:

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

             //最终调用了Observable的subscribeActual方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
}

我们可以看到代码里面的加的备注,说明最终我们的
observable.subscribe(observer)最终执行了变为:observable.subscribeActual(observer);,因为我们说过我们的observable具体是ObservableFromArray的实例,所以我们直接看相关源码。

ObservableFromArray.class:

public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    
    //实例化对象的调用的构造函数,同时传入我们要发送的数组
    public ObservableFromArray(T[] array) {
        this.array = array;
    }

     //最终订阅的时候调用这个方法
    @Override
    public void subscribeActual(Observer<? super T> s) {

        //new 一个我们平时用来取消订阅的Disposable,这里具体是FromArrayDisposable
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

        //也就是我们Observer复写的onSubscribe方法,并把Disposable对象传入
        s.onSubscribe(d);
        
        
        if (d.fusionMode) {
            return;
        }
        
        //然后执行了FromArrayDisposable对象的run方法
        d.run();
    }

    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {

        final Observer<? super T> actual;

        final T[] array;

        int index;

        boolean fusionMode;

        volatile boolean disposed;
         //构造函数,传入了Observer 和我们要传的数组
        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }

        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) {
                fusionMode = true;
                return SYNC;
            }
            return NONE;
        }

        @Nullable
        @Override
        public T poll() {
            int i = index;
            T[] a = array;
            if (i != a.length) {
                index = i + 1;
                return ObjectHelper.requireNonNull(a[i], "The array element is null");
            }
            return null;
        }

        @Override
        public boolean isEmpty() {
            return index == array.length;
        }

        @Override
        public void clear() {
            index = array.length;
        }

        //我们用来取消订阅的方法
        @Override
        public void dispose() {
            disposed = true;
        }

        //用来判断是否取消了订阅
        @Override
        public boolean isDisposed() {
            return disposed;
        }

        //当订阅的时候,真正执行的是Disposable的run方法。
        void run() {
            T[] a = array;
            int n = a.length;
            
            /*遍历我们要传的数组,然后并且判断isDisposed()的disposed值
            所以我们就知道了为啥我们取消订阅只要执行Disposable.dispose()方法了
            因为这时候会把disposed返回true,然后这里的for循环判断就会退出循环。
             */
            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                /*我们知道在RxJava 1的时候我们发送一个null值是可以的,
                  但是RxJava2就不行了,因为做了一个判空操作。
                  就会执行Observer的onError方法
                */
                if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                //执行Observer的onNext方法,并且把值一个个传过去
                actual.onNext(value);
            }
            /*如果用户在onNext都运行完后,并且没有执行dispose()方法,
              则if里面为true,就会执行Observer的onComplete()方法。
            */
            if (!isDisposed()) {
                actual.onComplete();
            }
        }
    }
}

我们通过代码可以看到,其实Observable在生成实例后,里面发送的数组是同一个数组,并且发送的数据也是同一个数据,所以虽然我们用多个Observer去订阅的时候,收到的Data对象是同一个,但是因为第一个Observer对这个对象里面的属性修改了,所以第二个Observer对获取同个对象的时候,获取的属性值也就变了。

简单可以这么理解:

Data data = new Data("aaaa");
Log.v("TAG",data.name);
change(data);
Log.v("TAG",data.name)

public void change(Data data){
     data.name = "bbbb";
}

其实我们可以这么理解。虽然都是打印同一个对象,但是属性被更改了。

所以我们的情形一的代码结果是不是已经能理解了呢,各位。

而情形二其实不是考验RxJava的源码基础,而是考验 Java基础。因为情形二我们发送的是(1,2);相当于:

int data = 1;
Log.v("TAG","data:"+data);
change(data);
Log.v("TAG","data:"+data);

public void change(int data){
     data = 2;
}

你就会发现其实二个Log打印的内容是一样的,都是1。

与其他语言不同,Java不允许程序员选择按值传递还是按引用传递各个参数,基本类型(byte--short--int--long--float--double--boolean--char)的变量总是按值传递。就对象而言,不是将对象本身传递给方法,而是将对象的的引用或者说对象的首地址传递给方法,引用本身是按值传递的-----------也就是说,讲引用的副本传递给方法(副本就是说明对象此时有两个引用了),通过对象的引用,方法可以直接操作该对象(当操作该对象时才能改变该对象,而操作引用时源对象是没有改变的)。

具体可以参考这篇:Java值传递以及引用的传递、数组的传递

结语:

所以本章我们更多地看了Rxjava的Observable生成及Observer订阅时候的部分源码及Java值传递等相关知识。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容

  • 转一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong阅读 911评论 0 2
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,464评论 7 62
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,971评论 25 707
  • 好的爱情都是有所图谋的, 我们早已经过了耳听爱情的年纪, 要么图的是美貌,要么就是人品。 天生丽质还温柔善良的自然...
    cherish刺心阅读 262评论 0 0
  • 一 我和我老婆是在大一的时候认识的。那时候我们参加了一个共同的旅行社团——驴友团,正是这个社团我才认识了她...
    北国鲁生阅读 488评论 3 4