在开发或者面试过程中,如何解决hive的数据倾斜问题是不可避免的。
发生数据倾斜的根本原因在于,shuffle之后,key的分布不均匀,使得大量key集中在某个reduce节点,导致此节点处理的数据量过大。要解决此问题,主要可以分为两大块:一是尽量不shuffle;二是shuffle之后,在reduce节点上的key分布尽量均匀。
具体来说,在实际开发过程中,以下几个业务过程可能会导致数据倾斜:
1.不可拆分的大文件引发的数据倾斜
2.业务无关的数据引发的数据倾斜
3.多维聚合计算数据膨胀引起的数据倾斜
4.无法削减中间结果的数据量引发的数据倾斜
5.两个hive数据表连接时引发的数据倾斜
接下来,我们对这五个问题逐一分析。
1.不可拆分的大文件引发的数据倾斜
当对文件使用GZIP压缩等不支持文件分割操作的压缩方式,在日后有作业涉及读取压缩后的文件时,该压缩文件只会被一个任务所读取。如果该压缩文件很大,则处理该文件的Map需要花费的时间会远多于读取普通文件的Map时间,该Map任务会成为作业运行的瓶颈。这种情况也就是Map读取文件的数据倾斜。
2.业务无关的数据引发的数据倾斜
实际业务中可能有大量的null值或者一些无意义的数据参与到计算作业中,这些数据可能来自业务为上报或者数据规范将某类数据进行归一化变成空值或空字符串等形式,这些与业务无关的数据引入导致在分组聚合或者在执行表连接时发生数据倾斜。
倾斜原因:
key相同的太集中,导致倾斜(很多和业务无关的null值)
解决方案:
对于group by操作,有两种解决思路:1.直接对null值进行过滤,2.对null值添加随机前缀。
对于join操作,有两种思路:1.手工分割,2.对null值添加随机前缀。
对于上述解决方案,其实也可以与本文开头的两类思路对应起来,对null值进行过滤和手工分割,这些都属于尽量不shuffle。添加随机前缀则属于shuffle之后,使得在reduce节点上的key尽量分布均匀。
3.多维聚合计算数据膨胀引起的数据倾斜
在多维聚合计算时,如果进行分组聚合的字段较多,如下:
select a,b,c,count(1)from log group by a,b,c with rollup;
注:对于最后的with rollup关键字不知道大家用过没,with rollup是用来在分组统计数据的基础上再进行统计汇总,即用来得到group by的汇总信息。
如果上面的log表的数据量很大,并且Map端的聚合不能很好地起到数据压缩的情况下,会导致Map端产出的数据急速膨胀,这种情况容易导致作业内存溢出。如果log表含有数据倾斜key,会加剧Shuffle过程的数据倾斜。
注:对于最后的with rollup关键字拆分为如下几个sql:
SELECT a, b, c, COUNT(1)
FROM log
GROUP BY a, b, c;
SELECT a, b, NULL, COUNT(1)
FROM log
GROUP BY a, b;
SELECT a, NULL, NULL, COUNT(1)
FROM log
GROUP BY a;
SELECT NULL, NULL, NULL, COUNT(1)
FROM log;
但是,上面这种方式不太友好,因为现在是对3个字段进行分组聚合,如果是5个或者10个,则需要拆解的SQL语句会更多。
在Hive中可以通过参数 hive.new.job.grouping.set.cardinality 配置的方式自动控制作业的拆解,该参数默认值是30。表示针对grouping sets/rollups/cubes这类多维聚合的操作,如果最后拆解的键组合大于该值,会启用新的任务去处理大于该值之外的组合。如果在处理数据时,某个分组聚合的列有较大的倾斜,可以适当调小该值。
- 确实无法减少数据量引发的数据倾斜
在一些操作中无法削减中间结果,例如使用collect_list聚合函数,存在如下SQL:
select s_age,collect_list(s_score) list_score
from student
group by s_age
s_age有数据倾斜,但如果数据量大到一定的数量,会导致处理倾斜的Reduce任务产生内存溢出的异常。
是否可以开启hive.groupby.skewindata参数来优化。我们接下来分析下:
开启该配置会将作业拆解成两个作业,第一个作业会尽可能将Map的数据平均分配到Reduce阶段,并在这个阶段实现数据的预聚合,以减少第二个作业处理的数据量;第二个作业在第一个作业处理的数据基础上进行结果的聚合。
hive.groupby.skewindata的核心作用在于生成的第一个作业能够有效减少数量。但是对于collect_list这类要求全量操作所有数据的中间结果的函数来说,明显起不到作用,反而因为引入新的作业增加了磁盘和网络I/O的负担,而导致性能变得更为低下。
解决方案:
这类问题最直接的方式就是调整reduce所执行的内存大小。
调整reduce的内存大小使用mapreduce.reduce.memory.mb这个配置。
5.表连接时引发的数据倾斜
两表进行join时,如果表连接的键存在倾斜,那么在shuffle阶段必然会引起数据倾斜。
解决方案:
5.1 MapJoin
通常做法是将倾斜的数据存到分布式缓存中,分发到各个Map任务所在节点。在map阶段完成了join操作,即MapJoin,这避免了Shuffle,从而避免了数据倾斜。
set hive.auto.convert.join = true;
set hive.mapjoin.smalltable.filesize=25000000;
优点: 运行时转换为mapjoin,无reduce阶段,运行时间极短
缺点: 适用场景有限,需要占用分布式内存。由于是将小表加载进内存所以需要注意小表的大小。
5.2 手工分割
适用场景有限,因为倾斜的key,除了key=A之外,还有其他key也可能会发生倾斜。
5.3大表添加N中随机前缀,小表膨胀N倍数据
适用场景:小表不是很小,不太方便用mapjoin。
通常做法是对大表的key添加N中随机前缀,对小表和N中随机前缀做笛卡尔积,使得大表生成的key在小表里都有可能找到对应的key,解决思路可参考如下代码:
select a.*
from a
left join (
select concat(c.rand_num,'_',d.key) as key from(
select rand_num from dual LATERAL VIEW explode(array(0,1,2,3,4,5,6,7,8,9)) rand_num_list as rand_num
)c join d
)b on concat(cast(9*rand() as int), '_', a.key) =b.key
where a.ds='2019-09-05'
优点:可适当降低倾斜程度
缺点:N的取值不太好确定。而且数据膨胀后,会增加资源消耗
总结
以上就是面对hive数据倾斜时,经常采用的一些方法。我们在遇到实际问题时,可以参考上述的方案,但是不需要完全照搬。因为处理数据倾斜是一个综合性的事情,考察的不仅是技术能力,更是对业务的熟悉程度。在处理数据倾斜问题时,我们首先要对表中的字段的意思有所了解。以上面的五种情况来说,对于null值,除了剔除以及添加随机前缀,我们可以进一步了解业务,在源表中是否可能将null值进行补全,或者在源表中是否可以给null值赋予一个随机值。再比如进行join操作时,如果有数据倾斜,可以先看是否有数据发散的情况,如果有数据发散,可以考虑先处理数据发散的问题。可以对代码进行拆分,先保证数据不发散,再进行join。
我们不管处理什么问题,不管使用的工具是hive、spark或者python,首先要做到熟悉业务,了解数据的含义。如果抛开业务,只是照搬书本上的所讲的技术,对代码进行调优,有时候也能解决问题,但是对自己的提升较为片面。