Hive源码概读-HQL任务提交Yarn

基于hive2.3.4, 这里以CLI方式提交Sql为例 :

  • 启动Driver
    沿着调用:
    org.apache.hadoop.hive.cli.CliDriver#main-->org.apache.hadoop.hive.cli.CliDriver#run-->org.apache.hadoop.hive.cli.CliDriver#executeDriver-->org.apache.hadoop.hive.cli.CliDriver#processLine-->org.apache.hadoop.hive.cli.CliDriver#processCmd
//这里Driver对象产生
public int processCmd(String cmd) {
......  
//对指令不同情况的判断 
 if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
      ss.close();
      System.exit(0);
    } else if (tokens[0].equalsIgnoreCase("source")) {
      ......
    } else if (cmd_trimmed.startsWith("!")) {
      ......
    }  else { // local mode
      try {
        CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);//这里得到Diver对象
        ret = processLocalCmd(cmd, proc, ss);//继续向下调用
      } catch (SQLException e) {
        console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
          org.apache.hadoop.util.StringUtils.stringifyException(e));
        ret = 1;
      }
    }
......
}
  • 解析&触发执行
    沿着上面的调用链:
    org.apache.hadoop.hive.cli.CliDriver#processLocalCmd-->org.apache.hadoop.hive.ql.Driver#run(java.lang.String, boolean)-->org.apache.hadoop.hive.ql.Driver#runInternal
    在这个方法里,sql得到解析和触发执行
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled){
  ......
      if (!alreadyCompiled) {
        // compile internal will automatically reset the perf logger
        ret = compileInternal(command, true);//这里是解析sql的入口
        // then we continue to use this perf logger
        perfLogger = SessionState.getPerfLogger();
        if (ret != 0) {
          return createProcessorResponse(ret);
        }
      } else {
       ......
      }
......

    if (requiresLock()) {
        // a checkpoint to see if the thread is interrupted or not before an expensive operation
        if (isInterrupted()) {
          ret = handleInterruption("at acquiring the lock.");
        } else {
          ret = acquireLocksAndOpenTxn(startTxnImplicitly);
        }
        if (ret != 0) {
          return rollback(createProcessorResponse(ret));
        }
      }
      ret = execute(true);//这里是执行入口
      if (ret != 0) {
        //if needRequireLock is false, the release here will do nothing because there is no lock
        return rollback(createProcessorResponse(ret));
      }
......
    
}
  • 提交任务
    org.apache.hadoop.hive.ql.Driver#execute(boolean)
    先看下这个方法, 这里很清晰,不停的从队列中取出task并launch
 public int execute(boolean deferClose) throws CommandNeedRetryException {
    ......
       // Loop while you either have tasks running, or tasks queued up
      while (driverCxt.isRunning()) {
        // Launch upto maxthreads tasks
        Task<? extends Serializable> task;
        while ((task = driverCxt.getRunnable(maxthreads)) != null) {
          TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs,driverCxt);//在这里launch
          if (!runner.isRunning()) {
            break;
          }
        }
        ......
      }
  ......
}

补充一句:这里的一个task,其实就是hive里面的一个Stage,也对应mapreduce程序里的一个job, 如下图:

MapredTask.png

launch.png

继续往下看 org.apache.hadoop.hive.ql.Driver#launchTask方法

    // Launch Task
    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.EXECPARALLEL) && tsk.isMapRedTask()) {
      // Launch it in the parallel mode, as a separate thread only for MR tasks
      if (LOG.isInfoEnabled()){
        LOG.info("Starting task [" + tsk + "] in parallel");
      }
      tskRun.setOperationLog(OperationLog.getCurrentOperationLog());
      tskRun.start();
    } else {
      if (LOG.isInfoEnabled()){
        LOG.info("Starting task [" + tsk + "] in serial mode");
      }
      tskRun.runSequential();
    }

