RxJava2笔记(二、事件取消流程)

在上一篇文章RxJava2笔记(一、事件订阅流程)中,我们讲解了RxJava的事件订阅流程,本文我们将继续讲解RxJava的订阅取消流程。

我们对上一篇文章开始的代码做一些修改:

private Disposable disposable;
private void init() {
    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.i(TAG, "onSubscribe:");
            disposable = d;
        }

        @Override
        public void onNext(Integer integer) {
            Log.i(TAG, "onNext: " + integer);
        }

        @Override
        public void onError(Throwable e) {
            Log.i(TAG, "onError: " + e.getMessage());
            e.printStackTrace();
        }

        @Override
        public void onComplete() {
            Log.i(TAG, "onComplete:");
        }
    };

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) {
            emitter.onNext(1);
            emitter.onNext(2);
            disposable.dispose();
            emitter.onNext(3);
            emitter.onComplete();
        }
    }).subscribe(observer);
}

我们在emitter.onNext(2)和emitter.onNext(3)之间添加了一行代码disposable.dispose(),我们来看下输出结果:


image.png

可以看到observer没有收到observable发送的onNext(3)和onComplete事件,原因在于observable在发送完emitter.onNext(2)后调用了disposable.dispose(),由此可以看出这个disposable应该就是用于中断事件订阅的,这个disposable则是在observer调用onSubscribe(Disposable d) 方法时获取的。
这个disposable.dispose()执行了哪些操作呢?我们点进去看下:

public interface Disposable {
    /**
     * Dispose the resource, the operation should be idempotent.
     */
    void dispose();

    /**
     * Returns true if this resource has been disposed.
     * @return true if this resource has been disposed
     */
    boolean isDisposed();
}

可以看到这是一个接口方法,其中dispose()方法就是用来中断事件订阅流程的;isDisposed()方法则是判断当前事件订阅是否被中断,具体的实现细节在其实现类中。不知道大家还有没有印象,在上一节中我们提到一个关键类ObservableCreate类,我们再看一下这个类的相关代码:


image.png

在上一篇文章中,我们在分析事件订阅流程时提到过,在source.subscribe(parent);代码执行之前会先执行observer.onSubscribe(parent);代码。也就是obsever的4个方法中,onSubscribe方法最先被执行,而这个parent就是我们前面获取的Disposable接口对象,也就是CreateEmitter,我们可以看到这个类继承了Disposable接口,因此当我们调用disposable.dispose()方法时,实际上执行的是CreateEmitter类中的实现方法,isDisposed()方法同理。那我们就来看下CreateEmitter类中这两个实现方法:

@Override
public void dispose() {
    DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
    return DisposableHelper.isDisposed(get());
}

可以看到,它是委托给了DisposableHelper这个类来完成的,这个DisposableHelper又是什么东西?我们点进去看下:

public enum DisposableHelper implements Disposable {
    /**
     * The singleton instance representing a terminal, disposed state, don't leak it.
     */
    DISPOSED
    ;

    /**
     * Checks if the given Disposable is the common {@link #DISPOSED} enum value.
     * @param d the disposable to check
     * @return true if d is {@link #DISPOSED}
     */
    public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }
    //.......代码省略

    /**
     * Atomically disposes the Disposable in the field if not already disposed.
     * @param field the target field
     * @return true if the current thread managed to dispose the Disposable
     */
    public static boolean dispose(AtomicReference<Disposable> field) {
        //......代码省略
    }
    
    //......代码省略
}

可以看到这个类是一个枚举类,它有一个枚举常量DISPOSED,这是一个事件订阅中断标识,在isDisposed(Disposable d)方法中,就是判断传入的参数对象(Disposable)是否等于该标识位从而判断该对象的订阅是否被中断。

我们要重点分析的是这个dispose(AtomicReference<Disposable> field)方法,这个方法接收一个AtomicReference类型的参数,这是因为CreateEmitter继承了AtomicReference类,这样保证了操作的原子性,防止在多线程当中出现数据错误。我们来仔细分析下这个方法:

