聊聊flink的RestartStrategies

本文主要研究一下flink的RestartStrategies

RestartStrategies

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

@PublicEvolving
public class RestartStrategies {

    /**
     * Generates NoRestartStrategyConfiguration.
     *
     * @return NoRestartStrategyConfiguration
     */
    public static RestartStrategyConfiguration noRestart() {
        return new NoRestartStrategyConfiguration();
    }

    public static RestartStrategyConfiguration fallBackRestart() {
        return new FallbackRestartStrategyConfiguration();
    }

    /**
     * Generates a FixedDelayRestartStrategyConfiguration.
     *
     * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy
     * @param delayBetweenAttempts Delay in-between restart attempts for the FixedDelayRestartStrategy
     * @return FixedDelayRestartStrategy
     */
    public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, long delayBetweenAttempts) {
        return fixedDelayRestart(restartAttempts, Time.of(delayBetweenAttempts, TimeUnit.MILLISECONDS));
    }

    /**
     * Generates a FixedDelayRestartStrategyConfiguration.
     *
     * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy
     * @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy
     * @return FixedDelayRestartStrategy
     */
    public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayInterval) {
        return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval);
    }

    /**
     * Generates a FailureRateRestartStrategyConfiguration.
     *
     * @param failureRate Maximum number of restarts in given interval {@code failureInterval} before failing a job
     * @param failureInterval Time interval for failures
     * @param delayInterval Delay in-between restart attempts
     */
    public static FailureRateRestartStrategyConfiguration failureRateRestart(
            int failureRate, Time failureInterval, Time delayInterval) {
        return new FailureRateRestartStrategyConfiguration(failureRate, failureInterval, delayInterval);
    }

    //......
}
  • RestartStrategies提供了noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart静态方法用于构建RestartStrategyConfiguration

RestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public abstract static class RestartStrategyConfiguration implements Serializable {
        private static final long serialVersionUID = 6285853591578313960L;

        private RestartStrategyConfiguration() {}

        /**
         * Returns a description which is shown in the web interface.
         *
         * @return Description of the restart strategy
         */
        public abstract String getDescription();
    }
  • RestartStrategyConfiguration是个抽象类,它定义了getDescription抽象方法,它有NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration这几个子类

NoRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class NoRestartStrategyConfiguration extends RestartStrategyConfiguration {
        private static final long serialVersionUID = -5894362702943349962L;

        @Override
        public String getDescription() {
            return "Restart deactivated.";
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            return o instanceof NoRestartStrategyConfiguration;
        }

        @Override
        public int hashCode() {
            return Objects.hash();
        }
    }
  • NoRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表no restart strategy

FixedDelayRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class FixedDelayRestartStrategyConfiguration extends RestartStrategyConfiguration {
        private static final long serialVersionUID = 4149870149673363190L;

        private final int restartAttempts;
        private final Time delayBetweenAttemptsInterval;

        FixedDelayRestartStrategyConfiguration(int restartAttempts, Time delayBetweenAttemptsInterval) {
            this.restartAttempts = restartAttempts;
            this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
        }

        public int getRestartAttempts() {
            return restartAttempts;
        }

        public Time getDelayBetweenAttemptsInterval() {
            return delayBetweenAttemptsInterval;
        }

        @Override
        public int hashCode() {
            int result = restartAttempts;
            result = 31 * result + (delayBetweenAttemptsInterval != null ? delayBetweenAttemptsInterval.hashCode() : 0);
            return result;
        }

        @Override
        public boolean equals(Object obj) {
            if (obj instanceof FixedDelayRestartStrategyConfiguration) {
                FixedDelayRestartStrategyConfiguration other = (FixedDelayRestartStrategyConfiguration) obj;

                return restartAttempts == other.restartAttempts && delayBetweenAttemptsInterval.equals(other.delayBetweenAttemptsInterval);
            } else {
                return false;
            }
        }

        @Override
        public String getDescription() {
            return "Restart with fixed delay (" + delayBetweenAttemptsInterval + "). #"
                + restartAttempts + " restart attempts.";
        }
    }
  • FixedDelayRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表fixed delay restart strategy,它有restartAttempts及delayBetweenAttemptsInterval两个属性

FailureRateRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class FailureRateRestartStrategyConfiguration extends RestartStrategyConfiguration {
        private static final long serialVersionUID = 1195028697539661739L;
        private final int maxFailureRate;

        private final Time failureInterval;
        private final Time delayBetweenAttemptsInterval;

        public FailureRateRestartStrategyConfiguration(int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) {
            this.maxFailureRate = maxFailureRate;
            this.failureInterval = failureInterval;
            this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
        }

        public int getMaxFailureRate() {
            return maxFailureRate;
        }

        public Time getFailureInterval() {
            return failureInterval;
        }

        public Time getDelayBetweenAttemptsInterval() {
            return delayBetweenAttemptsInterval;
        }

        @Override
        public String getDescription() {
            return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString()
                    + " and fixed delay " + delayBetweenAttemptsInterval.toString();
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            FailureRateRestartStrategyConfiguration that = (FailureRateRestartStrategyConfiguration) o;
            return maxFailureRate == that.maxFailureRate &&
                Objects.equals(failureInterval, that.failureInterval) &&
                Objects.equals(delayBetweenAttemptsInterval, that.delayBetweenAttemptsInterval);
        }

        @Override
        public int hashCode() {
            return Objects.hash(maxFailureRate, failureInterval, delayBetweenAttemptsInterval);
        }
    }
  • FailureRateRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表failure rate restart strategy,它有maxFailureRate、failureInterval、delayBetweenAttemptsInterval三个属性

FallbackRestartStrategyConfiguration

flink-core-1.7.1-sources.jar!/org/apache/flink/api/common/restartstrategy/RestartStrategies.java

    public static final class FallbackRestartStrategyConfiguration extends RestartStrategyConfiguration {
        private static final long serialVersionUID = -4441787204284085544L;

        @Override
        public String getDescription() {
            return "Cluster level default restart strategy";
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            return o instanceof FallbackRestartStrategyConfiguration;
        }

        @Override
        public int hashCode() {
            return Objects.hash();
        }
    }
  • FallbackRestartStrategyConfiguration继承了RestartStrategyConfiguration,它代表Cluster level default restart strategy

RestartStrategyResolving

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java

public final class RestartStrategyResolving {

    /**
     * Resolves which {@link RestartStrategy} to use. It should be used only on the server side.
     * The resolving strategy is as follows:
     * <ol>
     * <li>Strategy set within job graph.</li>
     * <li>Strategy set flink-conf.yaml on the server set, unless is set to {@link NoRestartStrategy} and checkpointing
     * is enabled.</li>
     * <li>If no strategy was set on client and server side and checkpointing was enabled then
     * {@link FixedDelayRestartStrategy} is used</li>
     * </ol>
     *
     * @param clientConfiguration restart configuration given within the job graph
     * @param serverStrategyFactory default server side strategy factory
     * @param isCheckpointingEnabled if checkpointing was enabled for the job
     * @return resolved strategy
     */
    public static RestartStrategy resolve(
            RestartStrategies.RestartStrategyConfiguration clientConfiguration,
            RestartStrategyFactory serverStrategyFactory,
            boolean isCheckpointingEnabled) {

        final RestartStrategy clientSideRestartStrategy =
            RestartStrategyFactory.createRestartStrategy(clientConfiguration);

        if (clientSideRestartStrategy != null) {
            return clientSideRestartStrategy;
        } else {
            if (serverStrategyFactory instanceof NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) {
                return ((NoOrFixedIfCheckpointingEnabledRestartStrategyFactory) serverStrategyFactory)
                    .createRestartStrategy(isCheckpointingEnabled);
            } else {
                return serverStrategyFactory.createRestartStrategy();
            }
        }
    }

    private RestartStrategyResolving() {
    }
}
  • RestartStrategyResolving提供了一个静态方法resolve,用于解析RestartStrategies.RestartStrategyConfiguration,然后使用RestartStrategyFactory创建RestartStrategy

RestartStrategy

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java

public interface RestartStrategy {

    /**
     * True if the restart strategy can be applied to restart the {@link ExecutionGraph}.
     *
     * @return true if restart is possible, otherwise false
     */
    boolean canRestart();

    /**
     * Called by the ExecutionGraph to eventually trigger a full recovery.
     * The recovery must be triggered on the given callback object, and may be delayed
     * with the help of the given scheduled executor.
     *
     * <p>The thread that calls this method is not supposed to block/sleep.
     *
     * @param restarter The hook to restart the ExecutionGraph
     * @param executor An scheduled executor to delay the restart
     */
    void restart(RestartCallback restarter, ScheduledExecutor executor);
}
  • RestartStrategy定义了canRestart及restart两个方法,它有NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy这几个子类

NoRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java

public class NoRestartStrategy implements RestartStrategy {

    @Override
    public boolean canRestart() {
        return false;
    }

    @Override
    public void restart(RestartCallback restarter, ScheduledExecutor executor) {
        throw new UnsupportedOperationException("NoRestartStrategy does not support restart.");
    }

    /**
     * Creates a NoRestartStrategyFactory instance.
     *
     * @param configuration Configuration object which is ignored
     * @return NoRestartStrategyFactory instance
     */
    public static NoRestartStrategyFactory createFactory(Configuration configuration) {
        return new NoRestartStrategyFactory();
    }

    @Override
    public String toString() {
        return "NoRestartStrategy";
    }

    public static class NoRestartStrategyFactory extends RestartStrategyFactory {

        private static final long serialVersionUID = -1809462525812787862L;

        @Override
        public RestartStrategy createRestartStrategy() {
            return new NoRestartStrategy();
        }
    }
}
  • NoRestartStrategy实现了RestartStrategy接口,它的canRestart方法返回false,restart方法抛出UnsupportedOperationException

FixedDelayRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java

public class FixedDelayRestartStrategy implements RestartStrategy {

    private final int maxNumberRestartAttempts;
    private final long delayBetweenRestartAttempts;
    private int currentRestartAttempt;

    public FixedDelayRestartStrategy(
        int maxNumberRestartAttempts,
        long delayBetweenRestartAttempts) {

        Preconditions.checkArgument(maxNumberRestartAttempts >= 0, "Maximum number of restart attempts must be positive.");
        Preconditions.checkArgument(delayBetweenRestartAttempts >= 0, "Delay between restart attempts must be positive");

        this.maxNumberRestartAttempts = maxNumberRestartAttempts;
        this.delayBetweenRestartAttempts = delayBetweenRestartAttempts;
        currentRestartAttempt = 0;
    }

    public int getCurrentRestartAttempt() {
        return currentRestartAttempt;
    }

    @Override
    public boolean canRestart() {
        return currentRestartAttempt < maxNumberRestartAttempts;
    }

    @Override
    public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
        currentRestartAttempt++;

        executor.schedule(new Runnable() {
            @Override
            public void run() {
                restarter.triggerFullRecovery();
            }
        }, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);
    }

    /**
     * Creates a FixedDelayRestartStrategy from the given Configuration.
     *
     * @param configuration Configuration containing the parameter values for the restart strategy
     * @return Initialized instance of FixedDelayRestartStrategy
     * @throws Exception
     */
    public static FixedDelayRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
        int maxAttempts = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);

        String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY);

        long delay;

        try {
            delay = Duration.apply(delayString).toMillis();
        } catch (NumberFormatException nfe) {
            throw new Exception("Invalid config value for " +
                    ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
                    ". Value must be a valid duration (such as '100 milli' or '10 s')");
        }

        return new FixedDelayRestartStrategyFactory(maxAttempts, delay);
    }

    @Override
    public String toString() {
        return "FixedDelayRestartStrategy(" +
                "maxNumberRestartAttempts=" + maxNumberRestartAttempts +
                ", delayBetweenRestartAttempts=" + delayBetweenRestartAttempts +
                ')';
    }

    public static class FixedDelayRestartStrategyFactory extends RestartStrategyFactory {

        private static final long serialVersionUID = 6642934067762271950L;

        private final int maxAttempts;
        private final long delay;

        public FixedDelayRestartStrategyFactory(int maxAttempts, long delay) {
            this.maxAttempts = maxAttempts;
            this.delay = delay;
        }

        @Override
        public RestartStrategy createRestartStrategy() {
            return new FixedDelayRestartStrategy(maxAttempts, delay);
        }
    }
}
  • FixedDelayRestartStrategy实现了RestartStrategy接口,它的canRestart方法依据currentRestartAttempt及maxNumberRestartAttempts来判断;restart方法则直接调用ScheduledExecutor.schedule方法,延时delayBetweenRestartAttempts毫秒执行RestartCallback.triggerFullRecovery()

FailureRateRestartStrategy

flink-runtime_2.11-1.7.1-sources.jar!/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java

public class FailureRateRestartStrategy implements RestartStrategy {