这里分两种情况
条件1:如果设置了并行执行模式,即"hive.exec.parallel=true"
条件2:tsk.isMapRedTask()为true, 这里具体的判断标准,不同的org.apache.hadoop.hive.ql.exec.Task子类不同
同时满足条件1和条件2的task, 直接开启一个org.apache.hadoop.hive.ql.exec.TaskRunner线程来执行该task
否则,只能乖乖在当前线程执行

再进入TaskRunner看一下

public class TaskRunner extends Thread {
  protected Task<? extends Serializable> tsk;
  @Override
  public void run() {
    runner = Thread.currentThread();
    try {
      OperationLog.setCurrentOperationLog(operationLog);
      SessionState.start(ss);
      runSequential(); //这里还是调用了 runSequential()
    } finally {
    }
  }
  
//Launches a task, and sets its exit value in the result variable.
  public void runSequential() {
    int exitVal = -101;
    try {
      exitVal = tsk.executeTask();
    } catch (Throwable t) {
    }
    result.setExitVal(exitVal);
    if (tsk.getException() != null) {
      result.setTaskError(tsk.getException());
    }
  }
}

可以看到几点
1.它是一个线程类
2.run方法最终也是调用runSequential()
3.runSequential()中,真正和执行相关的,还是Task自己的executeTask(), TaskRunner只不过是一个容器而已

那继续往下追org.apache.hadoop.hive.ql.exec.Task#executeTask-->org.apache.hadoop.hive.ql.exec.Task#execute
这个execute方法,对于不同的Task子类,有不同的实现,这里以org.apache.hadoop.hive.ql.exec.mr.MapRedTask为例:

 @Override
  public int execute(DriverContext driverContext) {
    ......
    runningViaChild = conf.getBoolVar(HiveConf.ConfVars.SUBMITVIACHILD);

      if (!runningViaChild) {
        // since we are running the mapred task in the same jvm, we should update the job conf
        // in ExecDriver as well to have proper local properties.
        if (this.isLocalMode()) {
          // save the original job tracker
          ctx.setOriginalTracker(ShimLoader.getHadoopShims().getJobLauncherRpcAddress(job));
          // change it to local
          ShimLoader.getHadoopShims().setJobLauncherRpcAddress(job, "local");
        }
        // we are not running this mapred task via child jvm
        // so directly invoke ExecDriver
        int ret = super.execute(driverContext);//执行父类方法

        // restore the previous properties for framework name, RM address etc.
        if (this.isLocalMode()) {
          // restore the local job tracker back to original
          ctx.restoreOriginalTracker();
        }
    ......
  }

这里会先对hive.exec.submitviachild配置有个判断,意思为map/reduce Job 是否应该使用各自独立的 JVM 进行提交(Child进程),默认情况下,使用与 HQL compiler 相同的 JVM 进行提交,即false.
最后这个方法调回直接父类的execute方法,即org.apache.hadoop.hive.ql.exec.mr.ExecDriver#execute

继续看ExecDriver#execute
首先注意一下ExecDriver构造方法,可见这就是MapReduce Job的入口了

public ExecDriver() {
    super();
    console = new LogHelper(LOG);
    job = new JobConf(ExecDriver.class); //这里指定MapReduce入口类
    this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
  }

再关注方法本身

 @Override
  public int execute(DriverContext driverContext) {
  ......
    job.setOutputFormat(HiveOutputFormatImpl.class);
    job.setMapperClass(ExecMapper.class); //这里指定Mapper Class
    job.setMapOutputKeyClass(HiveKey.class);
    job.setMapOutputValueClass(BytesWritable.class);
     job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks().intValue() : 0);
    job.setReducerClass(ExecReducer.class);//这里指定Reducer Class
    ......
     rj = jc.submitJob(job);
  ......
  }

看到很多熟悉的向yarn提交MapReduce-Job的代码了. 这里最终会走到org.apache.hadoop.mapred.JobClient#submitJob(org.apache.hadoop.mapred.JobConf), 任务得以提交!


收工!!!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352

推荐阅读更多精彩内容