Hive文件执行的方法主要是 CliDriver 中的 processFile,语句执行的主要在 processLine 方法中,这一篇主要看这两个方法中主要做了什么?
1. processFile
processFile 中最主要的代码如下:
rc = processReader(bufferReader);
我们可以看到其实 processReader 只是把文件读进来,去掉注释语句,然后对其调用processLine 方法,所以我们主要需要分析 processLine 方法。
public int processReader(BufferedReader r) throws IOException {
String line;
StringBuilder qsb = new StringBuilder();
while ((line = r.readLine()) != null) {
// Skipping through comments
if (! line.startsWith("--")) {
qsb.append(line + "\n");
}
}
return (processLine(qsb.toString()));
}
2. processLine
2.1 整体代码结构
- processLine中的主要代码如下:
public int processLine(String line, boolean allowInterrupting) {
...............
// we can not use "split" function directly as ";" may be quoted
int lastRet = 0, ret = 0;
List<String> commands = splitSemiColon(line);
String command = "";
for (String oneCmd : commands) {
if (StringUtils.endsWith(oneCmd, "\\")) {
command += StringUtils.chop(oneCmd) + ";";
continue;
} else {
command += oneCmd;
}
if (StringUtils.isBlank(command)) {
continue;
}
ret = processCmd(command);
command = "";
lastRet = ret;
}
................
}
从以上代码可以看出,processLine 会把我们写的所有SQL语句用分号 ; 拆开然后对每一句执行 processCmd
方法,有一句不成功就会返回失败
- processCmd 方法
public int processCmd(String cmd) {
CliSessionState ss = (CliSessionState) SessionState.get();
ss.setLastCommand(cmd);
ss.updateThreadName();
// Flush the print stream, so it doesn't include output from the last command
ss.err.flush();
String cmd_trimmed = cmd.trim();
String[] tokens = tokenizeCmd(cmd_trimmed);
int ret = 0;
// 处理退出命令
if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
ss.close();
System.exit(0);
} else if (tokens[0].equalsIgnoreCase("source")) { // 处理source 开头语句
String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
cmd_1 = .........
.........
ret = processFile(cmd_1); // source filename; 最终回归到processFile方法
.........
} else if (cmd_trimmed.startsWith("!")) { // !开头的命令则调用ShellCmdExecutor中的 execute方法执行
String shell_cmd = cmd_trimmed.substring(1);
shell_cmd = .........
.........
ShellCmdExecutor executor = new ShellCmdExecutor(shell_cmd, ss.out, ss.err);
ret = executor.execute();
.........
} else { // local mode
..........
CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);
ret = processLocalCmd(cmd, proc, ss); // 这个是大多数sql的主要执行方法
..........
}
从以上代码可以看到最需要关注的方法是 processLocalCmd
- processLocalCmd 方法
int processLocalCmd(String cmd, CommandProcessor proc, CliSessionState ss) {
.............
int ret = 0;
.............
if (proc != null) {
if (proc instanceof Driver) {
Driver qp = (Driver) proc;
.........
ret = qp.run(cmd).getResponseCode(); // 当 proc 属于 Driver对象的时候主要执行的方法
.........
} else {
String firstToken = tokenizeCmd(cmd.trim())[0];
String cmd_1 = getFirstCmd(cmd.trim(), firstToken.length());
.........
CommandProcessorResponse res = proc.run(cmd_1); // 当 proc 为非 Driver对象的时候主要执行的方法
.........
ret = res.getResponseCode();
}
.............
return ret;
}
从上面代码可以看出,proc这个对象有Driver类和非Driver类两种,所以重点需要关注什么时候得到的是 Driver对象,什么时候得到的是非Driver对象,才能知道什么样的sql执行Driver类中的run方法,什么样的sql执行非Driver类中的run方法。
所以接下来就具体看proc这个对象是如何获取的。
2.2 proc的获取(CommandProcessor对象)
proc的获取是通过processCmd中的如下这句代码获取的,其中tokens是一句SQL语句用空白符分隔开的字符串数组(即组成SQL的一个一个词组成的数组)
CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);
工厂中对应的get方法如下:
public static CommandProcessor get(String[] cmd, HiveConf conf) {
CommandProcessor result = getForHiveCommand(cmd, conf);
if (result != null) return result;
.........
if (conf == null) return new Driver();
Driver drv = mapDrivers.get(conf);
if (drv == null) {
drv = new Driver();
mapDrivers.put(conf, drv);
} else {
drv.resetQueryState();
}
drv.init();
return drv;
}
}
从代码中可知除了getForHiveCommand(cmd, conf)能返回非空对象的情况外,其他情况得到的都是Driver对象,并且Driver对象可复用。
所以要看getForHiveCommand方法,这个方法中就简单调用了 getForHiveCommandInternal
方法,代码如下
public static CommandProcessor getForHiveCommandInternal(String[] cmd, HiveConf conf, boolean testOnly)
HiveCommand hiveCommand = HiveCommand.find(cmd, testOnly);
if (hiveCommand == null || isBlank(cmd[0])) {
return null;
}
Set<String> availableCommands = new HashSet<String>();
// HIVE_SECURITY_COMMAND_WHITELIST默认值:set,reset,dfs,add,list,delete,reload,compile
for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST).split(",")) {
availableCommands.add(availableCommand.toLowerCase().trim());
}
if (cmd.length > 1 && "reload".equalsIgnoreCase(cmd[0]) && "function".equalsIgnoreCase(cmd[1])) {
// special handling for SQL "reload function"
return null;
}
switch (hiveCommand) {
case SET:
return new SetProcessor();
case RESET:
return new ResetProcessor();
case DFS:
SessionState ss = SessionState.get();
return new DfsProcessor(ss.getConf());
case ADD:
return new AddResourceProcessor();
case LIST:
return new ListResourceProcessor();
case DELETE:
return new DeleteResourceProcessor();
case COMPILE:
return new CompileProcessor();
case RELOAD:
return new ReloadProcessor();
case CRYPTO:
return new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(), conf);
.........
}
}
// hiveCommand的获取,HiveCommand是一个枚举类,
// 包含枚举对象 SET(),RESET(),DFS(),CRYPTO(true),ADD(),LIST(),RELOAD(),DELETE(),COMPILE();
public static HiveCommand find(String[] command, boolean findOnlyForTesting) {
if (null == command){
return null;
}
String cmd = command[0];
if (cmd != null) {
cmd = cmd.trim().toUpperCase();
if (command.length > 1 && "role".equalsIgnoreCase(command[1])) {
// special handling for set role r1 statement
return null;
} else if(command.length > 1 && "from".equalsIgnoreCase(command[1])) {
//special handling for SQL "delete from <table> where..."
return null;
} else if(command.length > 1 && "set".equalsIgnoreCase(command[0]) && "autocommit".equalsIgnoreCase(command[1])) {
return null;//don't want set autocommit true|false to get mixed with set hive.foo.bar...
} else if (COMMANDS.contains(cmd)) {
HiveCommand hiveCommand = HiveCommand.valueOf(cmd);
return hiveCommand;
}
}
return null;
}
// COMMANDS的来源
private static final Set<String> COMMANDS = new HashSet<String>();
static {
for (HiveCommand command : HiveCommand.values()) {
COMMANDS.add(command.name());
}
}
由上述代码可以看到不同SQL会得到不同的CommandProcessor对象proc,主要有如下这些情况:
SQL语句 | 返回的CommandProcessor对象 |
---|---|
set .... 并且不是 set role ..../ set autocommit... | new SetProcessor() |
reset .... | new ResetProcessor() |
dfs .... | new DfsProcessor(ss.getConf()) |
crypto .... | new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(), conf) |
add .... | new AddResourceProcessor() |
list .... | new ListResourceProcessor() |
reload .... | new ReloadProcessor() |
delete .... 并且不是 delete from ...... | new DeleteResourceProcessor() |
compile .... | new CompileProcessor() |
其他语句 | Driver对象 |
3. 总结
由上面源码的分析可以得出,除了set ...、dfs ...、add ... 这种不是对表中数据进行操作的sql语句,其他都是通过Driver类对象的run方法执行。
下一节主要看Driver中的run方法如何运行sql语句。