hadoop-distcp源码篇

0.背景介绍

~~~~DistCp(Distributed Copy)是Apache Hadoop自带的工具,目前存在两个版本,DistCp1和DistCp2。它是用于大规模集群内部或者集群之间的高性能拷贝工具。 它使用Map/Reduce实现文件分发,错误处理和恢复,以及报告生成。 它把文件和目录的列表作为map任务的输入,每个任务会完成源列表中部分文件的拷贝。
~~~~DistCp第一版使用了MapReduce并发拷贝数据,它将整个数据拷贝过程转化为一个map-only Job以加快拷贝速度。由于DistCp本质上是一个MapReduce作业,它需要保证文件中各个block的有序性,因此它的最小数据切分粒度是文件,也就是说,一个文件不能被切分成不同部分让多个任务并行拷贝,最小只能做到一个文件交给一个任务。

1.使用样例

1.1参数介绍

标识 描述 备注
-i 忽略失败 这个选项会比默认情况提供关于拷贝的更精确的统计, 同时它还将保留失败拷贝操作的日志,这些日志信息可以用于调试。最后,如果一个map失败了,但并没完成所有分块任务的尝试,这不会导致整个作业的失败。
-log <logdir> 记录日志到 <logdir> DistCp为每个文件的每次尝试拷贝操作都记录日志,并把日志作为map的输出。 如果一个map失败了,当重新执行时这个日志不会被保留。
-m <num_maps> 同时拷贝的最大数目 指定了拷贝数据时map的数目,默认是20。请注意并不是map数越多吞吐量越大。
-overwrite 覆盖目标 如果一个map失败并且没有使用-i选项,不仅仅那些拷贝失败的文件,这个分块任务中的所有文件都会被重新拷贝。
-update 如果源和目标的大小不一样则进行覆盖 像之前提到的,这不是"同步"操作。 执行覆盖的唯一标准是源文件和目标文件大小是否相同;如果不同,则源文件替换目标文件。
-strategy {dynamic/uniformsize} 选择要在DistCp中使用的复制策略。 默认情况下,使用uniformsize(每个map分配均衡的文件数量)。如果指定了“dynamic”,则使用DynamicInputFormat(动态进行map承担文件数量的分配,速度快的map分配的文件数量更多)。
... ... ...

1.2文件复制

hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo

将nn1上面的foo/bar文件复制导nn2的bar/foo路径下面。

1.3文件夹复制

hadoop distcp hdfs://nn1:8020/source/first hdfs://nn1:8020/source/second hdfs://nn2:8020/target

将nn1上面文件夹source/first和source/second下面的文件复制到nn2的target文件夹下面。

2.源码分析

2.1源码宏观流程

distcp宏观流程图.jpg

2.2源码细节分析

2.2.1细节主要的调用图

distcp.jpg

2.2.2步骤分析

  • shell入口
    进入hadoop文件
......
elif [ "$COMMAND" = "distcp" ] ; then
      CLASS=org.apache.hadoop.tools.DistCp
      CLASSPATH=${CLASSPATH}:${TOOL_PATH}
......
# Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"

HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}"

export CLASSPATH=$CLASSPATH
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
;;
......
  • distcp的main方法
    Distcp是一个Tool、ToolRunner应用,Tool应用要求实现run方法。进入main方法主要有构造器构造distcp类;建立Cleanup,并加入ShutdownHookManager;最后运行ToolRunner.run执行复制功能。
public class DistCp extends Configured implements Tool {
    ......
    public int run(String[] argv) {
        ......
    }
    ......
    ......
    public static void main(String argv[]) {
        System.out.println("test");
        int exitCode;
        try {
          DistCp distCp = new DistCp();
          Cleanup CLEANUP = new Cleanup(distCp);
    
          ShutdownHookManager.get().addShutdownHook(CLEANUP,
            SHUTDOWN_HOOK_PRIORITY);
          exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);
        }
        catch (Exception e) {
          LOG.error("Couldn't complete DistCp operation: ", e);
          exitCode = DistCpConstants.UNKNOWN_ERROR;
        }
        System.exit(exitCode);
      }
    ......
}
  • run方法
    OptionsParser类是distcp单独实现的参数解析工具类。将输入参数解析成DistCpOptions inputOptions类型。如常见的参数overwrite = false等等。程序中相关参数都有特定的默认值,比如map数量的默认值是20,分配策略类的默认方式是均衡分配等等。
    setTargetPathExists():从参数中解析出目标路径。
    execute():核心执行方法。
