入口
hdfs脚本中,关于dfs的操作,统一走了org.apache.hadoop.fs.FsShell这个类。这个代码在脚本第153行
elif [ "$COMMAND" = "dfs" ] ; then
CLASS=org.apache.hadoop.fs.FsShell
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"d
这个类中的main方法,只有简单的几行
/**
* main() has some simple utility methods
* @param argv the command and its arguments
* @throws Exception upon error
*/
public static void main(String argv[]) throws Exception {
FsShell shell = newShellInstance();
Configuration conf = new Configuration();
conf.setQuietMode(false);
shell.setConf(conf);
int res;
try {
res = ToolRunner.run(shell, argv);
} finally {
shell.close();
}
System.exit(res);
}
ToolRunner是一个公用的执行器工具,想要使用这个工具来执行任务,对应的任务执行逻辑类需要实现Tool接口以及这个接口里的run(String[] args)方法
public class ToolRunner {
/**
* Runs the given <code>Tool</code> by {@link Tool#run(String[])}, after
* parsing with the given generic arguments. Uses the given
* <code>Configuration</code>, or builds one if null.
*
* Sets the <code>Tool</code>'s configuration with the possibly modified
* version of the <code>conf</code>.
*
* @param conf <code>Configuration</code> for the <code>Tool</code>.
* @param tool <code>Tool</code> to run.
* @param args command-line arguments to the tool.
* @return exit code of the {@link Tool#run(String[])} method.
*/
public static int run(Configuration conf, Tool tool, String[] args)
throws Exception{
if(conf == null) {
conf = new Configuration();
}
GenericOptionsParser parser = new GenericOptionsParser(conf, args);
//set the configuration back, so that Tool can configure itself
tool.setConf(conf);
//get the args w/o generic hadoop args
String[] toolArgs = parser.getRemainingArgs();
return tool.run(toolArgs);
}
/**
* Runs the <code>Tool</code> with its <code>Configuration</code>.
*
* Equivalent to <code>run(tool.getConf(), tool, args)</code>.
*
* @param tool <code>Tool</code> to run.
* @param args command-line arguments to the tool.
* @return exit code of the {@link Tool#run(String[])} method.
*/
public static int run(Tool tool, String[] args)
throws Exception{
return run(tool.getConf(), tool, args);
}
我们从Fs的main方法里看到,执行的时候Tool接口的实现类传入的是FsShell对象,所以回去看FsShell类的run方法。
/**
* run
*/
@Override
public int run(String argv[]) throws Exception {
// initialize FsShell
init();
int exitCode = -1;
if (argv.length < 1) {
printUsage(System.err);
} else {
String cmd = argv[0];
Command instance = null;
try {
/**
* 这行代码是获取对应的执行逻辑类,HDFS把所有的命令都使用Command接口做了封装
*/
instance = commandFactory.getInstance(cmd);
if (instance == null) {
throw new UnknownCommandException();
}
exitCode = instance.run(Arrays.copyOfRange(argv, 1, argv.length));
} catch (IllegalArgumentException e) {
displayError(cmd, e.getLocalizedMessage());
if (instance != null) {
printInstanceUsage(System.err, instance);
}
} catch (Exception e) {
// instance.run catches IOE, so something is REALLY wrong if here
LOG.debug("Error", e);
displayError(cmd, "Fatal internal error");
e.printStackTrace(System.err);
}
}
return exitCode;
}
通过IDEA的工具我们可以发现,Command内部定义了一些抽象方法,Command的run方法按照规定的顺序调用抽象方法,Command的子类是FsCommand和DFSAdminCommand,每个实现类下面的都有若干个子类,这是典型的模板方法模式。总体的类结构如下图所示
有了这个结构,我们查看具体的dfs每个操作的时候,就可以查看对应的具体实现类就可以了。
远程执行过程
Command类是在本地机器上的Java进程里执行的,dfs操作是在HDFS集群上执行。hadoop处理多个进程间的通信使用了protobuf框架,HDFS的客户端和服务端使用的协议是org.apache.hadoop.hdfs.protocol.ClientProtocol。代码里客户端的DFSClient类对这个协议的代理类做了封装,服务端NameNodeRPCServer对这个协议做了实现。执行具体命令的时候,command实现类通过DFSClient实例与NameNodeRPCServer做通信。具体的执行步骤是在NameNode内部完成的。