本文引用了技术世界文章,来自:http://www.jasongj.com/spark/committer/
问题引入
- Spark 输出数据到HDFS时,多个task同时写数据到HDFS,如何保证数据的一致性
- 如果打开了speculation ,两个相同的task实例写相同的数据到HDFS,如何保证只有一个commit成功
- 几十万的task,如何高效管理文件
【 三个重要目录 】
- ${output.dir.root}
- ${appAttempt} application attempt ID ,从0开始
- ${taskAttempt} task attempt ID, 从0开始
写入目录:
${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt}/${fileName}
写入步骤:
- 1检查是否需要commit
需要commit && 开启了commit coordinator
commit coordinator检查是否可以commit:
- 若该taskAttempt为失败的attempt,直接拒绝
- 若该taskAttempt成功,且commit coordinator未允许过其该task的其他attempt的commit请求,则允许
- 若该taskAttempt之前已经被同意过,则commit coordinator会继续同意
- 若该taskAttempt成功,但已经有其他该task的attempt被授权commit了,则拒绝该attempt的commit请求
不需要commit(则拒绝写入)
- 2 rename重命名
判断mapreduce.fileoutputcommiter.algorithm.version
该值为1时
task commitTask 会将 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 目录重命名为${output.dir.root}/_temporary/${appAttempt}/${taskAttempt}
该值为2时
task commitTask 会将 ${output.dir.root}/_temporary/${appAttempt}/_temporary/${taskAttempt} 内所有文件移动到${output.dir.root}
- 3 整个job的commit
判断mapreduce.fileoutputcommiter.algorithm.version
该值为1,则遍历${output.dir.root}/_temporary/${appAttempt}/${taskAttempt},并将所有文件移动到${output.dir.root}
该值为2,无需移动
- 4 清理
Driver 清理_temporary的内容