1 整体优化思路
1、从数据任务本身出发,优化业务逻辑代码,偏业务优化
2、 从集群的资源参数进行优化,偏技术优化
3、从全局的角度观察任务的调度设置是否合理,优化任务优先级
4、从数据模型的易用性和复用性出发,落地中间可复用的模型表
2 详细优化策略
1、分区裁剪和列裁剪
查询分区表时需要增加分区过滤条件。只查询分析所需要的列,避免select *操作。
2、提前数据收敛
能够在子查询中完成的过滤要在子查询中完成
3、谓词下推PPD
在map端执行的谓词过滤称为谓词下推,在reduce端执行的过滤为非下推。开启谓词下推的方法 hive.optimized.ppd = true。在执行join关联时,谓词无论在where 还是on中都会下推,在执行full join关联时无论在where 还是 on中都不会下推。保留表的过滤条件在on中时谓词不会下推。空表的过滤条件在where中时谓词不会下推。(left join左表为保留表,右表为空表,right join反之)。若过滤条件中存在不确定函数如,rand()和unixtimestamp()等,谓词不会下推。
4. 选择合适的存储方式和数据格式
行存优点:所有数据存放在一起,INSERT/UPDATE操作比较方便。缺点:查询个别列时,所有列的数据均会被读取。使用场景:适合点查,返回基于少,基于简单索引查询。适用于增删改较多的场景。
列存优点:所有列均可以被作为索引,投影效率比较高,查询时只有涉及到的列会被读取,同列同质数据压缩比比较高。缺点:INSERT/UPDATE操作比较麻烦,查询时返回的列数据需要被重新组装。不适合点查。使用场景:统计分析类查询(OLAP大量汇聚计算,且对列操作较少,分组管理较多),即时查询,查询条件不确定,行存表难以使用索引。
从更新频率来看:更新频率高,适合行存。从插入频率来看:一次性批量插入适合列存。多次频率少量插入适合行存。从表的列数来看:如果表的列数较多,且每次查询涉及列较少(低于50%),适合列存。如果表的列数较少,且每次查询的列数较多适合行存。从压缩比来看:列存压缩率比较高,但是会消耗更多的CPU资源。
5、合理使用排序
order by 全局排序,全局只有一个reduce,数据量大时性能较差,慎用。
sort by 局部排序,即只保证每个reduce内部有序。
distribute by 按照制定的字段将数据分发到不同的reduce上,distribute by控制了数据从map端到reduce端的分发,结合给定的字段和reduce数量完成hash分发。
cluster by既可以实现sort by的功能,又可以实现distribute by的功能。当排序字段和分区字段相同时,可以使用cluster by 代替distribute by + sort by。
对于海量数据取TOP可以内部使用sort by,外部使用order by,selec id,num from (select id,num from table sort by id limit 10)order by num limit 10
6、多路输出
适用于一次查询和多次写入的场景,减少表的读取次数,提升性能。最多可128路输出。
7、小文件优化
本质原因:小文件过多是由于在向hive表导入数据时产生的。MR中有多少个reduce就有多少个文件,文件数=reduce *分区数,如果job没有reduce,那文件数=map数*分区数。所以需要调整reduce和分区数来控制小文件的数量。
影响:1)小文件过多会增加NameNode的压力,元数据较多占用内存,影响HDFS的性能。2)从Hive查询来看,每个文件被当成一个数据块,需要启动一个map任务来完成。而map任务的启动和初始化时间远大于逻辑处理时间,会造成较大的资源浪费。
优化思路:
1)使用hive命令进行合并,concatenate。alter table A conccatenate
2)调整参数减少map数,设置map输入合并小文件。
设置map输入合并小文件
-- 每个Map最大输入文件大小 set mapred.max.split.size = 10240000;
-- 一个节点上的split的至少的大小 set mapred.min.split.size.per.node = 10240000;
-- 一个交换机下split的至少的大小 set mapred.min.split.size.per.rack = 10240000
-- map前执行小文件合并 set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
设置map输出和reduce输出合并小文件
-- 设置map端输出进行合并,默认为true。 set hive.merge.mapfiles = true;
-- 设置reduce端输出合并,默认为false,set hive.merge.mapredfiles = true;
-- 设置合并文件大小 set hive.merge.size.per.task = 256*1000*1000
-- 当输出文件平均大小小于该值时,启动一个独立的MapReduce任务进行文件合并 set hive.merge.small.files.avgsize = 160000000;
3)调整参数减少reduce数
-- 直接设置reduce个数 set mapred.reduce.job.reduces = 10;
8、join 优化
1)提前数据收敛,保证join时无关数据不参与关联
2)left semi join,只返回左表数据,如果右表有一条匹配则跳过,而join可能会出现重复数据。右边过滤条件写on里。
3)大表join小表 小表放在左边,大表放在右边。join在reduce阶段,在hive 2.x之前会把左表加载到内存,hive2.x之后已经自动优化了。
4)启用map join,mapjoin就是把join的表直接分发到map端的内存中,即在map端来执行join操作。提高执行效率,如果表较小,可以启用map join。set hive.auto.convert.join = true,大表小表阈值 set hive.mapjoin.smalltable.filesize = 200000;
5) 大表join大表场景 A:空key过滤,过滤空key的数据 B:空key转换,转换key的数据进行关联时打散key
6)避免笛卡尔积
9、count distinct和group by
count distinct 计算去重指标,数据量不大时和group by性能差别不大。数据量较大时count distinct比较耗费性能,只有一个reduce task来执行。容易reduce端数据倾斜,通常优化使用里层group by ,外层count来代替。
hive 3.x新增了对count(distinct )的优化,通过set hive.optimize.countdistinct配置,可以进行自动优化。里层group by外层count会生成两个job任务,会消耗更多的I/O资源。
1)distinct是用于去重,group by设计目的是用于统计聚合。
2)单纯去重操作使用distinct,速度是快于group by的
3)distinct要针对查询的全部字段去重,而group by可以针对要查询的全部字段中的部分字段去重,主要作用是:获取数据表中以分组字段为依据的其他统计数据。
两者执行方式不同,distinct主要对数据进行两两比较,需要遍历整个表。group by分组类似先建立索引再查索引,当数据量大时,group by速度快于distinct。
10、参数调优
set hive.optimize.countditsinct = true;开启对count(distinct )的优化
set hive.auto.convert.join = true;开启map join
set hive.mapjoin.smalltable.filesize = 220000 设置mapjoin的大小表
set hive.exec.parallel = true 开启并行执行
set hive.exec.parallel.thread.numbers = 16;同一个SQL允许最大并行度,默认为8.会将SQL没有相互依赖的stage并行执行。
set hive.map.aggr = true 开启map端聚合
set hive.groupby.skewindata = true 当有数据倾斜时,进行负载均衡优化,生成的查询有两个MR任务,第一个MR任务中map随机打散数据,在reduce端进行聚合,第二个Job按group by的key分布到reduce,执行聚合操作
set hive.mapred.mode = strict 设置严格模式。1)分区查询需要增加分区过滤条件。2)order by必须增加limit。 3)限制了笛卡尔积查询
set hive.input.format = org.apahce.hadoop.ql.io.CombineHiveInputForamt 设置map端合并小文件
set hive.exec.compress.output = true 设置hive查询结果是否压缩
set mapreduce.output.fileoutputformat.compress = true;设置MapReduce Job的结果输出是否使用压缩
set hive.cbo.enable=false;关闭CBO优化,默认值true开启,可以自动优化HQL中多个JOIN的顺序,并选择合适的JOIN算法
11、解决数据倾斜问题
数据倾斜有三种类型:shuffle倾斜、数据膨胀倾斜、输入倾斜。
导致数据倾斜的原因,key的分布不均导致不同的task处理的数据量发生倾斜。
shuflle倾斜优化方案:
1)将reduce join改为map join,适用于大表join小表。思路,使用broadcast变量和map算子实现join操作。优点:对join操作大致的数据倾斜效果非常好,因为不会发生shuffle。缺点:使用场景少,只适合大表join小表。
2)过滤少量导致数据倾斜的key。判断少数几个数据量较多的导致数据倾斜的key是否对计算结果不太重要,若不重要可以直接过滤。优点:实现简单、效果好,且对整体计算影响不大。缺点:适用场景少,有些情况下key不能被过滤,或则key数据量比较多,或者key不固定。方案扩展,如果导致倾斜的key数量较少且不能被过滤,可以使用倾斜key map join 倾斜 union 非倾斜key reduce join 非倾斜。
3)两阶段聚合。局部聚合+全局聚合。适用场景:reducebykey等聚合类shuffle算子或SQL中使用group by语句导致的数据倾斜问题效果较好。思路:第一次是局部聚合,先给每一个key打上一个随机数,执行reducebykey聚合操作,然后将前缀去掉,进行全局聚合。优点:对于聚合类shuflle操作导致的数据倾斜效果比较好,通常可以解决或极大缓解,将spark性能提升数倍以上。缺点:仅适用于聚合类的shuffle操作,使用范围窄,如果join类的shuflle还需要其他的方案。
4)使用随机前缀和RDD扩容。适用场景:如果RDD在join时导致数据倾斜的key有很多,那么拆分key也没有太大意义。思路:查看RDD/HIVE的数据分布情况,找到导致数据倾斜的那个RDD或hive表,将该RDD的每一条数据都增加一个n以内的随机前缀。对另外一个RDD而言扩展为n条数据,扩展出来的每条数据都打上一个0~N的随机前缀。最终将两个RDD进行join即可。优点:对join的数据倾斜基本都可以处理,而且效果显著,性能提升不错。缺点:更多的是缓解数据倾斜,而不是彻底规避,需要对整个RDD进行扩容,对内存资源要求比较高。
输入倾斜。Input file存在数据倾斜,有些文件存储数据多,有些存储数据少。数据量大的task执行慢,优化方案:大文件拆分,小文件合并。
膨胀倾斜。两个对多对的表进行关联会发生数据膨胀(笛卡尔积)。解决方案:能否去掉一些热点的大key。能否增加一些关联条件,减少最终的结果数据。能否在数据范围上做减少,对于笛卡尔积的关联需要把数据限制在1亿条内,如果是M>>N的关联,可以考虑将N进行广播,对M切分成多个小分片进行map join。