对配置文件的配置及解析是每个框架的基本且必不可少的部分,本文主要对Hadoop中的配置文件的解析类Configuration的基本结构及主要方法进行介绍。
Hadoop的配置文件的操作主要分为三个部分:配置的加载、属性读取和设置,本文将分别对其进行介绍。如下是Configuration的主要的成员属性:
/**
* 保存了所有的资源配置的来源信息,资源文件主要有以下几种形式: URL、String、Path、InputStream和Properties。
*/
private ArrayList<Resource> resources = new ArrayList<Resource>();
/**
* 记录了配置文件中配置的final类型的属性名称,标记为final之后如果另外有同名的属性,那么该属性将不会被替换
*/
private Set<String> finalParameters = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
/**
* 记录了配置初始化后更新过的属性,其键为更新的属性名,值为更新操作的来源
*/
private Map<String, String[]> updatingResource;
/**
* 记录了所有的属性,包括系统初始化以及后续设置的属性
*/
private Properties properties;
/**
* 记录了除了初始化之后手动调用set方法设置的属性
*/
private Properties overlay;
/**
* 记录了过期的属性
*/
private static AtomicReference<DeprecationContext> deprecationContext = new AtomicReference<DeprecationContext>(new DeprecationContext(null, defaultDeprecations));
1.配置的加载
Configuration中的配置文件的信息主要保存在resources属性中,该属性是一个ArrayList<Resource>类型,如下是Resource的结构:
private static class Resource {
private final Object resource;
private final String name;
// 构造方法,get和set方法略
}
这里的name表示资源文件的名称,resource则表示具体的资源,其用一个Object类型,但具体的类型如下:
- URL: 通过一个URL链接来进行读取;
- String: 从当前classpath下该字符串所指定的文件读取;
- Path: 以绝对路径指定的配置文件或者是使用url指定的配置;
- InputStream: 以流的形式指定的配置文件;
- Properties: 以属性配置类保存的配置信息。
除了可以通过上述方式进行配置的设置以外,Hadoop还设置了几个默认的配置文件:core-default.xml、core-site.xml和hadoop-site.xml,具体的读取代码如下:
static {
// Add default resources
addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");
// print deprecation warning if hadoop-site.xml is found in classpath
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
if (cL.getResource("hadoop-site.xml") != null) {
LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
"Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
+ "mapred-site.xml and hdfs-site.xml to override properties of " +
"core-default.xml, mapred-default.xml and hdfs-default.xml " +
"respectively");
addDefaultResource("hadoop-site.xml");
}
}
这里需要注意的是,Hadoop对配置文件的读取并不是在Configuration类初始化时进行的,而是在获取配置信息时再进行读取,再此之前,配置相关的信息都保存在resources属性中。如下是addDefaultResource(String)方法的代码:
public static synchronized void addDefaultResource(String name) {
if(!defaultResources.contains(name)) {
defaultResources.add(name);
for(Configuration conf : REGISTRY.keySet()) {
if(conf.loadDefaults) {
conf.reloadConfiguration();
}
}
}
}
从上述代码可以看出,在添加配置文件时,其先检查默认配置文件中是否有该配置文件,如果不存在,则将其添加到defaultResources列表中,并且其会对Configuration配置项进行检查,如果其配置了加载默认配置文件,那么其就会对该配置中保存配置信息的properties和finalParameters进行清空,以此触发属性的重新加载。
在初始化时将相关配置信息添加到resources和defaultResources列表之后,配置文件中属性的读取是在客户端具体调用get*方法时进行的,以下是初始化配置信息的代码:
protected synchronized Properties getProps() {
if (properties == null) {
properties = new Properties();
Map<String, String[]> backup =
new ConcurrentHashMap<String, String[]>(updatingResource);
loadResources(properties, resources, quietmode);
if (overlay != null) {
properties.putAll(overlay);
for (Map.Entry<Object,Object> item: overlay.entrySet()) {
String key = (String)item.getKey();
String[] source = backup.get(key);
if(source != null) {
updatingResource.put(key, source);
}
}
}
}
return properties;
}
上述代码中,首先判断properties是否为空,为空则进行初始化,否则直接返回,这也就是前面为什么将properties和finalParameters进行清空之后能够触发属性的重新加载的原因。在初始化配置信息的时候首先对更新的资源过的资源进行备份,然后调用loadResources()方法加载配置文件的信息,加载完之后还会将用户调用set*方法设置的属性(保存在overlay中)添加到properties中,并且将用户设置的信息添加到updatingResource中。如下是loadResources()方法的具体代码:
private void loadResources(Properties properties, ArrayList<Resource> resources, boolean quiet) {
if(loadDefaults) {
for (String resource : defaultResources) {
loadResource(properties, new Resource(resource), quiet);
}
}
for (int i = 0; i < resources.size(); i++) {
Resource ret = loadResource(properties, resources.get(i), quiet);
if (ret != null) {
resources.set(i, ret);
}
}
}
loadResources是对所有的配置文件信息进行加载的方法,其主要包括两个部分:加载默认配置文件和加载资源文件。在加载配置文件时除了Properties和Resource属性之外,还有一个boolean类型的属性quiet,该属性表示是否以“安静”模式加载配置文件,即加载时是否打印加载日志。如下是通过loadResource()加载配置文件的具体代码:
private Resource loadResource(Properties properties,
Resource wrapper, boolean quiet) {
String name = UNKNOWN_RESOURCE;
try {
Object resource = wrapper.getResource();
name = wrapper.getName();
XMLStreamReader2 reader = null;
boolean returnCachedProperties = false;
if (resource instanceof URL) { // an URL resource
reader = (XMLStreamReader2)parse((URL)resource);
} else if (resource instanceof String) { // a CLASSPATH resource
URL url = getResource((String)resource);
reader = (XMLStreamReader2)parse(url);
} else if (resource instanceof Path) { // a file resource
// Can't use FileSystem API or we get an infinite loop
// since FileSystem uses Configuration API. Use java.io.File instead.
File file = new File(((Path)resource).toUri().getPath())
.getAbsoluteFile();
if (file.exists()) {
if (!quiet) {
LOG.debug("parsing File " + file);
}
reader = (XMLStreamReader2)parse(new BufferedInputStream(
new FileInputStream(file)), ((Path)resource).toString());
}
} else if (resource instanceof InputStream) {
reader = (XMLStreamReader2)parse((InputStream)resource, null);
returnCachedProperties = true;
} else if (resource instanceof Properties) {
overlay(properties, (Properties)resource);
}
if (reader == null) {
if (quiet) {
return null;
}
throw new RuntimeException(resource + " not found");
}
Properties toAddTo = properties;
if(returnCachedProperties) {
toAddTo = new Properties();
}
DeprecationContext deprecations = deprecationContext.get();
StringBuilder token = new StringBuilder();
String confName = null;
String confValue = null;
String confInclude = null;
boolean confFinal = false;
boolean fallbackAllowed = false;
boolean fallbackEntered = false;
boolean parseToken = false;
LinkedList<String> confSource = new LinkedList<String>();
while (reader.hasNext()) {
switch (reader.next()) {
case XMLStreamConstants.START_ELEMENT:
switch (reader.getLocalName()) {
case "property":
confName = null;
confValue = null;
confFinal = false;
confSource.clear();
// First test for short format configuration
int attrCount = reader.getAttributeCount();
for (int i = 0; i < attrCount; i++) {
String propertyAttr = reader.getAttributeLocalName(i);
if ("name".equals(propertyAttr)) {
confName = StringInterner.weakIntern(
reader.getAttributeValue(i));
} else if ("value".equals(propertyAttr)) {
confValue = StringInterner.weakIntern(
reader.getAttributeValue(i));
} else if ("final".equals(propertyAttr)) {
confFinal = "true".equals(reader.getAttributeValue(i));
} else if ("source".equals(propertyAttr)) {
confSource.add(StringInterner.weakIntern(
reader.getAttributeValue(i)));
}
}
break;
case "name":
case "value":
case "final":
case "source":
parseToken = true;
token.setLength(0);
break;
case "include":
// Determine href for xi:include
confInclude = null;
attrCount = reader.getAttributeCount();
for (int i = 0; i < attrCount; i++) {
String attrName = reader.getAttributeLocalName(i);
if ("href".equals(attrName)) {
confInclude = reader.getAttributeValue(i);
}
}
if (confInclude == null) {
break;
}
// Determine if the included resource is a classpath resource
// otherwise fallback to a file resource
// xi:include are treated as inline and retain current source
URL include = getResource(confInclude);
if (include != null) {
Resource classpathResource = new Resource(include, name);
loadResource(properties, classpathResource, quiet);
} else {
URL url;
try {
url = new URL(confInclude);
url.openConnection().connect();
} catch (IOException ioe) {
File href = new File(confInclude);
if (!href.isAbsolute()) {
// Included resources are relative to the current resource
File baseFile = new File(name).getParentFile();
href = new File(baseFile, href.getPath());
}
if (!href.exists()) {
// Resource errors are non-fatal iff there is 1 xi:fallback
fallbackAllowed = true;
break;
}
url = href.toURI().toURL();
}
Resource uriResource = new Resource(url, name);
loadResource(properties, uriResource, quiet);
}
break;
case "fallback":
fallbackEntered = true;
break;
case "configuration":
break;
default:
break;
}
break;
case XMLStreamConstants.CHARACTERS:
if (parseToken) {
char[] text = reader.getTextCharacters();
token.append(text, reader.getTextStart(), reader.getTextLength());
}
break;
case XMLStreamConstants.END_ELEMENT:
switch (reader.getLocalName()) {
case "name":
if (token.length() > 0) {
confName = StringInterner.weakIntern(token.toString().trim());
}
break;
case "value":
if (token.length() > 0) {
confValue = StringInterner.weakIntern(token.toString());
}
break;
case "final":
confFinal = "true".equals(token.toString());
break;
case "source":
confSource.add(StringInterner.weakIntern(token.toString()));
break;
case "include":
if (fallbackAllowed && !fallbackEntered) {
throw new IOException("Fetch fail on include for '"
+ confInclude + "' with no fallback while loading '"
+ name + "'");
}
fallbackAllowed = false;
fallbackEntered = false;
break;
case "property":
if (confName == null || (!fallbackAllowed && fallbackEntered)) {
break;
}
confSource.add(name);
DeprecatedKeyInfo keyInfo =
deprecations.getDeprecatedKeyMap().get(confName);
if (keyInfo != null) {
keyInfo.clearAccessed();
for (String key : keyInfo.newKeys) {
// update new keys with deprecated key's value
loadProperty(toAddTo, name, key, confValue, confFinal,
confSource.toArray(new String[confSource.size()]));
}
} else {
loadProperty(toAddTo, name, confName, confValue, confFinal,
confSource.toArray(new String[confSource.size()]));
}
break;
default:
break;
}
default:
break;
}
}
reader.close();
if (returnCachedProperties) {
overlay(properties, toAddTo);
return new Resource(toAddTo, name);
}
return null;
} catch (IOException e) {
LOG.fatal("error parsing conf " + name, e);
throw new RuntimeException(e);
} catch (XMLStreamException e) {
LOG.fatal("error parsing conf " + name, e);
throw new RuntimeException(e);
}
}
从代码中可以看出,在加载每个配置文件的时候,首先通过instanceof判断wrapper对象中Object类型的属性resource是什么类型,然后根据具体不同的类型将其转换为一个XMLStreamReader2类型的reader。对于转换之后的reader,其会依次读取xml配置文件中的具体标签信息,如:name、value、final、source等等,并且将读取后的信息保存在properties中,这里需要注意的是,在代码中的case "include"处可以看出,配置文件中如果使用了<include></include>标签引入其他的配置文件,那么其会递归的调用loadResource()方法对其进行读取。
在加载完配置
2.属性的获取
Configuration类主要是通过get方法获取相关属性的,其get方法的种类有三十多个,如:get(String)、get(String, String)、getBoolean(String, Boolean)、getClass(String, Class<?>)等等,但是最终调用的还是get(String, String)方法,该方法的作用是获取一个某个名称对应的属性值,具体代码如下:
public String get(String name, String defaultValue) {
// 处理过期的属性
String[] names = handleDeprecation(deprecationContext.get(), name);
String result = null;
// 对属性值中形如${foo.bar}引入的其他属性进行替换
for(String n : names) {
result = substituteVars(getProps().getProperty(n, defaultValue));
}
return result;
}
通过上述代码可以看出,get(String, String)方法主要做了两件事:处理过期键和对属性值进行标签替换处理。根据前面的介绍我们知道deprecationContext是一个AtomicReference<DeprecationContext>类型的变量,其保存有过期键的相关信息,并且这里使用AtomicReference包装,也就是说其获取和更新都是原子化的。这里我们首先讲解一下DeprecationContext具体实现方式,如下是该类的代码:
private static class DeprecationContext {
/**
* 保存有过期键信息,其值为主要是该过期键更新后的键的信息
*/
private final Map<String, DeprecatedKeyInfo> deprecatedKeyMap;
/**
* 对于DeprecatedKeyInfo,其有一个String[] newKeys属性,即更新后的键信息,这里reverseDeprecatedKeyMap的
* 键为newKeys中的各个字符串,而值则对应于newKeys所在的DeprecatedKeyInfo对象在deprecatedKeyMap所属的键
*/
private final Map<String, String> reverseDeprecatedKeyMap;
/**
* 根据另一个DeprecationContext对象实例化一个DeprecationContext对象,这里DeprecationDelta指的是除了DeprecationContext对象以外新增加的属性
*/
@SuppressWarnings("unchecked")
DeprecationContext(DeprecationContext other, DeprecationDelta[] deltas) {
HashMap<String, DeprecatedKeyInfo> newDeprecatedKeyMap = new HashMap<String, DeprecatedKeyInfo>();
HashMap<String, String> newReverseDeprecatedKeyMap = new HashMap<String, String>();
if (other != null) {
for (Entry<String, DeprecatedKeyInfo> entry : other.deprecatedKeyMap.entrySet()) {
newDeprecatedKeyMap.put(entry.getKey(), entry.getValue());
}
for (Entry<String, String> entry : other.reverseDeprecatedKeyMap.entrySet()) {
newReverseDeprecatedKeyMap.put(entry.getKey(), entry.getValue());
}
}
for (DeprecationDelta delta : deltas) {
if (!newDeprecatedKeyMap.containsKey(delta.getKey())) {
DeprecatedKeyInfo newKeyInfo = new DeprecatedKeyInfo(delta.getNewKeys(), delta.getCustomMessage());
newDeprecatedKeyMap.put(delta.key, newKeyInfo);
// 从这里可以看出,reverseDeprecatedKeyMap中的键为更新之后的过期键信息,而值为最初的过期键
for (String newKey : delta.getNewKeys()) {
newReverseDeprecatedKeyMap.put(newKey, delta.key);
}
}
}
this.deprecatedKeyMap = UnmodifiableMap.decorate(newDeprecatedKeyMap);
this.reverseDeprecatedKeyMap = UnmodifiableMap.decorate(newReverseDeprecatedKeyMap);
}
Map<String, DeprecatedKeyInfo> getDeprecatedKeyMap() {
return deprecatedKeyMap;
}
Map<String, String> getReverseDeprecatedKeyMap() {
return reverseDeprecatedKeyMap;
}
}
DeprecationContext中主要有两个属性:deprecatedKeyMap和reverseDeprecatedKeyMap。对于deprecatedKeyMap,其键是过期的键,而值则主要保存该键被替换之后的新键的信息;对于reverseDeprecatedKeyMap,其键为某个过期键更新之后的键,而值则了被更新的过期键。
在介绍了DeprecationContext的具体结构之后,我们继续来看get(String, String)方法中handleDeprecation()方法的具体处理方式,以下是该方法的具体代码:
private String[] handleDeprecation(DeprecationContext deprecations, String name) {
if (null != name) {
name = name.trim();
}
// 默认使用目标name作为返回值
String[] names = new String[]{name};
// 查询当前name所对应的过期键信息,并且获取其更新后的信息
DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name);
if (keyInfo != null) {
if (!keyInfo.getAndSetAccessed()) {
logDeprecation(keyInfo.getWarningMessage(name));
}
// 将当前过期键更新后的名称返回
names = keyInfo.newKeys;
}
// 如果没用通过set*方法设置的属性,那么直接返回
Properties overlayProperties = getOverlay();
if (overlayProperties.isEmpty()) {
return names;
}
// 查找当前name在DeprecatedKeyInfo中对应的最新的keys,如果某个key的属性在overlay(用户设置的key-value)中存在,那么就将该key对应的
// 值更新该key的值为所更新的过期键的值
for (String n : names) {
String deprecatedKey = deprecations.getReverseDeprecatedKeyMap().get(n); // 查找当前name是否在更新后的key中存在
if (deprecatedKey != null && overlayProperties.containsKey(n)) { // 当前name在DeprecatedKeyInfo更新后的名字中存在,且在overlayProperties中也存在当前name的键
String deprecatedValue = overlayProperties.getProperty(deprecatedKey);
if (deprecatedValue != null) {
getProps().setProperty(n, deprecatedValue); // 将DeprecatedKeyInfo中新的键与旧的键所对应的值关联起来,保存在properties和overlay中
overlayProperties.setProperty(n, deprecatedValue);
}
}
}
return names;
}
总结来说,在handleDeprecation()方法中,其会查询当前的name是否是过期键,如果不是,则将当前name组装为一个数组返回,如果是,则通过该过期键获取其所对应的更新之后的键,并将其作为返回值。除此之外,其还会在调用者设置的属性(overlayProperties)中查询其是否保存有当前过期键所更新的键名的信息,如果有,则将其更新为该过期键所对应的值。
接下来我们回头看看get(String, String)方法,在通过handleDeprecation()方法获取到该name的相关信息之后,这里主要调用了substituteVars()方法,该方法的主要作用为对获取到的属性值中的占位符如${foo.bar}进行替换,这里getProps()方法即为前面所讲解的通过配置文件初始化相关属性的方法。以下是substituteVars()的具体实现:
private String substituteVars(String expr) {
if (expr == null) {
return null;
}
String eval = expr;
for(int s = 0; s < MAX_SUBST; s++) {
// 返回值为长度为2的数组,该方法获取属性值中最内层占位符的起始和终止索引,如${prefix${foo.bar}suffix}将返回foo.bar的起始和终止索引
final int[] varBounds = findSubVariable(eval);
if (varBounds[SUB_START_IDX] == -1) {
return eval;
}
final String var = eval.substring(varBounds[SUB_START_IDX], varBounds[SUB_END_IDX]);
String val = null;
try {
// 判断是否为系统属性,系统属性以env.开头
if (var.startsWith("env.") && 4 < var.length()) {
String v = var.substring(4);
int i = 0;
for (; i < v.length(); i++) {
char c = v.charAt(i);
if (c == ':' && i < v.length() - 1 && v.charAt(i + 1) == '-') {
val = getenv(v.substring(0, i));
if (val == null || val.length() == 0) {
val = v.substring(i + 2);
}
break;
} else if (c == '-') {
val = getenv(v.substring(0, i));
if (val == null) {
val = v.substring(i + 1);
}
break;
}
}
if (i == v.length()) {
// i == v.length()说明为系统属性,那么返回值为系统属性
val = getenv(v);
}
} else {
// 如果不为系统属性,则从当前上下文环境中获取该属性
val = getProperty(var);
}
} catch(SecurityException se) {
LOG.warn("Unexpected SecurityException in Configuration", se);
}
if (val == null) {
// 如果既不是系统属性也不是环境属性,则在配置文件或者是用户设置的属性中查找
val = getRaw(var);
}
if (val == null) {
// val为null,说明该属性没有定义,则直接返回原始值
return eval;
}
final int dollar = varBounds[SUB_START_IDX] - "${".length();
final int afterRightBrace = varBounds[SUB_END_IDX] + "}".length(); // 因为这里SUB_END_INDEX实际上指向的就是"{"
final String refVar = eval.substring(dollar, afterRightBrace);
// 这里refVar即为要替换的占位符,如${foo.bar},走到这一步说明foo.bar有对应的属性值,即val,这里判断val中是否继续包含
// 有${foo.bar},如果有,则说明发生了嵌套占位,这种情况直接返回原始字符串,否则会发生无限循环
if (val.contains(refVar)) {
return expr;
}
eval = eval.substring(0, dollar) + val + eval.substring(afterRightBrace);
}
throw new IllegalStateException("Variable substitution depth too large: " + MAX_SUBST + " " + expr);
}
总结而言,substituteVars()方法主要对属性值中的占位符进行替换,对于属性值的获取,其可以通过三个途径进行:系统变量、当前环境变量和配置文件及用户设置的属性值,并且其优先级是:系统变量 > 当前环境变量 > 配置文件及用户设置的属性值。除此之外,该方法还会处理嵌套占位符,如${prefix${foo.bar}suffix},其是由内而外进行解析,直到不存在占位符,或者是进行解析的层数超过了20层(最外层的for循环控制,这里MAX_SUBST为20)。
3.属性设置
相对而言,属性的设置要简单一些,和get方法类似,虽然set方法也有很多,但其最终还是调用的set(String, String, String)方法,其第一个和第二个参数为要设置的键值对,第三个参数则为当前属性值的来源。如下是该方法的具体实现:
public void set(String name, String value, String source) {
Preconditions.checkArgument(name != null, "Property name must not be null");
Preconditions.checkArgument(value != null, "The value of property %s must not be null", name);
name = name.trim();
DeprecationContext deprecations = deprecationContext.get();
if (deprecations.getDeprecatedKeyMap().isEmpty()) {
// 初始化配置文件信息
getProps();
}
getOverlay().setProperty(name, value);
getProps().setProperty(name, value);
String newSource = (source == null ? "programmatically" : source);
if (!isDeprecated(name)) {
// 这里该name不是过期键分为两种情况,一种是在存储过期键的map(即deprecatedKeyMap)中没有相应数据,
// 而在更新的map(即reverseDeprecatedKeyMap)中有数据,第二种是在这两个map中都没有数据
updatingResource.put(name, new String[] {newSource});
// 判断该键是否为更新某一个过期键之后的键,如果是,则获取所有更新了该过期键的键
String[] altNames = getAlternativeNames(name);
if(altNames != null) {
// 更新所有该键所对应的过期键更新之后的键值
for(String n: altNames) {
if(!n.equals(name)) {
getOverlay().setProperty(n, value);
getProps().setProperty(n, value);
updatingResource.put(n, new String[] {newSource});
}
}
}
} else {
// 如果该键为过期键,则将该过期键更新之后的键的值都设置为新的值
String[] names = handleDeprecation(deprecationContext.get(), name);
String altSource = "because " + name + " is deprecated";
for(String n : names) {
getOverlay().setProperty(n, value);
getProps().setProperty(n, value);
updatingResource.put(n, new String[] {altSource});
}
}
}
对于set(String, String, String)方法,其不仅更新了当前键值对的属性,而且还判断了该键是否为过期键或者是更新过期键之后的键,如果是则将更新之后的键所对应的值都设置为新值。