RxJava - 小心 observeOn 的陷阱

在 Android 中使用 RxJava 经常会用到 observeOn 这个操作符来完成线程的切换,比如网络请求之后切换到“主线程”,通常会这么写:

  Observable.observeOn(AndroidSchedulers.mainThread())

然而我遇到一个奇怪的问题,onNext 有时候会丢失,先描述一下场景。我用 Retrofit + RxJava 来进行网络请求,我可以直接拿到一个 Observable,就像下面这样:

// 定义 Retrofit 接口
public interface FeedApi {    
  @GET("feeds")    
  Observable<List<Feed>> feeds();
}
// 省略部分代码
RestApi restApi = retrofit.create(RestApi.class);

Observable<List<Feed>> observable = restApi.feeds()
      .subscribeOn(Schedulers.io())       
      .observeOn(AndroidSchedulers.mainThread());

observable.subscribe(...);

目前来看一切正常,我为了更好的用户体验,加入了缓存,当网络有问题的时候,会显示缓存数据,用户不会看到一个空白的页面。缓存的方式多种多样,比如 Realm,同样支持 RxJava,我依然可以拿到一个 Observable,然后我使用 concat 把来自缓存的 Observable 和来自网络的 Observable 合并为一个 Observable 对外提供,就像下面这样:

Observable<List<Feed>> fromNetwork = ...
Observable<List<Feed>> fromCache = ...

Observable<List<Feed>> observable = Observable.concat(fromCache, fromNetwork)
      .subscribeOn(Schedulers.io())       
      .observeOn(AndroidSchedulers.mainThread());

observable.subscribe(...);

合并之后的 Observable 继续使用 subscribeOnobserveOn 来完成线程的切换。恩,看起来很完美,我想象中运行效果是这样的:

假设已经存在缓存的情况下
网络正常的时候,fromCache 从缓存拿到数据发送给 Subscriber,显示到页面上,然后 fromNetwork 从服务器拿到最新的数据,继续发送给 Subscriber,刷新页面为最新的数据,Subscribe 会经历 onNext() -> onNext() -> onCompleted()
网络异常的情况,fromCache 从缓存拿到数据发送给 Subscriber,显示到页面上,然后 fromNetwork 发生异常,Subscriber 收到 Error,可以提示用户网络异常,但是页面上会显示缓存的数据,用户不会看到空白页面,Subscriber 会经历 onNext() -> onError()

我运行之后,在网络正常的情况下,能显示数据,当我把网络关闭的时候,依然能显示数据,数据是从缓存加载的,还不错,和我想象的一样。但是当我反复多次打开页面的时候,发现一个奇怪的现象,页面有时候会显示空白,有时候会显示缓数据,但是都会提示网络异常的信息,从 Subscriber 的角度讲,就是 Subscriber 有时候会 onNext() -> onError(),有时候只会 onError(),缓存数据的那次 onNext() 丢失了!

为了搞清楚这个问题,我做了这样的测试,在 subscribe 之前各个环节加上 doOnNext,观察会不会执行。

observable        
  .doOnNext(s -> log("1 doOnNext: " + s))                    
  .subscribeOn(Schedulers.io())  
  .doOnNext(s -> log("2 doOnNext: " + s))            
  .observeOn(AndroidSchedulers.mainThread())
  .doOnNext(s -> log("3 doOnNext: " + s))
  .subscribe(...);

// 输出结果
1 doOnNext: test
2 doOnNext: test

第三个居然没有打印,说明 observeOn 之后的 onNext() 没有执行,确定了问题范围之后,我尝试从 observeOn 的源码寻找线索。我发现 observeOn 有几个重载的方法,其中有一个参数叫 delayError,看名字是“延迟错误”。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError);

注释对这个参数的描述是:

indicates if the onError notification may not cut ahead of onNext notification on the other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream

大概说 delayError 决定了 onError 会不会切断 onNext 通知,貌似和我的问题有些关系,为了弄清楚具体情况,还是看源码。
observeOn 基于 lift 操作符,lift 需要一个 OperatorOperator 可以将 下游 Subscriber 进行“代理”,返回一个 “代理” Subscriber 对象,“代理”对象可以收到 onNextonErroronCompleted,然后可以进行一些处理,再选择性的发送给 下游 Subscriber

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {    
  if (this instanceof ScalarSynchronousObservable) {
    return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);    
  }    
  // OperatorObserveOn 是关键
  return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

我们继续看 OperatorObserveOn

public final class OperatorObserveOn<T> implements Operator<T, T> {

