【Flink SQL】Apache Calcite 架构剖析

1.简介和架构

Apache Calcite 是一个动态的数据管理框架, 可以实现 SQL 的解析、验证、优化和执行。Calcite 是模块化插件式的, 解析、验证、优化和执行的步骤都对应着一个相对独立的模块。用户可以选择使用其中的一个或多个模块,也可以对任意模型进行定制化扩展。

Calcite 的架构如下图所示,
JDBC:构建了一个独立的 Avatica 框架,可以通过标准的 JDBC 接口访问 Calcite 获取数据。
SQL ParserSQL Validator:可以进行 SQL 的解析和验证,,并将原始的 SQL 字符串解析并转化为内部的 SqlNode 树来表示。
Query Optimizer:进行查询优化,,基于在关系代数在 Calcite 内部有一种关系代数表示方法,将关系代数表示为 RelNode 树。RelNode 树不只是由 SqlNode 树转化而来,也可以通过Calcite 提供的 Expressions Builder 接口构建。

image.png

说明:Calcite 包含许多组成典型数据库管理系统的部件,但是省略了一些关键的组成部分,例如数据的存储、处理数据的算法和存储元数据的存储库等。因为对不同的数据类型有不同的存储和计算引擎,是不可能将它们统一到一个框架的,所以 Calcite 是一个统一的 SQL 接口实现数据访问框架

2.SQL 处理流程

如下图所示,Calcite 的处理流程实际上就是 SQL 的解析、校验、优化和执行。
Parser:解析 SQL,将输入的 SQL 字符串转化为抽象语法树(AST),即 SqlNode 树表示
Validator:根据元数据信息对 SqlNode 树进行验证, 其输出仍是 SqlNode
Converter:将 SqlNode 树转化为关系代数,其中 RelNode 树表示关系代数
Optimizer:对输入的关系代数进行优化,并输出优化后的 RelNode
Execute:根据优化后的 RelNode 生成执行计划

image.png

demo利用 Calcite 实现使用 SQL 访问 CSV 文件

3.案例分析

users 表的内容:

id:string,name:string,age:int
1,Jack,28
2,John,21
3,Tom,32
4,Peter,24

orders 表内容:

id:string,user_id:string,goods:string,price:double
001,1,Cola,3.5
002,1,Hat,38.9
003,2,Shoes,199.9
004,3,Book,39.9
005,4,Phone,2499.9

查询的 SQL 语句:

SELECT u.id, name, age, sum(price)
FROM users AS u join orders AS o ON u.id = o.user_id
WHERE age >= 20 AND age <= 30
GROUP BY u.id, name, age
ORDER BY u.id

3.1 SQL 解析

通过词法分析和语法分析将 SQL 字符串转化为 AST。在Calcite中, 借助 JavaCC 实现了 SQL 的解析, 并转化为 SqlNode 表示。
SqlNode 是 AST 的抽象基类,不同类型的节点有对应的实现类。下面的 SQL 语句会生成 SqlSelectSqlOrderBy 两个主要的节点。

String sql = "SELECT u.id, name, age, sum(price) " +
    "FROM users AS u join orders AS o ON u.id = o.user_id " +
    "WHERE age >= 20 AND age <= 30 " +
    "GROUP BY u.id, name, age " +
    "ORDER BY u.id";
// 创建SqlParser, 用于解析SQL字符串
SqlParser parser = SqlParser.create(sql, SqlParser.Config.DEFAULT);
// 解析SQL字符串, 生成SqlNode树
SqlNode rootSqlNode = parser.parseStmt();

上述代码中的 rootSqlNode 是 AST 的根节点。如下图所示,可以看到 rootSqlNodeSqlOrderBy 类型,其中 query 字段是一个 SqlSelect 类型,即代表原始的 SQL 语句去掉ORDER BY 部分。

image.png

3.2 SQL 校验

SQL 校验阶段一方面会借助元数据信息执行上述验证,另一方面会对 SqlNode 树进行一些改写,以转化为统一的格式。

