Spark CommitCoordinator 保证数据一致性

本文引用了技术世界文章,来自:http://www.jasongj.com/spark/committer/

问题引入

  • Spark 输出数据到HDFS时,多个task同时写数据到HDFS,如何保证数据的一致性
  • 如果打开了speculation ,两个相同的task实例写相同的数据到HDFS,如何保证只有一个commit成功
  • 几十万的task,如何高效管理文件

【 三个重要目录 】

  • ${output.dir.root}
  • ${appAttempt} application attempt ID ,从0开始
  • ${taskAttempt} task attempt ID, 从0开始

写入目录:

${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}

写入步骤:

  • 1检查是否需要commit
  需要commit && 开启了commit coordinator
     commit coordinator检查是否可以commit:
      - 若该taskAttempt为失败的attempt,直接拒绝
      - 若该taskAttempt成功,且commit coordinator未允许过其该task的其他attempt的commit请求,则允许
      - 若该taskAttempt之前已经被同意过,则commit coordinator会继续同意
      - 若该taskAttempt成功,但已经有其他该task的attempt被授权commit了,则拒绝该attempt的commit请求
  不需要commit(则拒绝写入)
  • 2 rename重命名
    判断mapreduce.fileoutputcommiter.algorithm.version
该值为1时
task commitTask 会将 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 目录重命名为${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}

该值为2时
task commitTask 会将 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 内所有文件移动到${output.dir.root}
  • 3 整个job的commit
    判断mapreduce.fileoutputcommiter.algorithm.version
该值为1,则遍历${output.dir.root}/_temporary/${appAttempt}/${taskAttempt},并将所有文件移动到${output.dir.root}
该值为2,无需移动
  • 4 清理
    Driver 清理_temporary的内容
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容