1.2.2.1Job的提交(源码解读)

总目录:https://www.jianshu.com/p/e406a9bc93a9

Hadoop - 子目录:https://www.jianshu.com/p/9428e443b7fd

Job的实例化

Configuration con = new Configuration();  //读取各项配置

Job job = Job.getInstance(con);//将配置加载到job作业里面。

job.setJarByClass(WordCountMain.class);//指定主类

Configuration做为Hadoop的一个基础功能承担着重要的责任,为Yarn、HSFS、MapReduce、NFS、调度器等提供参数的配置、配置文件的分布式传输(实现了Writable接口)等重要功能。

从上到下的实现的功能依次是

LOG

--记录日志

quietmode

--boolean类型,配置信息加载过程中是否静默,即有一些信息不会被记录,默认是true

resources

--ArrayList<Resource>类型,Resource是Configuration的内部类,有两个属性Object resource和String name;resources是一个对象数组,用于存储有关包含配置信息的对象

finalParameter

--Set<String>类型,所有被声明为final的变量集合,声明为final就表示不能被后续覆盖

loadDefault

--boolean类型,是否加载默认配置;

REGISTRY

--WeakHashMap<Configuration,Object>类型,用于多个对象的相关配置的注册及对它们进行管理,记录了所有的Configuration

defaultResources

--CopyOnWriteArrayList<String>类型,用于存储默认的配置资源名或路径

{…}

--加载默认的配置资源

properties

--java内置的Properties类型,存储所有配置信息,KV值

overlay

--Properties类型,是用户设置的而不是通过对资源解析得到的  

classLoader

--ClassLoader类型,主要用于加载指定的类或者加载相关资源

varPat

--静态Pattern类型,对含有环境变量的值的进行转换的正则表达式

MAX_SUBST

--静态int类型,默认值是20,MAX_SUBST是设定对带有环境变量的值所能够深入解析的层次数,超出这个最大的层数的值将不能够解析。


文件的输入输出

FileInputFormat.addInputPath(job, new Path("输入路径"));

FileOutputFormat.setOutputPath(job, new Path("输出路径"));

先说输入路径

Ctrl+左键

进入 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 类中

addinputpath

这是一个三目运算符,追加所有输入的目录,目录间用逗号分隔。

注:文件输入有两个方法,除了addinputpath还有一个setinputpaths,区别是后者一次只能处理一个输入路径。但是前者兼容后者。


之后是输出路径

Ctrl+左键

进入org.apache.hadoop.mapreduce.lib.output.FileOutputFormat类中

setOutputPath

输出路径是唯一的,这里会读取主类中设置的输出路径,但是会以最后一个为准,后面设置的会自动覆盖前面设置的。而且输出路径目录必须为空,且已经创建好。(IOException是读写异常)

Job的提交

第一层:waitForCompletion

job.setNumReduceTasks(1);//设置reduce任务数量

job.waitForCompletion(true);//job的提交

setNumReduceTasks源码没有什么意义,我们直接来看waitForCompletion

Ctrl+左键

我们会给waitForCompletion传输一个布尔值,来判断是否打印job的执行情况。

org.apache.hadoop.mapreduce.Job类中

waitForCompletion

判断Job状态,如果处于DEFINE状态则通过submit()方法向集群提交job

    if (state == JobState.DEFINE) {

      submit();

    }

判断是否需要打印job的执行过程

 if (verbose) {

      monitorAndPrintJob();   //verbose为TRUE时打印

    } else {

。。。

        //verbose为FALSE时轮询任务是否完成

        Job.getCompletionPollInterval(cluster.getConf()); 

。。。

    }

waitForCompletion的功能是向集群提交任务,并等待任务完成,这个方法里面我们有一条语句需要详细分析。

第二层:submit

submit();向集群提交任务,并立即返回。

submit

进入submit方法后,首先会执行三个方法,分别是:

ensureState(JobState.DEFINE);    //判断job类型是否是DEFIND

setUseNewAPI();//指定job使用的是新版mapreduce的API

connect();//对封装Configuration设置的集群的信息的cluster对象初始化

                //创建真正的通信协议,被用于最终的job提交


之后产生一个对象负责job的运行进度

// 通过cluster中封装的集群信息(HDFS和client)获取JobSubmitter对象

//该对象负责最终向集群提交job并返回job的运行进度

final JobSubmitter submitter =

        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());


之后会将JobState的状态从DEFINE转换为RUNNING,开始执行Job

ClassNotFoundException {

        return submitter.submitJobInternal(Job.this, cluster);

      }