// 创建Schema, 一个Schema中包含多个表
SimpleTable userTable = SimpleTable.newBuilder("users")
    .addField("id", SqlTypeName.VARCHAR)
    .addField("name", SqlTypeName.VARCHAR)
    .addField("age", SqlTypeName.INTEGER)
    .withFilePath("/path/to/user.csv")
    .withRowCount(10)
    .build();
SimpleTable orderTable = SimpleTable.newBuilder("orders")
    .addField("id", SqlTypeName.VARCHAR)
    .addField("user_id", SqlTypeName.VARCHAR)
    .addField("goods", SqlTypeName.VARCHAR)
    .addField("price", SqlTypeName.DECIMAL)
    .withFilePath("/path/to/order.csv")
    .withRowCount(10)
    .build();
SimpleSchema schema = SimpleSchema.newBuilder("s")
    .addTable(userTable)
    .addTable(orderTable)
    .build();    
CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false);
rootSchema.add(schema.getSchemaName(), schema);

RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();

// 创建CatalogReader, 用于指示如何读取Schema信息
Prepare.CatalogReader catalogReader = new CalciteCatalogReader(
    rootSchema,
    Collections.singletonList(schema.getSchemaName()),
    typeFactory,
    config);
// 创建SqlValidator, 用于执行SQL验证
SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT
    .withLenientOperatorLookup(config.lenientOperatorLookup())
    .withSqlConformance(config.conformance())
    .withDefaultNullCollation(config.defaultNullCollation())
    .withIdentifierExpansion(true);
SqlValidator validator = SqlValidatorUtil.newValidator(
    SqlStdOperatorTable.instance(), catalogReader, typeFactory, validatorConfig);
// 执行SQL验证
SqlNode validateSqlNode = validator.validate(node);

如下图可知,SQL 验证后的输出结果仍是 SqlNode 树。不过其内部结构发生了改变,一个明显的变化是验证后的 SqlOrderBy 节点被改写为了 SqlSelect 节点,并在其 orderBy 变量中记录了排序字段。

image.png

如果把表名或者字段写错,validator.validate(node) 运行时在就会报错。如果把验证前后的SqlNode 完全打印出来,可以发现在校验时会为每个字段加上表名限定。

-- 验证前的SqlNode树打印结果
SELECT `u`.`id`, `name`, `age`, SUM(`price`)
FROM `users` AS `u`
INNER JOIN `orders` AS `o` ON `u`.`id` = `o`.`user_id`
WHERE `age` >= 20 AND `age` <= 30
GROUP BY `u`.`id`, `name`, `age`
ORDER BY `u`.`id`

-- 验证后的SqlNode树打印结果
SELECT `u`.`id`, `u`.`name`, `u`.`age`, SUM(`o`.`price`)
FROM `s`.`users` AS `u`
INNER JOIN `s`.`orders` AS `o` ON `u`.`id` = `o`.`user_id`
WHERE `u`.`age` >= 20 AND `u`.`age` <= 30
GROUP BY `u`.`id`, `u`.`name`, `u`.`age`
ORDER BY `u`.`id`

3.3 转换为关系代数 RelNode

关系代数是 SQL 的理论基础,可以阅读 Introduction of Relational Algebra in DBMS简单了解,其中“数据库系统概念“中对关系代数有更深入的介绍。

在 Calcite 中, 关系代数由 RelNode 表示。如下代码所示,将校验后的 SqlNode 树转化为RelNode树。

// 创建VolcanoPlanner, VolcanoPlanner在后面的优化中还需要用到
VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.of(config));
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
// 创建SqlToRelConverter
RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
SqlToRelConverter.Config converterConfig = SqlToRelConverter.config()
    .withTrimUnusedFields(true)
    .withExpand(false);
SqlToRelConverter converter = new SqlToRelConverter(
    null,
    validator,
    catalogReader,
    cluster,
    StandardConvertletTable.INSTANCE,
    converterConfig);
// 将SqlNode树转化为RelNode树
RelNode relNode = converter.convertQuery(validateSqlNode, false, true);

RelNode 树实质上是一个逻辑执行计划,上述 SQL 对应的逻辑执行计划如下,其中每一行都表示一个节点,即 RelNode 的实现类。

