1. 什么是数据倾斜, 如何处理
-
什么是数据倾斜
大量相同的key被partition到一台机器, 造成这台机器计算任务量极大, 而其他机器计算量任务极小 -
即使数据倾斜, UDAF函数也可能不产生效率低下的问题?
像sum, max, min, count等UDAF不怕数据倾斜, 因为会在map端进行一次数据聚合 -
count(distinct)最受数据倾斜影响?
因为count(distinct)要先按照group by分组, 再按distinct排序. 不会产生map端的聚合. 一般这中语句是很倾斜的;
比如男uv,女uv,淘宝一天30亿的pv,如果按性别分组,分配2个reduce,每个reduce处理15亿数据。 -
如何处理
- key上加上随机数打散, 然后最后计算结果时恢复成原来的key
- 过滤空值, 或者将空值变成一个字符串并加上随机前缀, 由于随机的key找不到相同的而执行失败
- 将倾斜的key拿出来单独处理, 最后和其它结果union all在一起(不去重不排序)
- 开启参数
hive.groupby.skewindata=true
这会生成2个任务:- 第一个任务: map的输出结果随机的分配到reduce上, 每个reduce按照key做第一次聚合
- 第二个任务: 将相同的key分布到同一个partition上, 再做第二次聚合
- 将
count(distinct)
替换成sum() group by
2. 如何调整mapper个数
mapper个数受一系列参数的影响, 最终由这些参数计算出一个表达式
-
mapred.map.tasks
: map任务的期望值, 默认为2 -
default_mapper_num
: 通过1个mapper处理1个block块的数据, 来计算一个mapper个数total_input_size : 输入文件的总大小 dfs.block.size : hdfs一个block块的大小 default_mapper_num : 为2个参数的商
-
splitNumber
: 通过把一个block块划分多个split, 每个mapper处理一个split的方式计算mapper个数mapred.min.split.size : split的最小size(默认值1B) mapred.max.split.size : split的最大size(默认值64MB) split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size)); split_num = total_input_size / split_size
因此 :
- 只有把
mapred.min.split.size
的值设置的比dfs.block.size
大, 才能达到1个mapper读取大于1个block块的目的 - 只有把
mapred.max.split.size
设置的比dfs.block.size
小, 才能达到1个block块被多个mapper处理的目的 .
最终得到mapper的个数计算公式为:
由于 mapred.max.split.size
默认为64M, 所以Hive每个map任务默认处理64M的数据(1个split)
3. 如何调整reduce个数
reducer的个数, 决定了job最后生成的文件个数, 这个数可以被用户直接指定; 若没有指定, hive会通过一个公司计算出来
- mapred.reduce.tasks : 直接指定reducer个数
- 公式推算
hive.exec.reducers.bytes.per.reducer : 每个reducer处理的最大数据量 hive.exec.reducers.max : 最大的reducer个数 最终计算得出reducer_num = MIN(total_input_size / reducers.bytes.per.reducer, reducers.max)
4. 什么时候需要合并文件
- 什么时候会出现小文件
- 动态分区插入数据,产生大量的小文件,从而导致map数量剧增。
- reducer数量太多
- 数据源本身就包含大量的小文件
- 输入时合并文件 : 通过控制split大小控制mapper个数
mapred.min.split.size : split的最小size(默认值1B) mapred.max.split.size=256000000; mapred.min.split.size.per.node=100000000 : 一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并) mapred.min.split.size.per.rack=100000000 : 执行Map前进行小文件合并(100M)
- 输出合并文件控制 :
hive.merge.mapfiles : 设置为true, 在mapped-only任务的输出进行合并 hive.merge.mapredfiles : 设置为true, 在map-reduce任务的输出进行合并 hive.merge.size.smallfiles.avgsize : 指定输出文件的均值大小, 当输出文件小于这个值时, 开启一个任务对小文件进行合并
5. 什么是严格模式?
严格模式下, hive会阻止用户进行三种类型的sql与
- 对分区表的查询, wher条件中没有指定分区列的值
- 产生笛卡尔积的join操作
- 使用了order by 却没有使用limit
6. 谈谈hive的优化
- 1. mapper个数和reducer个数的优化
-
2. 对于count(distinct) 语法的优化
使用count() group by进行优化因为这个语法在mapper端group by后, 会开启mapper端聚合减少reduce端的输入量; 而count(distinct) 计算全都压倒reduer, 把key分散到reducer后还要在每个reducer上对key进行排序去重
-
3. 对于order by 语法的优化
order by是全局排序, 只会生成1个reducer来处理所有mapper的输出. 但是往往我们并不需要在某个字段上完全的全局有序, 而是希望局部上有序, 所以可是使用其他字段先进行distribute by
,再在原字段上sort by
select uid,upload_time,event_type,record_data f
from calendar_record_log
where day_p between '20190101' and '20190131'
order by upload_time desc,event_type desc;
-- 这个语句在数据量很大时无法计算, 可以做如下改写
select uid,upload_time,event_type,record_data
from calendar_record_log
where day_p between '20190101' and '20190131'
distribute by uid
sort by upload_time desc,event_type desc;
-
4. 对于join语法的优化
-
使用
map join
当使用join时, 如果发现其中某个表很小, 可以完全放进内存的时候, 就可以将这张表指定成map join
分发的表. 有2中方法指定- 手动指定:
select /*+MAPJOIN(b)*/ a.a1,a.a2,b.b2 from tablea a JOIN tableb b ON a.a1=b.b1
缓存多张小表:
select /*+MAPJOIN(b,c)*/ a.a1,a.a2,b.b2 from tablea a JOIN tableb b ON a.a1=b.b1 JOIN tbalec c on a.a1=c.c1
- 在Hive0.11后,Hive默认启动该优化
可以通过以下两个属性来设置该优化的触发时机-
hive.auto.convert.join
: 默认值为true,自动开户MAPJOIN优化 -
hive.mapjoin.smalltable.filesize
: 默认值为2500000(25M), 如果表的大小小于此值就会被加载进内存中
-
此外, 使用使用MAPJOIN时,需要注意:
* LEFT OUTER JOIN的左表必须是大表;
* RIGHT OUTER JOIN的右表必须是大表; 调整map个数和reduce个数
-
7. 内部表和外部表?
内部表数据由Hive自身管理,外部表数据由HDFS管理;删除内部表会直接删除元数据(metadata)及存储数据;删除外部表仅仅会删除元数据,HDFS上的文件并不会被删除。
8. 分区表和分桶表的区别
- 分区表: Hive 数据表可以根据某些字段进行分区操作,细化数据管理,让部分查询更快,不同分区对应不同的目录;
- 分桶表:表和分区也可以进一步被划分为桶,分桶是相对分区进行更细粒度的划分。分桶将整个数据内容按照某列属性值的hash值进行区分,不同的桶对应不同的文件。