Flink源码阅读之Sql-Client的执行原理

前言

sql-cli相信大家都用过,通过sql-client.sh embedded启动就会进入交互界面,每条sql都可以单独执行。在功能调试时非常方便,还有进入界面的那个大松鼠相当可爱。

脚本

先上脚本代码

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

################################################################################
# Adopted from "flink" bash script
################################################################################

target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
    if [ "$iteration" -gt 100 ]; then
        echo "Cannot resolve path: You have a cyclic symlink in $target."
        break
    fi
    ls=`ls -ld -- "$target"`
    target=`expr "$ls" : '.* -> \(.*\)$'`
    iteration=$((iteration + 1))
done

# Convert relative path to absolute path
bin=`dirname "$target"`

# get flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

CC_CLASSPATH=`constructFlinkClassPath`

################################################################################
# SQL client specific logic
################################################################################

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)

# get path of jar in /opt if it exist
FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar")

# add flink-python jar to the classpath
if [[ ! "$CC_CLASSPATH" =~ .*flink-python.*.jar ]]; then
    FLINK_PYTHON_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-python.*.jar")
    if [ -n "$FLINK_PYTHON_JAR" ]; then
        CC_CLASSPATH="$CC_CLASSPATH:$FLINK_PYTHON_JAR"
    fi
fi

# check if SQL client is already in classpath and must not be shipped manually
if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then

    # start client without jar
    exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.client.SqlClient "$@"

# check if SQL client jar is in /opt
elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then

    # start client with jar
    exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`"

# write error message to stderr
else
    (>&2 echo "[ERROR] Flink SQL Client JAR file 'flink-sql-client*.jar' neither found in classpath nor /opt directory should be located in $FLINK_OPT_DIR.")

    # exit to force process failure
    exit 1
fi

内容比较简单,直接看最后启动命令调了那个类,可以看到是SqlClient。接下来就可以进入代码了。

代码

main方法:

public static void main(String[] args) {
        if (args.length < 1) {
            CliOptionsParser.printHelpClient();
            return;
        }

        switch (args[0]) {

            case MODE_EMBEDDED:
                // remove mode
                final String[] modeArgs = Arrays.copyOfRange(args, 1, args.length);
                final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
                if (options.isPrintHelp()) {
                    CliOptionsParser.printHelpEmbeddedModeClient();
                } else {
                    try {
                        final SqlClient client = new SqlClient(true, options);
                        client.start();
                    } catch (SqlClientException e) {
                        // make space in terminal
                        System.out.println();
                        System.out.println();
                        LOG.error("SQL Client must stop.", e);
                        throw e;
                    } catch (Throwable t) {
                        // make space in terminal
                        System.out.println();
                        System.out.println();
                        LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);
                        throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t);
                    }
                }
                break;

            case MODE_GATEWAY:
                throw new SqlClientException("Gateway mode is not supported yet.");

            default:
                CliOptionsParser.printHelpClient();
        }
    }

目前只支持embedded模式,后面还支持一些参数。对参数做解析,然后启动SqlClient。在start方法中主要做几件事:

  1. 根据配置加载一些依赖
  2. 启动gataway,用来和其他系统交互
  3. 构造environment
  4. 添加hook在程序结束时做一些事情
  5. 正式开始界面操作
private void start() {
        if (isEmbedded) {
            // create local executor with default environment
            final List<URL> jars;
            if (options.getJars() != null) {
                jars = options.getJars();
            } else {
                jars = Collections.emptyList();
            }
            final List<URL> libDirs;
            if (options.getLibraryDirs() != null) {
                libDirs = options.getLibraryDirs();
            } else {
                libDirs = Collections.emptyList();
            }
            final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs);
            executor.start();

            // create CLI client with session environment
            final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
            appendPythonConfig(sessionEnv, options.getPythonConfiguration());
            final SessionContext context;
            if (options.getSessionId() == null) {
                context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
            } else {
                context = new SessionContext(options.getSessionId(), sessionEnv);
            }

            // Open an new session
            String sessionId = executor.openSession(context);
            try {
                // add shutdown hook
                Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));

                // do the actual work
                openCli(sessionId, executor);
            } finally {
                executor.closeSession(sessionId);
            }
        } else {
            throw new SqlClientException("Gateway mode is not supported yet.");
        }
    }

执行sql语句是借助于CliClient

private void openCli(String sessionId, Executor executor) {
        CliClient cli = null;
        try {
            Path historyFilePath;
            if (options.getHistoryFilePath() != null) {
                historyFilePath = Paths.get(options.getHistoryFilePath());
            } else {
                historyFilePath = Paths.get(System.getProperty("user.home"),
                        SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");
            }
            cli = new CliClient(sessionId, executor, historyFilePath);
            // interactive CLI mode
            if (options.getUpdateStatement() == null) {
                cli.open();
            }
            // execute single update statement
            else {
                final boolean success = cli.submitUpdate(options.getUpdateStatement());
                if (!success) {
                    throw new SqlClientException("Could not submit given SQL update statement to cluster.");
                }
            }
        } finally {
            if (cli != null) {
                cli.close();
            }
        }
    }

在open方法中接受sql-cli界面输入的sql语句进行解析,以分号作为一条sql的结束,借助SqlCommandParser对命令做解析,根据不同命令做不同处理。

open() {
        isRunning = true;

        // print welcome
        terminal.writer().append(CliStrings.MESSAGE_WELCOME);

        // begin reading loop
        while (isRunning) {
            // make some space to previous command
            terminal.writer().append("\n");
            terminal.flush();

            final String line;
            try {
                line = lineReader.readLine(prompt, null, (MaskingCallback) null, null);
            } catch (UserInterruptException e) {
                // user cancelled line with Ctrl+C
                continue;
            } catch (EndOfFileException | IOError e) {
                // user cancelled application with Ctrl+D or kill
                break;
            } catch (Throwable t) {
                throw new SqlClientException("Could not read from command line.", t);
            }
            if (line == null) {
                continue;
            }
            final Optional<SqlCommandCall> cmdCall = parseCommand(line);
            cmdCall.ifPresent(this::callCommand);
        }
    }

callCommand这个方法比较长,就是根据不同的sql执行不同的操作
比如create table的sql调用的是callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_CREATED);方法
最终会调用TableEnvironment#executeSql方法。
insert或insert overwrite调用的是callInsert(cmdCall)最终会调用TableEnvironment#sqlUpdate,这个方法已经过时。
其他的sql都是类似,感兴趣的可以跟代码进去看,再往后就是跟sql程序一样了,sql验证→转换→优化→翻译成transformation→提交执行。可以看Flink源码阅读之Flinksql执行流程这篇文章。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容