浅谈 Apache Doris FE 处理查询 SQL 源码解析

在使用 Apache Doris 时,我们可以通过 Apache Doris FE Web 页面或者 Mysql 协议执行 SQL 语句,但是对于 Apache Doris 背后如何对 SQL 进行处理,我们无从所知。本文章内容主要讲解 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询语句和市面主流的数据库处理阶段都差不多,需要经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。 在 Doris 中,FE 负责查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 负责执行 FE 下发 Plan Fragment

## 一、前言

在使用 Apache Doris 时,我们可以通过 Apache Doris FE Web 页面或者 Mysql 协议执行 SQL 语句,但是对于 Apache Doris 背后如何对 SQL 进行处理,我们无从所知。本文章内容主要讲解 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询语句和市面主流的数据库处理阶段都差不多,需要经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。 在 Doris 中,FE 负责查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 负责执行 FE 下发 Plan Fragment

## 二、名词解释

* FE:Frontend,即 Doris 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。

* BE:Backend,即 Doris 的后端节点。主要负责数据存储与管理、查询计划执行等工作。

* slot:计算槽,是一个资源单位, 只有给 task 分配了一个 slot 之后, 这个 task 才可以运行

* planNode : 逻辑算子

* planNodeTree: 逻辑执行计划

## 三、执行流程

在使用 Apache Doris 时,我们可以通过 Apache Doris FE Web 页面或者 Mysql 协议执行 SQL 语句,但是对于 Apache Doris 背后如何对 SQL 进行处理,我们无从所知。本文章内容主要讲解 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询语句和市面主流的数据库处理阶段都差不多,需要经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。 在 Doris 中,FE 负责查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 负责执行 FE 下发 Plan Fragment

## 四、Apache Doris 查询原理

#### (一)SQL 接收

本文只说 mysql 协议如何接收 SQL 语句, 如果感兴趣的同学可以看看 Apache Doris FE Web 的 Rest Api。Apache Doris 兼容 Mysql 协议,用户可以通过 Mysql 客户端和其他支持 Mysql 协议的工具向 Doris 发送查询请求。MysqlServer Listener() 负责监听客户端发送来的 Mysql 连接请求,每个连接请求都被封装成一个 ConnectContext 对象,并被提交给 ConnectScheduler。ConnectScheduler 会维护一个线程池,每个 ConnectContext 会在线程池中由一个 ConnectProcessor 线程处理。

* MysqlServer 类 Listener 处理:

```

private class Listener implements Runnable {

        @Override

        public void run(){while (running && serverChannel.isOpen()) {

                SocketChannel clientChannel;

                try {clientChannel = serverChannel.accept();

                    if (clientChannel == null) {continue;}

                    // 构建 ConnectContext 对象

                    ConnectContext context = new ConnectContext(clientChannel);

                    // catelog 日志

                    context.setCatalog(Catalog.getCurrentCatalog());

                    // 向 ExecutorService 提交 new LoopHandler(context) ==>(源码)executor.submit(new LoopHandler(context))

                    if (!scheduler.submit(context)) {LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());

                        // clear up context

                        context.cleanup();}

                } catch (IOException e) {

                    // ClosedChannelException

                    // AsynchronousCloseException

                    // ClosedByInterruptException

                    // Other IOException, for example "to many open files" ...

                    LOG.warn("Query server encounter exception.", e);

                    try {Thread.sleep(100);

                    } catch (InterruptedException e1) {// Do nothing}

                } catch (Throwable e) {

                    // NotYetBoundException

                    // SecurityException

                    LOG.warn("Query server failed when calling accept.", e);

                }

            }

        }

    }

```

* ExecutorService 线程 LoopHandler 处理:

```

@Override

        public void run() {

            try {

                // Set thread local info

                context.setThreadLocalInfo();

                context.setConnectScheduler(ConnectScheduler.this);

                // authenticate check failed.

                if (!MysqlProto.negotiate(context)) {return;}

                if (registerConnection(context)) {MysqlProto.sendResponsePacket(context);

                } else {context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections");

                    MysqlProto.sendResponsePacket(context);

                    return;

                }

                context.setStartTime();

                ConnectProcessor processor = new ConnectProcessor(context);

                processor.loop();} catch (Exception e) {

                // for unauthorized access such lvs probe request, may cause exception, just log it in debug level

                if (context.getCurrentUserIdentity() != null){LOG.warn("connect processor exception because", e);

                } else {LOG.debug("connect processor exception because", e);

                }

            } finally {unregisterConnection(context);

                context.cleanup();}

        }

```

