我学 rxjava 2(2)- 操作符

rxjava 的操作符可是一大亮点啊,之前就算是有相应式变成的库也没有像 rxjava 这样提供这样可以变换的操作来。rxjava 操作符神神奇的地方在于他可以把一大堆无关的,单独的异步操作编制成一条执行的链子出来,让这些异步操作按照指定的顺序执行,其中我们可以执行大量的无法想想的操作,可以上上一个异步返回的数据,加工变成一个新的数据交给下一额个异步操作;可以把几个异步做关联,一个执行不完,下一个不能执行,一个执行不过去,之后的也不能执行;可以把异步并行执行,知道最后一个异步返回数据,然后结合这几个数据,返回一个集合数据,等等这些操作再 rxjava 之前都是无法想象的,都是不敢想的,这决然都能封装出来,但是 rxjava 还就是做到了,究其原因害的归功于响应式编程的思路啊。

废话不来了,rxjava 的操作符根据目的是分为几种的,我总结分类一下,便于大家记忆:


创建操作符

使用这类操作符可以不费力的创建出 Observable 对象

变换操作符

可以把一个异步获取的数据进行操作,改变成另一个类型的数据发射,或是把一个 Observable 异步操作变换成另一个 Observable 异步操作

组合操作符

组合操作符可以把多个异步操作合并或是链接起来,这是日常我们最常用的场景了

功能操作符

  • 条件操作符:

    • takeWhile - 只发送满足条件的,一旦碰到不满足的,后面的均不会发送,complete 结束
    • takeUntil - 一直发送,直到满足条件时结束,并且,满足条件的那一个也会发送,之后发complete 结束
  • 判断操作符
    * all - 只有全部满足的时候才会返回 true
    * contains - 是否包含指定参数
    * isEmpty - 是否为空

  • 刷选操作符
    * filter - 只发射符合条件的数据,数据从头遍历到尾,碰到不符合的数据也不会中断发射
    * ofType - 只发射符合类型的数据,数据从头遍历到尾,碰到不符合的数据也不会中断发射
    * elementAt - 选择指定位置的元素,下标准许越界,如果越界,可以指定默认值
    * firstElement / lastElement - 获得第一个数据 / 获得最后一个数据
    * distinct - 过滤重复,只要出现过得就不会再出现
    * distinctUntilChanged - 过滤连续重复数据,注意必须是连续重复的才有有效
    * take / takeLast - 从头开始 / 最后开始 获取指定数目的数据,如果数据量小于指定数目,则进入 error
    * skip / skipLast - 正序 / 倒序 跳过指定 count,如果不足,则会进入 onError
    * throttleFirst / throttleLast - 指定时间间隔内取第一个 / 最后一个 数据
    * throttleWithTimeout / debounce - 2次数据间隔超过指定时间才有效,若一直没有合适的数据,默认取最后一个数据


创建操作符

just
        Observable.just("AA","BB")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d("AA", "resylt:" + s);
                    }
                });

just 可以接受一个长度最多为 10 的可变参数

注意 just 只创建出 1 个 Observable 对象,复数的数据都是挨个发送出去,而不是创建多个 Observable 对象

    public static <T> Observable<T> just(T item1, T item2) {
        ObjectHelper.requireNonNull(item1, "The first item is null");
        ObjectHelper.requireNonNull(item2, "The second item is null");

        return fromArray(item1, item2);
    }

看源码就是返回一个 Observable 对象

    public static <T> Observable<T> fromArray(T... items) {
        ObjectHelper.requireNonNull(items, "items is null");
        if (items.length == 0) {
            return empty();
        } else
        if (items.length == 1) {
            return just(items[0]);
        }
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }

这时最终返回一个可以保存列数数据的 Observable 类型对象 ObservableFromArray

fromArray / fromIterable

fromArray / fromIterable 和 just 什么什么区别,区别就是 fromArray / fromIterable 的数据不限数量,fromArray 和 fromIterable 接受的数据集合类不同罢了

但是必须注意,他们都是只创建一个 Observable 对象,然后遍历集合,挨个发射数据

