业务背景
Spark读取文件时,对于可切分的文件,会将文件切分为一系列 Split ,每个Split对应一个Task。一般而言,Split的大小与HDFS的Block大小相当,即128MB。
对于Parquet文件,因为是按列存储,在读取数据时,可按列剪枝。而划分Split时并未考虑列剪枝。因此当所选取的列数远小于总列数时,实际每个Split内需要读取的数据量远小于Split大小,从而使得每个Task处理的数据量非常小(根据实际测试,可能只有几MB甚至几KB)
因此可根据目标列数占总列数的比例相应增大Split大小,从而保证一个Split内目标列读取数据总量与一个Block大小相当,即128MB。
优化方式
手动设置参数
默认配置
spark.sql.files.maxPartitionBytes=134217728 // 该参数决定Spark SQL读取可切分文件时,每个Partition / Split的最大值,单位Byte。
spark.sql.files.openCostInBytes=4194304 // 该参数决定Spark SQL读取文件时,每个Partition / Split的最小值,单位Byte。
通过如下SQL计算记录总条数
select count(device_id) from log_daily where date='20210202' and hour='12'
总Task个数为8779个
Task读取数据量中位数为2.7MB
通过如下参数设置,将Split大小扩大10倍,每个Partition / Split的最小值扩大4倍
set spark.sql.files.maxPartitionBytes= 1073741824;
set spark.sql.files.openCostInBytes= 16777216;
执行后,总Task个数为1098个
Task读取数据量中位数为扩大为22.6MB
可以看到Task的并行度下降,使用资源量可以得到控制,同时执行速度并没有多大变化
自动设置参数
通过如下参数设置,将开启自适应文件切分
set spark.sql.parquet.adaptiveFileSplit=true;
例:一张表共10个字段,5个IntegerType(宽度为4),5个LongType(宽度为8),总宽度为60。
现对其中两个字段进行查询,一个IntegerType,一个LongType,总宽度为12。故放大倍数为 60 / 12 = 5
注:该方案只对列式存储有用。