* processOnce(读取 Mysql 客户端的 sql) 方法

```

// 处理 mysql 的请求

    public void processOnce()throws IOException {ctx.getState().reset();

        executor = null;

        // 重置 MySQL 协议的序列号

        final MysqlChannel channel = ctx.getMysqlChannel();

        channel.setSequenceId(0);

        // 从通道读取数据包 ==>SQL

        try {packetBuf = channel.fetchOnePacket();

            if (packetBuf == null) {LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());

                throw new IOException("Error happened when receiving packet.");

            }

        } catch (AsynchronousCloseException e) {

            // when this happened, timeout checker close this channel

            // killed flag in ctx has been already set, just return

            return;

        }

        // 下发 SQL

        dispatch();

        // finalize

        finalizeCommand();

        ctx.setCommand(MysqlCommand.COM_SLEEP);

    }

```

#### (二)Parse

ConnectProcessor 接收到 SQL 之后会进行 analyze ,Apache Doris SQL 解析使用的 Parse 是 Java CUP Parser,语法规则 定义的文件在 sql_parser.cup。

> 感兴趣的同学可以详细看一下 StatementBase 类

* analyze 方法, 返回 List<StatementBase> (这里主要是语法解析)

```

// 解析 origin,返回 list<stmt>

    private List<StatementBase> analyze(String originStmt) throws AnalysisException, DdlException {LOG.debug("the originStmts are: {}", originStmt);

        // 使用 CUP&FLEX 生成的解析器解析语句

        SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());

        SqlParser parser = new SqlParser(input);

        try {return SqlParserUtils.getMultiStmts(parser);

        } catch (Error e) {throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);

        } catch (AnalysisException | DdlException e) {String errorMessage = parser.getErrorMsg(originStmt);

            LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e);

            if (errorMessage == null) {throw e;} else {throw new AnalysisException(errorMessage, e);

            }

        } catch (Exception e) {// TODO(lingbin): we catch 'Exception' to prevent unexpected error,

            // should be removed this try-catch clause future.

            throw new AnalysisException("Internal Error, maybe syntax error or this is a bug");

        }

    }

```

因为本文讲述的是查询语句(不同类型会转换成不通 Stmt,比如 InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt 等),最后我们会得到 QueryStmt,originStmt 会转换成 QueryStmt,QueryStmt 通常是用 SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement 组成

#### (三)Analyze

SQL 语句被解析成 AST 之后,会被交给 StmtExecutor 。StmtExecutor 会首先对 AST 进行语法和语义分析,大概会做下面的事情:

1. 检查并绑定 Cluster, Database, Table, Column 等元信息。

2. SQL 的合法性检查:窗口函数不能 DISTINCT,HLL 和 Bitmap 列不能 sum, count, where 中不能有 grouping 操作等。

3. SQL 重写:比如将 select * 扩展成 select 所有列,count distinct 查询重写等。

4. Table 与 Column 别名处理。

5. 为 Tuple, Slot, Expr 等分配唯一 ID。

6. 函数参数的合法性检测。

7. 表达式替换。

8. 类型检查,类型转换(BIGINT 和 DECIMAL 比较,BIGINT 类型需要 Cast 成 DECIMAL)。

主要代码:

```

analyzeAndGenerateQueryPlan 方法 -->  parsedStmt.analyze(analyzer);

```

#### (四)Rewrite

* analyzeAndGenerateQueryPlan 方法(部分代码,此处不做重点讲解)

StmtExecutor 在对 AST 进行语法和语义分析后,会让 ExprRewriter 根据 ExprRewriteRule 进行一次 Rewrite。目前 Doris 的重写规则比较简单,主要是进行了常量表达式的化简和谓词的简单处理。 常量表达式的化简是指 1 + 1 + 1 重写成 3,1 > 2 重写成 Flase 等。

如果重写后,有部分节点被成功改写,比如, 1 > 2 被改写成 Flase,那么就会再触发一次语法和语义分析的过程。

对于有子查询的 SQL,StmtRewriter 会进行重写,比如将 where in, where exists 重写成 semi join, where not in, where not exists 重写成 anti join。

