此篇内容:hive自定义函数UDF、UDTF,压缩存储方式,hive优化、hive实际编程
基本涵盖了hive基础及常用场景,输出这篇单纯是为了忘记了时候可以有地方翻看查找。
一、熟练hive大数据编程
1、hql实现累加值
实际需求是我要实现一个分布趋势报表可视化,但是需求是要后一个分布组基于前一个分组再累加当前分布值,之后计算分布占比,从而最后一个分布组占比是100%的水平点,这样就是一个曲线趋势轻缓报表分析。
select cold_startup_time_distribute
,cold_startup_cnt_distribute
,cumulative_cold_startup_cnt_distribute
,cumulative_cold_startup_cnt_distribute/all_cold_startup_cnt_distribute as cumulative_cold_startup_rate
from
(select cold_startup_time_distribute
,sum(cold_startup_cnt_distribute) as cold_startup_cnt_distribute
,sum(sum(cold_startup_cnt_distribute)) over(order by cold_startup_time_distribute ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as cumulative_cold_startup_cnt_distribute
,sum(sum(cold_startup_cnt_distribute)) over(order by cold_startup_time_distribute ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as all_cold_startup_cnt_distribute
from
(select case when code_startup_time>=0 and code_startup_time<=250 then 250
when code_startup_time>250 and code_startup_time<=500 then 500
when code_startup_time>500 and code_startup_time<=750 then 750
when code_startup_time>750 and code_startup_time<=1000 then 1000
when code_startup_time>1000 and code_startup_time<=1250 then 1250
when code_startup_time>1250 and code_startup_time<=1500 then 1500
when code_startup_time>1500 and code_startup_time<=1750 then 1750
when code_startup_time>1750 and code_startup_time<=2000 then 2000
when code_startup_time>2000 and code_startup_time<=2250 then 2250
when code_startup_time>2250 and code_startup_time<=2500 then 2500
when code_startup_time>2500 and code_startup_time<=3000 then 3000
when code_startup_time>3000 then 99999999
end as cold_startup_time_distribute
,count(1) as cold_startup_cnt_distribute
from ext_metis.ads_reportform_cold_hot_start
where stat_date >= '2021-01-07'
and stat_date <= '2021-01-11'
and type = 'COLD'
group by case when code_startup_time>=0 and code_startup_time<=250 then 250
when code_startup_time>250 and code_startup_time<=500 then 500
when code_startup_time>500 and code_startup_time<=750 then 750
when code_startup_time>750 and code_startup_time<=1000 then 1000
when code_startup_time>1000 and code_startup_time<=1250 then 1250
when code_startup_time>1250 and code_startup_time<=1500 then 1500
when code_startup_time>1500 and code_startup_time<=1750 then 1750
when code_startup_time>1750 and code_startup_time<=2000 then 2000
when code_startup_time>2000 and code_startup_time<=2250 then 2250
when code_startup_time>2250 and code_startup_time<=2500 then 2500
when code_startup_time>2500 and code_startup_time<=3000 then 3000
when code_startup_time>3000 then 99999999
end
) s
group by cold_startup_time_distribute
) q
分析上面hql,在子查询中构造冷启动时长分布,之后count计算得出各分布的次数,然后在外层实现累加操作,sum(cold_startup_cnt_distribute)
就代表每个分布组的值,是要累加的值,over定义了一个统计窗口,ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
这一句定义了窗口的起点和终点,UNBOUNDED PRECEDING
表明从第一行开始,CURRENT ROW
为默认值当前行。ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
等价于ROWS UNBOUNDED PRECEDING
。sum(sum(cold_startup_cnt_distribute)) over(order by cold_startup_time_distribute ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
从而就实现了滚动累计值,按分布组升序,然后统计窗口的终点始终从当前行滚动到下一行,起点一直是第一行。
上面hql的累计值统计思路参考自此篇文章:https://www.cnblogs.com/adolfmc/p/12060837.html
关键是理解ROWS BETWEEN含义,也叫做WINDOW子句,但序列函数则不支持window子句,比如row_number()
PRECEDING:在前N行的意思;FOLLOWIND:在后N行的意思。
sum( sum(amount) ) over(order by month ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as cumulative_amount
表示统计前一个月和后一个月的累加值
学习窗口函数:http://blog.itpub.net/22207394/viewspace-1154123/
2、with tmp as (子查询块)
with tmp as (子查询块) 就相当于是用来存储一段hql语句的,提高hql代码的可读性,常用在union查询,不过对于性能优化来说它是不能减少表的读取次数的。
with t1 as (select * from table_name where stat_date=20201231),
t2 as (select * from table_name where stat_date=20210102)
select * from t1 union all select * from t2;
3、from ... insert 同时多表插入
这个做法的场景是当要从同一张表查询数据时,然后写往不同的表或者表分区就可采用此做法,可以达到性能优化,减少表的读取次数,而不用说得分开查询多次表写入不同表。
from table_name
insert overwrite table table_kpi_2021 partition(stat_date,date_type=1)
select channel_lev1
,sum(active_umid_cnt ) as active_umid_cnt
group by channel_lev1
insert overwrite table table_kpi_2021 partition(stat_date,date_type=7)
select channel_lev1
,sum(active_umid_cnt_7d_std) as active_umid_cnt
group by channel_lev1
insert overwrite table table_kpi_2021 partition(stat_date,date_type=30)
select channel_lev1
,sum(active_umid_cnt_30d_std) as active_umid_cnt
group by channel_lev1
4、与时间相关的函数
涉及到以下函数格式化的参数及含义
参数 | 含义 |
---|---|
%Y | 年,4位格式 |
%y | 年,2位格式 |
%m | 月,数字格式 |
%d | 天,数据格式 |
%H | 小时,24小时格式 |
%h | 小时,12小时格式 |
%i | 分 |
%s | 秒 |
%T | 时间,24小时(hh:mm:ss) |
-
datediff日期比较函数
此函数可以用在计算留存率上,第一个参数是结束日期,第二个是开始日期,返回结束日期减开始日期
select datediff('2020-07-05','2020-06-15'); //返回20,注意日期格式认准- ,如果是/则无效,得使用格式转换
-
date_sub函数和date_add函数
date_sub是把日期减去指定时间间隔,用来取前几天的日期,输入和输出都是带横杠的日期格式的数据。date_add是date_sub相反的,取后几天的日期,两函数会一个即可,因为可通过正负号调整达到同样的效果。
select date_sub('2020-03-04',2); //输出结果:2020-03-02
select date_add('2020-03-04',-2); //输出结果:2020-03-02
-
from_unixtime函数
把时间戳转换为指定格式的日期,注意这里的时间戳单位是秒,如果是毫秒时间戳则得先除以1000转换为秒,否则输出的日期会奇奇怪怪的。
select from_unixtime(1548648852,'yyyy-MM-dd'); //输出结果:2019-01-28
-
unix_timestamp函数
把日期转换成unix时间戳格式
select unix_timestamp('2019-01-28','yyyy-MM-dd'); //输出结果:1548604800
-
to_date函数
将横杠格式的日期字符串中,获取日期,返回值类型是string,类似的还有year函数和month函数和day函数和hour函数和minute函数和second函数,返回int类型
select to_date('2021-01-01 10:03:01'); //返回'2021-01-01' string类型
select year('2021-01-01 10:03:01'); //返回2021 int类型
select month('2021-01-05 10:03:01'); //返回1 int类型
select day('2021-01-05 10:03:01'); //返回5 int类型
select hour('2021-01-05 10:03:01'); //返回10 int类型
select minute('2021-01-05 10:03:01'); //返回3 int类型
select second('2021-01-05 10:03:01'); //返回1 int类型
-
weekofyear函数
返回当前日期在当前年的周数,返回值是int类型
select weekofyear('2021-01-10 17:52:15');
-
date_format函数
起格式化日期作用
select date_format('2021-01-10','yyyyMMdd');
- 返回当前时间
select CURRENT_DATE; //返回当天时间,2021-01-10
select CURRENT_TIMESTAMP; //返回当前时间,2021-01-10 18:07:44.344
select from_unixtime(unix_timestamp()); //返回当前时间,2021-01-10 18:07:56
select trunc('2021-02-10','MM'); //返回当月的第一天,2021-02-01
select trunc('2021-02-10','YEAR'); //返回当年的第一天,2021-01-01
-
对yyyyMMdd格式的日期和yyyy-MM-dd格式的日期相互转换
有几种方式,第一种是通过from_unixtime()+unix_timestamp()
转换时间戳方式转换,第二种是通过concat()+substr()
拼接截取方式转换,yyyy-mm-dd转换成yyyymmdd时还可以通过regex_replace()
正则匹配方式去掉横杠,或者通过date_format()
格式化日期。
参考文章:https://www.jianshu.com/p/1826f373ce71
5、列转行函数 (效果:多行转换成一行)
有collect_list()
,collect_set()
这2个函数,区别在于collect_list()
不对最终转换出来的结果列数据进行去重操作,而collect_set()
会进行去重操作,而且利用列转行函数有时还可以满足一些特殊需求,达到突破group by 的分组限制。下面附实际例子帮助理解:
//需求:实现统计用户从桌面启动app的总次数且展示启动的app列表,如果不利用collect_set函数,就无法即满足计算用户启动app总次数且同时展示启动的app列表
select stat_date
,imei
,collect_set(app名事件参数) as app_list
,count(1) as open_cnt
from table_name
where stat_date = --统计日期
and imei in() --用户标识
and pkg_name = '' --桌面包
and event_name = '' --启动事件名
group by stat_date
,imei
6、表生成函数(效果:一行转换成多行)
lateral view
表生成函数时常搭配explode
或spilt
切分函数对一些map类型或者array类型的字段进行拆分成多行数据,比如看下面例子,可以理解到把a表查询的cc字段拆分出来明细数据并生成一个spilt_datas字段,这样就可以看到一行转换成多行的效果。
select a.aa,a.bb,b.spilt_datas
from
(select aa,bb,cc from table_a) a
lateral view explode(split(a.cc,','))b as spilt_datas
7、grouping多维组合分析
简单来说,就是希望达到在对同一个数据集进行一次查询中按照不同维度组合的情况来分析数据,你可能会想到分开几次group by后再union all起来,但是这样的hql写起来就很冗长繁琐,所以可以通过使用grouping sets
这种语法简化,原理也就是等价于多次group by再union all起来的过程。实际在使用grouping sets
时会产生null值,为了避免与数据自身的null记录混淆,可以先对原始的null记录使用coalesce函数转换为other或者unknown。hive也提供了一种标记方法用来识别上述null值情况,就是有个GROUPING__ID
的字段(PS:注意这个字段是2个下划线),这个字段的生成机制是group by 后面内容的二进制对应的十进制值。
grouping sets
相当于灵活指定各维度的组合情况
with cube;
相当于所有维度的全组合
with rollup;
相当于降维分析,即排列在group by中的前一维度为null则后一维度必为null,前一维度不为null则后一维度随意
当hql中出现了count distinct这类去重统计语句且多维分析的维度超过了5个维度时,则会报错。此时需要设置下参数 set hive.new.job.grouping.set.cardinality = 256;
这里的256值是可以调整的,代表着维度组合的最高数,一般为大于2^维度值
即可,但多维分析最好不要超过7个维度,否则对集群压力挺大。
8、hive元数据表应用
需求:可通过hive元数据表实现一个功能,写shell脚本通过表字段定义查询到对应的数据库名和数据表。
背景:对于新入职员工做临时取数数据需求有极大帮助,因为数仓建设过程中存在许多张hive落地表,已有统计好的指标不知道可从那张表中获取,可能数据开发组中平时也缺乏共同维护一份专门记录常用数据表的表用途的文档。
以下列举几张常用重要的元数据表:
VERSION
记录hive版本信息表,不常用但重要,没了这张表直接hive客户端都启动不了
DBS
记录hive所有数据库的基本信息表,关联字段:DB_ID
TBLS
记录hive所有数据表的基本信息表,关联字段:TBL_ID
TABLE_PARAMS
记录hive所有存储表/视图的属性信息表,关联字段:TBL_ID
SDS
记录hive所有数据表的存储信息表,关联字段:SD_ID
COLUMNS_V2
记录hive所有数据表的字段信息表,关联字段:CD_ID
PARTITIONS
记录hive所有分区表的基本信息表,关联字段:TBL_ID
PARTITION_KEYS
记录hive所有分区表的字段信息表,关联字段:TBL_ID
select a.*
,b.CD_ID
,b.INPUT_FORMAT
,b.OUTPUT_FORMAT
,b.IS_COMPRESSED
,b.LOCATION
,c.COLUMN_NAME
,c.COMMENT
,c.TYPE_NAME
from (select * from TBLS where SD_ID = 228) a
join (select * from SDS where SD_ID = 228) b
on a.SD_ID = b.SD_ID
join (select * from COLUMNS_V2 where CD_ID = 108) c
1、元数据和真实数据是分开存放的,元数据默认存放在自有derby中(PS:但公司一般会修改为存放在mysql中),真实数据存放在hdfs中。附hive元数据学习文章:https://www.cnblogs.com/qingyunzong/p/8710356.html
2、shell脚本调用mysql命令,无需进入mysql客户端,其中-e参数主要是执行完后面sql语句后退出,附学习文章:https://blog.csdn.net/u012026446/article/details/81094027
9、get_json_object函数和json_tuple函数
这两个函数都是用来解析获取json格式的文本数据的,不过它俩就区别在get_json_object一次只能解析一个键对象的数据,而json_tuple一次可以解析多个键对象的数据。假设下面例子的content字段是存放着json数据(即键值对)
select get_json_object(content,'$.键名') from test //这样就解析获取到对应键名的数据了
select a.* from test lateral view json_tuple(content,'键名1','键名2','键名3') a as k1,k2,k3 //这样就解析获取多个键名的数据了
附一篇对hive解析json操作讲得很棒的文章:https://mp.weixin.qq.com/s/awCvlb9BzCRX-Da1_l1FYg
10、alter表结构修改
在表最后面新增字段新增hive列字段
alter table 表名 add columns (列名1 数据类型 comment '数据注释',列名2 数据类型 comment '数据注释');
重命名hive表
alter table 旧表名 rename to 新表名;
修改hive表列字段或数据类型
alter table 表名 change 要修改的列名 新列名 新数据类型;
删除hive表列字段
alter table 表名 replace columns(新的schema)
没有删除,相当于用新的schema替换原有的,这样经测试还可以使用此方式来修改列字段顺序,弥补hive新增字段只能默认放在最后一行的情况
模糊查询数据库或数据仓库中的数据表
在mysql中,show tables like '%模糊关键字%'
语句进行模糊查询表名,like关键字必须存在
在hive中,show tables like '模糊关键字*'
语句进行模糊查询表名,like关键字可省略
hive查找各自带函数:show functions
hive模糊查找各自带函数:show functions like '*substr'
hive展示自带函数的用法:desc function extended substr
union all和union
union all
对两个结果集进行并集操作,不进行去重操作,也没有进行排序
union
对两个结果集进行并集操作,但有进行去重操作和排序
11、hive语句使用正则表达式
- 1、
regexp_extract(第一个参数,第二个参数,第三个参数)
作用:根据正则规则匹配把字段的值拆分哪几组并选择第几组作为结果输出。第一个参数是字符串或者字段名,第二个参数是正则拆分规则,每个()拆分为一组,第三个参数是取第几组,0表示把整个正则表达式对应的结果全部返回。
举例:select regexp_extract(abcde_ver_lev2,'(abcdeOS-)([0-9]+\.[0-9]+)',2) from 表名
,拿到的结果是类似于7.3这种数据。 - 2、
regexp_replace(第一个参数,第二个参数,第三个参数)
作用:根据正则规则匹配把字段的值替换掉。第一个参数是字符串或者字段名,第二个参数是正则替换规则,第三个参数是替换的文本。 - 3、
字符型字段 rlike 正则规则
作用:把符合正则规则的给筛选出来,如果要把不符合的给筛选出来可以在前面加个not。
注意:sql正则表达式开始和结束标志^和$前面不需要\转义,和java不一样。
举例:①select phone from phone_table where phone not rlike '^1\d{10}$';
,拿到的就是不符合11位合理手机号的数据
②select * from 表名 where 字段名 rlike '^\\d+$';
,匹配全是数字的数据。
③select fphone * 表名 where phone rlike '^((\\+86)|(86))?1\\d{10}$'
,拿到的就是包含+86、86、1开头的手机号。
关于正则表达式的基础标识就看这篇文章
12、数据脱敏函数
hive有专门的脱敏函数供我们使用,就是mask()
函数,返回值是string类型,默认需要脱敏的数据中大写字母就自动转换为X
,小写字母就自动转换为x
,数字就自动转换为n
,也可通过mask()
函数的参数来自定义转换格式。
select mask(要加密字段) from 表名 //输出默认加密后的结果
select mask(要加密字段,'X','x','#') from 表名 //输出自定义加密后的结果
select mask_first_n(要加密的字段,n) from 表名 //对前n个字符进行脱敏
select mask_last_n(要加密的字段,n) from 表名 //对后n个字符进行脱敏
select mask_show_first_n(要加密的字段,n) from 表名 //对除了前n个字符之外的字符进行脱敏
select mask_show_last_n(要加密的字段,n) from 表名 //对除了后n个字符之外的字符进行脱敏
select mask_hash(字段) from 表名 //对字段进行hash操作,若是非string类型的字段此函数就返回null
附学习参考文章:https://blog.csdn.net/CPP_MAYIBO/article/details/104065839
13、常见函数
函数concat(str1,str2)
是用来连接字符串的
函数concat_ws(连接符,str1,str2)
也是用来连接字符串,但是第一个参数表示连接起来的分隔符
函数round(double类型参数,n)
表示四舍五入,保留小数点后n位精度,
函数floor(double类型参数)
表示向下取整,向下保留整数部分
函数reil(double类型参数)
表示向上取整,向上保留整数部分,注意:这三个函数的返回类型都是bigint。
函数coalesce(数据字段,'自定义值')
,这个函数实现对数据做空处理,当数据为空时则按定义的结果输出,可应用在数据清洗转换过程或者多维grouping分析场景。
14、查询中的join连接语句
hive支持join语句,但只支持等值连接,即不能出现这种select 列字段 from 表a join 表b on a.列字段>b.列字段
,因为hive默认是开启了严格安全模式。
一般在连接操作的时候,hive是从左到右的顺序执行的,所以这里可以提到join优化的问题,通常把连接的小表放在join前边,把大表放在join后边。还有当多表进行join连接时,每个on子句都使用相同的连接键的话,就只会产生一个MapReduce job。
select 列名 from 表名1 join 表名2 on (表1唯一字段=表2唯一字段)
等值连接
select 列名 from 表名1 left join 表名2 on (表1唯一字段=表2唯一字段)
左外连接 (左表有的都要出现)
select 列名 from 表名1 right join 表名2 on (表1唯一字段=表2唯一字段)
右外连接
select 列名 from 表名1 full outer join 表名2 on (表1唯一字段=表2唯一字段)
全外连接 (左右表有的都要出现)
select 列名 from 表名1 left semi join 表名2 on (表1唯一字段=表2唯一字段)
左半连接(右边的表只能在on子句中设置过滤条件,where不能设置过滤条件,最终select结果只允许出现左表,右表的数据被过滤掉了,左表的记录在右表中一旦找到对应的记录,右侧表既停止扫描 ,所以性能会高些),PS:没有右半连接。
二、UDF和UDAF和UDTF
UDF
:一进一出,类似于round函数转换效果
UDAF
:多进一出,类似于sum函数聚合效果
UDTF
:一进多出,类似于lateral view或explore函数扩散效果
使用自定义开发的udf函数
步骤:
1、要继承org.apache.hadoop.hive.ql.UDF
2、需要实现evaluate函数,evaluate是可以支持方法重载的
3、把程序打包成jar包扔到集群上
4、在hive脚本里创建函数使用 ADD JAR ... CREATE TEMPORARY FUNCTION ...
实际UDF例子:校验imei,匹配长度为12~20位,且只能包含大小写字母和数字的字符串
package com.xiaojiang.udf;
//实现UDF函数的JAVA类ValidIMEI
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
public class ValidIMEI extends UDF{
public int evaluate(String imei) {
if (StringUtils.isEmpty(imei)){
return 0;
}
if (formatMatch(imei)){
return 1;
}
return 0;
}
public boolean formatMatch(String bigDate){
String elString = "^[A-Za-z0-9]{12,20}";
Pattern pattern = Pattern.compile(elString);
Matcher m = pattern.matcher(bigDate);
return m.matches();
}
}
//打包成jar包后扔到集群上
hadoop fs -put validimei.jar /apps/hive/udf
//脚本中使用UDF功能
add jar hdfs:///apps/hive/udf/validimei.jar
create temperory function validimei as 'com.xiaojiang.udf.ValidIMEI';
select validimei(imei); //返回值1则合规imei,返回值0则不合规imei
使用自定义开发的udaf函数
步骤:(
UDAF自定义聚合函数需要搭配group by一同使用
)
1、需要实现UDAFEvaluator接口的方法,主要是以下几个方法
2、重写init()
、iterate()
、terminatePartial()
、merge()
、terminate()
init()
一般负责初始化内部字段,通常用来存放最终结果的变量
iterate()
每次对一个新的值进行聚合计算时都会调用该方法,一般根据计算结果更新用来存放最终结果的变量
terminatePartial()
部分聚合结果的时候调用该方法,必须返回一个封装了聚合计算当前状态的对象,类似于MapReduce的combiner
merge()
接受来自terminatePartial的返回结果进行合并,hive合并两部分聚合的时候调用该方法
terminate()
终止方法,返回最终聚合函数结果
3、把程序打包成jar包扔到集群上
4、在hive脚本里创建函数使用 ADD JAR ... CREATE TEMPORARY FUNCTION ...
三、压缩存储方式
hive的存储格式中,TEXTFILE
和SEQUENCEFILE
的存储格式是基于行存储的,ORCFILE
和PARQUET
是基于列式存储的,hive建表默认存储格式是TEXTFILE,数据不做压缩的,现在建表建议使用ORCFILE
格式。
- 行存储
行存储的形式是同一条数据的不同字段值都在相邻地方,所以当查找某一个值时行存储查询速度会较快些 - 列存储
列存储的形式是以列来聚集存储,相同字段的值聚集在一起,当经常查询少数字段时,这样能大大减少读取的数据量,更加灵活
//结合orc存储方式和snappy压缩方式,获得减少数数据表大小的性能优化,orc自带的ZLIB压缩方式略差与SNAPPY压缩方式
create table tmp_dwd_app_action_detail_test (
umid string comment '用户标识'
)
stored as orcfile //指定存储方式
tblproperties("orc.compress"="SNAPPY") //指定压缩方式
hive各种表存储格式对比,textfile不做压缩处理,orc格式是默认有做压缩的,采用的是ZLIB方式。表大小排序的话是:textfile>parquet>orc,查询表大小可以使用命令hadoop fs -du -h 表文件路径
四、hive优化
-
1、实在从hql语句逻辑层面解决不了时试试换个执行引擎
在工作中遇到在关联imei唯一键 时,偶尔会出现执行到reduce阶段时报错下面语句,看着报错信息像是关联时imei空值导致聚合失败,但我也做了空值过滤和空值转换处理了,可都不奏效,最后set hive.execution.engine=mr;
换了执行引擎MapReduce反而执行通过了,原来的是Tez引擎。
-
2、牢牢把握住explain语句查看执行计划,才能做到hive优化心中有数
发生数据倾斜时,看执行日志观察下到底是在哪个阶段耗时卡住的,尤其是那种100个reduce数跑剩最后一个reduce却一直卡在那里的情况,这时就要explain查看下执行计划看哪个阶段主要在做什么操作,才能对症优化hql语句,我在做快游戏广告报表优化的时候就遇到这个场景,优化效果:执行时间 3小时--> 5分钟。 -
3、小表JOIN大表
在早期的时候,join关联操作时,如果两表数据量差距很大,要记得把小表放在左表,大表放在右边,执行速度才会快些,不过现在hive已经做了优化解决了这个问题了,所以现在不管大小表join顺序其实对执行速度没多大影响了。 -
4、空key过滤
关联的过程是相同key对应的数据都会发送到相同的reducer上,如果某些空key过多是会导致内存不够的,从而引发join超时,所以必须得先过滤掉这些空key的异常数据
假设a表是包括许多空值的数据,b表是不包含空值的数据
//不做优化时的原始hql
select a.id
from a left join b
on a.id = b.id
//做空key过滤优化时的hql,利用子查询先处理掉后再关联
select a.id
from (select * from a where id is not null) a
join b
on a.id = b.id
-
5、空key转换
当然,有时候空值的数据又不一定是异常数据,还是需要保留的,但是空key过多都分配到一个reducer去了,这样执行起来就算不内存溢出也会发生数据倾斜情况,数据倾斜的话对集群资源的利用率来看的话是极其不利的,我们可以通过把空key虚拟成随机数,保证不是同一个空key,从而降低数据倾斜概率,虽然这样在对关联键做处理反而会总体增长执行时间,但却减轻了reducer负担。
//做空key转换优化时的hql,利用case when判断加随机数
select a.id
from a.left join b
on case when a.id is null then concat('hive'+rand()) else a.id end = b.id
观察数据倾斜情况可以从reducer机器执行时间的差值来对比,假设执行了30秒的hql,然后最长时间的reducer用了28秒,而最短时间的reducer用了10秒,30秒的hql中reducer时长就差距了10秒,那这就算是数据倾斜了,但假设是执行了3000秒的hql,reducer之前相差个10秒倒是无所谓。
- 6、map端优化
通常情况下,作业会通过input目录产生一个或多个map任务,map数主要取决与input的文件总个数,文件总大小,集群设置的文件块大小。
从hadoop2.7.3版本开始,HDFS的默认块大小block size是128M。每张hive表在hdfs上对应存储都是一个文件,关于执行task时,每一个128M的文件都是一个块block,每个块就用一个map任务来完成,若文件超过128M就分块,若小于128M则独立成块。
那么:①当小文件过多怎么办?
答案是map任务增多,map任务的启动和初始化时间远大于执行逻辑处理时间,从而集群造成资源浪费。
②是不是让每个文件都接近128M大小就毫无问题了呢?
答案是不可能,假设一个文件大小127M,但表只有一两个字段,文件大小是由几千万条记录撑大的,如果数据处理逻辑复杂则用一个map任务去执行也是很耗时的。
③是不是map数越多越好?
答案是这种说法是片面的,map数增多有利于提升并行度,但一个map在启动和初始化时间是远大于执行逻辑处理时间,越多的map启动初始化就造成很大的集群资源浪费。
减少map数量,降低资源浪费,如何做?
以下相当于是把小文件合并成大文件处理 (多合一)
//100000000B=100M
set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
//前3行设置是确定合并文件块的大小,>128M的文件按128M切块,>100M和<128M的文件按100M切块,剩下的<100M的小文件直接合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; //执行前合并小文件
有时候对hive进行优化,在执行时间上可能没什么大的改观,但是在计算资源上就有很大改善。
增大map数量,分担每个map处理的数据量提升任务效率,如何做?
以下相当于是把大文件拆分成小文件处理 (一拆多)
set mapred.reduce.tasks=10; //分担成10个小文件对应10个map任务执行,缩小了数据处理量
- 7、reduce端优化
reduce个数设置过大会产生很多小文件对namenode有影响,且输出的小文件偶尔也会作为下一个任务的输入导致出现小文件过多问题,设置过小又会导致单个reduce处理的数据量过大导致OOM异常,不指定时则hive会默认根据hive.exec.reducers.bytes.per.reducer(每个reduce任务处理数据量,默认1G)
和hive.exec.reducers.max(每个任务的最大reduce数,默认999)
来做min(hive.exec.reducers.max值,总输入数据量/hive.exec.reducers.bytes.per.reducer值)
计算,得出结果确定reduce个数。
set mapred.reduce.tasks=50; //指定reduce个数
那么:①reduce数是不是越多越好?
答案是错误的,同map数一样,启动reduce和初始化同样耗时和占资源。
②什么情况下当设置了参数指定reduce个数后还是只有单个reduce在跑?
本身输入数据量就小于1G
在做测数据量时没加group by分组汇总。比如select count(1) from test_table where stat_date = 20201228;
用了order by排序
关联出现了笛卡尔积
总结:要合理设置map和reduce数,默认不设置的话将由hive自行判断map和reduce个数
之所以说合理,就是因为设置map个数和reducer个数的设置是不绝对的,答案不唯一,具体见业务数据情况,过多过少都可能对优化没帮助,反而耗费任务资源,比如启动map任务耗用的时间大于逻辑处理的时间,map越多并行度就越高,影响map个数的因素有input文件总个数和input文件大小以及集群设置的文件块大小。
set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; //系统默认格式,设置在map执行前合并小文件,减少map数
set mapreduce.input.fileinputformat.split.maxsize = 100; //调整最大切片值,让maxSize值低于blocksize就可以增加map数
附上切片的公式:computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M //从公式可以看出假设要调小的话就调整maxSize,假设要调大就调整minSize
set mapreduce.job.reduces = 15; //有多少个reducer就会有多少个输出文件
-
8、巧用分区表和分桶表
在hive中查询时,执行顺序是先from后where之后再join,分区表在生产环境大量被使用,就是防止数据量大时全表扫描而造成缓慢,每一张hive表对应HDFS底层都是一个文件目录,而分区字段就相当于分子目录出来,hive要查到数据就必须要有实际数据和元数据,元数据存放在mysql中,元数据指的是表字段和定义,如果是通过load data方式加载数据进表的话是没有元数据信息的,所以可以先load数据后再使用命令msck repair table 表名;
一次性修复分区信息,或者先load数据后再使用命令alter table 表名 add partition 分区
一个个的添加分区信息。
分区表是划分存储路径,分桶表是针对数据文件 - 9、常见hive命令参数调整
hive报错:Caused by: java.lang.OutOfMemoryError: Java heap space
这问题主要原因是处理的数据量太大以至于程序占据了集群资源不够了,java的堆内存不足了,增加大小即可
容器设为10G,java.opt给它80%
set hive.tez.container.size=10240;
set hive.tez.java.opts=-Xmx8192m;
set hive.execution.engine=mr; // 设置执行引擎为MapReduce
set hive.execution.engine=tez; // 设置执行引擎为Tez tez在复杂SQL查询的性能是比mr还要优越的,而且还有==>>动态进度显示,mr只有进度百分比,相对简洁
set hive.new.job.grouping.set.cardinality=256; //做cube组合group by的时候超过5个维度则要配置这条参数
set hive.mapred.mode=nonstrict; //关闭严格模式
set hive.mapred.mode=strict; //开启严格模式,默认开启。
严格模式限制是在分区表查询时必须加分区字段过滤,order by排序查询时必须加limit限制条数,禁止笛卡尔积查询。
set hive.merge.mapfiles=true //默认是true开启的,合并map端输出文件
set hive.merge.mapredfiles=true //默认是false,合并reduce端输出文件
set hive.merge.size.per.task=256*1000*1000 //默认值256000000,合并文件的大小
set hive.map.aggr = true //开启map端聚合,默认是true开启的
set hive.grouby.mapaggr.checkinterval=100000 //在map端进行聚合操作的条目数
set hive.groupby.skewindata=true //默认是false关闭的,当为true会生成两个查询计划MR Job,第一个自动把相同key随机分配给不同的reducer,第二个根据预处理结果把group by key分布到同一个reducer中,最终聚合
-
10、关于count(distinct id)
hql语句中出现了distinct或者是order by这样都是走一个reducer的,尽量避免这类语句
//当数据量小的时候其实影响不大,直接使用下面即可
select count(distinct id) from a
//但是当数据量大时,distinct很耗性能,我们可以先group by去重,再count计算
select count(id)
from (select id from a group by id) a
-
11、分区裁剪、列裁剪
查询时尽量只取需要的列,有分区字段尽量通过分区字段过滤,少用select *
使用外关联时注意where过滤条件和on关联条件,避开造成先全表关联后才where过滤的情形,如下:
//下面这语句就造成了先全表关联后再where过滤了,这容易就掉这坑
select a.id
,count(1) as cnt
from a
left join b
on a.id = b.id
where a.stat_date = 20210124
group by a.id
//改造方式一,通过子查询优化语句
select a.id
,count(1) as cnt
from
(select a.id
from a
where a.stat_date = 20210124
) a
left join b
on a.id = b.id
group by a.id
//改造方式二,通过在关联条件优化语句
select a.id
,count(1) as cnt
from a
left join b
on a.stat_date = 20210124
and a.id = b.id
group by a.id
-
12、hive的表分析命令
统计表信息,主要用来优化查询,加速查询速度,刷新元数据。
analyze table test_jxb partition(stat_date=20210202) COMPUTE STATISTICS for COLUMNS
;
analyze table test_jxb partition(stat_date=20210202) COMPUTE STATISTICS
;
hive优化模块参考此文章:https://www.cnblogs.com/swordfall/p/11037539.html ,以及日常ETL工作实践积累。
hive注意事项看此文章:https://www.cnblogs.com/itlz/p/14267440.html
五、hive窗口函数
分析函数专题:http://lxw1234.com/archives/2015/07/367.htm
- sum、avg、min、max聚合函数的over() 窗口形式,这几个聚合函数是可以使用window子句的,即ROW BETWEEN,具体实践参考上面附的学习链接的分析函数(一)
-
ntile、row_number、rank、dense_rank序列函数的over() 窗口形式,
(注意:序列函数是不支持window子句的。)
ntile
是做分组内切片功能的,例如实际需求场景说要让取当月中用户活跃的前百分之三的那几天
select stat_date
,ntile(3) over(partition by stat_date order by uv DESC) as top3%_uv
from
(select stat_date
,count(distinct umid) as uv
from dwd_user_active_table
where stat_date between 20210101 and 20210130
group by stat_date
) s
row_number
是做分组内排名功能的,如果有排名相同的则随机依次排名,不考虑和存在相同名次
rank
同样是做分组内排名,存在相同名次,相同排名的话会在名次中留下空位
dense_rank
也是做分组内排名,存在相同名次,相同排名的话不会在名次中留下空位
-
cume_dist、percent_rank序列函数,这两个不常用,具体实践参考上面附的学习链接的分析函数(三)
cume_dist
的作用是:小于等于当前值的行数/分组内总行数
percent_rank
的作用是:分组内当前行的排名值-1/分组内总行数-1 -
lag、lead、first_value、last_value序列函数
lag
是用来往上移动统计窗口的,即数据整体往下对应移动,超出窗口的取默认值,否则为NULL
lead
也是用来往下移动统计窗口的,即数据整体往上对应移动,超出窗口的取默认值,否则为NULL
select imei
,sn
,pkg_name
,currentReportElapsedNanos
,currentReportWallClockNanos
,translate_timestamp as start_translate_timestamp
,lead(translate_timestamp,1,2553436800000) over(partition by imei,sn,pkg_name order by translate_timestamp) as end_translate_timestamp
,version_code
,version_string
,version_source
from ext_metis.dwd_common_proto_norm_uidmaps_detail
where stat_date = %(yyyymmdd)s
first_value
是取分组内截止到当前行的第一个值
last_value
是取分组内截止到当前行的最后一个值
如果要实现取分组内最后一个值的话则可以灵活变通下语句,即实现分组内倒序然后再用first_value函数取截止到当前行的第一个值,具体实践参考上面附的学习链接的分析函数(四)