ElasticSearch Master选举、自动Cluster发现

1 概述

ElasticSearch中Discovery负责Mater选举、Cluster发现、向Cluster中的node发布状态等。
其源码注释如下:

A pluggable module allowing to implement discovery of other nodes, publishing of the cluster state to all nodes, electing a master of the cluster that raises cluster state change events.

Discovery在ElasticSearch中有两个实现:

  1. SingleNodeDiscovery: 只有一个节点的集群。
  2. ZenDiscovery: 有多个节点的集群。

本文的重点在于介绍ZenDiscovery,介绍Master选举、Cluster发现等相关实现。

2 ZenDiscovery实例化和启动

ZenDiscoveryDiscoveryModule中获取并初始化,DiscoveryModuleNode类的构造函数中初始化。

Node.start函数会触发ZenDiscovery.doStart函数的调用:

//ZenDiscovery
@Override
protected void doStart() {
    DiscoveryNode localNode = transportService.getLocalNode();
    assert localNode != null;
    synchronized (stateMutex) {
        // set initial state
        assert committedState.get() == null;
        assert localNode != null;
        ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
        ClusterState initialState = builder
            .blocks(ClusterBlocks.builder()
                //这里其实有个需要注意的地方,STATE_NOT_RECOVERED_BLOCK
                //表示节点刚启动没有进行集群、索引元数据等的选举、恢复
                //等,主节点在对新ClusterState响应时会触发集群、索引
                //元数据选举,完成后移除STATE_NOT_RECOVERED_BLOCK
                //具体过程后续文章会有介绍
                .addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
                .addGlobalBlock(discoverySettings.getNoMasterBlock()))
            .nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()))
            .build();
        committedState.set(initialState);
        clusterApplier.setInitialState(initialState);
        //nodesFD主要是主节点定时ping集群内其他非主节点并发现节点故障(下线)的工具类
        nodesFD.setLocalNode(localNode);
        //加入Cluster的工具类,后文具体介绍
        joinThreadControl.start();
    }
    //zenPing主要用于主节点选举时ping,以此其他节点获取其他节点状态
    zenPing.start();
}

3 集群启动时的初始join

每个节点的Node.start函数中会调用ZenDiscovery.startInitialJoin进行第一次join操作。

[pic-startInitialJoin调用轨迹]

上面列出的代码有一行为joinThreadControl.start(),这里我们先看下具体实现:

//ZenDiscovery.JoinThreadControl
public void start() {
    //设置running=true,标识已启动,后续使用JoinThreadControl进行Master选举等都会判断该标志位。
    running.set(true);
}

下面看ZenDiscovery.startInitialJoin的具体实现

//ZenDiscovery.startInitialJoin
@Override
public void startInitialJoin() {
    // start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
    synchronized (stateMutex) {
        // do the join on a different thread, the caller of this method waits for 30s anyhow till it is discovered
        joinThreadControl.startNewThreadIfNotRunning();
    }
}

上面是初始join的实现,最终都是调用joinThreadControl.startNewThreadIfNotRunning();实现选主,后续如果主节点发生故障等也是使用同样的逻辑。下面具体讨论joinThreadControl.startNewThreadIfNotRunning()实现。

3.1 Master选举以及加入集群大致流程

//ZenDiscovery.JoinThreadControl
/** starts a new joining thread if there is no currently active one and join thread controlling is started */
public void startNewThreadIfNotRunning() {
    assert Thread.holdsLock(stateMutex);
    //判断是否本节点是否已经有其他线程启动了join操作,如果是的话则直接返回
    if (joinThreadActive()) {
        return;
    }
    //加入join任务
    threadPool.generic().execute(new Runnable() {
        @Override
        public void run() {
            Thread currentThread = Thread.currentThread();
            //CAS设置当前线程为当前join线程,如果成功则继续后续操作,
            //如果失败,则表示已经有其他线程抢先jion操作了,此时则直接返回
            if (!currentJoinThread.compareAndSet(null, currentThread)) {
                return;
            }
            //这里while进行循环,直到加入成功为止,后面可能因为当前当选为
//主节点等待投票超时、可选主节点个数不足等失败
            while (running.get() && joinThreadActive(currentThread)) {
                try {
                    //具体加入集群和Master选举函数
                    innerJoinCluster();
                    return;
                } catch (Exception e) {
                    ...
                }
            }
            // cleaning the current thread from currentJoinThread is done by explicit calls.
        }
    });
}

JoinThreadControlZenDiscovery内部类,innerJoinClusterZenDiscovery方法,Master选举的具体实现就在innerJoinCluster中:

