RxJava 2.x 源码分析(二) 之 FlatMap

FlatMap

  • 官方定义:把被观察者发射出去的事件转化成新的子被观察者,然后把这些发射量展开平铺后统一放到一个被观察者中。官方文档
image
  • 简单来讲就是把被观察者每次发射的事件转化成一个子被观察者,然后通过合并(Merge)所有子被观察者的事件成总的一系列的事件并发射给观察者

  • 官方文档中提及到很多语言都拥有 Merge 和 Concat 的合并操作,他们的区别是前者会顺序交错,而后者是不会破坏顺序的。

  • 所以FlatMapConcatMap的区别是合并后的事件顺序有可能是无序的,但FlatMap真的不能做到有序事件吗?本文也会探讨这个问题。

RxJava使用了观察者模式,封装了很多ObservableObserver,针对不同的操作符的调用会用对应的ObservaEbleObserver实现。

根据源码Observable发射的事件都是有序的,使用FlatMap时由事件转换的被观察者也是有序地发射自己的事件,我们可以猜测:

  • FlatMap事件无序的关键是线程,当由事件转换成的多个被观察者在不同线程中发射事件时,会导致顶层观察者接收到的事件是无序的。
  • 反之所有被观察者都在同一个线程中发射时间的话 FlatMap 的效果跟ConcatMap是相同的。

为了证实我们的猜测,我们先具体简单的例子:

public static void main(String args[]) {
    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                    e.onComplete();
                }
            })
            .flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    return Observable.just(
                            "item " + integer + " sub-item " + 1 + " Observable Thread: " + Thread.currentThread().getName()
                            , "item " + integer + " sub-item " + 2 + " Observable Thread: " + Thread.currentThread().getName()
                            , "item " + integer + " sub-item " + 3 + " Observable Thread: " + Thread.currentThread().getName());
                }
            })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    System.out.println(s + " Observer Thread: " + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });
}

运行结果:

item 1 sub-item 1 Observable Thread: main Observer Thread: main
item 1 sub-item 2 Observable Thread: main Observer Thread: main
item 1 sub-item 3 Observable Thread: main Observer Thread: main
item 2 sub-item 1 Observable Thread: main Observer Thread: main
item 2 sub-item 2 Observable Thread: main Observer Thread: main
item 2 sub-item 3 Observable Thread: main Observer Thread: main
item 3 sub-item 1 Observable Thread: main Observer Thread: main
item 3 sub-item 2 Observable Thread: main Observer Thread: main
item 3 sub-item 3 Observable Thread: main Observer Thread: main

阅读源码之前我们需要知道,RxJava 内部包装了很多ObservableObserver,用FlatMap实现该例子的实现方法不止一种,使用的操作符也可以不一样,所以运行时调用到的ObservableObserver不一定相同,所以下面阅读的源码路径只是基于这个例子。

  • 首先看看Observable的创建过程:
//Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

上一篇源码分析一样,内部创建的是ObservableCreate对象,ObservableOnSubscribe是一个提供subscribe()作为约定函数的接口。

  • 接下来看看flatMap()做了什么
//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return flatMap(mapper, false);
}

mapper 代表的是我们的刚才创建的Function对象,这里有函数式编程的味道(如果用过Kotlin的同学就更加熟悉了),相当于把我们写的函数作为一个参数。

false 表示异常是否需要延迟到所有内部被观察者都结束后才抛出。

继续进下一层

//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
        return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}

maxConcurrency: 最大并发数

继续进下一层

//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
        return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

bufferSize :缓存所有子被观察者的事件加起来总数大小

继续进下一层

//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
    // 检查参数是否合法
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    if (this instanceof ScalarCallable) { //false
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    // ObservableCreate 没有实现 ScalarCallable,所以走这里
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

这里返回一个 ObservableFlatMap 对象,实际上ObservableFlatMap包装了ObservableCreate并把ObservableCreate对象作为常量source,那么flatMap()到这里结束了。

  • 下面看看subscribe()的源码:
//Observable.java
public final void subscribe(Observer<? super T> observer) {
    // 检查观察者是否为空
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        // 调用 hook 方法
        observer = RxJavaPlugins.onSubscribe(this, observer);
        // 检查调用 hook 的观察者是否为空
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        // 实际订阅操作在这个方法里
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
     ...
    }
}

下面看看 subscribeActual:

// Observable.java
protected abstract void subscribeActual(Observer<? super T> observer);

居然是一个抽象方法,在哪里被实现了?机智的我们反应过来了,具体实现在flatMap()返回的ObservableFlatMap中:

// ObservableFlatMap.java
public ObservableFlatMap(ObservableSource<T> source,
        Function<? super T, ? extends ObservableSource<? extends U>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    super(source);
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency;
    this.bufferSize = bufferSize;
}
    