empty / error / never
<-- empty()  -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
Observable observable1=Observable.empty(); 
// 即观察者接收后会直接调用onCompleted()

<-- error()  -->
// 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
// 可自定义异常
Observable observable2=Observable.error(new RuntimeException())
// 即观察者接收后会直接调用onError()

<-- never()  -->
// 该方法创建的被观察者对象发送事件的特点:不发送任何事件
Observable observable3=Observable.never();
// 即观察者接收后什么都不调用
defer 延迟创建

通过 Observable工厂方法创建被观察者对象Observable,每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的

举个例子,一个 int 参数,我们在创建 Observable 之后修改这个 int 的值,看看可能发射的数据是修改之前的还是之后的

        index = 10;

        Observable<Integer> just = Observable.just(index);

        index = 20;

        just.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer index) throws Exception {
                        Log.d(tag, "index:" + index);
                    }
                });
Snip20171011_29.png

看结果是修改之前的数值,那我们用 defer 来看看

  index = 10;

        Observable<Integer> defer = Observable.defer(new Callable<ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> call() throws Exception {
                return Observable.just(index);
            }
        });

        index = 20;

        defer.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer index) throws Exception {
                        Log.d(tag, "index:" + index);
                    }
                });
Snip20171011_30.png

这次打印的是修改之后的值,哈哈,大家不必担心的,因为基础数据类型是值传递才会这样,有前后不一致的情况,引用类型就没事啊,大家放心的修改,我做过测试修改的引用类型参数可以正常显示的哦

  mBook.setName("AAA");

        Observable<Book> just = Observable.just(mBook);

        mBook.setName("BBB");

        just.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Book>() {
                    @Override
                    public void accept(Book book) throws Exception {
                        Log.d(tag, "book-name:" + book.getName());
                    }
                });
Snip20171011_31.png

说实话这个 defer 有些像 AAC 的 liveData ,只不过没有 liveData 这么友好。

timer / interval

timer 和 interval 都是延迟操作,区别是 timer 只执行一次,interval 会一直执行

需要注意的是第一次发送都是在指定的延迟时间之后进行的。

   // 
   Observable.timer(3, TimeUnit.SECONDS) / interval(3, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(tag, "time:" + aLong);
                    }
                });
range / rangeLong

计数发射,指定开始值,指定数据发射次数,每一次数据++,range 支持 int 类型,rangeLong 支持 long 类型

 Observable.range(0, 5)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(tag, "range:" + integer);
                    }
                });
Snip20171011_26.png
intervalRange

指定开始数值,指定发射次数,指定首次执行延迟时间

// 参数说明:
        // 参数1 = 事件序列起始点;
        // 参数2 = 事件数量;
        // 参数3 = 第1次事件延迟发送时间;
        // 参数4 = 间隔时间数字;
        // 参数5 = 时间单位
        Observable.intervalRange(3,10,2, 1, TimeUnit.SECONDS)
                // 该例子发送的事件序列特点:
                // 1. 从3开始,一共发送10个事件;
                // 2. 第1次延迟2s发送,之后每隔2秒产生1个数字(从0开始递增1,无限个)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }
                    // 默认最先调用复写的 onSubscribe()

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }

                });
944365-ed225f309949bdeb.png
repeat 重复发射

repeat 顾名思义,就是重复发射,一般结合其他的操作符来使用,参数就是我们想要重复发射的次数

  Observable
                .range(0, 5)
                .repeat(2)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(tag, "range:" + integer);
                    }
                });
Snip20171011_27.png

变换操作符

map
        Observable
                .just(11)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return "AA";
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {

                    }
                });

map 把一个数据类型的 Observable 转成另一个类型的 Observable

flatmap
 Observable
                .just(11)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just("AA");
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String o) throws Exception {

                    }
                });

flatmap 就比 map 要复杂一些了,map 不管 数据有多少,只是一个数据类型的 Observable 转成另一个类型的 Observable

而 flatmap 会把没每一个数据都转成成一个 新的 Observable ,然后最总汇总所有的 Observable 统一生成一个最终的 Observable 。