LogicalSort(sort0=[$0], dir0=[ASC])
  LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[SUM($3)])
    LogicalProject(id=[$0], name=[$1], age=[$2], price=[$6])
      LogicalFilter(condition=[AND(>=($2, 20), <=($2, 30))])
        LogicalJoin(condition=[=($0, $4)], joinType=[inner])
          LogicalTableScan(table=[[s, users]])
          LogicalTableScan(table=[[s, orders]])

3.4 查询优化

查询优化是 Calcite 的核心模块,主要有三部分组成:
Planner rules:优化规则,例如内置优化规则有谓词下推、投影下推等。用户也可定义自己的优化规则。
Metadata providers:元数据,主要用于基于成本的优化(Cost-based Optimize 即 CBO),包括表的行数、表的大小、给定列的值是否唯一等信息。
Planner engines:优化器实现,HepPlanner 用于实现基于规则的优化(Rule-based Optimize 即 RBO),VolcanoPlanner 用于实现基于成本的优化。

// 优化规则
RuleSet rules = RuleSets.ofList(
    CoreRules.FILTER_TO_CALC,
    CoreRules.PROJECT_TO_CALC,
    CoreRules.FILTER_CALC_MERGE,
    CoreRules.PROJECT_CALC_MERGE,
    CoreRules.FILTER_INTO_JOIN,     // 过滤谓词下推到Join之前
    EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE,
    EnumerableRules.ENUMERABLE_PROJECT_TO_CALC_RULE,
    EnumerableRules.ENUMERABLE_FILTER_TO_CALC_RULE,
    EnumerableRules.ENUMERABLE_JOIN_RULE,
    EnumerableRules.ENUMERABLE_SORT_RULE,
    EnumerableRules.ENUMERABLE_CALC_RULE,
    EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
Program program = Programs.of(RuleSets.ofList(rules));
RelNode optimizerRelTree = program.run(
    planner,
    relNode,
    relNode.getTraitSet().plus(EnumerableConvention.INSTANCE),
    Collections.emptyList(),
    Collections.emptyList());

经过优化后的输出如下,可知所有的节点都变成了 Enumerable 开头的物理节点,其基类是EnumerableRel

EnumerableSort(sort0=[$0], dir0=[ASC])
  EnumerableAggregate(group=[{0, 1, 2}], EXPR$3=[SUM($3)])
    EnumerableCalc(expr#0..6=[{inputs}], proj#0..2=[{exprs}], price=[$t6])
      EnumerableHashJoin(condition=[=($0, $4)], joinType=[inner])
        EnumerableCalc(expr#0..2=[{inputs}], expr#3=[Sarg[[20..30]]], expr#4=[SEARCH($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])
          EnumerableTableScan(table=[[s, users]])
        EnumerableTableScan(table=[[s, orders]])

优化前后的计划:users 表的过滤位置发生了变动,从先 Join 后过滤,变成了先过滤后 Join,如下图所示。

image.png

3.5 执行计划

将物理计划转化为执行计划通常需要自定义代码。Calcite 提供了一种执行计划生成方法,如下所示,可以生成执行计划并读取CSV文件中的数据。

EnumerableRel enumerable = (EnumerableRel) optimizerRelTree;
Map<String, Object> internalParameters = new LinkedHashMap<>();
EnumerableRel.Prefer prefer = EnumerableRel.Prefer.ARRAY;
Bindable bindable = EnumerableInterpretable.toBindable(internalParameters,
                                                       null, enumerable, prefer);
Enumerable bind = bindable.bind(new SimpleDataContext(rootSchema.plus()));
Enumerator enumerator = bind.enumerator();
while (enumerator.moveNext()) {
    Object current = enumerator.current();
    Object[] values = (Object[]) current;
    StringBuilder sb = new StringBuilder();
    for (Object v : values) {
        sb.append(v).append(",");
    }
    sb.setLength(sb.length() - 1);
    System.out.println(sb);
}

执行结果:

1,Jack,28,42.40
2,John,21,199.90
4,Peter,24,2499.90

参考
[1] SQL over anything with an Optiq Adapter
[2] Apache Calcite 处理流程详解(一)
[3] 编译原理实践 - JavaCC 解析表达式并生成抽象语法树.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容