```

if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {ExprRewriter rewriter = analyzer.getExprRewriter();

            rewriter.reset();

            if (context.getSessionVariable().isEnableFoldConstantByBe()) {parsedStmt.foldConstant(rewriter);

            }

            // explan 标签

            ExplainOptions explainOptions = parsedStmt.getExplainOptions();

            boolean reAnalyze = false;

            parsedStmt.rewriteExprs(rewriter);

            reAnalyze = rewriter.changed();

            if (analyzer.containSubquery()) {parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);

                reAnalyze = true;

            }

            if (parsedStmt instanceof SelectStmt) {if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {reAnalyze = true;}

            }

            if (parsedStmt instanceof SetOperationStmt) {List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();

                for (SetOperationStmt.SetOperand operand : operands) {if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)){reAnalyze = true;}

                }

            }

            if (parsedStmt instanceof InsertStmt) {QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();

                if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {reAnalyze = true;}

            }

            if (reAnalyze) {

                // 对重写语句进行处理

                List<Type> origResultTypes = Lists.newArrayList();

                for (Expr e : parsedStmt.getResultExprs()) {origResultTypes.add(e.getType());

                }

                List<String> origColLabels =

                        Lists.newArrayList(parsedStmt.getColLabels());

                // 重写语句进行 analyzer

                analyzer = new Analyzer(context.getCatalog(), context);

                // 重写语句 analyzer 信息

                parsedStmt.reset();

                parsedStmt.analyze(analyzer);

                // 恢复原始结果类型和列标签

                parsedStmt.castResultExprs(origResultTypes);

                parsedStmt.setColLabels(origColLabels);

                if (LOG.isTraceEnabled()) {LOG.trace("rewrittenStmt:" + parsedStmt.toSql());

                }

                if (explainOptions != null) {parsedStmt.setIsExplain(explainOptions);

                }

            }

        }

```

#### (五)SingleNodePlan

经过 parse、Analyze、Rewrite 阶段后,AST 会生成 singleNodePlanner,源码如下:

```

singleNodePlanner = new SingleNodePlanner(plannerContext);

PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();

```

单机 Plan 由 SingleNodePlanner 执行,输入是 AST,输出是单机物理执行 Plan, Plan 中每个节点是一个 PlanNode。

SingleNodePlanner 核心任务就是根据 AST 生成 OlapScanNode, AggregationNode, HashJoinNode, SortNode, UnionNode 等。

Doris 在生成单机 Plan 的时候主要进行了以下**工作或优化** :

1. Slot 物化:指确定一个表达式对应的列需要 Scan 和计算,比如聚合节点的聚合函数表达式和 Group By 表达式需要进行物化

```


//Slot物化,处理 Base表

analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());

// Slot物化 处理 where 语句的子查询

selectStmt.materializeRequiredSlots(analyzer);

```

2. 投影下推:BE 在 Scan 时只会 Scan 必须读取的列

```

    projectPlanNode(resultSlotIds, root);

```

3. 谓词下推:在满足语义正确的前提下将过滤条件尽可能下推到 Scan 节点

```

    pushDownPredicates(analyzer, selectStmt);

```

4. 分区,分桶裁剪:比如建表时按照 UserId 分桶,每个分区 100 个分桶,那么当不包含 or 的 Filter 条件包含 UserId ==xxx 时,Doris 就只会将查询发送 100 个分桶中的一个发送给 BE,可以大大减少不必要的数据读取

5. Join Reorder:对于 join操作,在保证结果不变的情况,通过规则计算最优(最少资源)join 操作。

```

    createCheapestJoinPlan(analyzer, refPlans);

```

6. Sort + Limit 优化成 TopN(FE 进行useTopN标识,BE标识执行)

```

    root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),useTopN, limit == -1, stmt.getOffset());

```

7. MaterializedView 选择:会根据查询需要的列,过滤,排序和 Join 的列,行数,列数等因素选择最佳的 MaterializedView

```

    boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);

```

8. 向量化执行引擎选择:基于现代CPU的特点与火山模型的执行特点,重新设计列式存储系统的SQL执行引擎,从而提高了CPU在SQL执行时的效率,提升了SQL查询的性能。

```

    if (VectorizedUtil.isVectorized()) {

            singleNodePlan.convertToVectoriezd();

    }

```

9. Runtime Filter Join:Doris 在进行 Hash Join 计算时会在右表构建一个哈希表,左表流式的通过右表的哈希表从而得出 Join 结果。而 RuntimeFilter 就是充分利用了右表的 Hash 表,在右表生成哈希表的时,同时生成一个基于哈希表数据的一个过滤条件,然后下推到左表的数据扫描节点

```

RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());

```

创建 **singleNodePlanner** 主要代码:**createSingleNodePlan()**

#### (六)DistributedPlan

分布式查询计划 PlanFragmentTree ,每个 PlanFragment 是由

PlanNodeTree 的子树 和 Sink 节点组成的。分布式化的目标是最小化数据移动和最大化本地 Scan。

分布式查询计划 PlanFragmentTree ,每个 PlanFragment 是由