@Override
  public int run(String[] argv) {
    if (argv.length < 1) {
      OptionsParser.usage();
      return DistCpConstants.INVALID_ARGUMENT;
    }
    
    try {
      inputOptions = (OptionsParser.parse(argv));
      setTargetPathExists();
      LOG.info("Input Options: " + inputOptions);
    } catch (Throwable e) {
      LOG.error("Invalid arguments: ", e);
      System.err.println("Invalid arguments: " + e.getMessage());
      OptionsParser.usage();      
      return DistCpConstants.INVALID_ARGUMENT;
    }
    
    try {
      execute();
    } catch (InvalidInputException e) {
      LOG.error("Invalid input: ", e);
      return DistCpConstants.INVALID_ARGUMENT;
    } catch (DuplicateFileException e) {
      LOG.error("Duplicate files in input path: ", e);
      return DistCpConstants.DUPLICATE_INPUT;
    } catch (AclsNotSupportedException e) {
      LOG.error("ACLs not supported on at least one file system: ", e);
      return DistCpConstants.ACLS_NOT_SUPPORTED;
    } catch (XAttrsNotSupportedException e) {
      LOG.error("XAttrs not supported on at least one file system: ", e);
      return DistCpConstants.XATTRS_NOT_SUPPORTED;
    } catch (Exception e) {
      LOG.error("Exception encountered ", e);
      return DistCpConstants.UNKNOWN_ERROR;
    }
    return DistCpConstants.SUCCESS;
  }
  • execute()
    在execute()方法中,会调用createAndSubmitJob()创建MR任务,准备数据,设定数据输入格式,并把任务提交到hadoop集群运行,最后等待任务执行完毕。于是我们可以看到,主体功能实现就在createAndSubmitJob()这个函数体里,工程中其它的各个类无非都是为这个函数接口服务的。
public Job execute() throws Exception {
    Job job = createAndSubmitJob();

    if (inputOptions.shouldBlock()) {
      waitForJobCompletion(job);
    }
    return job;
  }
public Job createAndSubmitJob() throws Exception {
    assert inputOptions != null;
    assert getConf() != null;
    Job job = null;
    try {
      synchronized(this) {
        //Don't cleanup while we are setting up.
        metaFolder = createMetaFolderPath();
        jobFS = metaFolder.getFileSystem(getConf());
        job = createJob();
      }
      if (inputOptions.shouldUseDiff()) {
        if (!DistCpSync.sync(inputOptions, getConf())) {
          inputOptions.disableUsingDiff();
        }
      }
      createInputFileListing(job);

      job.submit();
      submitted = true;
    } finally {
      if (!submitted) {
        cleanup();
      }
    }

    String jobID = job.getJobID().toString();
    job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
    LOG.info("DistCp job-id: " + jobID);

    return job;
  }
  • metaFolder
    一个Path类型。其存放着distcp工具需要的元数据信息,也就是所有需要拷贝的源目录/文件信息列表。这些数据在一个fileList.seq文件中以Key/Value结构进行保存,其中Key是源文件的Text格式的相对路径,而Value则记录源文件的FileStatus格式的org.apache.hadoop.fs.FileStatus信息,这里FileStatus是hadoop已经封装好了的描述HDFS文件信息的类,metafolder目录中的fileList.seq最终会作为参数传递给MR任务中的Mapper。
  private Path createMetaFolderPath() throws Exception {
    Configuration configuration = getConf();
    Path stagingDir = JobSubmissionFiles.getStagingDir(
            new Cluster(configuration), configuration);
    Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt()));
    if (LOG.isDebugEnabled())
      LOG.debug("Meta folder location: " + metaFolderPath);
    configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString());    
    return metaFolderPath;
  }
  • job = createJob()
    创建MR job的地方。
  private Job createJob() throws IOException {
    String jobName = "distcp";
    String userChosenName = getConf().get(JobContext.JOB_NAME);
    if (userChosenName != null)
      jobName += ": " + userChosenName;
    Job job = Job.getInstance(getConf());
    job.setJobName(jobName);
    job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
    job.setJarByClass(CopyMapper.class);
    configureOutputFormat(job);

    job.setMapperClass(CopyMapper.class);
    job.setNumReduceTasks(0);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputFormatClass(CopyOutputFormat.class);
    job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
    job.getConfiguration().set(JobContext.NUM_MAPS,
                  String.valueOf(inputOptions.getMaxMaps()));

    if (inputOptions.getSslConfigurationFile() != null) {
      setupSSLConfig(job);
    }

    inputOptions.appendToConf(job.getConfiguration());
    return job;
  }

重点看这两行代码,job.setInputFormatClass主要是设定job中任务的分配策略,分为UniformSizeInputFormat和DynamicInputFormat两种,UniformSizeInputFormat表示均衡分配任务,也就是设定的map中,每个map分配同样的任务数,DynamicInputFormat表示动态分为任务书,也就是动态的根据每个map运行的速度来分为具体map的任务数;job.setMapperClass主要设定map任务的具体逻辑。

job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setMapperClass(CopyMapper.class)
  • CopyMapper类的源码
    拷贝工作实际做的地方,主要看setup()、map()方法