原理:

  • 为事件序列中每个事件都创建一个 Observable 对象;
  • 将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象;
  • 将新建的每个Observable 都合并到一个 新建的、总的Observable 对象;
  • 新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer)

所以 flatmap 获得的最终的 Observable 里面数据是无序的,这点是需要注意的。flatmap 过程中有大量的 Observable 对象产生销毁,很消耗资源的,我们能用 map 就尽量不要用 flatmap

ConcatMap

ConcatMap 和 flatmap 一样,flatmap 处理过的数据是无序的,ConcatMap 是有序的,就这点区别

Buffer

隔指定补偿,从缓存里循环取出指定数据,感觉这个没什么用啊,至少 android 里体会不到

// 被观察者 需要发送5个数字
        Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 1) // 设置缓存区大小 & 步长
                                    // 缓存区大小 = 每次从被观察者中获取的事件数量
                                    // 步长 = 每次获取新事件的数量
                .subscribe(new Observer<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(List<Integer> stringList) {
                        //
                        Log.d(TAG, " 缓存区里的事件数量 = " +  stringList.size());
                        for (Integer value : stringList) {
                            Log.d(TAG, " 事件 = " + value);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应" );
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });
944365-f1d4e320b7c62dd9.png

组合操作符

concat 、 concatArray

observable 顺序操作符,把多个 observable 按照前后顺序串行执行,执行完一个才能执行下一个,一个 error 了直接中断,不在往下走了,另外 前一个 observable 若有多个数据,只有等前一个 observable 的复数数据全部发射完毕,才能执行下一个 observable 的数据发射任务

concat 只能就收最多4个数据,concatArray 没有数量限制

        Observable.concat(Observable.just(11, 22), Observable.just(33, 44))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("CC", "result:" + integer);
                    }
                });
Snip20180403_7.png
merge / mergeArray

observable 同事执行操作符,把多个 observable 并行执行,所有的 observable 均同时执行,也没有前后之分了

                Observable.merge(
                        Observable.intervalRange(20, 10, 2, 1, TimeUnit.SECONDS)
                        , Observable.intervalRange(1, 10, 2, 1, TimeUnit.SECONDS))
                        .subscribe(new Consumer<Long>() {
                            @Override
                            public void accept(Long integer) throws Exception {
                                Log.d("CC", "result:" + integer);
                            }
                        });
Snip20180403_9.png
concatDelayError / mergeDelayError

用来处理 concat 、 merge 中 error 的, concat 、 merge 中若有一个数据 error 了,就会结束整个操作,这可能不是我们想要的,那么这个相应的 DelayError 就可以让 error 不阻断操作,在数据都发射后在执行 error 操作

Observable.concatArrayDelayError(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NullPointerException()); // 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });
944365-804c8472fc60eb6a.png
zip

数据合并,多 Observable 并发执行,组合多个 Observable 的所有单次数据返回一个综合的数据

                Observable
                        .zip(
                                Observable.intervalRange(100, 5, 2, 5, TimeUnit.SECONDS),
                                Observable.intervalRange(200, 5, 2, 5, TimeUnit.SECONDS),
                                new BiFunction<Long, Long, String>() {
                                    @Override
                                    public String apply(Long aLong, Long aLong2) throws Exception {
                                        return aLong + " / " + aLong2;
                                    }
                                })
                        .subscribe(new Consumer<String>() {
                            @Override
                            public void accept(String s) throws Exception {
                                Log.d("CC", "result: " + s);
                            }
                        });
Snip20180403_10.png

看上面的例子,每一次返回的数据都是 5秒 而不是 10秒,所以证明 zip 组合的 observable 是并发执行的,时间间隔由单次执行最慢的 observable 决定

reduce

把前2个数据交给第三个数据,后面的以此类推

Observable.just(1,2,3,4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    // 在该复写方法中复写聚合的逻辑
                    @Override
                    public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                        Log.e(TAG, "本次计算的数据是: "+s1 +" 乘 "+ s2);
                        return s1 * s2;
                        // 本次聚合的逻辑是:全部数据相乘起来
                        // 原理:第1次取前2个数据相乘,之后每次获取到的数据 = 返回的数据x原始下1个数据每
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer s) throws Exception {
                Log.e(TAG, "最终计算的结果是: "+s);

            }
        });