//ZenDiscovery
private void innerJoinCluster() {
    DiscoveryNode masterNode = null;
    final Thread currentThread = Thread.currentThread();
  //nodeJoinController是负责处理其他节点加入当前当选的主节点,其他节点
//加入主机点之后会触发reroute,进行shard分配
    nodeJoinController.startElectionContext();
    while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
        //选举master节点的实现所在
        masterNode = findMaster();
    }

    if (!joinThreadControl.joinThreadActive(currentThread)) {
        logger.trace("thread is no longer in currentJoinThread. Stopping.");
        return;
    }
    //如果当前节点就是此次选举出的Master节点,则等待其他节点对其选举结果
    //进行确认,即等待其他节点加入该节点。这里也属于一个投票过程
    if (transportService.getLocalNode().equals(masterNode)) {
        //根据配置获取选举成功的最少参与节点数,当加入当前节点的其
        //他节点数目大于等于此值时则此节点才会真正当选master成功
        final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
        logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
        //等待其他节点加入此节点
        nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                new NodeJoinController.ElectionCallback() {
                    @Override
                    public void onElectedAsMaster(ClusterState state) {
                        //成功当选为master节点,标识流程成功结束
                        //markThreadAsDone会清空joinThreadControl中记录的选举线程,
                        //以后续有需要时rejoin
                        synchronized (stateMutex) {
                            joinThreadControl.markThreadAsDone(currentThread);
                        }
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        logger.trace("failed while waiting for nodes to join, rejoining", t);
                        synchronized (stateMutex) {
                            //等待投票失败,重新启动整个join逻辑
                            joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        }
                    }
                }

        );
    } else {
        //如果当前节点不是此次选举中被选中的主节点,则
        //停止选举流程,开始加入刚选举出的主节点(即投票过程)
        // process any incoming joins (they will fail because we are not the master)
        //停止选举过程
        nodeJoinController.stopElectionContext(masterNode + " elected");

        // send join request
        //向此次选举出的master节点发起join请求,即承认其master身份
        final boolean success = joinElectedMaster(masterNode);

        synchronized (stateMutex) {
            //成功加入刚选举出的master节点
            if (success) {
                DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
                //没有主节点异常
                if (currentMasterNode == null) {
                    // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
                    // a valid master.
                    logger.debug("no master node is set, despite of join request completing. retrying pings.");
                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                } else if (currentMasterNode.equals(masterNode) == false) {
                    // update cluster state
                    joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
                }
                //标识加入成功
                joinThreadControl.markThreadAsDone(currentThread);
            } else {
                // failed to join. Try again...
                //加入当选的主节点失败则重新启动join过程
                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
            }
        }
    }
}

3.1.2 Master选举实现:选举算法

Master选举主要逻辑在ZenDiscovery.findMaster中:

//ZenDiscovery
private DiscoveryNode findMaster() {
    logger.trace("starting to ping");
    //pingAndWait用于获取其他节点的状态,这里只介绍下大致实现,不再展开具体源码:
    //pingAndWait主要是使用上面介绍的ZenPing去ping配置中的所有host,具体实现逻辑可以
    //参考ZenPing的默认实现UnicastZenPing。
    //通过函数名称可以知道这是个同步调用,同步的具体实现和ElasticSearch大部分需要等待
    //远程通信返回的行为类似,采用计数器记录发送的请求个数,每次有请求响应时递减计数器,
    //当计数器递减为0时表示所有请求都得到了响应。
    List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
    if (fullPingResponses == null) {
        logger.trace("No full ping responses");
        return null;
    }
    if (logger.isTraceEnabled()) {
        StringBuilder sb = new StringBuilder();
        if (fullPingResponses.size() == 0) {
            sb.append(" {none}");
        } else {
            for (ZenPing.PingResponse pingResponse : fullPingResponses) {
                sb.append("\n\t--> ").append(pingResponse);
            }
        }
        logger.trace("full ping responses:{}", sb);
    }

    final DiscoveryNode localNode = transportService.getLocalNode();

    // add our selves
    //在获取的装填集中加入当前节点自己的状态,因为自己也需要加入选举,也可能被选举为主节点
    assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
        .filter(n -> n.equals(localNode)).findAny().isPresent() == false;

    fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));

    // filter responses
    final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
    //activeMasters用来记录当前已经存在的主节点
    List<DiscoveryNode> activeMasters = new ArrayList<>();
    for (ZenPing.PingResponse pingResponse : pingResponses) {
        // We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
        // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
        //如果返回的节点表明自己当前已经是主节点(这里可以看上面的因为注释,避免了自己选自己的情况)
        if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
            activeMasters.add(pingResponse.master());
        }
    }

    // nodes discovered during pinging
    //masterCandidates用来记录配置为可以成为主节点的候选节点
    List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
    //这里将返回节点中配置为可以作为主节点的节点加入候选节点中
    for (ZenPing.PingResponse pingResponse : pingResponses) {
        //这里要注意isMasterNode并不是说明该节点是不是主节点,而是表明该节点能不能成为主节点
        if (pingResponse.node().isMasterNode()) {
            masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
        }
    }
    //如果当前存在的主节点列表activeMasters为空,则从候选节点列表masterCandidates中选取主节点
    if (activeMasters.isEmpty()) {
        //判断是否有足够的候选节点
        if (electMaster.hasEnoughCandidates(masterCandidates)) {
            //进行节点选举
            final ElectMasterService.MasterCandidate winner = 
            electMaster.electMaster(masterCandidates);
            logger.trace("candidate {} won election", winner);
            return winner.getNode();
        } else {
            // if we don't have enough master nodes, we bail, because there are not enough master to elect from
            logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                        masterCandidates, electMaster.minimumMasterNodes());
            return null;
        }
    } else {
        //如果当前存在的主节点列表activeMasters不为空,则从中选取主节点
        assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
        // lets tie break between discovered nodes
        return electMaster.tieBreakActiveMasters(activeMasters);
    }
}

