1、API解读
Resources
配置通过资源指定,资源以XML形式包含一组name/value对,每个资源通过String或Path指定,假如使用String指定资源,将检查classpath下是否有该名称对应的文件,若使用Path,则直接在本地文件系统查找。
除非显示关闭[new Configuration(false)
],否则Hadoop默认从classpath下加载如下两个资源:
core-default.xml
: hadoop默认值
core-site.xml
: Hadoop站点相关配置.
Final Parameters
配置参数可以设置为final,资源声明为final,后续加载的资源无法改变该参数值,如:
<property>
<name>dfs.hosts.include</name>
<value>/etc/hadoop/conf/hosts.include</value>
<final>true</final>
</property>
管理员在core-site.xml中定义了final参数,用户应用无法修改该值。
Variable Expansion(变量膨胀)
解析属性value字符串中的变量,可查找的变量值包括:
配置中定义的其他属性,如果配置中找不到该属性,查找System.getProperties()
如,一个配置资源包含了如下属性定义:
<property>
<name>basedir</name>
<value>/user/${user.name}</value>
</property>
<property>
<name>tempdir</name>
<value>${basedir}/tmp</value>
</property>
当调用conf.get("tempdir")
,${basedir}
会被解析为这个配置中另一个属性,而${user.name}
通常被解析为系统属性user.name
值。默认的,若配置了已废弃参数,将引发告警信息,可在log4j.properties配置log4j.logger.org.apache.hadoop.conf.Configuration.deprecation
以抑制告警。
2、加载配置文件
2.1 org.apache.hadoop.conf.Configuration
加载类路径下core-default.xml、core-site.xml
static{
//print deprecation warning if hadoop-site.xml is found in classpath
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
if(cL.getResource("hadoop-site.xml")!=null) {
LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
"Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
+ "mapred-site.xml and hdfs-site.xml to override properties of " +
"core-default.xml, mapred-default.xml and hdfs-default.xml " +
"respectively");
}
addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");
}
通过上述代码告警信息可以看出,老版本hadoop配置文件hadoop-site.xml
分拆为
core-site.xml
、mapred-site.xml
和hdfs-site.xml
2.2 org.apache.hadoop.mapreduce.Job
加载mapreduce和yarn配置文件
static {
ConfigUtil.loadResources();
}
org.apache.hadoop.mapreduce.util.ConfigUtil.loadResources()
public static void loadResources() {
addDeprecatedKeys();
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
Configuration.addDefaultResource("yarn-default.xml");
Configuration.addDefaultResource("yarn-site.xml");
}
2.3 org.apache.hadoop.hdfs.HdfsConfiguration
加载hdfs配置文件
static {
addDeprecatedKeys();
// adds the default resources
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
}
如上都有addDeprecatedKeys()
,负责添加deprecated key与new key的对应关系。
3、org.apache.hadoop.conf.Configuration
资源加载源码
3.1 新增资源文件
defaultResources
为CopyOnWriteArrayList
对象,判断List是否存在该对象,不存在即添加,且重加载配置
public static synchronized void addDefaultResource(String name) {
if(!defaultResources.contains(name)) {
defaultResources.add(name);
for(Configuration conf : REGISTRY.keySet()) {
if(conf.loadDefaults) {
conf.reloadConfiguration();
}
}
}
}
public synchronized void reloadConfiguration() {
properties = null; // trigger reload
finalParameters.clear(); // clear site-limits
}
3.2 加载资源文件
/**
* Stores the mapping of key to the resource which modifies or loads
* the key most recently
* key以及修改或加载了该key值的资源文件数组
*/
private Map<String, String[]> updatingResource;
private Properties overlay; //覆盖属性
//获取属性,若属性为空,即读取解析所有资源文件中属性
protected synchronized Properties getProps() {
if (properties == null) {
properties = new Properties();
Map<String, String[]> backup =
new ConcurrentHashMap<String, String[]>(updatingResource);
loadResources(properties, resources, quietmode);
if (overlay != null) {
properties.putAll(overlay);
for (Map.Entry<Object,Object> item: overlay.entrySet()) {
String key = (String)item.getKey();
String[] source = backup.get(key);
if(source != null) {
updatingResource.put(key, source);
}
}
}
}
return properties;
}
//解析资源文件
private void loadResources(Properties properties,
ArrayList<Resource> resources,
boolean quiet) {
if(loadDefaults) {
for (String resource : defaultResources) {
loadResource(properties, new Resource(resource, false), quiet);
}
//support the hadoop-site.xml as a deprecated case
if(getResource("hadoop-site.xml")!=null) {
loadResource(properties, new Resource("hadoop-site.xml", false), quiet);
}
}
for (int i = 0; i < resources.size(); i++) {
Resource ret = loadResource(properties, resources.get(i), quiet);
if (ret != null) {
resources.set(i, ret);
}
}
}
private Resource loadResource(Properties properties, Resource wrapper, boolean quiet) {
...
//支持如下资源类型配置解析
if (resource instanceof URL) { // an URL resource
doc = parse(builder, (URL)resource);
} else if (resource instanceof String) { // a CLASSPATH resource
URL url = getResource((String)resource);
doc = parse(builder, url);
} else if (resource instanceof Path) { // a file resource
// Can't use FileSystem API or we get an infinite loop
// since FileSystem uses Configuration API. Use java.io.File instead.
File file = new File(((Path)resource).toUri().getPath())
.getAbsoluteFile();
if (file.exists()) {
if (!quiet) {
LOG.debug("parsing File " + file);
}
doc = parse(builder, new BufferedInputStream(
new FileInputStream(file)), ((Path)resource).toString());
}
} else if (resource instanceof InputStream) {
doc = parse(builder, (InputStream) resource, null);
returnCachedProperties = true;
} else if (resource instanceof Properties) {
overlay(properties, (Properties)resource);
} else if (resource instanceof Element) {
root = (Element)resource;
}
//...解析XML
}
3.3 获取属性值
属性包含属性名与属性值,属性名为String类型,属性值包含多种类型,包括Java基本数据类型(boolean、int、long、float),其他数据类型(String、Class、java.io.File)及String集合。
public String get(String name) {
String[] names = handleDeprecation(deprecationContext.get(), name);
String result = null;
for(String n : names) {
result = substituteVars(getProps().getProperty(n));
}
return result;
}
通过如上代码可知,若新增了资源文件(会调用了reloadConfiguration()
),调用如上方法时,即会触发资源真正加载。
private String substituteVars(String expr) {
if (expr == null) {
return null;
}
String eval = expr;
//MAX_SUBST = 20,防止变量循环调用或变量太多
for (int s = 0; s < MAX_SUBST; s++) {
final int[] varBounds = findSubVariable(eval); //获取变量名开始位置及结束位置
if (varBounds[SUB_START_IDX] == -1) {
return eval;
}
final String var = eval.substring(varBounds[SUB_START_IDX],
varBounds[SUB_END_IDX]); //获取变量名
String val = null;
if (!restrictSystemProps) {
try {
val = System.getProperty(var); //优先从系统变量获取值
} catch (SecurityException se) {
LOG.warn("Unexpected SecurityException in Configuration", se);
}
}
if (val == null) { //不存在于系统变量,从配置文件中获取
val = getRaw(var);
}
if (val == null) {
return eval; // return literal ${var}: var is unbound
}
final int dollar = varBounds[SUB_START_IDX] - "${".length();
final int afterRightBrace = varBounds[SUB_END_IDX] + "}".length();
// 将变量替换为变量值,查找下一个变量
eval = eval.substring(0, dollar)
+ val
+ eval.substring(afterRightBrace);
}
throw new IllegalStateException("Variable substitution depth too large: "
+ MAX_SUBST + " " + expr);
}
public String getRaw(String name) {
String[] names = handleDeprecation(deprecationContext.get(), name);
String result = null;
for(String n : names) {
result = getProps().getProperty(n);
}
return result;
}
3、序列化与反序列化
public void readFields(DataInput in) throws IOException {
clear();
int size = WritableUtils.readVInt(in);
for(int i=0; i < size; ++i) {
String key = org.apache.hadoop.io.Text.readString(in);
String value = org.apache.hadoop.io.Text.readString(in);
set(key, value);
String sources[] = WritableUtils.readCompressedStringArray(in);
if(sources != null) {
updatingResource.put(key, sources);
}
}
}
public void write(DataOutput out) throws IOException {
Properties props = getProps();
WritableUtils.writeVInt(out, props.size());
for(Map.Entry<Object, Object> item: props.entrySet()) {
org.apache.hadoop.io.Text.writeString(out, (String) item.getKey());
org.apache.hadoop.io.Text.writeString(out, (String) item.getValue());
WritableUtils.writeCompressedStringArray(out,
updatingResource.get(item.getKey()));
}
}
4、示例
<!-- configuration-1.xml -->
<?xml version="1.0"?>
<configuration>
<property>
<name>color</name>
<value>yellow</value>
<description>Color</description>
</property>
<property>
<name>size</name>
<value>10</value>
<description>Size</description>
</property>
<property>
<name>weight</name>
<value>heavy</value>
<final>true</final>
<description>Weight</description>
</property>
<property>
<name>size-weight</name>
<value>${size},${weight}</value>
<description>Size and weight</description>
</property>
</configuration>
<!-- configuration-2.xml -->
<?xml version="1.0"?>
<configuration>
<property>
<name>color</name>
<value>white</value>
</property>
<property>
<name>weight</name>
<value>light</value>
</property>
</configuration>
@Test
public void test() {
Configuration conf = new Configuration();
conf.addResource("configuration-1.xml");
assertEquals(conf.get("color"), "yellow");
assertEquals(conf.getInt("size", 100), 10);
assertEquals(conf.get("weight"), "heavy");
assertEquals(conf.get("size-weight"), "10,heavy");
conf.addResource("configuration-2.xml");
assertEquals(conf.get("color"), "white");
assertEquals(conf.get("weight"), "heavy");
assertEquals(conf.get("width", "middle"), "middle");
assertEquals(conf.get("size-weight"), "10,heavy");
}
1、可以根据数据类型(原始类型)进行读取
2、若属性不存在,可以返回默认值
3、属性value支持变量(如${size})填充,获取属性value时,会解析变量
- a、若变量不存在,返回空字符串
- b、存在与变量同名属性,返回该属性value
- c、程序运行时,使用运行参数填充,如
java -Dsize=10 xxx.jar
,优先级比b高- d、程序运行时,使用入参填充,如
java xxx.jar -Dsize=10
或hadoop jar xxx -Dsize=10
与c有本质区别,传入值存入args数组,可以使用GenericOptionsParser解析并set至configuration
注意: 运行时参数指定的同名属性值,不会覆盖属性文件中设置的同名属性值
如:java -Dcolor=black -Dweight=light ...
println(conf.get("color")) 不覆盖同名属性值,输出仍为配置文件中值white
println(conf.get("size-weight")) 覆盖属性值变量,输出 10,light
5、GenericOptionsParser
与ToolRunner
GenericOptionsParser
可以读取命令行入参,并将入参设置到Configuration
中
/**
* Create a <code>GenericOptionsParser</code> to parse given options as well
* as generic Hadoop options.
* @param conf the configuration to modify
* @param options options built by the caller
* @param args User-specified arguments
* @throws IOException
*/
public GenericOptionsParser(Configuration conf,
Options options, String[] args) throws IOException {
parseGeneralOptions(options, conf, args);
this.conf = conf;
}
private void parseGeneralOptions(Options opts, Configuration conf,
String[] args) throws IOException {
opts = buildGeneralOptions(opts);
CommandLineParser parser = new GnuParser();
try {
commandLine = parser.parse(opts, preProcessForWindows(args), true);
processGeneralOptions(conf, commandLine);
} catch(ParseException e) {
LOG.warn("options parsing failed: "+e.getMessage());
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("general options are: ", opts);
}
}
//添加fs jt conf D libjars files archives配置项
private static Options buildGeneralOptions(Options opts) {
Option fs = OptionBuilder.withArgName("local|namenode:port")
.hasArg()
.withDescription("specify a namenode")
.create("fs");
Option jt = OptionBuilder.withArgName("local|resourcemanager:port")
.hasArg()
.withDescription("specify a ResourceManager")
.create("jt");
Option oconf = OptionBuilder.withArgName("configuration file")
.hasArg()
.withDescription("specify an application configuration file")
.create("conf");
Option property = OptionBuilder.withArgName("property=value")
.hasArg()
.withDescription("use value for given property")
.create('D');
Option libjars = OptionBuilder.withArgName("paths")
.hasArg()
.withDescription("comma separated jar files to include in the classpath.")
.create("libjars");
Option files = OptionBuilder.withArgName("paths")
.hasArg()
.withDescription("comma separated files to be copied to the " +
"map reduce cluster")
.create("files");
Option archives = OptionBuilder.withArgName("paths")
.hasArg()
.withDescription("comma separated archives to be unarchived" +
" on the compute machines.")
.create("archives");
// file with security tokens
Option tokensFile = OptionBuilder.withArgName("tokensFile")
.hasArg()
.withDescription("name of the file with the tokens")
.create("tokenCacheFile");
opts.addOption(fs);
..
return opts;
}
//解析配置参数
private void processGeneralOptions(Configuration conf,
CommandLine line) throws IOException {
if (line.hasOption("fs")) { //设置fs.defaultFS
FileSystem.setDefaultUri(conf, line.getOptionValue("fs"));
}
if (line.hasOption("jt")) { //设置jobtracker
String optionValue = line.getOptionValue("jt");
if (optionValue.equalsIgnoreCase("local")) {
conf.set("mapreduce.framework.name", optionValue);
}
conf.set("yarn.resourcemanager.address", optionValue,
"from -jt command line option");
}
if (line.hasOption("conf")) { //新增配置资源
String[] values = line.getOptionValues("conf");
for(String value : values) {
conf.addResource(new Path(value));
}
}
if (line.hasOption('D')) { //设置配置项
String[] property = line.getOptionValues('D');
for(String prop : property) {
String[] keyval = prop.split("=", 2);
if (keyval.length == 2) {
conf.set(keyval[0], keyval[1], "from command line");
}
}
}
if (line.hasOption("libjars")) {
conf.set("tmpjars",
validateFiles(line.getOptionValue("libjars"), conf),
"from -libjars command line option");
//setting libjars in client classpath
URL[] libjars = getLibJars(conf);
if(libjars!=null && libjars.length>0) {
conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));
Thread.currentThread().setContextClassLoader(
new URLClassLoader(libjars,
Thread.currentThread().getContextClassLoader()));
}
}
if (line.hasOption("files")) {
conf.set("tmpfiles",
validateFiles(line.getOptionValue("files"), conf),
"from -files command line option");
}
if (line.hasOption("archives")) {
conf.set("tmparchives",
validateFiles(line.getOptionValue("archives"), conf),
"from -archives command line option");
}
conf.setBoolean("mapreduce.client.genericoptionsparser.used", true);
// tokensFile
if(line.hasOption("tokenCacheFile")) {
String fileName = line.getOptionValue("tokenCacheFile");
// check if the local file exists
FileSystem localFs = FileSystem.getLocal(conf);
Path p = localFs.makeQualified(new Path(fileName));
if (!localFs.exists(p)) {
throw new FileNotFoundException("File "+fileName+" does not exist.");
}
if(LOG.isDebugEnabled()) {
LOG.debug("setting conf tokensFile: " + fileName);
}
UserGroupInformation.getCurrentUser().addCredentials(
Credentials.readTokenStorageFile(p, conf));
conf.set("mapreduce.job.credentials.binary", p.toString(),
"from -tokenCacheFile command line option");
}
}
可以通过new GenericOptionsParser(conf, args);
将入参设置至conf,也可以ToolRunner.run(new XxxDriver(), args);
将入参设置至conf,XxxDriver
继承Configured
实现Tool
接口
public static int run(Tool tool, String[] args)
throws Exception{
return run(tool.getConf(), tool, args);
}
public static int run(Configuration conf, Tool tool, String[] args)
throws Exception{
if(conf == null) {
conf = new Configuration();
}
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
//set the configuration back, so that Tool can configure itself
tool.setConf(conf);
//get the args w/o generic hadoop args
String[] toolArgs = parser.getRemainingArgs();
return tool.run(toolArgs);
}