PlanNodeTree 的子树 和 Sink 节点组成的。分布式化的目标是最小化数据移动和最大化本地 Scan。

每个 PlanFragment 由 PlanNodeTree 和 Data Sink 组成,我们从上图的 Plan Fragment 2 可以看出,由 AggregationNode、HashJoinNode 和 DataSink。Plan 分布式化的方法是增加 ExchangeNode,执行计划树会以 ExchangeNode 为边界拆分为 PlanFragment。

ExchangeNode 主要是用于 BE 之间的数据交换与共享,类似 Spark 和 MR 中的 Shuffle。

各个 Fragment 的数据流转和最终的结果发送依赖:DataSink。比如 DataStreamSink 会将一个 Fragment 的数据发送到另一个 Fragment 的 ExchangeNode,ResultSink 会将查询的结果集发送到 FE。

每个 PlanFragment 可以在每个 BE 节点生成 1 个或多个执行实例,不同执行实例处理不同的数据集,通过并发来提升查询性能。

DistributedPlanner 中最主要的工作是决定 Join 的分布式执行策略:Shuffle Join,Bucket Join,Broadcast Join,Colocate Join,和增加 Aggregation 的 Merge 阶段。

决定 Join 的分布式执行策略的逻辑如下:

如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join

如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。

如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join

如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。

#### (七)Schedule

生成了 Plan Fragment Tree 之后,Apache Doris FE 通过 Coordinator 类对 Fragment 进行分配、分发步骤,主要涉及的方法有:computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment()。


生成了 Plan Fragment Tree 之后,Apache Doris FE 通过 Coordinator 类对 Fragment 进行分配、分发步骤,主要涉及的方法有:computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment()。

* computeScanRangeAssignment():主要逻辑对fragment合理分配,尽可能保证每个BE节点的请求都是平均。

* computeFragmentExecParams():处理Fragment执行参数。

* sendFragment():发送Fragment至BE节点,

#### (八)Execute

Doris 的查询执行模式 Volcano 模式,不过做了 Batch 的优化,不同的 operator 之间以 RowBatch 的方式传输数据。

BE 的 BackendService 会接收 FE 的 查询请求,让 FragmentMgr 进行处理。 FragmentMgr::exec_plan_fragment 会启动一个线程由 PlanFragmentExecutor 具体执行一个 plan fragment。PlanFragmentExecutor 会根据 plan fragment 创建一个 ExecNode 树,FE 每个 PlanNode 都会对应 ExecNode 的一个子类。

PlanFragmentExecutor::get_next_internal 会驱动整个 ExecNode 树的执行,会自顶向下调用每个 ExecNode 的 get_next 方法,最终数据会从 ScanNode 节点产生,向上层节点传递,每个节点都会按照自己的逻辑处理 RowBatch。 PlanFragmentExecutor 在拿到每个 RowBatch 后,如果是中间结果,就会将数据传输给其他 BE 节点,如果是最终结果,就会将数据传输给 FE 节点。

## 五、参考献文

* Apache Doris Join原理

  > https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F

* Apache Doris 存储层设计

  > https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html


* Apache Doris 元数据涉及

  > https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584


* Apache Doris 查询原理

  > https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258C



* [Apache Doris Join原理](https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F)

* [Apache Doris 存储层设计](https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html)

* [Apache Doris 元数据涉及](https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584)

* [Apache Doris 查询原理](https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258C)

## 六、实践分享

* [Apache Doris 在网易互娱的应用实践](https://mp.weixin.qq.com/s/3gQiN6trYmmXVsuZVXNl5Q)

* [Apache Doris 在知乎用户画像与实时数据的架构与实践](https://mp.weixin.qq.com/s/i5qbiKN6ruOk2Snpyy6DBw)

* [Apache Doris 物化视图与索引在京东的典型应用](https://mp.weixin.qq.com/s/3WAdi40yg7dRt2QNWcTARw)

* [Apache Doris Join 实现与调优实践](https://mp.weixin.qq.com/s/pukjERSOW-D-BM4z1G9JlA)

## 七、总结

本文主要介绍查询 SQL 在 Apache Doris Fe 节点经历 parse、analyze、rewrite、GenerateQueryPlan、schedule、send 等阶段处理。Apache Doris Fe 的 parse、analyze、rewrite 阶段和其他数据库处理过程差不多,本文主要讲解的核心是 GenerateQueryPlan、schedule、send 阶段的原理。我们可以深度了解 Apache Doris Fe 节点对查询 SQL 的优化操作,以及未来遇到相关性能问题不会无从下手。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容