public static boolean dispose(AtomicReference<Disposable> field) {
    //1、获取当前传递进来的对象所持有的disposable对象
    Disposable current = field.get();
    //2、终止标识
    Disposable d = DISPOSED;
    //3、当前current不等于终止标识(dispose()方法还未被调用)
    if (current != d) {
        //4、使用AtomicReference的原子方法将终止标识设置到field对象中,并返回field对象中的旧值
        //并将其保存在current中
        current = field.getAndSet(d);
        //5、current仍旧不等于终止标识
        //备注:当第一次调用dispose()方法时,此时current为空,满足这个条件,下面的current != null为false,直接返回true;
        //另外一种情况就是程序多次调用了dispose()方法,但是disposable值不等于终止标识,说明之前的设置失败了,
        //此时current不为空,再次调用dispose()方法
        if (current != d) {
            if (current != null) {
                current.dispose();
            }
            //7、返回true,表示当前线程成功的设置了终止标识
            return true;
        }
    }
    //8、之前已经调用过dispose()方法,并且已经正确设置了disposable终止标识,
    //订阅事件已经被终止了,再次调用该方法时直接返回false,表示设置终止标识失败
    return false;
}

从上面的代码分析中可以看出,dispose()方法中断订阅的方式只是向AtomicReference<Disposable>这个泛型类中设置了一个终止标识,那么这个终止标识是如何影响到onNext,onComplete等方法的呢?让我们回过头来再看下CreateEmitter中的onNext等方法:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

@Override
public void onError(Throwable t) {
    if (!tryOnError(t)) {
        RxJavaPlugins.onError(t);
    }
}

@Override
public boolean tryOnError(Throwable t) {
    if (t == null) {
        t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
    }
    if (!isDisposed()) {
        try {
            observer.onError(t);
        } finally {
            dispose();
        }
        return true;
    }
    return false;
}

@Override
public void onComplete() {
    if (!isDisposed()) {
        try {
            observer.onComplete();
        } finally {
            dispose();
        }
    }
}

我们来看onNext方法,它先判断要发射的数据是否为空,为空就抛出异常。接着在if语句中判断isDisposed()返回值,如果isDisposed()返回false,表示订阅已经被终止,随后的observer.onNext(t)也就不会被执行。

随后的onComplete和onError都是通过判断isDisposed()的返回值来决定是否执行接下来的操作(即observer.onComplete和observer.onError),而且我们发现observer中的onComplete和onError之间是互斥的,也就是说这两个方法只有一个会执行,如果先执行了onComplete,onError就不会执行;反之先执行onError,onComplete就不会执行。我们通过分析源码发现不管是先执行onError还是先执行onComplete,只要当前订阅事件还未被终止,他们最终都会调用dispose()方法来终止订阅事件,最终会导致isDisposed()返回为false,自然也就不会再执行另外一个了。


到此为止,整个的事件订阅取消流程就分析完了,我们来总结下:

  • 1、在事件订阅开始时,最先执行的是observer中的onSubscribe(Disposable d)方法,也就是ObservableCreate类中subscribeActual(Observer<? super T> observer)方法里面的observer.onSubscribe(parent)这个方法,我们获取到的disposable实际上就是这个parent,也就是包装了外部传进来的observer观察者的CreateEmitter类(该类继承了Disposable接口)
  • 2、调用disposable.dispose()方法终止事件订阅流程,实际上调用的是Disposable的实现类CreateEmitter中的实现方法
  • 3、在CreateEmitter的dispose()实现方法中,委托DisposableHelper类实现具体的取消订阅流程;isDisposed()方法也是如此
  • 4、DisposableHelper类是一个枚举类,它也实现了Disposable接口,内部有一个枚举常量DISPOSED,这是一个标识位。在dispose()方法中,给传入的CreateEmitter设置该标识位,而CreateEmitter当中的onNext,onComplete以及onError方法均需要根据isDisposed()的返回值来决定接下来的操作是否执行;而isDisposed()方法正是根据传入对象所持有的标识位是否等于DISPOSED终止标识来决定其返回true还是false。至此,中断标识位就是通过这样的方式来影响事件中的订阅相关方法(即CreateEmitter中的onNext,onComplete以及onError方法),进而影响到外部观察者对象(observer)中对应的事件接收方法。

在下一章RxJava2笔记(三、订阅线程切换)中,我们将接着分析订阅线程切换时如何进行的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容