第6章 MapReduce应用开发-Configuration详解

1、API解读
Resources

配置通过资源指定,资源以XML形式包含一组name/value对,每个资源通过String或Path指定,假如使用String指定资源,将检查classpath下是否有该名称对应的文件,若使用Path,则直接在本地文件系统查找。
除非显示关闭[new Configuration(false)],否则Hadoop默认从classpath下加载如下两个资源:
core-default.xml: hadoop默认值
core-site.xml: Hadoop站点相关配置.

Final Parameters

配置参数可以设置为final,资源声明为final,后续加载的资源无法改变该参数值,如:

  <property>
    <name>dfs.hosts.include</name>
    <value>/etc/hadoop/conf/hosts.include</value>
    <final>true</final>
  </property>

管理员在core-site.xml中定义了final参数,用户应用无法修改该值。

Variable Expansion(变量膨胀)

解析属性value字符串中的变量,可查找的变量值包括:
配置中定义的其他属性,如果配置中找不到该属性,查找System.getProperties()
如,一个配置资源包含了如下属性定义:

  <property>
    <name>basedir</name>
    <value>/user/${user.name}</value>
  </property>
  
  <property>
    <name>tempdir</name>
    <value>${basedir}/tmp</value>
  </property>

当调用conf.get("tempdir")${basedir}会被解析为这个配置中另一个属性,而${user.name}通常被解析为系统属性user.name值。默认的,若配置了已废弃参数,将引发告警信息,可在log4j.properties配置log4j.logger.org.apache.hadoop.conf.Configuration.deprecation以抑制告警。

2、加载配置文件
2.1 org.apache.hadoop.conf.Configuration加载类路径下core-default.xml、core-site.xml
  static{
    //print deprecation warning if hadoop-site.xml is found in classpath
    ClassLoader cL = Thread.currentThread().getContextClassLoader();
    if (cL == null) {
      cL = Configuration.class.getClassLoader();
    }
    if(cL.getResource("hadoop-site.xml")!=null) {
      LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
          "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
          + "mapred-site.xml and hdfs-site.xml to override properties of " +
          "core-default.xml, mapred-default.xml and hdfs-default.xml " +
          "respectively");
    }
    addDefaultResource("core-default.xml");
    addDefaultResource("core-site.xml");
  }

通过上述代码告警信息可以看出,老版本hadoop配置文件hadoop-site.xml分拆为
core-site.xmlmapred-site.xmlhdfs-site.xml

2.2 org.apache.hadoop.mapreduce.Job加载mapreduce和yarn配置文件
  static {
    ConfigUtil.loadResources();
  }

org.apache.hadoop.mapreduce.util.ConfigUtil.loadResources()

  public static void loadResources() {
    addDeprecatedKeys();
    Configuration.addDefaultResource("mapred-default.xml");
    Configuration.addDefaultResource("mapred-site.xml");
    Configuration.addDefaultResource("yarn-default.xml");
    Configuration.addDefaultResource("yarn-site.xml");
  }
2.3 org.apache.hadoop.hdfs.HdfsConfiguration加载hdfs配置文件
  static {
    addDeprecatedKeys();

    // adds the default resources
    Configuration.addDefaultResource("hdfs-default.xml");
    Configuration.addDefaultResource("hdfs-site.xml");
  }

如上都有addDeprecatedKeys(),负责添加deprecated key与new key的对应关系。

3、org.apache.hadoop.conf.Configuration资源加载源码
3.1 新增资源文件

defaultResourcesCopyOnWriteArrayList对象,判断List是否存在该对象,不存在即添加,且重加载配置

  public static synchronized void addDefaultResource(String name) {
    if(!defaultResources.contains(name)) {
      defaultResources.add(name);
      for(Configuration conf : REGISTRY.keySet()) {
        if(conf.loadDefaults) {
          conf.reloadConfiguration();
        }
      }
    }
  }

  public synchronized void reloadConfiguration() {
    properties = null;                            // trigger reload
    finalParameters.clear();                      // clear site-limits
  }
