基于hive2.3.4, 这里以CLI方式提交Sql为例 :
- 启动Driver
沿着调用:
org.apache.hadoop.hive.cli.CliDriver#main
-->org.apache.hadoop.hive.cli.CliDriver#run
-->org.apache.hadoop.hive.cli.CliDriver#executeDriver
-->org.apache.hadoop.hive.cli.CliDriver#processLine
-->org.apache.hadoop.hive.cli.CliDriver#processCmd
//这里Driver对象产生
public int processCmd(String cmd) {
......
//对指令不同情况的判断
if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
ss.close();
System.exit(0);
} else if (tokens[0].equalsIgnoreCase("source")) {
......
} else if (cmd_trimmed.startsWith("!")) {
......
} else { // local mode
try {
CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);//这里得到Diver对象
ret = processLocalCmd(cmd, proc, ss);//继续向下调用
} catch (SQLException e) {
console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
}
......
}
- 解析&触发执行
沿着上面的调用链:
org.apache.hadoop.hive.cli.CliDriver#processLocalCmd
-->org.apache.hadoop.hive.ql.Driver#run(java.lang.String, boolean)
-->org.apache.hadoop.hive.ql.Driver#runInternal
在这个方法里,sql得到解析和触发执行
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled){
......
if (!alreadyCompiled) {
// compile internal will automatically reset the perf logger
ret = compileInternal(command, true);//这里是解析sql的入口
// then we continue to use this perf logger
perfLogger = SessionState.getPerfLogger();
if (ret != 0) {
return createProcessorResponse(ret);
}
} else {
......
}
......
if (requiresLock()) {
// a checkpoint to see if the thread is interrupted or not before an expensive operation
if (isInterrupted()) {
ret = handleInterruption("at acquiring the lock.");
} else {
ret = acquireLocksAndOpenTxn(startTxnImplicitly);
}
if (ret != 0) {
return rollback(createProcessorResponse(ret));
}
}
ret = execute(true);//这里是执行入口
if (ret != 0) {
//if needRequireLock is false, the release here will do nothing because there is no lock
return rollback(createProcessorResponse(ret));
}
......
}
- 提交任务
org.apache.hadoop.hive.ql.Driver#execute(boolean)
先看下这个方法, 这里很清晰,不停的从队列中取出task
并launch
public int execute(boolean deferClose) throws CommandNeedRetryException {
......
// Loop while you either have tasks running, or tasks queued up
while (driverCxt.isRunning()) {
// Launch upto maxthreads tasks
Task<? extends Serializable> task;
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs,driverCxt);//在这里launch
if (!runner.isRunning()) {
break;
}
}
......
}
......
}
补充一句:这里的一个task
,其实就是hive里面的一个Stage
,也对应mapreduce程序里的一个job, 如下图:
继续往下看 org.apache.hadoop.hive.ql.Driver#launchTask
方法
// Launch Task
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
// Launch it in the parallel mode, as a separate thread only for MR tasks
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in parallel");
}
tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
tskRun.start();
} else {
if (LOG.isInfoEnabled()){
LOG.info("Starting task [" + tsk + "] in serial mode");
}
tskRun.runSequential();
}
这里分两种情况
条件1:如果设置了并行执行模式,即"hive.exec.parallel=true"
条件2:tsk.isMapRedTask()
为true, 这里具体的判断标准,不同的org.apache.hadoop.hive.ql.exec.Task
子类不同
同时满足条件1和条件2的task, 直接开启一个org.apache.hadoop.hive.ql.exec.TaskRunner
线程来执行该task
否则,只能乖乖在当前线程执行
再进入TaskRunner
看一下
public class TaskRunner extends Thread {
protected Task<? extends Serializable> tsk;
@Override
public void run() {
runner = Thread.currentThread();
try {
OperationLog.setCurrentOperationLog(operationLog);
SessionState.start(ss);
runSequential(); //这里还是调用了 runSequential()
} finally {
}
}
//Launches a task, and sets its exit value in the result variable.
public void runSequential() {
int exitVal = -101;
try {
exitVal = tsk.executeTask();
} catch (Throwable t) {
}
result.setExitVal(exitVal);
if (tsk.getException() != null) {
result.setTaskError(tsk.getException());
}
}
}
可以看到几点
1.它是一个线程类
2.run方法最终也是调用runSequential()
3.runSequential()中,真正和执行相关的,还是Task自己的executeTask(), TaskRunner只不过是一个容器而已
那继续往下追org.apache.hadoop.hive.ql.exec.Task#executeTask
-->org.apache.hadoop.hive.ql.exec.Task#execute
这个execute方法,对于不同的Task子类,有不同的实现,这里以org.apache.hadoop.hive.ql.exec.mr.MapRedTask
为例:
@Override
public int execute(DriverContext driverContext) {
......
runningViaChild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);
if (!runningViaChild) {
// since we are running the mapred task in the same jvm, we should update the job conf
// in ExecDriver as well to have proper local properties.
if (this.isLocalMode()) {
// save the original job tracker
ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(job));
// change it to local
ShimLoader.getHadoopShims().setJobLauncherRpcAddress(job, "local");
}
// we are not running this mapred task via child jvm
// so directly invoke ExecDriver
int ret = super.execute(driverContext);//执行父类方法
// restore the previous properties for framework name, RM address etc.
if (this.isLocalMode()) {
// restore the local job tracker back to original
ctx.restoreOriginalTracker();
}
......
}
这里会先对hive.exec.submitviachild
配置有个判断,意思为map/reduce Job 是否应该使用各自独立的 JVM 进行提交(Child进程),默认情况下,使用与 HQL compiler 相同的 JVM 进行提交,即false.
最后这个方法调回直接父类的execute方法,即org.apache.hadoop.hive.ql.exec.mr.ExecDriver#execute
继续看ExecDriver#execute
:
首先注意一下ExecDriver
构造方法,可见这就是MapReduce Job的入口了
public ExecDriver() {
super();
console = new LogHelper(LOG);
job = new JobConf(ExecDriver.class); //这里指定MapReduce入口类
this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
}
再关注方法本身
@Override
public int execute(DriverContext driverContext) {
......
job.setOutputFormat(HiveOutputFormatImpl.class);
job.setMapperClass(ExecMapper.class); //这里指定Mapper Class
job.setMapOutputKeyClass(HiveKey.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0);
job.setReducerClass(ExecReducer.class);//这里指定Reducer Class
......
rj = jc.submitJob(job);
......
}
看到很多熟悉的向yarn提交MapReduce-Job的代码了. 这里最终会走到org.apache.hadoop.mapred.JobClient#submitJob(org.apache.hadoop.mapred.JobConf)
, 任务得以提交!
收工!!!