Hive优化总结

一.常用参数

开启中间结果压缩  对于输入数据量有少许减少,但是cpu开销增大,对于单stage任务总体不理想

set hive.exec.compress.intermediate=true;

set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec

开启最终输出压缩

set hive.exec.compress.output=true;

mapred.compress.map.output   map的输出是否压缩

mapred.map.output.compression.codec  map的输出压缩方式

mapred.output.compress reduce的输出是否压缩

mapred.output.compression.codecreduce的输出压缩方式

set mapred.output.compression.codec=org.apache.hadoop.io.compress.GZipCodec

减少MAP数量

设置MAP的分割文件大小 set mapred.max.split.size=512000000

增加MR中MARTASK的可使用内存内存  set mapreduce.map.memory.mb; 以解决MAP时的OOM

map执行时间:map任务启动和初始化的时间+逻辑处理的时间。

所以 如果小文件过多,可以执行set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;在MAP开始前进行小文件合并

调整reduce参数的值

set mapred.reduce.tasks = 15;

set hive.exec.reducers.bytes.per.reducer =

一般根据输入文件的总大小,用它的estimation函数来自动计算reduce的个数:reduce个数 = InputFileSize / bytes per reducer

设置并行度

set hive.exec.paralle=true 

set hive.exec.parallel.thread.number=16;

设置mapjoin

set hive.auto.convert.join = true;

set hive.mapjoin.smalltable.filesize = 2500000 ;//刷入内存表的大小(字节) 

设置严格模式(一般设置成全局级,而非会话级)

set hive.marped.mode=strict

防止用户执行那些可能意想不到的不好的影响的查询  分区表必须指定分区

开启动态分区排序优化 set hive.optimize.sort.dynamic.partition

  开启map任务的mapreduce会引入reduce过程,这样动态分区的那个字段比如日期在传到reducer时会被排序。由于分区字段是排序的,因此每个reducer只需要保持一个文件写入器(file writer)随时处于打开状态(不然每个动态分区都需要打开一个写入器),在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力

二、日常sql编写技巧

1.将大表放后头

    Hive假定查询中最后的一个表是大表。它会将其它表缓存起来,然后扫描最后那个表。 因此通常需要将小表放前面,或者标记哪张表是大表:/*streamtable(table_name) */

2.使用相同的连接键

    当对3个或者更多个表进行join连接时,如果每个on子句都使用相同的连接键的话,那么只会产生一个MapReduce job。

3.尽量尽早地过滤数据

    减少每个阶段的数据量,对于分区表要加分区,同时只选择需要使用到的字段。

4.尽量原子化操作

    尽量避免一个SQL包含复杂逻辑,可以使用中间表来完成复杂的逻辑

5.limit 语句快速出结果

    尤其是spark sql

数据倾斜

原因

1)、key分布不均匀

2)、业务数据本身的特性

3)、建表时考虑不周

4)、某些SQL语句本身就有数据倾斜

解决方案:

场景1:join时小表放在前面,并开启mapjoin    set hive.auto.convert.join = true;

        或使用 set hive.optimize.skewinfo=table_B:(selleer_id) [ ( "0") ("1") ) ]   说明B表的值集中在0和1上

      set hive.optimize.skewjoin = true;

场景2:借助随机函数 对关联不到的 如0值或空值进行过滤或随机函数替换处理,避免倾斜

场景3:方案1:通过 union all将数据进行分离,聚集值单独处理

             方案2:在MAP端 预聚合 set hive.map.aggr=true

这个设置可以将顶层的聚合操作放在Map阶段执行,从而减轻清洗阶段数据传输和Reduce阶段的执行时间,提升总体性能。确点是更废内存

             方案3:    1.hive.groupby.skewindata=true  (set hive.groupby.mapaggr.checkinterval=100000;这个是group的键对应的记录条数超过这个值则会进行优化  注意:该参数仅支持单列

hive.groupby.skewindata=true 控制生成两个MR Job。

第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中(这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中

场景4: select count(distinct udid) from T 改写成  select count(1) from( select distinct udid from T) t;

区别:增加一个JOB,同时后者在执行子查询的时候,可以有多个reduce参与计算,而前者只有一个reduce


大表JOIN大表时性能问题

1.分桶

建表时采用表分桶的设计,同时join时可以尝试使用bucket map join;基本处理方法是将两个表在join key上做hash bucket,将较小表(sale_history)的bucket设置为较大表(call_result)的数倍。这样数据就会按照join key做hash bucket。这样做的话,小表依然会复制到各个节点上,map join的时候,小表的每一组bucket加载成hashtable,与对应的大表bucket做局部join。

如果两表的join key 都具有唯一性(是主键关联),还可以进一步做sort merge bucket map join ;做法是两表都做bucket的基础上,每个bucket内部还要进行排序,这样做得好处就是在两边的bucket要做局部join的时候,用类似merge sort算法中的merge操作一样把两个bucket顺序遍历一下即可。

1.1 条件

1) set hive.optimize.bucketmapjoin = true;

2) 一个表的bucket数是另一个表bucket数的整数倍

3) bucket列 == join列

4) 必须是应用在map join的场景中