    private final Time failuresInterval;
    private final Time delayInterval;
    private final int maxFailuresPerInterval;
    private final ArrayDeque<Long> restartTimestampsDeque;

    public FailureRateRestartStrategy(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {
        Preconditions.checkNotNull(failuresInterval, "Failures interval cannot be null.");
        Preconditions.checkNotNull(delayInterval, "Delay interval cannot be null.");
        Preconditions.checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
        Preconditions.checkArgument(failuresInterval.getSize() > 0, "Failures interval must be greater than 0 ms.");
        Preconditions.checkArgument(delayInterval.getSize() >= 0, "Delay interval must be at least 0 ms.");

        this.failuresInterval = failuresInterval;
        this.delayInterval = delayInterval;
        this.maxFailuresPerInterval = maxFailuresPerInterval;
        this.restartTimestampsDeque = new ArrayDeque<>(maxFailuresPerInterval);
    }

    @Override
    public boolean canRestart() {
        if (isRestartTimestampsQueueFull()) {
            Long now = System.currentTimeMillis();
            Long earliestFailure = restartTimestampsDeque.peek();

            return (now - earliestFailure) > failuresInterval.toMilliseconds();
        } else {
            return true;
        }
    }

    @Override
    public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
        if (isRestartTimestampsQueueFull()) {
            restartTimestampsDeque.remove();
        }
        restartTimestampsDeque.add(System.currentTimeMillis());

        executor.schedule(new Runnable() {
            @Override
            public void run() {
                restarter.triggerFullRecovery();
            }
        }, delayInterval.getSize(), delayInterval.getUnit());
    }

    private boolean isRestartTimestampsQueueFull() {
        return restartTimestampsDeque.size() >= maxFailuresPerInterval;
    }

    @Override
    public String toString() {
        return "FailureRateRestartStrategy(" +
            "failuresInterval=" + failuresInterval +
            "delayInterval=" + delayInterval +
            "maxFailuresPerInterval=" + maxFailuresPerInterval +
            ")";
    }

    public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
        int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
        String failuresIntervalString = configuration.getString(
                ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString()
        );
        String timeoutString = configuration.getString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL);
        String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString);

        Duration failuresInterval = Duration.apply(failuresIntervalString);
        Duration delay = Duration.apply(delayString);


        return new FailureRateRestartStrategyFactory(maxFailuresPerInterval, Time.milliseconds(failuresInterval.toMillis()), Time.milliseconds(delay.toMillis()));
    }

    public static class FailureRateRestartStrategyFactory extends RestartStrategyFactory {
        private static final long serialVersionUID = -373724639430960480L;

        private final int maxFailuresPerInterval;
        private final Time failuresInterval;
        private final Time delayInterval;

        public FailureRateRestartStrategyFactory(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {
            this.maxFailuresPerInterval = maxFailuresPerInterval;
            this.failuresInterval = Preconditions.checkNotNull(failuresInterval);
            this.delayInterval = Preconditions.checkNotNull(delayInterval);
        }

        @Override
        public RestartStrategy createRestartStrategy() {
            return new FailureRateRestartStrategy(maxFailuresPerInterval, failuresInterval, delayInterval);
        }
    }
}
  • FailureRateRestartStrategy实现了RestartStrategy接口,它的canRestart方法在restartTimestampsDeque队列大小小于maxFailuresPerInterval时返回true,大于等于maxFailuresPerInterval时则判断当前时间距离earliestFailure是否大于failuresInterval;restart方法则往restartTimestampsDeque添加当前时间,然后调用ScheduledExecutor.schedule方法,延时delayInterval执行RestartCallback.triggerFullRecovery()

小结

  • RestartStrategies提供了noRestart、fallBackRestart、fixedDelayRestart、failureRateRestart静态方法用于构建RestartStrategyConfiguration
  • RestartStrategyConfiguration是个抽象类,它定义了getDescription抽象方法,它有NoRestartStrategyConfiguration、FixedDelayRestartStrategyConfiguration、FailureRateRestartStrategyConfiguration、FallbackRestartStrategyConfiguration这几个子类
  • RestartStrategyResolving提供了一个静态方法resolve,用于解析RestartStrategies.RestartStrategyConfiguration,然后使用RestartStrategyFactory创建RestartStrategy;RestartStrategy定义了canRestart及restart两个方法,它有NoRestartStrategy、FixedDelayRestartStrategy、FailureRateRestartStrategy这几个子类

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容