3.2 加载资源文件
  /**
   * Stores the mapping of key to the resource which modifies or loads 
   * the key most recently
   * key以及修改或加载了该key值的资源文件数组
   */
  private Map<String, String[]> updatingResource;
  private Properties overlay;    //覆盖属性
  //获取属性,若属性为空,即读取解析所有资源文件中属性
  protected synchronized Properties getProps() {
    if (properties == null) {
      properties = new Properties();
      Map<String, String[]> backup =
          new ConcurrentHashMap<String, String[]>(updatingResource);
      loadResources(properties, resources, quietmode);

      if (overlay != null) {
        properties.putAll(overlay);
        for (Map.Entry<Object,Object> item: overlay.entrySet()) {
          String key = (String)item.getKey();
          String[] source = backup.get(key);
          if(source != null) {
            updatingResource.put(key, source);
          }
        }
      }
    }
    return properties;
  }

  //解析资源文件
  private void loadResources(Properties properties,
                             ArrayList<Resource> resources,
                             boolean quiet) {
    if(loadDefaults) {
      for (String resource : defaultResources) {
        loadResource(properties, new Resource(resource, false), quiet);
      }
    
      //support the hadoop-site.xml as a deprecated case
      if(getResource("hadoop-site.xml")!=null) {
        loadResource(properties, new Resource("hadoop-site.xml", false), quiet);
      }
    }
    
    for (int i = 0; i < resources.size(); i++) {
      Resource ret = loadResource(properties, resources.get(i), quiet);
      if (ret != null) {
        resources.set(i, ret);
      }
    }
  }

  private Resource loadResource(Properties properties, Resource wrapper, boolean quiet) {
     ...
     //支持如下资源类型配置解析
      if (resource instanceof URL) {                  // an URL resource
        doc = parse(builder, (URL)resource);
      } else if (resource instanceof String) {        // a CLASSPATH resource
        URL url = getResource((String)resource);
        doc = parse(builder, url);
      } else if (resource instanceof Path) {          // a file resource
        // Can't use FileSystem API or we get an infinite loop
        // since FileSystem uses Configuration API.  Use java.io.File instead.
        File file = new File(((Path)resource).toUri().getPath())
          .getAbsoluteFile();
        if (file.exists()) {
          if (!quiet) {
            LOG.debug("parsing File " + file);
          }
          doc = parse(builder, new BufferedInputStream(
              new FileInputStream(file)), ((Path)resource).toString());
        }
      } else if (resource instanceof InputStream) {
        doc = parse(builder, (InputStream) resource, null);
        returnCachedProperties = true;
      } else if (resource instanceof Properties) {
        overlay(properties, (Properties)resource);
      } else if (resource instanceof Element) {
        root = (Element)resource;
      }
     //...解析XML
}
3.3 获取属性值

属性包含属性名与属性值,属性名为String类型,属性值包含多种类型,包括Java基本数据类型(boolean、int、long、float),其他数据类型(String、Class、java.io.File)及String集合。

  public String get(String name) {
    String[] names = handleDeprecation(deprecationContext.get(), name);
    String result = null;
    for(String n : names) {
      result = substituteVars(getProps().getProperty(n));
    }
    return result;
  }

通过如上代码可知,若新增了资源文件(会调用了reloadConfiguration()),调用如上方法时,即会触发资源真正加载。

private String substituteVars(String expr) {
    if (expr == null) {
      return null;
    }
    String eval = expr;
    //MAX_SUBST = 20,防止变量循环调用或变量太多
    for (int s = 0; s < MAX_SUBST; s++) {  
      final int[] varBounds = findSubVariable(eval);   //获取变量名开始位置及结束位置
      if (varBounds[SUB_START_IDX] == -1) {
        return eval;
      }
      final String var = eval.substring(varBounds[SUB_START_IDX],
          varBounds[SUB_END_IDX]);  //获取变量名
      String val = null;
      if (!restrictSystemProps) {
        try {
          val = System.getProperty(var);    //优先从系统变量获取值
        } catch (SecurityException se) {
          LOG.warn("Unexpected SecurityException in Configuration", se);
        }
      }
      if (val == null) {     //不存在于系统变量,从配置文件中获取
        val = getRaw(var);
      }
      if (val == null) {
        return eval; // return literal ${var}: var is unbound
      }
      final int dollar = varBounds[SUB_START_IDX] - "${".length();
      final int afterRightBrace = varBounds[SUB_END_IDX] + "}".length();
      // 将变量替换为变量值,查找下一个变量
      eval = eval.substring(0, dollar)
             + val
             + eval.substring(afterRightBrace);
    }
    throw new IllegalStateException("Variable substitution depth too large: " 
                                    + MAX_SUBST + " " + expr);
  }

  public String getRaw(String name) {
    String[] names = handleDeprecation(deprecationContext.get(), name);
    String result = null;
    for(String n : names) {
      result = getProps().getProperty(n);
    }
    return result;
  }