944365-3f63477c864d7aae.png
collect

把所有发送的数据最终收集到一个集合里统一发送数据

Observable.just(1, 2, 3 ,4, 5, 6)
                .collect(
                        // 1. 创建数据结构(容器),用于收集被观察者发送的数据
                        new Callable<ArrayList<Integer>>() {
                            @Override
                            public ArrayList<Integer> call() throws Exception {
                                return new ArrayList<>();
                            }
                            // 2. 对发送的数据进行收集
                        }, new BiConsumer<ArrayList<Integer>, Integer>() {
                            @Override
                            public void accept(ArrayList<Integer> list, Integer integer)
                                    throws Exception {
                                // 参数说明:list = 容器,integer = 后者数据
                                list.add(integer);
                                // 对发送的数据进行收集
                            }
                        }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(@NonNull ArrayList<Integer> s) throws Exception {
                Log.e(TAG, "本次发送的数据是: "+s);

            }
        });
944365-ab51b84d6a373330.png
startWith / startWithArray

在正式发送数据前先发送指定数据

                Observable
                        .just(1, 2, 3, 4, 5, 6)
                        .startWith(0)
                        .startWithArray(-1, -2)
                        .subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer s) throws Exception {
                                Log.d("CC", "result: " + s);
                            }
                        });
Snip20180403_11.png
count

统计发送次数,这个不知道实际没有没用

// 注:返回结果 = Long类型
        Observable.just(1, 2, 3, 4)
                  .count()
                  .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "发送的事件数量 =  "+aLong);

                    }
                });
944365-45d279a5773edfc2.png

功能操作符

delay

延迟操作,和 time 不同的是 delay 不能创建出 observable 对象来

        Observable
                .just(1)
                .delay(2, TimeUnit.SECONDS)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                    }
                });
do

do 操作符包含游很多方法,都是在相应的方法之前执行

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
                e.onError(new Throwable("发生错误了"));
                 }
               })
                // 1. 当Observable每发送1次数据事件就会调用1次
                .doOnEach(new Consumer<Notification<Integer>>() {
                    @Override
                    public void accept(Notification<Integer> integerNotification) throws Exception {
                        Log.d(TAG, "doOnEach: " + integerNotification.getValue());
                    }
                })
                // 2. 执行Next事件前调用
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doOnNext: " + integer);
                    }
                })
                // 3. 执行Next事件后调用
                .doAfterNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "doAfterNext: " + integer);
                    }
                })
                // 4. Observable正常发送事件完毕后调用
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doOnComplete: ");
                    }
                })
                // 5. Observable发送错误事件时调用
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG, "doOnError: " + throwable.getMessage());
                    }
                })
                // 6. 观察者订阅时调用
                .doOnSubscribe(new Consumer<Disposable>() {
                    @Override
                    public void accept(@NonNull Disposable disposable) throws Exception {
                        Log.e(TAG, "doOnSubscribe: ");
                    }
                })
                // 7. Observable发送事件完毕后调用,无论正常发送完毕 / 异常终止
                .doAfterTerminate(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doAfterTerminate: ");
                    }
                })
                // 8. 最后执行
                .doFinally(new Action() {
                    @Override
                    public void run() throws Exception {
                        Log.e(TAG, "doFinally: ");
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });
944365-11213ae3bb321197.png
onErrorReturn

错误处理,可以先于 onError 接受错误信息,然后返回一个正常信息,不至于中断整个操作

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Throwable("发生错误了"));
                 }
               })
                .onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(@NonNull Throwable throwable) throws Exception {
                        // 捕捉错误异常
                        Log.e(TAG, "在onErrorReturn处理了错误: "+throwable.toString() );

                        return 666;
                        // 发生错误事件后,发送一个"666"事件,最终正常结束
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });
944365-53f108767f179f0b.png
onErrorResumeNext / onExceptionResumeNext