public void setup(Context context) throws IOException, InterruptedException {
    conf = context.getConfiguration();

    syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false);
    ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false);
    skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false);
    overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false);
    append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false);
    preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch.
        PRESERVE_STATUS.getConfigLabel()));

    targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH));
    Path targetFinalPath = new Path(conf.get(
            DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
    targetFS = targetFinalPath.getFileSystem(conf);

    if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) {
      overWrite = true; // When target is an existing file, overwrite it.
    }

    if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
      initializeSSLConf(context);
    }
  }
  public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
          Context context) throws IOException, InterruptedException {
    Path sourcePath = sourceFileStatus.getPath();

    if (LOG.isDebugEnabled())
      LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath);

    Path target = new Path(targetWorkPath.makeQualified(targetFS.getUri(),
                          targetFS.getWorkingDirectory()) + relPath.toString());

    EnumSet<DistCpOptions.FileAttribute> fileAttributes
            = getFileAttributeSettings(context);
    final boolean preserveRawXattrs = context.getConfiguration().getBoolean(
        DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false);

    final String description = "Copying " + sourcePath + " to " + target;
    context.setStatus(description);

    LOG.info(description);

    try {
      CopyListingFileStatus sourceCurrStatus;
      FileSystem sourceFS;
      try {
        sourceFS = sourcePath.getFileSystem(conf);
        final boolean preserveXAttrs =
            fileAttributes.contains(FileAttribute.XATTR);
        sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS,
          sourceFS.getFileStatus(sourcePath),
          fileAttributes.contains(FileAttribute.ACL), 
          preserveXAttrs, preserveRawXattrs);
      } catch (FileNotFoundException e) {
        throw new IOException(new RetriableFileCopyCommand.CopyReadException(e));
      }

      FileStatus targetStatus = null;

      try {
        targetStatus = targetFS.getFileStatus(target);
      } catch (FileNotFoundException ignore) {
        if (LOG.isDebugEnabled())
          LOG.debug("Path could not be found: " + target, ignore);
      }

      if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) {
        throw new IOException("Can't replace " + target + ". Target is " +
            getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus));
      }

      if (sourceCurrStatus.isDirectory()) {
        createTargetDirsWithRetry(description, target, context);
        return;
      }

      FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target);
      if (action == FileAction.SKIP) {
        LOG.info("Skipping copy of " + sourceCurrStatus.getPath()
                 + " to " + target);
        updateSkipCounters(context, sourceCurrStatus);
        context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath()));
      } else {
        copyFileWithRetry(description, sourceCurrStatus, target, context,
            action, fileAttributes);
      }

      DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus,
          fileAttributes, preserveRawXattrs);
    } catch (IOException exception) {
      handleFailures(exception, sourceFileStatus, target, context);
    }
  }
  • copyFileWithRetry
    拷贝动作,这个函数最底层调用的是常用的Java输入输出流的方式,以此方式来完成点对点拷贝。即copyToFile里面的copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE, context);方法。
private void copyFileWithRetry(String description,
      FileStatus sourceFileStatus, Path target, Context context,
      FileAction action, EnumSet<DistCpOptions.FileAttribute> fileAttributes)
      throws IOException {
    long bytesCopied;
    try {
      bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description,
          action).execute(sourceFileStatus, target, context, fileAttributes);
    } catch (Exception e) {
      context.setStatus("Copy Failure: " + sourceFileStatus.getPath());
      throw new IOException("File copy failed: " + sourceFileStatus.getPath() +
          " --> " + target, e);
    }
    incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen());
    incrementCounter(context, Counter.BYTESCOPIED, bytesCopied);
    incrementCounter(context, Counter.COPY, 1);
  }

至此,拷贝的job任务完成设定、提交以及执行,也就意味着distcp命令执行完成,即实现了跨集群的文件拷贝。

文本资源来自于互联网和书本整理,仅供学习,有侵权联系删除。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容

  • 先思考问题 我们处在一个大数据的时代已经是不争的事实,这主要表现在数据源多且大,如互联网数据,人们也认识到数据里往...
    墙角儿的花阅读 7,363评论 0 9
  • Zookeeper用于集群主备切换。 YARN让集群具备更好的扩展性。 Spark没有存储能力。 Spark的Ma...
    Yobhel阅读 7,271评论 0 34
  • Hadoop MapReduce 整个MR的过程可以分解为下面几步 读取数据 Map reduce output ...
    流浪山人阅读 602评论 0 1
  • 为什么要有Hadoop? 从计算机诞生到现今,积累了海量的数据,这些海量的数据有结构化、半结构化、非 结构的数据...
    _Levi__阅读 755评论 1 0
  • Hadoop部署方式 本地模式 伪分布模式(在一台机器中模拟,让所有进程在一台机器上运行) 集群模式 服务器只是一...
    陈半仙儿阅读 1,613评论 0 9