1数据倾斜
2.Group By
3合理设置Map数
4小文件进行合并
5.如何适当的增加map数
6.合理设置reduce数
7.不可拆分大文件引发的数据倾斜
8.业务无关的数据引发的数据倾斜
9.多维聚合计算数据膨胀引起的数据倾斜
10.无法削减中间的数据量引发的数据倾斜
11.两个hive数据表连接时引发的数据倾斜
1.数据倾斜
什么是数据倾斜
在单个节点任务所处理的数据量远大于同类型任务所处理的数据量,导致该节点成为整个作业的瓶颈,这是分布式系统不可能避免的问题。从本质上说,导致数据倾斜有两种原因,一是任务读取大文件,二是任务需要处理大量相同键的数据。
- 任务读取大文件,最常见的就是读取压缩的不可分割的大文件,具体在
6.不可拆分大文件引发的数据倾斜
会介绍。任务需要处理大量相同键的数据,这种情况有以下4中表现形式: - 数据含有大量无意义的数据,例如空值(null)、空字符串等。
- 含有倾斜数据在进行聚合计算时无法聚合中间结果,大量数据都需要经过shuffle阶段的处理,引起数据倾斜。
- 数据在计算时做多维数据集合,导致维度膨胀引起的数据倾斜。
- 两表进行join,都含有大量相同的倾斜数据键。
1.Group By
默认情况下,Map阶段同一Key数据分发给一个reduce,当一个key数据过大时就倾斜了。
并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以先在Map端进行部分聚合,最后在Reduce端得出最终结果。
1)开启Map端聚合参数设置
(1)是否在Map端进行聚合,默认为True
set hive.map.aggr = true;
(2)在Map端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
(3)有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true;
- 当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。
1、MRjob
相同的key放在不同的reduce负载均衡的思想就是在map端聚合。
2、MRjob
第二次保证相同的key分布到同一个reduce中
2.Map数
1)通常情况下,作业会通过input的目录产生一个或者多个map任务。
主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M,可在hive中通过set dfs.block.size;命令查看到,该参数不能自定义修改);
2)举例:
a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数。
b) 假设input目录下有3个文件a,b,c大小分别为10m,20m,150m,那么hadoop会分隔成4个块(10m,20m,128m,22m),从而产生4个map数。即,如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块。
3)是不是map数越多越好?
答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。
4)是不是保证每个map处理接近128m的文件块,就高枕无忧了?
答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。
针对上面的问题3和4,我们需要采取两种方式来解决:即减少map数和增加map数;
3.小文件进行合并
小文件进行合并
在map执行前合并小文件,减少map数:
1)参数设置(下面的API属于hadoop低版本的API)
set mapred.max.split.size=112345600;
set mapred.min.split.size.per.node=112345600;
set mapred.min.split.size.per.rack=112345600;
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
这个参数表示执行前进行小文件合并,前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),进行合并。
4. 如何适当的增加map数
当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。
假设有这样一个任务:
Select data_desc,
count(1),
count(distinct id),
sum(case when …),
sum(case when …),
sum(…)
from a group by data_desc
如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,这样就可以用多个map任务去完成。
set mapreduce.job.reduces =10;
create table a_1 as
select * from a
distribute by rand(123);
这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。
每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。
看上去,貌似这两种有些矛盾,一个是要合并小文件,一个是要把大文件拆成小文件,这点正是重点需要关注的地方,根据实际情况,控制map数量需要遵循两个原则:使大数据量利用合适的map数;使单个map任务处理合适的数据量;
5.reduce数
1)调整reduce个数方法一
- 每个Reduce处理的数据量默认是256MB
hive.exec.reducers.bytes.per.reducer=256123456
- 每个任务最大的reduce数,默认为1009
hive.exec.reducers.max=1009
- 计算reducer数的公式
N=min(参数2,总输入数据量/参数1)
2)调整reduce个数方法二
在hadoop的mapred-default.xml文件中修改
设置每个job的Reduce个数
set mapreduce.job.reduces = 15;
3)reduce个数并不是越多越好 - 过多的启动和初始化reduce也会消耗时间和资源;
- 另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;
在设置reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的reduce数;使单个reduce任务处理数据量大小要合适;
6.不可拆分大文件引发的数据倾斜
- 当集群的数据量增长到一定规模,有些数据需要归档或者转储,这时候往往会对数据进行压缩;当对文件使用GZIP压缩等不支持文件分割操作的压缩方式,在日后有作业设计读取压缩后的文件时,该压缩文件只会被一个任务所读取。
- 如果该压缩文件很大,则处理该文件的Map需要花费的时间会远多于读取普通文件的Map时间,该Map任务会成为作业运行的瓶颈。这种情况也就是Map读取文件的数据倾斜。例如存在这样一张表t_des_info,如图是对这张表的描述
desc formatted t_desc_info
- t_desc_info表由3个GZIP压缩后的文件组成,如下图所示
- 其中,large_file.gz文件约200MB,在计算引擎在运行时,预先设置每个Map处理的数据量为128MB,但是计算引擎无法切分large_file.gz文件,所以该文件不会交给两个Map任务去读取,而是有且仅有一个任务在操作。
- t_des_info表有3个gz文件,任何涉及处理该表的数据都只会使用3个Map,例如下图所示的案例
select count(1) from t_desc_info
如果想要了解每个Map任务所读取的具体文件,可以关注我告诉你 哈哈哈
为避免因不可拆分大文件而引发的数据读取的倾斜,在数据压缩的时候可以采用lzo和bzip2和Zip等支持文件分割的压缩算法。
7.业务无关的数据引发的数据倾斜
实际业务中有些大量的NULL值或者一些无意义的数据参与到计算作业中,这些数据可能来自业务为上报或因数据规范将某类数据进行归一化变成空值或空字符串等形式。这些与业务无关的数据引入导致在进行分组聚合或者在执行表连接时发生数据倾斜。对于这类问题引发的数据倾斜。在计算过程中排除含有这类"异常"数据即可。
8.多维聚合计算数据膨胀引起的数据倾斜
在多维聚合计算时存在这样的场景:
select a,b,c,count(1) from T group by a,b,c with rollup
。
对于上述的SQL,可以拆解成4种类型的键进行分组聚合,它们分别是(a,b,c)、(a、b、null)、(a,null,null)和(null,null,null)。
如果T表的数据量很大,并且Map端的聚合不能很好地起到数据压缩的情况下,会导致Map端产出的数据急速膨胀,这种情况容易导致作业内存溢出的
异常。如果T表含有数据倾斜键,会加剧Shuffle过程的数据倾斜,对上述的情况我们会很自然地想到拆解上面的SQL语句,将rollup拆解成如下多个普通类型分组聚合的组合。
select a,b,c,count(1) from T
group by a,b,c;
select a,b,null,count(1) from T
group by a,b;
select a,null,null,count(1) from T
group by a;
select null,null,null,count(1) from T;
这是很笨拙的方法,如何分组聚合的列远不止3个列,那么需要拆解的SQL语句会更多。
解决方法:在hive中可以通过参数(hive.new.job.grouping.set.cardinality)配置的方式自动控制作业的拆解,该参数默认值是30.该参数表示针对grouping sets/rollups/cubes这类多维聚合的操作,如果最后拆解的键组合(上面的例子的组合是4)大于该值,会启用新的任务去处理大于该值之外的组合,
如果在处理数据时,某个分组聚合的列有较大的倾斜,可以适当调小该值。
9.无法削减中间的数据量引发的数据倾斜
在一些操作中无法削减中间结果,例如使用collect_list聚合函数,存在如下SQL:
select s_age,collect_list(s_score) list_score from student_tb_txt group by s_age
在student_tb_txt表中,s_age有数据倾斜,但如果数据量大到一定的数量,会导致处理倾斜的Reduce任务产生内存溢出的异常,针对这种场景,即使开启
hive.groupby.skewindata
配置参数,也不会起到优化的作用,反而会拖累整个作业的运行。启用该配置参数会将作业拆解成两个作业,第一个作业会尽可能将map的数据平均分配到Reduce阶段,并在这个阶段实现数据的预聚合,以减少第二个作业的处理的数据量;第二个作业在第一个作业处理的数据基础上进行结果的聚合。
hive.groupby.skewindata的核心作用在于生成的第一个作业能够有效减少数量。但是对于collect_list这类要求全量操作所有数据的中间结果的函数来说,明显起不到作用,反而因为引入新的作业增加了磁盘和网络I/O的负担,而导致性能变得更为低下。
解决方法:调整reduce所执行的内存大小,使用mapreduce.reduce.memory.mb这个参数(如果是Map任务内存瓶颈可以调整mapreduce.map.memory.mb)。
-
注意
如果Hive的客户端连接的HiveServer2一次性需要返回处理的数据很大,超过了启动Hive设置的Java堆(Xmx),也会导致HiveServer2服务内存溢出。
10.两个hive数据表连接时引发的数据倾斜(map join)
Hive优化(二)-map join和join原则
两表进行普通的repartition join时,如果表连接的键存在倾斜,那么在shuffle阶段必然会引起数据倾斜。
遇到这种情况,Hive的通常做法还是启用两个作业,第一个作业处理没有倾斜的数据,第二个作业将倾斜的数据存到分布式缓存中,分发到各个map任务所在节点。在map阶段完成join操作,即MapJoin,这避免了shuffle,从而避免了数据倾斜。