第三层:submitJobInternal

这个方法会将任务提交到集群中,是真正干活的方法。

1.在执行任务前,首先会检查输出路径是否配置,是否存在;正确应已配置但未创建,其默认配置参数为mapreduce.output.fileoutputformat.outputdir。

2.获取job中的集群配置信息,添加到分布式缓存中。

3.通过JobSubmissionFiles中的getStagingDir()方法获取作业执行时相关资源的存放路径。

4.获取当前主机ip与主机名,封装进Configuration对象中。

5.生成JobID并封装进Job。

6.构建提交作业的路径

7.权限与密钥相关配置

8.拷贝副本

9.将jar包上传到HDFS中

10.分片  (重要)

11.程序正式开始执行,并返回作业状态。

JobStatus submitJobInternal(Job job, Cluster cluster)

  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs

    checkSpecs(job); //1.检查作业输出路径是否配置并且是否存在,必须配置且不存在

    Configuration conf = job.getConfiguration(); // 2.获取job中封装的Configuration对象

    addMRFrameworkToDistributedCache(conf); //2.添加到分布式缓存中

    // 3.通过getStagingDir获取作业执行时相关资源的存放路径

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);


    //configure the command line options correctly on the submitting dfs

     //4.获取job的当前主机的IP,并将ip、主机名等相关信息封装进Configuration对象中

    InetAddress ip = InetAddress.getLocalHost();

    。。。。

    //5.生成作业ID,即jobID

    JobID jobId = submitClient.getNewJobID();

    //5.将jobID添加到job内  

    job.setJobID(jobId);

    //6.构造提交作业路径, jobStagingArea + /jobID

    Path submitJobDir = new Path(jobStagingArea, jobId.toString());

     。。。。

    // get delegation token for the dir    7.关于job提交路径权限的设置

      TokenCache.obtainTokensForNamenodes(job.getCredentials(),

          new Path[] { submitJobDir }, conf);

    。。。。

    //8.复制job与相关配置文件作为副本  

      copyAndConfigureFiles(job, submitJobDir);

    //8.获取配置文件路径

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);


      // Create the splits for the job    分片

      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));

      //10.写分片数据文件job.splits和分片元数据文件job.splitmetainfo,计算map任务数

      int maps = writeSplits(job, submitJobDir);

      conf.setInt(MRJobConfig.NUM_MAPS, maps);

      。。。。

     //9. Write job file to submit dir将Job文件(jar包)上传到任务提交文件夹(HDFS)

    writeConf(conf, submitJobFile); 

    。。。。

    //11.真正的提交作业到集群,并返回作业状态到status成员

   status = submitClient.submitJob

                                                   jobId, submitJobDir.toString(), job.getCredentials());  

    。。。。

分片:writeSplits

进行追踪后,可以很清晰的看到,这里是决定使用新版API或者旧版API的地方。

writeSplits

对新版API进行追踪。

新版API:writeNewSplits()

1.根据我们设置的inputFormat反射出一个inputFormat类型的对象input

2.调用他的getSplits方法来获取分片信息

3.调用JobSplitWriter.createSplitFiles方法将分片的信息写入到submitJobDir/job.split文件中。

private <T extends InputSplit>

  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,

      InterruptedException, ClassNotFoundException {

    Configuration conf = job.getConfiguration();

    InputFormat<?, ?> input =

      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    //获取切片信息

    List<InputSplit> splits = input.getSplits(job);

    //创建一个数组,以split集合的切片大小为长度

    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest

    // go first

    Arrays.sort(array, new SplitComparator());

    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,

        jobSubmitDir.getFileSystem(conf), array);

    return array.length;

  }

分片实际方法:getSplits()

这个方法在org.apache.hadoop.mapreduce.lib.input.FileInputFormat类中。

这个方法和submitJobInternal方法一样,都是用来干活的。

我们需要注意这一个while循环。

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {

            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);

            splits.add(makeSplit(path, length-bytesRemaining, splitSize,

                        blkLocations[blkIndex].getHosts(),

                        blkLocations[blkIndex].getCachedHosts()));

            bytesRemaining -= splitSize;

          }

这个循环的判定语句是剩余的字节大小是否大于预设分片大小的1.1倍。

private static final double SPLIT_SLOP = 1.1; // 10% slop

也就是说当剩余字节大于预设分片大小的110%后,对剩余的文件继续分片,否则直接将剩余文件生成一个分片。

我们在MapReduce那一篇的分片补充知识中说过,分片是有1.1倍冗余的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351