业务背景
项目中将两个表进行join,一个大表,一个小表,在平时200 executor-core * 20G executor-memory的资源下跑的挺好的,随着业务数据的增加,有一天,这个任务就跑不出来了,重试5次每次都失败,最后任务报错;
报错时,俩表情况如下:大表的数据量约为278亿,1TB左右,另一个的数据量约为480万,4GB左右;通过DAG图发现,任务卡在俩表join的那个stage上;
报错信息
1.spark sql实现报错
当使用SparkSQL对俩表进行join时,报错为:
org.apache.spark.shuffle.MetadataFetchFailedException:Missing an output location for shuffle 0
以及
org.apache.spark.shuffle.FetchFailedException:Failed to connect to hostname:port
2.rdd实现报错
当使用rdd对俩表进行join时,报错为:
WARN TaskSetManager:Lost task 17.1 in stage 4.1:java.io.FileNotFoundException:一个文件/目录
以及
org.apache.spark.shuffle.FetchFailedException:Error in opening FileSegmentManagedBuffer
梳理过程
通过看这些错哈,能发现是在join时产生的shuffle出了问题,那么我们对shuffle进行分析一下:shuffle过程分为两部分shuffle write和shuffle read;
其中shuffle write就相当于write in local memory,这个过程中的分区数,是由上一个阶段的rdd分区数来决定的;shuffle read就是把数据读出来,然后在根据其对应的key进行reduce,得到结果;
分析了这两个过程,发现报错中,Missing an output location for shuffle 0
还有其他的报错,原因都是因为,在一个task执行对应数据计算的时候,太大了,最终失败,导致心跳检测无法检测到或者是超过了connection-time,所以最终找不到这个task及结果;那么我们还是得从降低这个task中处理的数据来入手。
优化策略
首先,减小分区数据的最直接的办法,就是将整个数据都变小,所以尽可能在shuffle前,把改filter掉的数据全都过滤掉;其次,最粗暴的办法就是直接增大每个executor-memory,直接增大每个task的可支配内存;接着,通过“少量多次”的思想,把shuffle时的数据,分成尽可能多的、合理的分区数,官方建议分区数为程序总executor-core的2~3倍;最后,那就是投机取巧一点的办法了,想办法避开shuffle,如:使用map side join或broadcast join来避免shuffle过程;
小tips:两表join时,将较小的表放在右边,这样会将小表读进内存,与接下来的大表匹配;hive则相反;
总结一下:
- 1.增大资源
- 2.filter
- 3.调整shuffle partition的数量
- 4.是否数据倾斜
- 5.map side join或broadcast join
具体实施效果
此次出现的bug,不适用于方案2、5,因为那些数据都是要用的,并且数据量太大,不支持广播等;
方案1
把整个程序的资源增加到executor-core=3; executor-num=100; executor-memory=30g; driver-memory=20g最终的结果还是在join时的最后两个task上失败了;
方案3
使用了SparkSQL,在Spark SQL中设置shuffle时的分区时,应该设置参数spark.sql.shuffle.partition
,这个参数默认为200,按照官方建议的总程序executor-core的2~3倍,设置值为800;
并且为了更好的将数据打散,将两个表单独select出来后,小表的分区从自身的200 repartition到800,大表则按照自身的3200参与计算;
发现运行结果,最终卡在join最后的4个task上,4个task失败,导致最终失败;
方案4
通过查看,发现最终失败的几个task,各自计算的memory都为17.8G及以上,因此最终还是数据倾斜,调整数据倾斜的方式有很多,因为我想早点睡觉,之后如果翻看到的同学,就去翻翻前面的文章吧。
最终定位到此次数据倾斜的原因是因为,两个表的join字段的数据类型不一致,大表的关联字段为String型,小表的关联字段为bigint型;在关联前,对小表执行cast(bigint to string)
,然后再join,并加上以上方案的行为,之后的task分区就变得均匀多了,成功运行~