和 onErrorReturn 差不多,遇到错误会抛出一个新的 Observable 来,这2个对象一个接受 Throwable 类型的错误,一个接受 Exception 的错误

retry 、retryWhen

重试,这个比较重要了,当收到 error 决定是重新几次执行几次,什么条件执行

retry 共有5种重载方法

<-- 1. retry() -->
// 作用:出现错误时,让被观察者重新发送数据
// 注:若一直错误,则一直重新发送

<-- 2. retry(long time) -->
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制
// 参数 = 重试次数
 
<-- 3. retry(Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
// 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)

<-- 5. retry(long time,Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
// 参数 = 设置重试次数 & 判断逻辑
<-- 1. retry() -->
// 作用:出现错误时,让被观察者重新发送数据
// 注:若一直错误,则一直重新发送

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
                 }
               })
                .retry() // 遇到错误时,让被观察者重新发射数据(若一直错误,则一直重新发送
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });


<-- 2. retry(long time) -->
// 作用:出现错误时,让被观察者重新发送数据(具备重试次数限制
// 参数 = 重试次数
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
                 }
               })
                .retry(3) // 设置重试次数 = 3次
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

<-- 3. retry(Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送& 持续遇到错误,则持续重试)
// 参数 = 判断逻辑
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
                 }
               })
                // 拦截错误后,判断是否需要重新发送请求
                .retry(new Predicate<Throwable>() {
                    @Override
                    public boolean test(@NonNull Throwable throwable) throws Exception {
                        // 捕获异常
                        Log.e(TAG, "retry错误: "+throwable.toString());

                        //返回false = 不重新重新发送数据 & 调用观察者的onError结束
                        //返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出现错误后,判断是否需要重新发送数据(若需要重新发送 & 持续遇到错误,则持续重试
// 参数 =  判断逻辑(传入当前重试次数 & 异常错误信息)
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
                 }
               })

                // 拦截错误后,判断是否需要重新发送请求
                .retry(new BiPredicate<Integer, Throwable>() {
                    @Override
                    public boolean test(@NonNull Integer integer, @NonNull Throwable throwable) throws Exception {
                        // 捕获异常
                        Log.e(TAG, "异常错误 =  "+throwable.toString());

                        // 获取当前重试次数
                        Log.e(TAG, "当前重试次数 =  "+integer);

                        //返回false = 不重新重新发送数据 & 调用观察者的onError结束
                        //返回true = 重新发送请求(若持续遇到错误,就持续重新发送)
                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });


<-- 5. retry(long time,Predicate predicate) -->
// 作用:出现错误后,判断是否需要重新发送数据(具备重试次数限制
// 参数 = 设置重试次数 & 判断逻辑
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
                 }
               })
                // 拦截错误后,判断是否需要重新发送请求
                .retry(3, new Predicate<Throwable>() {
                    @Override
                    public boolean test(@NonNull Throwable throwable) throws Exception {
                        // 捕获异常
                        Log.e(TAG, "retry错误: "+throwable.toString());

                        //返回false = 不重新重新发送数据 & 调用观察者的onError()结束
                        //返回true = 重新发送请求(最多重新发送3次)
                        return true;
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        })
                // 遇到error事件才会回调
                .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                    
                    @Override
                    public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
                        // 参数Observable<Throwable>中的泛型 = 上游操作符抛出的异常,可通过该条件来判断异常的类型
                        // 返回Observable<?> = 新的被观察者 Observable(任意类型)
                        // 此处有两种情况:
                            // 1. 若 新的被观察者 Observable发送的事件 = Error事件,那么 原始Observable则不重新发送事件:
                            // 2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则重新发送事件:
                        return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                            @Override
                            public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {

                                // 1. 若返回的Observable发送的事件 = Error事件,则原始的Observable不重新发送事件
                                // 该异常错误信息可在观察者中的onError()中获得
                                 return Observable.error(new Throwable("retryWhen终止啦"));
                                
                                // 2. 若返回的Observable发送的事件 = Next事件,则原始的Observable重新发送事件(若持续遇到错误,则持续重试)
                                 // return Observable.just(1);
                            }
                        });

                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应" + e.toString());
                        // 获取异常错误信息
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });
repeat / repeatWhen