根据上述代码可知,ElasticSearch在选取主节点时,首先使用electMaster.tieBreakActiveMasters在当前已经是主节点的列表中选取,如果该列表为空则再使用electMaster.electMaster在候选列表中选取。

electMaster.electMasterelectMaster.tieBreakActiveMasters是选举的逻辑所在,但是比较简单,展开源码可以发现最终选举过程是首先对列表中的节点做一个基数排序,排序原则有两个

  1. 先判断对比的两个节点是否配置为可当选为主节点,配置可以当选为主节点的获胜。
  2. 如果上述不能区分两个节点的顺序,则再根据节点ID进行排序,ID小的获胜。

上述说明了ZenDiscovery.innerJoinCluster定义了选举和加入集群的逻辑,在findMaster选举出主节点后,主节点则等待其他节点加入自己;其他节点发现自己不是此次选举出的主节点,则向此次选举出的主节点发送join请求,加入该节点。

electMaster.electMasterelectMaster.tieBreakActiveMasters最终都是通过调用如下函数对列表中的节点进行排序,然后取第一个节点做为主节点的:

/** master nodes go before other nodes, with a secondary sort by id **/
//ElectMasterService
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
    //首选判断该节点是否配置为可当选为主节点
    if (o1.isMasterNode() && !o2.isMasterNode()) {
        return -1;
    }
    if (!o1.isMasterNode() && o2.isMasterNode()) {
        return 1;
    }
    //在比较节点ID
    return o1.getId().compareTo(o2.getId());
}

3.1.3 Master选举实现:等待其他节点加入自己

当一个节点当选为主节点时,通过上面的ZenDiscovery.innerJoinCluster可知,当选为主节点的节点和落选的节点会执行不同的操作,当选为主节点的节点会等待足够数量的其他节点节点加入自己,而落选的节点则向当选为主节点的节点发送DISCOVERY_JOIN_ACTION_NAM请求加入主节点,下面我们分别看一下相关实现:

3.1.3.1 当选为主节点等待其他节点加入

当选为主节点的节点等待其他节点加入,其实就是一个投票过程,先再次看一下ZenDiscovery.innerJoinCluster实现:

//ZenDiscovery
 private void innerJoinCluster() {
    ...
    使用findMaster选举主节点
    //如果自己当选为主节点
    if (transportService.getLocalNode().equals(masterNode)) {
        //配置discovery.zen.minimum_master_nodes要求的最少加入自己的节点数
        final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
        ...
        nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                new NodeJoinController.ElectionCallback() {
                    //当选为主节点,清空状态,以备下一次选举
                    @Override
                    public void onElectedAsMaster(ClusterState state) {
                        synchronized (stateMutex) {
                            joinThreadControl.markThreadAsDone(currentThread);
                        }
                    }
                    //此次选举失败,重新开启下一轮选举
                    @Override
                    public void onFailure(Throwable t) {
                        logger.trace("failed while waiting for nodes to join, rejoining", t);
                        synchronized (stateMutex) {
                            joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                        }
                    }
                }

        );
    } else {
        //本次选举落选的节点,首先停止选举操作
        // process any incoming joins (they will fail because we are not the master)
        nodeJoinController.stopElectionContext(masterNode + " elected");
        //向本次当选为主节点的节点发送join请求
        // send join request
        final boolean success = joinElectedMaster(masterNode);

        synchronized (stateMutex) {
            ...一些异常处理
        }
    }
}

