在使用 Apache Doris 时,我们可以通过 Apache Doris FE Web 页面或者 Mysql 协议执行 SQL 语句,但是对于 Apache Doris 背后如何对 SQL 进行处理,我们无从所知。本文章内容主要讲解 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询语句和市面主流的数据库处理阶段都差不多,需要经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。 在 Doris 中,FE 负责查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 负责执行 FE 下发 Plan Fragment
## 一、前言
在使用 Apache Doris 时,我们可以通过 Apache Doris FE Web 页面或者 Mysql 协议执行 SQL 语句,但是对于 Apache Doris 背后如何对 SQL 进行处理,我们无从所知。本文章内容主要讲解 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询语句和市面主流的数据库处理阶段都差不多,需要经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。 在 Doris 中,FE 负责查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 负责执行 FE 下发 Plan Fragment
## 二、名词解释
* FE:Frontend,即 Doris 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
* BE:Backend,即 Doris 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
* slot:计算槽,是一个资源单位, 只有给 task 分配了一个 slot 之后, 这个 task 才可以运行
* planNode : 逻辑算子
* planNodeTree: 逻辑执行计划
## 三、执行流程
在使用 Apache Doris 时,我们可以通过 Apache Doris FE Web 页面或者 Mysql 协议执行 SQL 语句,但是对于 Apache Doris 背后如何对 SQL 进行处理,我们无从所知。本文章内容主要讲解 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询语句和市面主流的数据库处理阶段都差不多,需要经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。 在 Doris 中,FE 负责查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 负责执行 FE 下发 Plan Fragment
## 四、Apache Doris 查询原理
#### (一)SQL 接收
本文只说 mysql 协议如何接收 SQL 语句, 如果感兴趣的同学可以看看 Apache Doris FE Web 的 Rest Api。Apache Doris 兼容 Mysql 协议,用户可以通过 Mysql 客户端和其他支持 Mysql 协议的工具向 Doris 发送查询请求。MysqlServer Listener() 负责监听客户端发送来的 Mysql 连接请求,每个连接请求都被封装成一个 ConnectContext 对象,并被提交给 ConnectScheduler。ConnectScheduler 会维护一个线程池,每个 ConnectContext 会在线程池中由一个 ConnectProcessor 线程处理。
* MysqlServer 类 Listener 处理:
```
private class Listener implements Runnable {
@Override
public void run(){while (running && serverChannel.isOpen()) {
SocketChannel clientChannel;
try {clientChannel = serverChannel.accept();
if (clientChannel == null) {continue;}
// 构建 ConnectContext 对象
ConnectContext context = new ConnectContext(clientChannel);
// catelog 日志
context.setCatalog(Catalog.getCurrentCatalog());
// 向 ExecutorService 提交 new LoopHandler(context) ==>(源码)executor.submit(new LoopHandler(context))
if (!scheduler.submit(context)) {LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());
// clear up context
context.cleanup();}
} catch (IOException e) {
// ClosedChannelException
// AsynchronousCloseException
// ClosedByInterruptException
// Other IOException, for example "to many open files" ...
LOG.warn("Query server encounter exception.", e);
try {Thread.sleep(100);
} catch (InterruptedException e1) {// Do nothing}
} catch (Throwable e) {
// NotYetBoundException
// SecurityException
LOG.warn("Query server failed when calling accept.", e);
}
}
}
}
```
* ExecutorService 线程 LoopHandler 处理:
```
@Override
public void run() {
try {
// Set thread local info
context.setThreadLocalInfo();
context.setConnectScheduler(ConnectScheduler.this);
// authenticate check failed.
if (!MysqlProto.negotiate(context)) {return;}
if (registerConnection(context)) {MysqlProto.sendResponsePacket(context);
} else {context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections");
MysqlProto.sendResponsePacket(context);
return;
}
context.setStartTime();
ConnectProcessor processor = new ConnectProcessor(context);
processor.loop();} catch (Exception e) {
// for unauthorized access such lvs probe request, may cause exception, just log it in debug level
if (context.getCurrentUserIdentity() != null){LOG.warn("connect processor exception because", e);
} else {LOG.debug("connect processor exception because", e);
}
} finally {unregisterConnection(context);
context.cleanup();}
}
```
* processOnce(读取 Mysql 客户端的 sql) 方法
```
// 处理 mysql 的请求
public void processOnce()throws IOException {ctx.getState().reset();
executor = null;
// 重置 MySQL 协议的序列号
final MysqlChannel channel = ctx.getMysqlChannel();
channel.setSequenceId(0);
// 从通道读取数据包 ==>SQL
try {packetBuf = channel.fetchOnePacket();
if (packetBuf == null) {LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
throw new IOException("Error happened when receiving packet.");
}
} catch (AsynchronousCloseException e) {
// when this happened, timeout checker close this channel
// killed flag in ctx has been already set, just return
return;
}
// 下发 SQL
dispatch();
// finalize
finalizeCommand();
ctx.setCommand(MysqlCommand.COM_SLEEP);
}
```
#### (二)Parse
ConnectProcessor 接收到 SQL 之后会进行 analyze ,Apache Doris SQL 解析使用的 Parse 是 Java CUP Parser,语法规则 定义的文件在 sql_parser.cup。
> 感兴趣的同学可以详细看一下 StatementBase 类
* analyze 方法, 返回 List<StatementBase> (这里主要是语法解析)
```
// 解析 origin,返回 list<stmt>
private List<StatementBase> analyze(String originStmt) throws AnalysisException, DdlException {LOG.debug("the originStmts are: {}", originStmt);
// 使用 CUP&FLEX 生成的解析器解析语句
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {return SqlParserUtils.getMultiStmts(parser);
} catch (Error e) {throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);
} catch (AnalysisException | DdlException e) {String errorMessage = parser.getErrorMsg(originStmt);
LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e);
if (errorMessage == null) {throw e;} else {throw new AnalysisException(errorMessage, e);
}
} catch (Exception e) {// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
// should be removed this try-catch clause future.
throw new AnalysisException("Internal Error, maybe syntax error or this is a bug");
}
}
```
因为本文讲述的是查询语句(不同类型会转换成不通 Stmt,比如 InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt 等),最后我们会得到 QueryStmt,originStmt 会转换成 QueryStmt,QueryStmt 通常是用 SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement 组成
#### (三)Analyze
SQL 语句被解析成 AST 之后,会被交给 StmtExecutor 。StmtExecutor 会首先对 AST 进行语法和语义分析,大概会做下面的事情:
1. 检查并绑定 Cluster, Database, Table, Column 等元信息。
2. SQL 的合法性检查:窗口函数不能 DISTINCT,HLL 和 Bitmap 列不能 sum, count, where 中不能有 grouping 操作等。
3. SQL 重写:比如将 select * 扩展成 select 所有列,count distinct 查询重写等。
4. Table 与 Column 别名处理。
5. 为 Tuple, Slot, Expr 等分配唯一 ID。
6. 函数参数的合法性检测。
7. 表达式替换。
8. 类型检查,类型转换(BIGINT 和 DECIMAL 比较,BIGINT 类型需要 Cast 成 DECIMAL)。
主要代码:
```
analyzeAndGenerateQueryPlan 方法 --> parsedStmt.analyze(analyzer);
```
#### (四)Rewrite
* analyzeAndGenerateQueryPlan 方法(部分代码,此处不做重点讲解)
StmtExecutor 在对 AST 进行语法和语义分析后,会让 ExprRewriter 根据 ExprRewriteRule 进行一次 Rewrite。目前 Doris 的重写规则比较简单,主要是进行了常量表达式的化简和谓词的简单处理。 常量表达式的化简是指 1 + 1 + 1 重写成 3,1 > 2 重写成 Flase 等。
如果重写后,有部分节点被成功改写,比如, 1 > 2 被改写成 Flase,那么就会再触发一次语法和语义分析的过程。
对于有子查询的 SQL,StmtRewriter 会进行重写,比如将 where in, where exists 重写成 semi join, where not in, where not exists 重写成 anti join。
```
if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {ExprRewriter rewriter = analyzer.getExprRewriter();
rewriter.reset();
if (context.getSessionVariable().isEnableFoldConstantByBe()) {parsedStmt.foldConstant(rewriter);
}
// explan 标签
ExplainOptions explainOptions = parsedStmt.getExplainOptions();
boolean reAnalyze = false;
parsedStmt.rewriteExprs(rewriter);
reAnalyze = rewriter.changed();
if (analyzer.containSubquery()) {parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);
reAnalyze = true;
}
if (parsedStmt instanceof SelectStmt) {if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {reAnalyze = true;}
}
if (parsedStmt instanceof SetOperationStmt) {List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();
for (SetOperationStmt.SetOperand operand : operands) {if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)){reAnalyze = true;}
}
}
if (parsedStmt instanceof InsertStmt) {QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {reAnalyze = true;}
}
if (reAnalyze) {
// 对重写语句进行处理
List<Type> origResultTypes = Lists.newArrayList();
for (Expr e : parsedStmt.getResultExprs()) {origResultTypes.add(e.getType());
}
List<String> origColLabels =
Lists.newArrayList(parsedStmt.getColLabels());
// 重写语句进行 analyzer
analyzer = new Analyzer(context.getCatalog(), context);
// 重写语句 analyzer 信息
parsedStmt.reset();
parsedStmt.analyze(analyzer);
// 恢复原始结果类型和列标签
parsedStmt.castResultExprs(origResultTypes);
parsedStmt.setColLabels(origColLabels);
if (LOG.isTraceEnabled()) {LOG.trace("rewrittenStmt:" + parsedStmt.toSql());
}
if (explainOptions != null) {parsedStmt.setIsExplain(explainOptions);
}
}
}
```
#### (五)SingleNodePlan
经过 parse、Analyze、Rewrite 阶段后,AST 会生成 singleNodePlanner,源码如下:
```
singleNodePlanner = new SingleNodePlanner(plannerContext);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
```
单机 Plan 由 SingleNodePlanner 执行,输入是 AST,输出是单机物理执行 Plan, Plan 中每个节点是一个 PlanNode。
SingleNodePlanner 核心任务就是根据 AST 生成 OlapScanNode, AggregationNode, HashJoinNode, SortNode, UnionNode 等。
Doris 在生成单机 Plan 的时候主要进行了以下**工作或优化** :
1. Slot 物化:指确定一个表达式对应的列需要 Scan 和计算,比如聚合节点的聚合函数表达式和 Group By 表达式需要进行物化
```
//Slot物化,处理 Base表
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
// Slot物化 处理 where 语句的子查询
selectStmt.materializeRequiredSlots(analyzer);
```
2. 投影下推:BE 在 Scan 时只会 Scan 必须读取的列
```
projectPlanNode(resultSlotIds, root);
```
3. 谓词下推:在满足语义正确的前提下将过滤条件尽可能下推到 Scan 节点
```
pushDownPredicates(analyzer, selectStmt);
```
4. 分区,分桶裁剪:比如建表时按照 UserId 分桶,每个分区 100 个分桶,那么当不包含 or 的 Filter 条件包含 UserId ==xxx 时,Doris 就只会将查询发送 100 个分桶中的一个发送给 BE,可以大大减少不必要的数据读取
5. Join Reorder:对于 join操作,在保证结果不变的情况,通过规则计算最优(最少资源)join 操作。
```
createCheapestJoinPlan(analyzer, refPlans);
```
6. Sort + Limit 优化成 TopN(FE 进行useTopN标识,BE标识执行)
```
root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),useTopN, limit == -1, stmt.getOffset());
```
7. MaterializedView 选择:会根据查询需要的列,过滤,排序和 Join 的列,行数,列数等因素选择最佳的 MaterializedView
```
boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
```
8. 向量化执行引擎选择:基于现代CPU的特点与火山模型的执行特点,重新设计列式存储系统的SQL执行引擎,从而提高了CPU在SQL执行时的效率,提升了SQL查询的性能。
```
if (VectorizedUtil.isVectorized()) {
singleNodePlan.convertToVectoriezd();
}
```
9. Runtime Filter Join:Doris 在进行 Hash Join 计算时会在右表构建一个哈希表,左表流式的通过右表的哈希表从而得出 Join 结果。而 RuntimeFilter 就是充分利用了右表的 Hash 表,在右表生成哈希表的时,同时生成一个基于哈希表数据的一个过滤条件,然后下推到左表的数据扫描节点
```
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());
```
创建 **singleNodePlanner** 主要代码:**createSingleNodePlan()**
#### (六)DistributedPlan
分布式查询计划 PlanFragmentTree ,每个 PlanFragment 是由
PlanNodeTree 的子树 和 Sink 节点组成的。分布式化的目标是最小化数据移动和最大化本地 Scan。
分布式查询计划 PlanFragmentTree ,每个 PlanFragment 是由
PlanNodeTree 的子树 和 Sink 节点组成的。分布式化的目标是最小化数据移动和最大化本地 Scan。
每个 PlanFragment 由 PlanNodeTree 和 Data Sink 组成,我们从上图的 Plan Fragment 2 可以看出,由 AggregationNode、HashJoinNode 和 DataSink。Plan 分布式化的方法是增加 ExchangeNode,执行计划树会以 ExchangeNode 为边界拆分为 PlanFragment。
ExchangeNode 主要是用于 BE 之间的数据交换与共享,类似 Spark 和 MR 中的 Shuffle。
各个 Fragment 的数据流转和最终的结果发送依赖:DataSink。比如 DataStreamSink 会将一个 Fragment 的数据发送到另一个 Fragment 的 ExchangeNode,ResultSink 会将查询的结果集发送到 FE。
每个 PlanFragment 可以在每个 BE 节点生成 1 个或多个执行实例,不同执行实例处理不同的数据集,通过并发来提升查询性能。
DistributedPlanner 中最主要的工作是决定 Join 的分布式执行策略:Shuffle Join,Bucket Join,Broadcast Join,Colocate Join,和增加 Aggregation 的 Merge 阶段。
决定 Join 的分布式执行策略的逻辑如下:
如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join
如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。
如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join
如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。
#### (七)Schedule
生成了 Plan Fragment Tree 之后,Apache Doris FE 通过 Coordinator 类对 Fragment 进行分配、分发步骤,主要涉及的方法有:computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment()。
生成了 Plan Fragment Tree 之后,Apache Doris FE 通过 Coordinator 类对 Fragment 进行分配、分发步骤,主要涉及的方法有:computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment()。
* computeScanRangeAssignment():主要逻辑对fragment合理分配,尽可能保证每个BE节点的请求都是平均。
* computeFragmentExecParams():处理Fragment执行参数。
* sendFragment():发送Fragment至BE节点,
#### (八)Execute
Doris 的查询执行模式 Volcano 模式,不过做了 Batch 的优化,不同的 operator 之间以 RowBatch 的方式传输数据。
BE 的 BackendService 会接收 FE 的 查询请求,让 FragmentMgr 进行处理。 FragmentMgr::exec_plan_fragment 会启动一个线程由 PlanFragmentExecutor 具体执行一个 plan fragment。PlanFragmentExecutor 会根据 plan fragment 创建一个 ExecNode 树,FE 每个 PlanNode 都会对应 ExecNode 的一个子类。
PlanFragmentExecutor::get_next_internal 会驱动整个 ExecNode 树的执行,会自顶向下调用每个 ExecNode 的 get_next 方法,最终数据会从 ScanNode 节点产生,向上层节点传递,每个节点都会按照自己的逻辑处理 RowBatch。 PlanFragmentExecutor 在拿到每个 RowBatch 后,如果是中间结果,就会将数据传输给其他 BE 节点,如果是最终结果,就会将数据传输给 FE 节点。
## 五、参考献文
* Apache Doris Join原理
> https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F
* Apache Doris 存储层设计
> https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html
* Apache Doris 元数据涉及
> https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584
* Apache Doris 查询原理
> https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258C
* [Apache Doris Join原理](https://doris.apache.org/zh-CN/advanced/join-optimization/doris-join-optimization.html#doris-shuffle-%25E6%2596%25B9%25E5%25BC%258F)
* [Apache Doris 存储层设计](https://doris.apache.org/zh-CN/article/articles/doris-storage-reader-compaction.html)
* [Apache Doris 元数据涉及](https://doris.apache.org/zh-CN/design/metadata-design.html#%25E5%2585%2583%25E6%2595%25B0%25E6%258D%25AE%25E7%25BB%2593%25E6%259E%2584)
* [Apache Doris 查询原理](https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%25E6%2589%25A7%25E8%25A1%258C)
## 六、实践分享
* [Apache Doris 在网易互娱的应用实践](https://mp.weixin.qq.com/s/3gQiN6trYmmXVsuZVXNl5Q)
* [Apache Doris 在知乎用户画像与实时数据的架构与实践](https://mp.weixin.qq.com/s/i5qbiKN6ruOk2Snpyy6DBw)
* [Apache Doris 物化视图与索引在京东的典型应用](https://mp.weixin.qq.com/s/3WAdi40yg7dRt2QNWcTARw)
* [Apache Doris Join 实现与调优实践](https://mp.weixin.qq.com/s/pukjERSOW-D-BM4z1G9JlA)
## 七、总结
本文主要介绍查询 SQL 在 Apache Doris Fe 节点经历 parse、analyze、rewrite、GenerateQueryPlan、schedule、send 等阶段处理。Apache Doris Fe 的 parse、analyze、rewrite 阶段和其他数据库处理过程差不多,本文主要讲解的核心是 GenerateQueryPlan、schedule、send 阶段的原理。我们可以深度了解 Apache Doris Fe 节点对查询 SQL 的优化操作,以及未来遇到相关性能问题不会无从下手。