@Override
public void subscribeActual(Observer<? super U> t) {
    // source : ObservableCreate对象
    // t : 观察者
    // mapper : 之前我们写的 mapper 函数
    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) { // false
        return;
    }
    // 本次运行走这里,调用 ObservableCreate 的 subscribe() 并用    
    // MergeObserver 包装了我们写的 observer
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

可以说ObservableFlatMapFlatMap来说是比较重要的类,里面包含了许多重要逻辑。

调用 ObservableCreatesubscribe() 之前我们先看看 MergeObserver 的构造方法:

// ObservableFlatMap.java
MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
                boolean delayErrors, int maxConcurrency, int bufferSize) {
    this.actual = actual; // 我们写的 observer
    this.mapper = mapper; 
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency; // Integer.MAX_VALUE
    this.bufferSize = bufferSize;
    if (maxConcurrency != Integer.MAX_VALUE) {
        sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
    }
    // 创建一个原子性的内部观察者对象数组
    this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
}

由于ObservableCreate没有覆写 subscribe(),所以实际上调用的是父类Observablesubscribe()且源码上面已经贴过,可以直接跳过进入ObservableCreatesubscribeActual()中:

// ObservableCreate.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 创建发射器,这里先忽略发射器内部实现,只需要知道发射器主要用来回调观察者的
    // onNext onComplete onError 方法
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 回调 MergeObserver 的 onSubscribe() 
    // 再由其回调我们写的 observer 的 onSubscribe()
    observer.onSubscribe(parent);

    try {
        // source : 我们创建的 ObservableOnSubscribe 对象
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

下面走到了我们写的subcribe()逻辑里,我们调用了发射器的 onNext(),看看发射器的源码:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    // 检测是否切断
    // 如果切断了被观察者就接收不到后续的事件了
    if (!isDisposed()) {
        // observer : MergeObserver 对象
        // 回调观察者的 onNext()
        observer.onNext(t);
    }
}

还记得我们写的 observer 对象被包装成了MergeObserver,那么进入MergeObserveronNext():

// ObservableFlatMap.java
@Override
public void onNext(T t) {
    // safeguard against misbehaving sources
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    try {
        // 把我们的 mapper 方法里面返回的子被观察者提取出来,
        // 由于我们当初用的是 .just() 创建子被观察者,
        // 所以子被观察者是 ObservableFromArray 对象,
        // 这里先忽略 just() 内部实现
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        s.dispose();
        onError(e);
        return;
    }

    if (maxConcurrency != Integer.MAX_VALUE) { // false
        synchronized (this) {
            if (wip == maxConcurrency) {
                sources.offer(p);
                return;
            }
            wip++;
        }
    }
    // 进入下面的函数
    subscribeInner(p);
}
    
    
void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        // p : ObservableFromArray 对象
        if (p instanceof Callable) { //false
            tryEmitScalar(((Callable<? extends U>)p));

            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    p = sources.poll();
                    if (p == null) {
                        wip--;
                        break;
                    }
                }
            } else {
                break;
            }
        } else {
            // ObservableFromArray 没有实现 Callable 接口,所以走这里
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner); // 飙车入口
            }
            break;
        }
    }
}
    
// 这里主要做内部观察者对象数组的增加
// 通过创建size为原数组长度+1的新数组并作为新的内部观察者对象数组来实现
boolean addInner(InnerObserver<T, U> inner) {
    for (;;) {
        // 获取之前 MergeObserver 创建的内部观察者对象数组
        InnerObserver<?, ?>[] a = observers.get();
        if (a == CANCELLED) {
            inner.dispose();
            return false;
        }
        int n = a.length; // 0
        InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = inner;
        if (observers.compareAndSet(a, b)) {
            return true;
        }
    }
}
        

看到这里我们应该到了 p.subscribe(inner) ,又调用Observablesubscribe()?到现在我们好像已经调用了好几次了,证明这两个东西都被包装好几层了,前面我提到过RxJava包装了许多ObservableObserver,配合观察者模式一层一层地地传递事件下去,这是 RxJava 的其中一个奥妙之处。

下面整个过程我们会一直在 ObservableFromArray InnerObserver ObservableFlatMap MergeObserver的方法调用中飙车,可能会感到不适。

我们直接跳过进入从ObservableFromArraysubscribeActual()开始看,这里会有很多的跳转不方便一步步展示,所有相关代码调用顺序和注释都在下面:

// ObservableFromArray.java
public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }
    @Override
    public void subscribeActual(Observer<? super T> s) {
        // [1]
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
        // s : InnerObserver 对象
        // array : 我们创建子被观察者时调用 .just() 方法生成的String数组
        // [4]
        s.onSubscribe(d);
    
        // [9]
        if (d.fusionMode) { // true 看到这里结束了 onNext() 所有操作
            return;
        }

        d.run();
    }

    // [2]
    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
        
        final Observer<? super T> actual; // InnerObserver 对象
        
        final T[] array;
        
        int index;
        
        boolean fusionMode;
        
        volatile boolean disposed;
        // [3]
        FromArrayDisposable(Observer<? super T> actual, T[] array) {
        this.actual = actual;
        this.array = array;
        }
...
        // [7]
        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) { //true
                fusionMode = true;
                return SYNC;
            }
                return NONE;
        }
    }
}

InnerObserver:

// ObservableFlatMap.java
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {

    private static final long serialVersionUID = -4606175640614850599L;
    final long id;
    final MergeObserver<T, U> parent;

    volatile boolean done;
    volatile SimpleQueue<U> queue;

    int fusionMode;

    InnerObserver(MergeObserver<T, U> parent, long id) {
        this.id = id;
        this.parent = parent;
    }
    
    // [5]
    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.setOnce(this, s)) { // true
            if (s instanceof QueueDisposable) {  // true
                @SuppressWarnings("unchecked")
                // s : FromArrayDisposable 对象
                QueueDisposable<U> qd = (QueueDisposable<U>) s;
                // 获取合并的标记 这里返回同步标记 SYNC
                // [6]
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                if (m == QueueDisposable.SYNC) { // true
                    fusionMode = m;
                    queue = qd;
                    done = true;
                    // parent: MergeObserver
                    // 把 MergeObserver 的所有事件都发送完毕
                    // [8]
                    parent.drain();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    fusionMode = m;
                    queue = qd;
                }
            }
        }
    }        
    ...
}

MergeObserver:

// [9]
void drain() {
    if (getAndIncrement() == 0) {
        drainLoop();
    }
}
// [10]
// 这里利用之前创建的具有原子性的内部观察者数组发射子被观察者的所有事件
void drainLoop() {
 ...
}

从上面的源码主要过程是被观察者的单次调用onNext()发射的事件变成一个子被观察者且将其事件都发射给观察者,然后执行下一个onNext()重新走一遍上述代码或者进入其他回调方法,所以整个过程都在同一个线程中且同步执行的,事件的顺序是有序的。

  • 到此我们见证了FlatMap发射有序事件的全过程

无序事件

修改下flatMap()代码,实现无序事件:

  • 在创建子被观察者的时候调用subscribeOn()指定发射事件在新的子线程中进行,或者使用delay()也可以
.flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        final Integer i = integer;
        Observable observable_create = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext(i + "-" + 1 + " Observable: " + Thread.currentThread().getName());
                e.onNext(i + "-" + 2 + " Observable: " + Thread.currentThread().getName());
                e.onNext(i + "-" + 3 + " Observable: " + Thread.currentThread().getName());
                e.onComplete();
            }
        });
        Observable observable_just = Observable.just(
                integer + "-" + 1 + " Observable: " + Thread.currentThread().getName(),
                integer + "-" + 2 + " Observable: " + Thread.currentThread().getName(),
                integer + "-" + 3 + " Observable: " + Thread.currentThread().getName());

        return observable_create
                .subscribeOn(Schedulers.newThread());
                // .delay((int(Math.random()*1000),TimeUnit.MILLISECONDS);
    }
}) 

运行结果:

03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-1 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-2 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-3 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.275 12781-12800/com.example.myapplication E/RxJava: 2-1 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 2-2 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-1 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-2 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-3 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 2-3 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2

无序事件的子被观察者不再由.just()方法创建,而是.create()代替,原因是由于ObservableFromArray实现逻辑不能在日志中直接明确的显示子被观察者发送事件是在子线程进行的。

observable_just 作为子被观察者的运行结果:

03-16 11:10:55.381 13418-13437/? E/RxJava: 1-1 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.381 13418-13437/? E/RxJava: 1-2 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.381 13418-13437/? E/RxJava: 1-3 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-1 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-2 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-3 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-1 Observable: main Observer :RxCachedThreadScheduler-2
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-2 Observable: main Observer :RxCachedThreadScheduler-2
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-3 Observable: main Observer :RxCachedThreadScheduler-2

由于调用just()的时候已经在当前线程(默认主线程)把事件都准备好了,再在子线程中发射出去,所以日志上打印的是主线程


总结:

  • FlatMap把每个发射的事件都包装成新的子被观察者,然后这些子被观察者再把子事件发送出去

  • 无序事件每个子被观察者发射的所有事件都运行在同一个线程内发射且顺序按照代码的调用顺序

  • 子被观察者都不指定子线程而是在当前线程的时候,flatMap作用跟concatMap相同

无序事件涉及到 RxJava 切换线程的操作,具体实现以及源码分析将会留到下一章中介绍。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容