当选为主节点的节点会调用nodeJoinController.waitToBeElectedAsMaster等待其他节点加入自己(投票)。这里不再展开其实现原理,和其他同步操作一样,采用计数器记录要求加入自己的节点个数,等待其他节点发送DISCOVERY_JOIN_ACTION_NAME请求加入自己,每成功收到一个请求即递减计数器,计算器等于0时表示满足所需的最少票数,此时才表示自己真正成为主节点。

3.1.3.2 落选节点加入主节点

这里其实就是一个投票过程,落选节点通过调用joinElectedMaster加入主节点:

 private boolean joinElectedMaster(DiscoveryNode masterNode) {
    try {
        // first, make sure we can connect to the master
        //先尝试是否能够连接次轮选举中当选的主节点
        transportService.connectToNode(masterNode);
    } catch (Exception e) {
        logger.warn(() -> new ParameterizedMessage("failed to connect to master [{}], retrying...", masterNode), e);
        return false;
    }
    int joinAttempt = 0; // we retry on illegal state if the master is not yet ready
    //while循环,表示如果发送可接受的异常时,会重复发送join请求
    while (true) {
        try {
            logger.trace("joining master {}", masterNode);
            membership.sendJoinRequestBlocking(masterNode, transportService.getLocalNode(), joinTimeout);
            return true;
        } catch (Exception e) {
            final Throwable unwrap = ExceptionsHelper.unwrapCause(e);
            if (unwrap instanceof NotMasterException) {
                //统计失败次数,如果达到指定此时则不再发送
                if (++joinAttempt == this.joinRetryAttempts) {
                    logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
                    return false;
                } else {
                    logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
                }
            } else {
                if (logger.isTraceEnabled()) {
                    logger.trace(() -> new ParameterizedMessage("failed to send join request to master [{}]", masterNode), e);
                } else {
                    logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(e));
                }
                return false;
            }
        }

        try {
            //一次失败之后等待一定时间之后再次重试
            Thread.sleep(this.joinRetryDelay.millis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

上面代码中可以看出,当前节点会首先尝试连接当选的主节点,transportService.connectToNode实现原理可以参考这篇ElasticSearch 基于Nettty的通信原理,其实就是克隆客户端Bootstrap模板,注册handler,然后调用connect函数,成功则表示可以连接。

连接成功之后会发送DISCOVERY_JOIN_ACTION_NAME请求,上面代码中失败重试次数通过discovery.zen.join_retry_attempts进行配置,每次重试之间的等待时间通过discovery.zen.join_retry_delay进行配置。

MembershipAction可以看到当选为主节点的节点处理DISCOVERY_JOIN_ACTION_NAME请求的handler为JoinRequestRequestHandler,大概处理方式在3.1.3.1已经介绍过,这里不再赘述。

4 ElasticSearch运行过程中的rejoin

ElasticSearch运行过程中,主节点可能会发送故障,也可以使用api使ElasticSearch重新进行主节点选举,运行过程中的重新选举是通过调用ZenDiscovery.rejoin实现的:

protected void rejoin(String reason) {
    assert Thread.holdsLock(stateMutex);
    ClusterState clusterState = committedState.get();

    logger.warn("{}, current nodes: {}", reason, clusterState.nodes());
    //停止负责互ping的服务
    nodesFD.stop();
    masterFD.stop(reason);

    // TODO: do we want to force a new thread if we actively removed the master? this is to give a full pinging cycle
    // before a decision is made.
    //开始一轮新的选举
    joinThreadControl.startNewThreadIfNotRunning();

    if (clusterState.nodes().getMasterNodeId() != null) {
        // remove block if it already exists before adding new one
        assert clusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock().id()) == false :
            "NO_MASTER_BLOCK should only be added by ZenDiscovery";
        ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks())
            .addGlobalBlock(discoverySettings.getNoMasterBlock())
            .build();

        DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build();
        clusterState = ClusterState.builder(clusterState)
            .blocks(clusterBlocks)
            .nodes(discoveryNodes)
            .build();

        committedState.set(clusterState);
        clusterApplier.onNewClusterState(reason, this::clusterState, (source, e) -> {}); // don't wait for state to be applied
    }
}

通过rejoin调用轨迹可以大概知道何时会触发rejoin进行新一轮的选举:


rejoin调用轨迹.png

通过上图可以看出收到主动rejoin请求、主节点失联(故障)、最小主节点数发生变化、集群状态Publish异常等都会触发rejoin。

5 自动Cluster发现

其实就是上述join的过程。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容