    ...

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
        // 创建一个 Subscriber “代理”
        ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
        parent.init();
        return parent;
    }

    // Subscriber 的代理,它会代理 下游的 Subscriber 收到事件
    // 然后在 Scheduler 的线程中再转发给 下游的 Subscriber,就完成了线程的切换
    static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {

        @Override
        public void onNext(final T t) {
            // 代理对象先收到 onNext
            if (isUnsubscribed() || finished) {
                return;
            }
            // 把收到的数据放到一个队列中
            if (!queue.offer(NotificationLite.next(t))) {
                onError(new MissingBackpressureException());
                return;
            }
            // 通知 Scheduler,Scheduler 会回调 call 方法
            schedule();
        }

        @Override
        public void onCompleted() {
            if (isUnsubscribed() || finished) {
                return;
            }
            // 标记 流 已经结束
            finished = true;
            // 通知 Scheduler,Scheduler 会回调 call 方法
            schedule();
        }

        @Override
        public void onError(final Throwable e) {
            if (isUnsubscribed() || finished) {
                RxJavaHooks.onError(e);
                return;
            }
            // 保存 error
            error = e;
            // 标记 流 已经结束
            finished = true;
            // 通知 Scheduler,Scheduler 会回调 call 方法
            schedule();
        }

        protected void schedule() {
            if (counter.getAndIncrement() == 0) {
                // 把 this 当做一个 callback,Scheduler 会调用 call 方法。
                recursiveScheduler.schedule(this);
            }
        }

        // only execute this from schedule()
        @Override
        public void call() {
            // 从 Scheduler 中调用,此时已经在 Scheduler 的线程中,比如 Android 的主线程中
            long missed = 1L;
            long currentEmission = emitted;

            final Queue<Object> q = this.queue; // 保存 next 数据的队列
            final Subscriber<? super T> localChild = this.child;    // 下游的 Subscriber

            for (;;) {
                long requestAmount = requested.get();

                while (requestAmount != currentEmission) {
                    boolean done = finished;
                    // 从队列中取数据,就是 onNext 的时候放到队列中的
                    Object v = q.poll();
                    // 判断是不是 null
                    boolean empty = v == null;
                    // 这里是关键,检查是否需要中断 onNext
                    if (checkTerminated(done, empty, localChild, q)) {
                        // 中断,最终的 Subscriber 不会收到 onNext
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    // 转发给 下游的 Subscriber
                    localChild.onNext(NotificationLite.<T>getValue(v));

                    ...
                }
            }
        }

        // 检查是否需要中断 onNext
        boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
            if (a.isUnsubscribed()) {
                q.clear();
                return true;
            }

            if (done) {
                // 如果 流 已经结束,执行了 onCompleted 或者 onError
                if (delayError) {
                    // 如果是延迟错误,delayError 默认是 false
                    if (isEmpty) {
                        // 如果没有 onNext 的数据了
                        Throwable e = error;
                        try {
                            if (e != null) {
                                // 如果存在 error,直接转发给 下游的 Subscriber
                                a.onError(e);
                            } else {
                                // 如果是正常结束,也是直接转发给 下游的 Subscriber
                                a.onCompleted();
                            }
                        } finally {
                            recursiveScheduler.unsubscribe();
                        }
                    }
                } else {
                    // 如果没有延迟错误,这也是默认的情况
                    Throwable e = error;
                    if (e != null) {
                        // **** 如果已经有 error 了,把队列里的 onNext 情况,也就是 onNext 的数据丢失了!
                        q.clear();
                        try {
                            // **** 把 error 发送给 下游的 Subscriber
                            a.onError(e);
                        } finally {
                            recursiveScheduler.unsubscribe();
                        }
                        return true;
                    } else
                    if (isEmpty) {
                        // 如果没有 error,并且没有 onNext 的数据了,直接完成
                        try {
                            a.onCompleted();
                        } finally {
                            recursiveScheduler.unsubscribe();
                        }
                        return true;
                    }
                }

            }

            return false;
        }
    }
}

通过上面对源码的分析,可以确定这个问题出现的原因了,上面注释中 **** 标记的地方就是问题所在:

if (e != null) {
    // **** 如果已经有 error 了,把队列里的 onNext 情况,也就是 onNext 的数据丢失了!
    q.clear();
    try {
        // **** 把 error 发送给 下游的 Subscriber
        a.onError(e);
    } finally {
        recursiveScheduler.unsubscribe();
    }
    return true;
} 

