RxJava 2.x知识笔记

观察者模式的运用

传统的Java观察者模式可以参考此篇博客:Java观察者模式案例简析

RxJava 是基于Java的观察者模式开展的。构建被观察者(Observable/Flowable)、观察者(Observer/Subscriber),并通过建立两者的订阅关系实现观察,在事件的传递过程中可以对事件进行各种处理。

在rxjava 1.x、rxjava 2.x里,Observable是被观察者,Observer是观察者,正常逻辑是观察者通过subscribe订阅Observable的事件处理,当Observable发射事件时Observer接收数据。但为了保持流式API风格,观察者订阅被观察者的代码顺序设计有一些调整。
如:

Observable.subscribe(Observer);

RxJava 1.x 和RxJava 2.x的主要区别

在RxJava 2.x中的观察者模式有两种。而Flowable作为被观察者是专门支持背压的。这也是RxJava 1.x 和RxJava 2.x的主要区别。当然还有一些区别是操作符、接口的不兼容更新。

  • Observable ( 被观察者 ) / Observer ( 观察者 )
  • Flowable (被观察者)/ Subscriber (观察者)


    image.png

RxJava1.x 平滑升级到RxJava2.x

由于RxJava2.0变化较大无法直接升级,幸运的是,官方提供了RxJava2Interop这个库,可以方便地将RxJava1.x升级到RxJava2.x,或者将RxJava2.x转回RxJava1.x。
RxJava2Interop

经典流式API调用风格

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        })
//                ...省略很多在发射过程中的流式处理代码
                .subscribe(new Observer<Integer>() {
                    private Disposable mDisposable;

                    @Override
                    public void onSubscribe(Disposable d) {
                        mDisposable = d;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d("onNext", "" + integer);
                        //新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件。
                        if (integer == 3) {
                            mDisposable.dispose();
                            Log.d("onNext", "已停止接收事件");
                        }
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

打印结果:

11-28 13:00:42.195 29930-29930/? D/onNext: 1
11-28 13:00:42.195 29930-29930/? D/onNext: 2
11-28 13:00:42.195 29930-29930/? D/onNext: 3
11-28 13:00:42.195 29930-29930/? D/onNext: 已停止接收事件

Rxjava 线程调度

subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。

多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。

Rxjava 2.x常用操作符

1,Function<T, R> ——将输入的value类型T转换成输出的value类型R。通常结合Map操作符。

/**
 * A functional interface that takes a value and returns another value, possibly with a
 * different type and allows throwing a checked exception.
 *
 * @param <T> the input value type
 * @param <R> the output value type
 */
public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T t) throws Exception;
}

2,Map——将一个Observable被观察者通过特定函数的执行,转换成另一种Observable被观察者。
在 2.x 中和 1.x 中作用几乎一致,不同点在于:2.x 将 1.x 中的 Func1 和 Func2 改为了 Function 和 BiFunction。

  /**
     * Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and emits the results of these function applications.
     * 
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

3,Consumer——接收一个单独的数据,类似于一个简化版的观察者observer。

/**
 * A functional interface (callback) that accepts a single value.
 * @param <T> the value type
 */
public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(@NonNull T t) throws Exception;
}

4,distinct——去重操作符。即先有的数字保留,重复的数字去除并保留原先顺序的方式输出。

  Observable.just(2, 1, 2, 3, 4, 2, 3)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("accept", "" + integer);
                    }
                });

输出

11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 2
11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 1
11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 3
11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 4

5,concat—— 可以做到不交错的发射两个甚至多个 Observable 的发射事件,并且只有前一个 Observable 终止(onComplete) 后才会订阅下一个 Observable。比如可以采用 concat 操作符先读取缓存再通过网络请求获取数据。

案例说明:

  Observable observable = Observable.just(1, 2, 3, 4, 5, 6)
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer) throws Exception {
                        return integer + 1;
                    }
                });
        Observable.concat(Observable.just(-1, -2, -3, -4, -5, -6), observable)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.d("accept", "" + integer);
                    }
                });

