1.flink sql 解析
方法1:直接创建 flink sql parser 解析多行 sql
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-parser</artifactId>
<version>1.12.3</version>
<dependency>
SqlParser sqlParser = SqlParser.create(sql, SqlParser.config()
.withParserFactory(FlinkSqlParserImpl.FACTORY)
.withQuoting(Quoting.BACK_TICK)
.withUnquotedCasing(Casing.TO_LOWER)
.withQuotedCasing(Casing.UNCHANGED)
.withConformance(FlinkSqlConformance.DEFAULT)
);
SqlNodeList sqlNodeList = sqlParser.parseStmtList();
List<SqlNode> sqlNodes = sqlNodeList.getList();
for (SqlNode sqlNode : sqlNodes) {
switch (sqlNode.getKind()) {
case INSERT:
SqlNodeList targetColumnNodes = ((SqlInsert) sqlNode).getTargetColumnList();
// todo
break;
case SELECT:
SqlNodeList nodes = ((SqlSelect) sqlNode).getSelectList();
// todo
break;
case XXX:
// todo
default:
// ignore
}
}
方法2:通过 table env 获取的 flink sql parser 解析单行 sql
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.12.3</version>
</dependency>
//1. 先创建ExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tableEnv = StreamTableEnvironment.create(env, settings);
//2. 从这个TableEnv中获取ParserImpl
Parser parserImpl = ((TableEnvironmentImpl) tableEnv).getParser();
// parse the sql
SqlNode sqlNode = parserImpl.parse(statement);
// todo
if(sqlNode.getKind() == SELECT){
// ...
}
2.flink sql 校验
参考 flink sql 单条 sql 校验和上下文校验源码
// 单条 sql 校验
// ParserImpl parse
@Override
public List<Operation> parse(String statement) {
//...省略
// 单条 sql 校验
Operation operation =
SqlToOperationConverter.convert(planner, catalogManager, parsed)
.orElseThrow(() -> new TableException("Unsupported query: " + statement));
return Collections.singletonList(operation);
}
// 上下文校验
// 1.TableEnvironmentImpl executeSql
@Override
public TableResult executeSql(String statement) {
List<Operation> operations = parser.parse(statement);
if (operations.size() != 1) {
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
}
return executeOperation(operations.get(0));
}
// 2.TableEnvironmentImpl executeOperation
// 最关键是catalogManager,用于管理table,例如创建、删除等
private TableResult executeOperation(Operation operation) {
if (operation instanceof ModifyOperation) {
return executeInternal(Collections.singletonList((ModifyOperation) operation));
} else if (operation instanceof CreateTableOperation) {
CreateTableOperation createTableOperation = (CreateTableOperation) operation;
if (createTableOperation.isTemporary()) {
catalogManager.createTemporaryTable(
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
} else {
catalogManager.createTable(
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
} else if (operation instanceof DropTableOperation) {
DropTableOperation dropTableOperation = (DropTableOperation) operation;
if (dropTableOperation.isTemporary()) {
catalogManager.dropTemporaryTable(
dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
} else {
catalogManager.dropTable(
dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
}
return TableResultImpl.TABLE_RESULT_OK;
}
...省略
}
问题1:如何获取 Planner 和 CatalogManager,进而利用 SqlToOperationConverter 校验单个 sqlNode?
问题2:如何进行 sql 上下文校验?
// 通过 table env 获取 Planner 和 CatalogManager
CatalogManager catalogManager = ((TableEnvironmentImpl) tableEnv).getCatalogManager();
StreamPlanner planner = (StreamPlanner) ((TableEnvironmentImpl) tableEnv).getPlanner();
//创建 flink planner 实例
FlinkPlannerImpl flinkPlanner = planner.createFlinkPlanner();
// 校验单条 sqlNode
Operation operation = SqlToOperationConverter.convert(planner, catalogManager, sqlNode).get()
// 上下文校验
// 参考 TableEnvironmentImpl executeOperation
if (operation instanceof ModifyOperation) {
//...省略
} else if (operation instanceof CreateTableOperation) {
CreateTableOperation createTableOperation = (CreateTableOperation) operation;
if (createTableOperation.isTemporary()) {
catalogManager.createTemporaryTable(
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
} else {
catalogManager.createTable(
createTableOperation.getCatalogTable(),
createTableOperation.getTableIdentifier(),
createTableOperation.isIgnoreIfExists());
}
} else if (operation instanceof DropTableOperation) {
DropTableOperation dropTableOperation = (DropTableOperation) operation;
if (dropTableOperation.isTemporary()) {
catalogManager.dropTemporaryTable(
dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
} else {
catalogManager.dropTable(
dropTableOperation.getTableIdentifier(), dropTableOperation.isIfExists());
}
}
//...省略