默认 delayErrorfalse,就会走上面这段代码,如果有 error 就会把队列中还未下发给 下游 SubscriberonNext 数据清空,从而导致了 Subscriber 没有收到 onNext,直接收到了 onError

为什么这个问题不是必现的呢,因为这有一个条件,就是出现 onError 的时候,onNext 的数据还没有被消费完,队列中还有数据的情况下才会丢失,也就是 Scheduler 的处理速度跟不上生产速度,这很好理解,因为我们用的是 AndroidSchedulers.mainThread() ,Android 的主线程也就是 Main Lopper,是一个单线程模型,只能一个个的处理,不能并行,而且 Android 中的各种事件,包括 View 的绘制都要通过 Main Looper 来完成,如果 Looper 中积压的消息太多,那么新消息就不会被及时的处理,这时候我们的 fromCache 把缓存数据发送给 Subscriber,就会由于 Looper 中积压的消息过多,Scheduler 不能立刻执行 ObserveOnSubscribercall 方法,从而不能立刻被 下游的 Subscriber 接收到,与此同时 fromNetowrk 发生了 error,当 Looper 处理到消息的时候,也就是 ObserveOnSubscribercall 方法被调用的时候,这时候队列中存在一个 fromCache 的数据,并且 error 也存在,就发生了上面源码分析的情况,导致队列被清空,onNext 没有执行,直接执行了 onError

怎么解决这个问题?答案已经明确了,就是显式的传递 delayErrortrue

Observable.observeOn(AndroidSchedulers.mainThread(), true)

这个问题之前有人在 GitHub 上提过 Issue,并且在这个 Pull 3544 中解决,也就是增加了 delayError,在此之前是没有这个参数的。

明白了原因之后,这个问题其实和 concat 没有什么关系,不用 concat 依然可能出现 onNext 丢失,由于 Main Looper 我们不好去控制,可以通过一个简单的 Java 程序来模拟这种情况。

public static void main(String[] args) {

    // 用一个 单线程的线程池 来模拟 Android 的主线程
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Scheduler androidMainThread = Schedulers.from(executor);

    Observable<String> observable = Observable.create(subscriber -> {
        // 发送数据
        subscriber.onNext("text");
        // 一秒后发送 error
        sleep(1000);
        subscriber.onError(new RuntimeException());
    });

    // 完成线程切换,然后订阅
    observable
            .subscribeOn(Schedulers.io())
            .observeOn(androidMainThread)
            .subscribe(s -> {
                // 收到 onNext 数据
                System.out.println("onNext: " + s);
            }, throwable -> {
                // 收到 onError
                System.out.println("onError: " + throwable);
            });
}

static void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

输出结果:

onNext: text
onError: java.lang.RuntimeException

输出结果是正常的,既收到了 onNext 又收到了 onError,因为 androidMainThread 中没有其他的任务要处理,Observable 发送 onNext 之后会 sleep 一秒钟之后发送 onErrorandroidMainThread 在这一秒钟之内完全可以把 onNext 的数据消费掉,下面模拟 androidMainThread 中有任务处理的情况。

// 主线程有其他任务处理,会阻塞 两秒
executor.execute(() -> sleep(2000));

// 完成线程切换,然后订阅
observable
        .subscribeOn(Schedulers.io())
        .observeOn(androidMainThread)
        .subscribe(s -> {
            // 收到 onNext 数据
            System.out.println("onNext: " + s);
        }, throwable -> {
            // 收到 onError
            System.out.println("onError: " + throwable);
        });

输出结果:

onError: java.lang.RuntimeException

会发现只有 onErroronNext 丢失了,因为 Observable 只是 sleep 了一秒就发送了 onError,而 androidMainThread 的 线程 正在执行一个需要耗时两秒的任务,执行完之后,ObserveOnSubscriber中的队列存在一个 onNext 数据和一个 error,这就会发生之前分析的情况,队列被清空,直接发送了 onError。我们再试试 显式 传递 delayError 的情况。

// 主线程有其他任务处理,会阻塞 两秒
executor.execute(() -> sleep(2000));

// 完成线程切换,然后订阅
observable
        .subscribeOn(Schedulers.io())
        .observeOn(androidMainThread, true) // delayError 为 true
        .subscribe(s -> {
            // 收到 onNext 数据
            System.out.println("onNext: " + s);
        }, throwable -> {
            // 收到 onError
            System.out.println("onError: " + throwable);
        });

输出结果:

onNext: text
onError: java.lang.RuntimeException

会发现 onNext 收到了,这也证明了上面源码分析的逻辑是正确的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容