打印输出:看吧,两个Observable是按照顺序依次无交错执行的。

11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -1
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -2
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -3
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -4
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -5
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -6
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 2
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 3
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 4
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 5
11-29 01:48:50.431 31564-31564/com.zjrb.sjzsw D/accept: 6
11-29 01:48:50.431 31564-31564/com.zjrb.sjzsw D/accept: 7

注:熟悉操作符的目的在于,不同场景中都能随时想到有对应的工具可用。

背压

背压:指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
因为事件产生的速度远远快于事件消费的速度,最终导致数据积累越来越多,从而导致OOM等异常。这就是背压产生的必要性。

RxJava2.0中,Flowable是能够支持Backpressure的Observable,是对Observable的补充(而不是替代)。所以Observable被观察者支持的API,Flowable也都支持,并且Flowable的API里也都强制支持背压。

背压经典代码

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                if (e.requested() != 0) {
                    for (int i = 0; i < 10; i++) {
                        e.onNext(i + 1);
                        Log.d(TAG, "已发送" + (i + 1) + "个——剩下" + e.requested());
                    }
                    e.onComplete();
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription = s;
                    }

                    @Override
                    public void onNext(Integer s) {
                        Log.d(TAG, "接收 ——" + s);
                        if (s == 9){
                            subscription.cancel();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d(TAG, "接收错误——" + t);
                    }

                    @Override
                    public void onComplete() {
                    }
                });

外部调用subscription请求配合配额11个。

  if (subscription != null) {
      subscription.request(11);
  }

在BackpressureStrategy.LATEST背压策略下,上游发射10个事件,下游由外部调用请求发布配额指令,当下游接收到第9个事件时暂停上游发布(此操作会清空上游事件源)。

背压策略

  • BackpressureStrategy.MISSING
    此策略下,上游发射的数据不做缓存也不丢弃,下游处理溢出的问题。简单说就是没有背压。
  • BackpressureStrategy.ERROR
    此策略下,在上游发射速度过快并超出下游接收速度时,抛出MissingBackpressureException异常。
  • BackpressureStrategy.BUFFER
    此策略下,把上游发射过来的所有数据全部缓存在缓存区,不做丢弃,待下游接收。
  • BackpressureStrategy.DROP
    此策略下,相当于一种令牌机制(或者配额机制),下游通过request请求产生令牌(配额)给上游,上游接到多少令牌,就给下游发送多少数据。当令牌数消耗到0的时候,上游开始丢弃数据。BackpressureStrategy.LATEST的策略和此类似。
  • BackpressureStrategy.LATEST
    此策略和BackpressureStrategy.DROP的策略类似,但在令牌数为0的时候有一点微妙的区别:onBackpressureDrop直接丢弃数据,不缓存任何数据;而onBackpressureLatest则缓存最新的一条数据,这样当上游接到新令牌的时候,它就先把缓存的上一条“最新”数据发送给下游。
/**
 * Represents the options for applying backpressure to a source sequence.
 */
public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

Flowable案例代码

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onComplete();
                e.onNext(4);
            }
        }, BackpressureStrategy.ERROR)
                //下面两行代码执行线程切换,达到异步效果
//                .subscribeOn(Schedulers.io())
//                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription = s;
//                        subscription.request(1);
                    }

                    @Override
                    public void onNext(Integer s) {
//                        subscription.request(1);
                        Log.d(TAG, "接收——" + s);
//                        subscription.cancel();
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d(TAG, "接收错误——" + t);
                    }

                    @Override
                    public void onComplete() {
                    }
                });

这里指定背压策略是BackpressureStrategy.ERROR,这种策略下执行此段代码会报如下错误。

D/rxjava2: 接收错误——io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

因为,上下游是同步的。上游发射了事件但是下游没有接收,就会造成阻塞(即便上游的事件队列长度只有3个 < 128)。为了避免ANR,就要提示MissingBackpressureException异常。