重复发射,这个也是可以决定从复发射几次,什么条件从复发射

Observable.just(1,2,4).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
            @Override
            // 在Function函数中,必须对输入的 Observable<Object>进行处理,这里我们使用的是flatMap操作符接收上游的数据
            public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
                // 将原始 Observable 停止发送事件的标识(Complete() /  Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable)
                // 以此决定是否重新订阅 & 发送原来的 Observable
                // 此处有2种情况:
                // 1. 若新被观察者(Observable)返回1个Complete() /  Error()事件,则不重新订阅 & 发送原来的 Observable
                // 2. 若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
                return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {

                        // 情况1:若新被观察者(Observable)返回1个Complete() /  Error()事件,则不重新订阅 & 发送原来的 Observable
                        return Observable.empty();
                        // Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete()

                        // return Observable.error(new Throwable("不再重新订阅事件"));
                        // 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。

                        // 情况2:若新被观察者(Observable)返回其余事件,则重新订阅 & 发送原来的 Observable
                        // return Observable.just(1);
                       // 仅仅是作为1个触发重新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() /  Error()事件
                    }
                });

            }
        })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应:" + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }

                });
takeWhile

只发送满足条件的,但一旦碰到不满足的,后面的均不会发送,complete 结束

//takeWhile 
Observable.just(1, 2, 3, 4, 5)
                .takeWhile(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer != 4;
                    }
                })
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
02-11 16:37:18.896 10620-10620/... D/SplashActivity: integer:1
02-11 16:37:18.896 10620-10620/... D/SplashActivity: integer:2
02-11 16:37:18.896 10620-10620/... D/SplashActivity: integer:3
02-11 16:37:18.896 10620-10620/... D/SplashActivity: onComplete
takeUntil

一直发送,直到满足条件时结束,并且,满足条件的那一个也会发送,之后发complete 结束

Observable.just(1, 2, 3, 4, 5, 6)
                .takeUntil(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer == 4;
                    }
                })
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:1
02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:2
02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:3
02-11 17:03:11.396 12703-12703/... D/SplashActivity: integer:4
02-11 17:03:11.396 12703-12703/... D/SplashActivity: onComplete
all

只有全部满足的时候才会返回 true

Disposable subscribe = Observable.just(1, 2, 3)
                .all(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer>2;
                    }
                })
                .subscribe(aBoolean -> Log.d(TAG, "aBoolean:" + aBoolean));
02-11 16:33:45.936 10199-10199/... D/SplashActivity: aBoolean:false
contains

是否包含指定参数

 Disposable subscribe = Observable.just(1, 2, 3, 4, 5)
                .contains(1)
                .subscribe(aBoolean -> Log.d(TAG, "aBoolean:" + aBoolean));
02-11 17:28:06.486 15538-15538/... D/SplashActivity: aBoolean:true
isEmpty

是否为空

Disposable subscribe = Observable.just(1, 2, 3, 4, 5)
                .isEmpty()
                .subscribe(aBoolean -> Log.d(TAG, "aBoolean:" + aBoolean));
02-11 17:31:04.306 15894-15894/... D/SplashActivity: aBoolean:false

filter

只发射符合条件的数据,数据从头遍历到尾,碰到不符合的数据也不会中断发射

Observable.just(1, 2, 3, 4, 5, 6).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {
                return integer != 4;
            }
        })
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:1
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:2
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:3
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:5
02-11 17:46:46.206 17513-17513/... D/SplashActivity: integer:6
02-11 17:46:46.206 17513-17513/... D/SplashActivity: onComplete
ofType

只发射符合类型的数据,数据从头遍历到尾,碰到不符合的数据也不会中断发射

Observable.just(1, "one", 2, "tow", 3, "three")
                .ofType(Integer.class)
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:1
02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:2
02-11 17:54:15.436 19570-19570/... D/SplashActivity: integer:3
02-11 17:54:15.436 19570-19570/... D/SplashActivity: onComplete
elementAt

