源码解读篇 — RxJava2源码解读

本篇包含内容包括
1 RxJava事件流向的基本流程;
2 自己手写一个RxJava的基本流程。

1 RxJava事件流向的基本流程

最开始接触RxJava时,很多文章把observerobservable对象定义为观察者和被观察者。其实这样容易把人给绕晕,直接把observable看成上游产生事件者,把observer看出下游接收处理事件者。

RxJava2最简单的调用方式如下:

Observable.create(new ObservableOnSubscribe<String>() { // ①
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {

    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(String s) {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

基本的逻辑是:Observable通过create方法创建一个<? extends Observable>的类,然后通过<? extends Observable>类调用subscribe方法,并传入一个观察者observer

需要搞定的问题:

问:create方法创建的到底是什么类型的实例?

带着问题我们先来看处的create方法,主要涉及的内容如下:

// Observable 类
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));  ①
}

// RxJavaPlugins类
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;  ②
}

为避免分散精力,我们只看主流分支,搞清主流分支,所有的脉络就基本清晰。

由上处可知,返回的Observable对象就是方法onAssembly的返回值,而当onObservableAssembly为空时,返回值其实就是处传进来的ObservableCreate对象。

所以到此,我们可以回答上面的问题了:create创建的对象就是ObservableCreate。

那么正常逻辑下,我们现在肯定要找ObservableCreatesubscribe方法。

嗯嗯~可事情会如我们预期的那么顺利吗?我们在ObservableCreate类中是找不到subscribe方法。

问:subscribe在哪里?

我们来看看ObservableCreate类部分源码:

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ......
}

由上可知ObservableCreate继承至Observable,但我们并没有找到subscribe方法。于是第一个想法就是去父类找,也就是Observable类中。

// Observable类
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

我的乖乖~,subscribe竟然是个final方法,难怪在子类中看不到这个方法,不过我们看到了另一个方法subscribeActual,点进去一看:

// Observable类
protected abstract void subscribeActual(Observer<? super T> observer);

是个抽象方法,这我们就放心了,有点类似与Android控件ViewMeasure方法中的onMeasure

于是乎,我们就可以回答上面提出的问题。

答:Observablesubscribe方法是抽象方法,所有子类继承Observable后,实现其抽象方法subscribeActual,进行实际的订阅操作。

于是我们就直接去看ObservableCreate类中的subscribeActual方法:

// ObservableCreate类
@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);   ①
    observer.onSubscribe(parent);  ②

    try {
        source.subscribe(parent);  ③
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

问:发送消息的基本原理?

①处创建发射器

②处的方法是不是看上去很眼熟,对的,没错,它就是我们观察者Observer最先被调用的方法。

③处source就是我们最开始调用的create方法中的参数ObservableOnSubscribe,也就是发射器中的参数。

还记得我们最开始那个例子吗?

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {

    }
})
......

③处传入的参数就是此处ObservableEmitter对象的实例,所以每次当
我们利用emitter发送消息时,就触发了CreateEmitter类的onNext方法。

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

    private static final long serialVersionUID = -3434801548987643227L;

    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
        }
        if (!isDisposed()) {
            // 触发observer对象的onNext方法
            observer.onNext(t);
        }
    }
  省略部分代码.....
}

2 自己手写一个RxJava的基本流程

古人云:纸上得来终觉浅,绝知此事要躬行。我们自己手写一个这个的过程比任何教程都会要记得深刻,牢固。

写我们的Observable,我把命名都加上一个perry前缀。

abstract class PerryObservable<T> {
    // 订阅开始调用的方法
    fun subscribe(observer: PerryObserver<in T>) {
        subscribeActual(observer)
    }
    
    internal abstract fun subscribeActual(observer: PerryObserver<in T>)

    companion object {
        // create构造方法
        fun <T> create(source: PerryObservableOnSubscribe<T>): PerryObservable<T> {
            return PerryObservableCreate(source)
        }
    }
}

接下来是PerryObservableCreate

class PerryObservableCreate<T>(private val source: PerryObservableOnSubscribe<T>) : PerryObservable<T>() {

    override fun subscribeActual(observer: PerryObserver<in T>) {
        val emitter = PerryCreateEmitter(observer)
        observer.onSubscribe(emitter)

        source.subscribe(emitter)
    }
    
    // 这是我们的发射器
    class PerryCreateEmitter<T> internal constructor(private val observer: PerryObserver<in T>) 
      : PerryDisposable, PerryEmitter<T> {

        override fun dispose() {

        }

        override fun isDisposed(): Boolean {
            return false
        }

        override fun onNext(value: T) {
            observer.onNext(value)
        }

        override fun onError(error: Throwable) {
            observer.onError(error)
        }

        override fun onComplete() {
            observer.onComplete()
        }
    }
}

其他全部都是一些接口,就不一一贴出来啦。

interface PerryEmitter<T> {

    fun onNext(@NonNull value: T)
    
    fun onError(@NonNull error: Throwable)
    
    fun onComplete()
}

最终调用方法如下:

PerryObservable.Companion.create(new PerryObservableOnSubscribe<String>() {
    @Override
    public void subscribe(PerryObservableCreate.PerryCreateEmitter<String> emitter) {
        emitter.onNext("hello");
        emitter.onNext("world");
    }
}).subscribe(new PerryObserver<String>() {
    @Override
    public void onSubscribe(PerryDisposable d) {

    }

    @Override
    public void onNext(String s) {
        Log.d("zp_test", s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});

打印日志:

image.png

至此,我们基本搞清其发生消息的逻辑,由于篇幅有限,其他更加高级的功能,期待下次再见!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352