如果恢复第12、13行处理线程切换的代码,表示上下游位于不同线程,是异步状态。此种情形下,上游发射数据后就不会报MissingBackpressureException异常,但虽然上游能正常发射数据,下游同样接收不到数据。

这里涉及到一个知识点:

Flowable默认事件队列大小为128。BackpressureStrategy.BUFFER策略下事件队列无限大,和没有采取背压的Observable ( 被观察者 ) / Observer ( 观察者 )类似了。

注:在处理同一组数据时,Observable ( 被观察者 ) / Observer ( 观察者 )比BackpressureStrategy.BUFFER策略下的Flowable (被观察者)/ Subscriber (观察者)性能更优,内存消耗更少。

在上下游异步的情况下,上游会先把事件发送到长度为128的事件队列中,待下游发送请求数据指令后从事件队列中拉取数据。这种“响应式拉取”的思想用于解决上下游流速不均衡的情况。

上述代码中,第19、24行代码是表示下游接收前、接收后发送请求配额指令给上游。也可以通过subscription.request(n);在外围调用发送n个请求配额给上游以获取数据。

API自带的被观察者的背压策略

API内的其他被观察者,API也为我们提供了背压策略方法:

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()
    示例代码如下:
   Flowable.interval(1, TimeUnit.MICROSECONDS)
                .onBackpressureDrop()  //加上背压策略
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        subscription = s;
                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

如果不加背压策略,则会报错:

D/rxjava2: onSubscribe
W/rxjava2: onError: 
           io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
               at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:87)
               at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
               at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:278)
               at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:273)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
               at java.lang.Thread.run(Thread.java:761)

响应式编程

当上下游在同一个线程中的时候,在下游调用request(n)就会直接改变上游中的requested的值,多次调用便会叠加这个值,而上游每发送一个事件之后便会去减少这个值,当这个值减少至0的时候,上游若继续发送事件便会抛异常了。

案例情景一:同步环境下,在下游发出10个请求配额情况下,上游发射130个事件。

      Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                    for (int i = 0; i < 130; i++) {
                        e.onNext(i + 1);
                        Log.d(TAG, "requested—left—" + e.requested());
                    }
//                    e.onComplete();
            }
        }, BackpressureStrategy.ERROR)
                //下面两行代码执行线程切换,达到异步效果
//                .subscribeOn(Schedulers.io())
//                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription = s;
                        subscription.request(10);
                    }

                    @Override
                    public void onNext(Integer s) {
                        Log.d(TAG, "接收 ——" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d(TAG, "接收错误——" + t);
                    }

                    @Override
                    public void onComplete() {
                    }
                });

打印LOG日志:

D/rxjava2: 接收 ——1
D/rxjava2: requested—left—9
D/rxjava2: 接收 ——2
D/rxjava2: requested—left—8
D/rxjava2: 接收 ——3
D/rxjava2: requested—left—7
D/rxjava2: 接收 ——4
D/rxjava2: requested—left—6
D/rxjava2: 接收 ——5
D/rxjava2: requested—left—5
D/rxjava2: 接收 ——6
D/rxjava2: requested—left—4
D/rxjava2: 接收 ——7
D/rxjava2: requested—left—3
D/rxjava2: 接收 ——8
D/rxjava2: requested—left—2
D/rxjava2: 接收 ——9
D/rxjava2: requested—left—1
D/rxjava2: 接收 ——10
D/rxjava2: requested—left—0
D/rxjava2: 接收错误——io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
D/rxjava2: requested—left—0

下游不再发送请求配额时,上游的配额令牌就为0。此时上游还有事件强行发个的话,就会出现异常。这里可以验证上面说的结论。

案例情景二:异步环境,在下游发出10个请求配额情况下,上游发射128个事件。

只是多了切换线程的代码:

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

打印LOG日志:

D/rxjava2: requested—left—127
D/rxjava2: requested—left—126
D/rxjava2: requested—left—125
...
D/rxjava2: requested—left—2
D/rxjava2: requested—left—1
D/rxjava2: requested—left—0
D/rxjava2: 接收 ——1
D/rxjava2: 接收 ——2
D/rxjava2: 接收 ——3
D/rxjava2: 接收 ——4
D/rxjava2: 接收 ——5
D/rxjava2: 接收 ——6
D/rxjava2: 接收 ——7
D/rxjava2: 接收 ——8
D/rxjava2: 接收 ——9
D/rxjava2: 接收 ——10

异步的情况下,上游先把事件序列内的事件发射完毕,下游才开始接收。如果上游的时间序列超过默认的128个,则上游事件发射到第129个就会报MissingBackpressureException异常,下游就接收不到事件了。这就涉及到下一个知识点了。

当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们下游调用request(1000)时,实际上改变的是下游线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发。

何时自动触发呢?我们一起看下:

 Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "e.requested() == "+e.requested());
                    for (int i = 0; ; i++) {
                        boolean flag = false;
                        //这里做了一个循环,使发射循环处于保活状态,并在适当时机继续发射事件
                        while (e.requested() == 0){
                            if (!flag){
                                Log.d(TAG, "e.requested() == "+e.requested());
                                flag = true;
                            }
                        }
                        e.onNext(i + 1);
                        Log.d(TAG, "已发送" + (i + 1) + "个——剩下" + e.requested());
                    }
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription = s;
                    }

                    @Override
                    public void onNext(Integer s) {
                        Log.d(TAG, "接收 ——" + s);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.d(TAG, "接收错误——" + t);
                    }

                    @Override
                    public void onComplete() {
                    }
                });

外部手动调用request向上游请求发射配额

 if (subscription != null) {
     subscription.request(96);
     Log.d(TAG,"下游请求了96个");
  }

打印Log日志:

D/rxjava2: e.requested() == 128
D/rxjava2: 已发送1个——剩下127
D/rxjava2: 已发送2个——剩下126
...
D/rxjava2: 已发送127个——剩下1
D/rxjava2: 已发送128个——剩下0
D/rxjava2: e.requested() == 0
D/rxjava2: 下游请求了96个
D/rxjava2: 接收 ——1
D/rxjava2: 接收 ——2
...
D/rxjava2: 接收 ——95
D/rxjava2: 接收 ——96
D/rxjava2: 已发送129个——剩下95
...
D/rxjava2: 已发送223个——剩下1
D/rxjava2: 已发送224个——剩下0
D/rxjava2: e.requested() == 0

1,2,3,4行表示启动之后,上游自动想上游的事件序列缓存区发射128个事件。
8,9...13行表示下游手动请求96个发射配额时接收的事件。此时会自动触发上游继续发送事件,如14,15...17,18行,上游会自动再次发射96个事件(95-0+1=96)。

如果你下游请求95个发射配额的话,上游不会自动触发事件发射的(这个应该是底层设置的触发阀吧)。

因此得出结论:当下游每消费96个事件便会自动触发内部的request()去设置上游的requested的值。

在某一些场景下,可以在发送事件前先判断当前的requested的值是否大于0,若等于0则说明下游处理不过来了,则需要等待,

注意:是在onNext事件里,onComplete和onError事件不会消耗requested。

本章完~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容

  • 转载自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657阅读 2,018评论 1 9
  • RxJava RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式...
    Mr槑阅读 942评论 0 5
  • RXjava 实质是一个异步操作库 1. 导入 2. 概念 Observable(被观察者) : Observer...
    Lhuo阅读 2,304评论 0 0
  • 这可能是最好的RxJava 2.x 教程(完结版) 文章链接:这可能是最好的RxJava 2.x 入门教程(一)这...
    Thor_果冻阅读 551评论 0 1
  • 再怎么憧憬,我的憧憬,我一定能要他不会落空,做不到也行
    海狮会带给你伤害阅读 183评论 1 0