map优化
优化并发个数
-
减少map数,合并小文件
- set mapred.max.split.size=100000000;单位b,代表一个map能处理多大的数据量
- set mapred.min.split.size.per.node=100000000;一个节点最少要处理多少数据
- set mapred.min.split.size.per.rack=100000000;
- set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
-
增加map数
- set mapred.map.tasks=10;
block大小会影响并发度
- set dfs.block.size = 128
相当于开启Combiner功能
- hive.map.aggr=true
reduce优化
优化并发个数
- hive.exec.reducers.bytes.per.reduce;reduce任务处理的数据量
- reduce个数计算公式
reduce个数=inputFileSize / bytes per reduce 就是文件大小除以一个reduce能处理的字节大小 - 调整reduce的个数
set mapred.reduce.tasks=10
MapReduce出现痛点
只有一个reduce情况
- 没有group by
- order by
- 发生在reduce阶段
- 全局排序,因此只有一个reduce,当输入数据规模较大时,计算时间消耗严重
- 建议用distribute by 和 sort by来替代
- sort by
- 发生在map阶段
- 不是全局排序,如果用sort by排序,并且设置多个reduce,每个reduce输出是有序的,但是不保证全局排序
- distribute by
- 控制map端的数据如何拆分给reduce,可控制分区文件的个数
- cluster by
- 相当于distribute by和sort by的结合
- 只默认升序排序
笛卡尔积
- 使用join的时候,尽量有效的使用on条件
如何加快查询速度
精华总结
-
横向尽可能多并发
一个节点的并发越多越好,前提是性能的保证 -
纵向尽可能少依赖
一个MapReduce能完成的任务,就不要选择多个任务来完成,控制任务数
1.分区partition
- where中的分区条件,会提前生效,不必特意做子查询,直接join和groupby
2.Map Join
- 指定表是小表,内存处理,通常不超过1个G或者50W记录
/*+MAPJOIN(tablelist)*/
3.union all
- 先把两张表union all,然后再做join或者group by,可以减少MapReduce的数量
- union和union all的区别:union相当于记录合并,union不合并,后者性能更优
4.multi-insert & multi group by
- 从一份基础表中按照不同的维度,一次组合出不同的数据
5.Automatic merge
- 为了多个小文件合并,当文件大小比阈值小时,hive会启动一个MapReduce进行合并
- 将数据量大的节点的数据中途随机分发进行预处理数据压缩,最后再合并数据
- 配置
- hive.merge.mapfiles=true 是否合并map输出文件,默认true
- hive.merge.mapredfiles=false是否合并reduce输出文件,默认false
- hive.merge.size.per.task=25610001000 合并文件的大小
6.Multi-Count Distinct
- 与精华总结相违背
- 一个MapReduce拆成多个的目的是为了降低数据倾斜的压力,出现数据倾斜时,帮忙完成一定的负载均衡
- 必须设置的参数:set hive.groupby.skewindata=true
- 示例
select dt,count(distinct uniq_id),count(distinct ip) from logs where dt = xxxx group by dt
7.并行执行
- set hive.exec.parallel=true
- 同步执行hive的多个阶段,hive在执行过程,将一个查询转化成一个或者多个阶段,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全相互依赖的,也就是说可以并行执行的,这样可能使得整个job的执行时间缩短
如何加快join操作
- 语句优化
- 多表连接:如果join中多个表的join key是同一个,则join会转化为单个mr任务
- 表的连接顺序:指定大小表,Hive默认把左表数据放到缓存中,右边的表的数据做流数据
/*+STREAMTABLE(a)*/指定该表是大表
- 如果避免join过程中出现大量结果,尽可能在on中完成所有条件判断(少用where)
数据倾斜问题
业务场景
- 操作
- join
- group by
- count distinct
- 原因
- key分布不均
- 人为的建表疏忽
- 业务数据特点
- 症状
- 任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或者几个)reduce子任务未完成
- 查看未完成的子任务,可以看到本地读写数据量积累非常大,通常超过10GB可以认定为发生数据倾斜
- 倾斜度
- 平均记录数超过50w且最大记录数是超过平均数的4倍
- 最长时长比平均时长超过4分钟,且最大时长超过平均时长的2倍
- 万能方法
- hive.groupby.skewindata=true
- 把一个任务拆分成多个任务
大小表关联
- 原因
Hive在进行join时,按照join的key进行分发,而在join左边的表的数据会首先读入内存,如果左边表的key相对分散,读入内存的数据会比较小,join任务执行会比较快;而如果左边的表key比较集中,而这张表的数据量很大,那么数据倾斜就会比较严重,而如果这张表是小表,则还是应该把这张表放在join左边 - 思路
- 将key相对分散并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率
- 使用map join让小的维度表先进内存
- 方法
- small table join big table
大大表关联
- 原因
日志中有一部分的userid是空或者是0的情况,导致在用userid进行hash分桶的时候,会将日志中userid为0或者空的数据分到一起,导致了过大的斜率 - 思路
把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果 - 方法
on case when(x.uid = '-' or x.uid = '0' or x.uid is null)then concat('dp_hive_search',rand()) else x.uid end =f.userid
聚合时存在大量特殊值
- 原因
做count distinct时,该字段存在大量值为null或空的记录 - 思路
- count distinct时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1
- 如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union
-
方法
空间换时间
-
案例
- 问题
同一个reduce上进行distinct操作时压力很大 -
方法