3、序列化与反序列化
  public void readFields(DataInput in) throws IOException {
    clear();
    int size = WritableUtils.readVInt(in);
    for(int i=0; i < size; ++i) {
      String key = org.apache.hadoop.io.Text.readString(in);
      String value = org.apache.hadoop.io.Text.readString(in);
      set(key, value); 
      String sources[] = WritableUtils.readCompressedStringArray(in);
      if(sources != null) {
        updatingResource.put(key, sources);
      }
    }
  }

  public void write(DataOutput out) throws IOException {
    Properties props = getProps();
    WritableUtils.writeVInt(out, props.size());
    for(Map.Entry<Object, Object> item: props.entrySet()) {
      org.apache.hadoop.io.Text.writeString(out, (String) item.getKey());
      org.apache.hadoop.io.Text.writeString(out, (String) item.getValue());
      WritableUtils.writeCompressedStringArray(out, 
          updatingResource.get(item.getKey()));
    }
  }
4、示例
<!-- configuration-1.xml -->
<?xml version="1.0"?>
<configuration>
    <property>
        <name>color</name>
        <value>yellow</value>
        <description>Color</description>
    </property>

    <property>
        <name>size</name>
        <value>10</value>
        <description>Size</description>
    </property>

    <property>
        <name>weight</name>
        <value>heavy</value>
        <final>true</final>
        <description>Weight</description>
    </property>

    <property>
        <name>size-weight</name>
        <value>${size},${weight}</value>
        <description>Size and weight</description>
    </property>
</configuration>

<!-- configuration-2.xml -->
<?xml version="1.0"?>
<configuration>
    <property>
        <name>color</name>
        <value>white</value>
    </property>

    <property>
        <name>weight</name>
        <value>light</value>
    </property>
</configuration>
    @Test
    public void test() {
        Configuration conf = new Configuration();
        conf.addResource("configuration-1.xml");

        assertEquals(conf.get("color"), "yellow");
        assertEquals(conf.getInt("size", 100), 10);
        assertEquals(conf.get("weight"), "heavy");
        assertEquals(conf.get("size-weight"), "10,heavy");

        conf.addResource("configuration-2.xml");
        assertEquals(conf.get("color"), "white");
        assertEquals(conf.get("weight"), "heavy");
        assertEquals(conf.get("width", "middle"), "middle");
        assertEquals(conf.get("size-weight"), "10,heavy");
    }

1、可以根据数据类型(原始类型)进行读取
2、若属性不存在,可以返回默认值
3、属性value支持变量(如${size})填充,获取属性value时,会解析变量

  • a、若变量不存在,返回空字符串
  • b、存在与变量同名属性,返回该属性value
  • c、程序运行时,使用运行参数填充,如java -Dsize=10 xxx.jar,优先级比b高
  • d、程序运行时,使用入参填充,如java xxx.jar -Dsize=10hadoop jar xxx -Dsize=10
    与c有本质区别,传入值存入args数组,可以使用GenericOptionsParser解析并set至configuration

注意: 运行时参数指定的同名属性值,不会覆盖属性文件中设置的同名属性值
如:java -Dcolor=black -Dweight=light ...
println(conf.get("color")) 不覆盖同名属性值,输出仍为配置文件中值white
println(conf.get("size-weight")) 覆盖属性值变量,输出 10,light

5、GenericOptionsParserToolRunner

GenericOptionsParser可以读取命令行入参,并将入参设置到Configuration

  /** 
   * Create a <code>GenericOptionsParser</code> to parse given options as well 
   * as generic Hadoop options. 
   * @param conf the configuration to modify  
   * @param options options built by the caller 
   * @param args User-specified arguments
   * @throws IOException 
   */
  public GenericOptionsParser(Configuration conf,
      Options options, String[] args) throws IOException {
    parseGeneralOptions(options, conf, args);
    this.conf = conf;
  }

  private void parseGeneralOptions(Options opts, Configuration conf, 
      String[] args) throws IOException {
    opts = buildGeneralOptions(opts);
    CommandLineParser parser = new GnuParser();
    try {
      commandLine = parser.parse(opts, preProcessForWindows(args), true);
      processGeneralOptions(conf, commandLine);
    } catch(ParseException e) {
      LOG.warn("options parsing failed: "+e.getMessage());

      HelpFormatter formatter = new HelpFormatter();
      formatter.printHelp("general options are: ", opts);
    }
  }