1.2 注意

1)如果表不是bucket的,只是做普通join


2.业务场景处理:

       看懂实现逻辑即可,核心逻辑,无外乎是维护一个大客户表,对数据进行放大,结合随机函数,生成新的关联键,确保只关联到其中的一条即可,将数据的集中度进行打散

  1).通用方案

      此方案的思路是建立一个numbers表,其值只有一列int 行,比如从1到10(具体值可根据倾斜程度确定),然后放大B表10倍,再取模join。代码如下:

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from (

        select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (  select buyer_id,  seller_id,  pay_cnt_90day   from table_A)  a

        join

               (

          select  /*+mapjoin(members)*/

            seller_id,  sale_level ,member

          from table_B

            join members

          )  b

        on  a.seller_id  = b.seller_id

          and mod(a.pay_cnt_90day,10)+1 = b.number 

        )  m

      group by m.buyer_id

         此思路的核心在于,既然按照seller_id分发会倾斜,那么再人工增加一列进行分发,这样之前倾斜的值的倾斜程度会减少到原来的1/10,可以通过配置numbers表改放大倍数来降低倾斜程度,

      但这样做的一个弊端是B表也会膨胀N倍。

  2).专用方案

        通用方案的思路把B表的每条数据都放大了相同的倍数,实际上这是不需要的,只需要把大卖家放大倍数即可:需要首先知道大卖家的名单,即先建立一个临时表动态存放每天最新的大卖家(

      比如dim_big_seller),同时此表的大卖家要膨胀预先设定的倍数(1000倍)。

        在A表和B表分别新建一个join列,其逻辑为:如果是大卖家,那么concat一个随机分配正整数(0到预定义的倍数之间,本例为0~1000);如果不是,保持不变。具体代码如下:

      select

         m.buyer_id,

        sum(pay_cnt_90day)  as pay_cnt_90day,

        sum(case when m.sale_level = 0  then pay_cnt_90day  end)  as pay_cnt_90day_s0,

        sum(case when m.sale_level = 1  then pay_cnt_90day  end)  as pay_cnt_90day_s1,

        sum(case when m.sale_level = 2  then pay_cnt_90day  end)  as pay_cnt_90day_s2,

        sum(case when m.sale_level = 3  then pay_cnt_90day  end)  as pay_cnt_90day_s3,

        sum(case when m.sale_level = 4  then pay_cnt_90day  end)  as pay_cnt_90day_s4,

        sum(case when m.sale_level = 5  then pay_cnt_90day  end)  as pay_cnt_90day_s5

      from (

        select  a.buer_id,  a.seller_id,  b.sale_level, a.pay_cnt_90day

        from (  

          select  /*+mapjoin(big)*/

             buyer_id,  seller_id,  pay_cnt_90day,

             if(big.seller_id is not null, concat(  table_A.seller_id,  'rnd',  cast(  rand() * 1000 as bigint ), table_A.seller_id)  as seller_id_joinkey

              from table_A

               left outer join

             --big表seller_id有重复,请注意一定要group by 后再join,保证table_A的行数保持不变

             (select seller_id  from dim_big_seller  group by seller_id)big

             on table_A.seller_id = big.seller_id

        )  a

        join

               (

          select  /*+mapjoin(big)*/

            seller_id,  sale_level ,

            --big表的seller_id_joinkey生成逻辑和上面的生成逻辑一样

            coalesce(seller_id_joinkey,table_B.seller_id) as seller_id_joinkey

          from table_B

            left out join

          --table_B表join大卖家表后大卖家行数扩大1000倍,其它卖家行数保持不变

          (select seller_id, seller_id_joinkey from dim_big_seller) big

          on table_B.seller_id= big.seller_id

          )  b

        on  a.seller_id_joinkey= b.seller_id_joinkey

        )  m

      group by m.buyer_id

      相比通用方案,专用方案的运行效率明细好了许多,因为只是将B表中大卖家的行数放大了1000倍,其它卖家的行数保持不变,但同时代码复杂了很多,而且必须首先建立大数据表。

   3).动态一分为二

      实际上方案2和3都用了一分为二的思想,但是都不彻底,对于mapjoin不能解决的问题,终极解决方案是动态一分为二,即对倾斜的键值和不倾斜的键值分开处理,不倾斜的正常join即可,

    倾斜的把他们找出来做mapjoin,最后union all其结果即可。

      但是此种解决方案比较麻烦,代码复杂而且需要一个临时表存放倾斜的键值。代码如下:

      --由于数据倾斜,先找出90天买家超过10000的卖家

      insert overwrite table  temp_table_B

      select 

        m.seller_id,  n.sale_level

      from (

        select   seller_id

        from (

          select seller_id,count(buyer_id) as byr_cnt

          from table_A

          group by seller_id

          ) a

        where a.byr_cnt >10000

        ) m

      left join 

      (

       select seller_id, sale_level  from table_B

      ) n

        on m.seller_id = n.seller_id;

      --对于90天买家超过10000的卖家直接mapjoin,对其它卖家直接正常join即可。

大表关联案例  引用自  https://www.cnblogs.com/shaosks/p/9491905.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容