spark写入动态分区表小文件过多的问题
在大数据中使用动态分区表时总是要考虑写入小文件过多的问题,这个问题分为两种情况。
1.由于分区键太多导致的小文件问题
2.由于写入文件太多导致的小文件问题
第一种偏向于业务问题或者设计问题,我们主要讨论的是第二种情况。
我们假设一种场景
sparksql 默认分区数200 分区表一共有500个分区 这样保存下来需要存200*500 =100000个文件。
如果每个分区中的数据量确实很大当然没有问题。但是实际更可能遇到的情况是小部分分区数据量比较大,其他的大部分分区数据量较小。最好的办法是我们期望可以通过配置一个参数用来控制输出文件大小,比如每256m生成一个文件。这样数据量较小的分区只会生成一个文件,数据量大的分区也不会出现单个文件过大,导致查询时并行度不足的问题。
如果是用hive,我们可以通过配置以下参数来达到目的
set hive.exec.reducers.bytes.per.reducer=67108864; --设置每个reducer处理大约64MB数据。
set hive.merge.mapfiles=true; --在Map任务完成后合并小文件。
set hive.merge.mapredfiles=true; --在Reduce任务完成后合并小文件。
set hive.merge.smallfiles.avgsize=16000000; --当输出文件的平均大小小于该值时,会启动一个独立的MapReduce任务进行文件merge。
set hive.merge.size.per.task=256*1000*1000; --设置当输出文件大小小于这个值时,触发文件合并。
如果用spark3.0以上版本,我们可以通过配置sparksql的partiton大小来达到目的
set spark.sql.adaptive.enabled=true --开启spark自适应优化
set spark.sql.adaptive.coalescePartitions.enabled=true --Spark 会根据目标大小(由 指定) spark.sql.adaptive.advisoryPartitionSizeInBytes 合并连续的随机分区,以避免过多的小任务。
set spark.sql.adaptive.coalescePartitions.minPartitionNum=5 --合并后的最小随机分区数。如果未设置,则默认值为 Spark 群集的默认并行度。此配置仅在同时启用和 spark.sql.adaptive.coalescePartitions.enabled 启用时 spark.sql.adaptive.enabled 才有效。
set spark.sql.adaptive.coalescePartitions.initialPartitionNum=200 --合并前的初始随机分区数。如果未设置,则等于 spark.sql.shuffle.partitions 。此配置仅在同时启用和 spark.sql.adaptive.coalescePartitions.enabled 启用时 spark.sql.adaptive.enabled 才有效。
set spark.sql.adaptive.advisoryPartitionSizeInBytes=64*1024*1024 --自适应优化期间随机分区的建议大小(以字节为单位)(当为 true 时 spark.sql.adaptive.enabled )。当 Spark 合并小的随机分区或拆分倾斜的随机分区时,它就会生效。
如果使用的是spark2.x,没有直接的参数,一个比较方便的方案是在写入前使用coalesce减小写入分区,之后再通过限制输出文件行数的方法防止输出过大的文件
df.coalesce(5) //这里设置了输出时的文件数,也同时是写入文件的并行度,需要根据能接受的小文件数量酌情配置
.write
.option("maxRecordsPerFile",500000) //设置了每个文件的最大记录数,如果过会另起一个文件写。也可以设置全局参数指定spark.sql.files.maxRecordsPerFile
.mode("append")
.saveAsTable("existing_hive_table")