选择指定位置的元素,下标准许越界,如果越界,可以指定默认值

有2个参数:

  • value1 - 指定数据下标
  • value2 - 默认数据
 Disposable subscribe = Observable.just(1, 2, 3, 4)
                .elementAt(5,10)
                .subscribe(integer ->
                        Log.d(TAG, "integer:" + integer));
02-12 09:37:34.093 13153-13153/... D/SplashActivity: integer:10
firstElement / lastElement
  • firstElement - 获得第一个数据
  • lastElement - 获得最后一个数据
Disposable subscribe = Observable.just(1, 2, 3, 4)
.firstElement()
.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "integer:" + integer);
            }
        });
02-12 09:28:26.753 10462-10462/... D/SplashActivity: integer:1
distinct

过滤重复,只要出现过得就不会再出现

Observable.just(1, 2,3,1,3)
                .distinct()
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:1
02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:2
02-11 18:00:08.836 20056-20056/... D/SplashActivity: integer:3
02-11 18:00:08.836 20056-20056/... D/SplashActivity: onComplete
distinctUntilChanged

过滤连续重复数据,注意必须是连续重复的才有有效

Observable.just(1, 2, 3, 1, 3, 3, 4, 4)
                .distinctUntilChanged()
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:1
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:2
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:3
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:1
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:3
02-11 18:02:59.656 20334-20334/... D/SplashActivity: integer:4
02-11 18:02:59.656 20334-20334/... D/SplashActivity: onComplete
take / takeLast
  • take - 从头开始获取指定数目的数据,如果数据量小于指定数目,则进入 error
  • takeLast - 反向从最后开始获取数据
Observable.just(1, 2, 3, 4, 5, 6)
                .take(2)
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
02-11 18:35:23.196 21766-21766/... D/SplashActivity: integer:1
02-11 18:35:23.196 21766-21766/... D/SplashActivity: integer:2
02-11 18:35:23.196 21766-21766/... D/SplashActivity: onComplete
skip
  • skip - 正序跳过指定 count,如果不足,则会进入 onError
  • skipLast - 倒序跳过指定 count,如果不足,则会进入 onError
Observable.just(1, 2, 3, 4).skip(2).subscribe(new Observer<Integer>() {

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "integer:" + integer);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
02-12 09:19:47.243 9691-9691/... D/SplashActivity: integer:3
02-12 09:19:47.243 9691-9691/... D/SplashActivity: integer:4
02-12 09:19:47.243 9691-9691/... D/SplashActivity: onComplete
throttleFirst / throttleLast
  • throttleFirst - 指定时间间隔内取第一个数据
  • throttleLast - 指定时间间隔内取最后一个数据
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);

                Thread.sleep(500);
                e.onNext(2);

                Thread.sleep(1600);
                e.onNext(3);

                Thread.sleep(2100);
                e.onNext(4);
                e.onComplete();
            }
        }).throttleFirst(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "complete");
                    }
                });

``log
02-12 10:01:16.243 14629-14629/... D/SplashActivity: integer:1
02-12 10:01:18.343 14629-14629/... D/SplashActivity: integer:3
02-12 10:01:20.443 14629-14629/... D/SplashActivity: integer:4
02-12 10:01:20.443 14629-14629/... D/SplashActivity: complete


##### throttleWithTimeout / debounce

throttleWithTimeout 同 debounce

2次数据间隔超过指定时间才有效,若一直没有合适的数据,默认取最后一个数据

```java
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);

                Thread.sleep(500);
                e.onNext(2);

                Thread.sleep(500);
                e.onNext(3);

                Thread.sleep(500);
                e.onNext(4);

                Thread.sleep(500);
                e.onNext(5);

                Thread.sleep(500);
                e.onNext(6);
                e.onComplete();
            }
        }).throttleWithTimeout(1, TimeUnit.SECONDS)
                .subscribe(new Observer<Integer>() {

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "integer:" + integer);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
02-12 10:15:56.573 16357-16357/... D/SplashActivity: integer:6
02-12 10:15:56.573 16357-16357/... D/SplashActivity: onComplete
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容