//添加fs jt conf D libjars files archives配置项
private static Options buildGeneralOptions(Options opts) {
    Option fs = OptionBuilder.withArgName("local|namenode:port")
    .hasArg()
    .withDescription("specify a namenode")
    .create("fs");
    Option jt = OptionBuilder.withArgName("local|resourcemanager:port")
    .hasArg()
    .withDescription("specify a ResourceManager")
    .create("jt");
    Option oconf = OptionBuilder.withArgName("configuration file")
    .hasArg()
    .withDescription("specify an application configuration file")
    .create("conf");
    Option property = OptionBuilder.withArgName("property=value")
    .hasArg()
    .withDescription("use value for given property")
    .create('D');
    Option libjars = OptionBuilder.withArgName("paths")
    .hasArg()
    .withDescription("comma separated jar files to include in the classpath.")
    .create("libjars");
    Option files = OptionBuilder.withArgName("paths")
    .hasArg()
    .withDescription("comma separated files to be copied to the " +
           "map reduce cluster")
    .create("files");
    Option archives = OptionBuilder.withArgName("paths")
    .hasArg()
    .withDescription("comma separated archives to be unarchived" +
                     " on the compute machines.")
    .create("archives");
    
    // file with security tokens
    Option tokensFile = OptionBuilder.withArgName("tokensFile")
    .hasArg()
    .withDescription("name of the file with the tokens")
    .create("tokenCacheFile");
     
    opts.addOption(fs);
    ..
    return opts;
  }

//解析配置参数
private void processGeneralOptions(Configuration conf,
      CommandLine line) throws IOException {
    if (line.hasOption("fs")) {     //设置fs.defaultFS
      FileSystem.setDefaultUri(conf, line.getOptionValue("fs"));
    }

    if (line.hasOption("jt")) {   //设置jobtracker
      String optionValue = line.getOptionValue("jt");
      if (optionValue.equalsIgnoreCase("local")) {
        conf.set("mapreduce.framework.name", optionValue);
      }

      conf.set("yarn.resourcemanager.address", optionValue, 
          "from -jt command line option");
    }
    if (line.hasOption("conf")) {     //新增配置资源
      String[] values = line.getOptionValues("conf");
      for(String value : values) {
        conf.addResource(new Path(value));
      }
    }

    if (line.hasOption('D')) {        //设置配置项
      String[] property = line.getOptionValues('D');
      for(String prop : property) {
        String[] keyval = prop.split("=", 2);
        if (keyval.length == 2) {
          conf.set(keyval[0], keyval[1], "from command line");
        }
      }
    }

    if (line.hasOption("libjars")) {
      conf.set("tmpjars", 
               validateFiles(line.getOptionValue("libjars"), conf),
               "from -libjars command line option");
      //setting libjars in client classpath
      URL[] libjars = getLibJars(conf);
      if(libjars!=null && libjars.length>0) {
        conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));
        Thread.currentThread().setContextClassLoader(
            new URLClassLoader(libjars, 
                Thread.currentThread().getContextClassLoader()));
      }
    }
    if (line.hasOption("files")) {
      conf.set("tmpfiles", 
               validateFiles(line.getOptionValue("files"), conf),
               "from -files command line option");
    }
    if (line.hasOption("archives")) {
      conf.set("tmparchives", 
                validateFiles(line.getOptionValue("archives"), conf),
                "from -archives command line option");
    }
    conf.setBoolean("mapreduce.client.genericoptionsparser.used", true);
    
    // tokensFile
    if(line.hasOption("tokenCacheFile")) {
      String fileName = line.getOptionValue("tokenCacheFile");
      // check if the local file exists
      FileSystem localFs = FileSystem.getLocal(conf);
      Path p = localFs.makeQualified(new Path(fileName));
      if (!localFs.exists(p)) {
          throw new FileNotFoundException("File "+fileName+" does not exist.");
      }
      if(LOG.isDebugEnabled()) {
        LOG.debug("setting conf tokensFile: " + fileName);
      }
      UserGroupInformation.getCurrentUser().addCredentials(
          Credentials.readTokenStorageFile(p, conf));
      conf.set("mapreduce.job.credentials.binary", p.toString(),
               "from -tokenCacheFile command line option");
    }
  }

可以通过new GenericOptionsParser(conf, args);将入参设置至conf,也可以ToolRunner.run(new XxxDriver(), args);将入参设置至conf,XxxDriver继承Configured实现Tool接口

  public static int run(Tool tool, String[] args) 
    throws Exception{
    return run(tool.getConf(), tool, args);
  }

  public static int run(Configuration conf, Tool tool, String[] args) 
    throws Exception{
    if(conf == null) {
      conf = new Configuration();
    }
    GenericOptionsParser parser = new GenericOptionsParser(conf, args);
    //set the configuration back, so that Tool can configure itself
    tool.setConf(conf);
    
    //get the args w/o generic hadoop args
    String[] toolArgs = parser.getRemainingArgs();
    return tool.run(toolArgs);
  }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容