简介
由于Spark应用写数据到Hive表时,容易因为shuffle数过多导致生成过多小文件,影响集群存储利用率;故需要一个能避免读写冲突的小文件合并工具。
工具类
object CombineSmallFiles {
/**
* Step 1:读取指定目录下需要合并文件的目录和目录下的文件名,修改文件名后缀为.old:相当于记录为需要合并
*/
def searchFiles(spark: SparkSession, hdfs: FileSystem, srcStr: String, fileType: String): Unit = {
val srcPath = new Path(srcStr)
val fileStatus = hdfs.listStatus(srcPath)
var smallFileCount: Int = 0
fileStatus.foreach(file => {
// 对大小小于128M的文件进行标记
if (!file.isDirectory && !file.getPath.getName.startsWith(".")
&& (hdfs.getContentSummary(file.getPath).getLength < 134217728)) {
if (!file.getPath.getName.endsWith(".old")) {
hdfs.rename(file.getPath, new Path(file.getPath + ".old"))
}
smallFileCount += 1
} else if (file.isDirectory && !file.getPath.getName.startsWith(".")) {
searchFiles(spark, hdfs, file.getPath.toUri.getPath, fileType)
}
})
if (smallFileCount > 1) {
combineSmallFile(spark, hdfs, srcPath, fileType)
}
}
/**
* 合并
*/
def combineSmallFile(spark: SparkSession, hdfs: FileSystem, srcPath: Path, fileType: String): Unit = {
val srcStr = srcPath.toUri.getPath
val combineStr = srcStr + "/.combine"
//如果因为程序中断导致combine遗留合并后的文件,则移动后清除
moveCombineFileAndRemove(hdfs, srcStr, combineStr)
// Step 2:获取目录下.old的文件,读取写入临时目录并生成文件.combine
spark
.read
.format(fileType)
.load(srcStr + "/*.old")
.repartition(1)
.write
.format(fileType)
.save(combineStr)
// Step3:删除.old -> mv .combine下文件到源目录 -> 删除.combine
hdfs.listStatus(srcPath).foreach(file => {
// 对.old结尾的文件清除
if (!file.isDirectory && file.getPath.getName.endsWith(".old")) {
hdfs.delete(file.getPath, true)
}
})
moveCombineFileAndRemove(hdfs, srcStr, combineStr)
}
/**
* 移动合并文件并清除合并用的临时目录
*
* @param hdfs
* @param srcStr
* @param combineStr
*/
def moveCombineFileAndRemove(hdfs: FileSystem, srcStr: String, combineStr: String): Unit = {
val combinePath = new Path(combineStr)
if (!hdfs.exists(combinePath)) return
hdfs.listStatus(combinePath).foreach(combineFile => {
if (combineFile.getPath.getName.startsWith("part-")) {
hdfs.rename(combineFile.getPath, new Path(srcStr + "/" + combineFile.getPath.getName + ".combine"))
hdfs.deleteOnExit(combineFile.getPath)
}
})
hdfs.delete(new Path(combineStr), true)
}
}
调用
/**
* 运维程序:小文件合并
*/
object BT_OPS_CombineSmallFiles {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("BT_OPS_CombineSmallFiles")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "134217728")
.enableHiveSupport()
.getOrCreate()
val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(spark.sparkContext.hadoopConfiguration)
val srcPath = "你的HDFS路径"
searchFiles(spark, hdfs,srcPath, "orc")
spark.stop
}
}
TBC:可通过读取MySQL配置表来指定需要合并的目录、文件类型,方便随时修改。