0x00 什么是hive
hive是构建在hadoop之上的数据仓库组件,是目前大数据领域最常用的数据仓库开源实现框架,hive定义了一种类sql语言,称为hql,hive能将hql转换成mapreduce,提交到hadoop执行,当然如果执行引擎设置为其他,例如spark,会将hql转换成spark job执行
0x01 hive的执行流程
参考资料:
hive官方文档design
如下图,hive的结构以及执行过程
[图片上传失败...(image-6c2390-1574612092305)]
UI user interface
用户接口,与用户的交互接口,包括jdbc、odbc、cli、web interface,其中jdbc和odbc通过Thrift Server连接driverdriver、compiler
接到用户提交的sql后,driver发送给compiler翻译成执行计划,compiler先查询元数据,确定sql语句没有问题然后生成执行计划并优化,返回给driver,然后发送给execution engineexecution engine
engine接到查询计划后,编译成hadoop任务,发送给hadoop执行,然后返回结果,如果是ddl语句,还会联系metastore,另外hive的后续版本也加入了tez和spark执行引擎,配置hive.execution.engine参数可以修改,默认是mr连接方式
通常在bin目录下的hive脚本,执行就可以直接访问hive,也就是我们常用的hive cli,另外一种方式是hive0.11之后的新的cli连接方式beeline,需要启动hiveserver2(默认端口10000),beeline是一种类似于jdbc的连接方式,beeline相对于hive cli有以下优势
1、hive cli需要将访问元数据的用户名和密码信息配置在hive-site.xml中
2、hive cli只支持单用户访问
具体客户端的区别见0x02
0x02 hive客户端
运行时会启动一个runJar进程,在本地编译hql,然后提交到hadoop执行
单用户操作
安全性较低
hive0.11之后推出的新客户端,hive推荐使用
基于SQLLine的jdbc客户端
需要启动hiveserver2(jdbc方式访问也需要)
更纯粹的客户端,hql提交到hiveserver2编译优化执行
有更丰富的权限控制
0x03 hive权限
HIVE LanguageManual Authorization
hive一般不需要做权限控制,因为大部分访问都是内网的访问,但是hive也提供了几种权限控制用例
首先从hive的两个作用来考虑权限
1、作为数据存储层
这种权限控制一般基于hdfs文件系统,只能做到表级别或表分区级别的读写控制
2、作为sql引擎
这个又分两种
hive cli客户端,这种和第一种类似,只能做到基于文件系统的权限控制
beeline客户端,这种是基于hiveserver2的,能做到sql标准的权限控制,类似于mysql的权限控制
0x04 hive排序的几种方式
- order by
order by和关系型数据库的排序类似,不同的地方是,当hive开启strict模式时,order by必须带上limit,因为order by是在最后将所有数据到一个reduce中进行排序,执行时间会很长
- sort by
sort by也是排序,但是sort by是将每个reduce中的数据排序,如果不止一个reduce,最终结果可能是部分有序
- distribute by
distribute by不是用来排序,主要是将key分区到不同的reduce,能保证相同的key在一个reducce处理,但是不保证同一个reduce中的key有序
- cluster by
cluster by相当于distribute by + sort by,除了将key分区以外还保证key在reduce上有序
0x05 hive strict模式
hive严格模式,通过hive.maper.mode=strict(nostrict)设置开启或关闭,这个模式主要是为了防止用户做一些非常耗时的操作而影响整个集群,例如:
1、分区表查询不对分区做限制
2、order by不带limit
。。。
这些操作在严格模式下都会报错
0x06 内部表和外部表
内部表和外部表是hive的两种表,主要区别是hive是否管理表的数据
- 内部表(managed table)
默认建表都是内部表
数据由hive管理,存储在hive.metastore.warehouse.dir配置的hdfs目录
drop表会将数据也删除
适用于hive自己管理生命周期的表或者临时表
- 外部表(external)
建表时要加上 create external table
drop表时不会删除数据文件(如果设置了external.table.purge=true也会删除数据)
适用于数据已经存在的情况(并且数据较大)
0x07 分桶表
LanguageManual DDL BucketedTables
分桶表提供了比分区更细粒度的数据划分方式,能将字段按hash取模划分成指定份数
- 创建方法
CREATE TABLE user_info_bucketed(user_id BIGINT, firstname STRING, lastname STRING)
COMMENT 'A bucketed copy of user_info'
PARTITIONED BY(ds STRING)
CLUSTERED BY(user_id) INTO 256 BUCKETS;
- 插入数据
set hive.enforce.bucketing = true; -- (Note: Not needed in Hive 2.x onward)
FROM user_id
INSERT OVERWRITE TABLE user_info_bucketed
PARTITION (ds='2009-02-25')
SELECT userid, firstname, lastname WHERE ds='2009-02-25';
hive0.x和1.x时,在插入数据前需要执行set hive.enforce.bucketing = true,这个参数会自动根据插入表的分桶数设置reduce个数,如果不设置这个参数需要设置好reduce(set mapred.reduce.tasks=?)个数,并且使用cluster by查询语句
- 注意事项
分桶规则是按照hash函数来的,具体的hash函数是根据桶的个数来的,但是对于数据和字符串或其他数据类型使用的hash函数不同,最后得到的桶也不同,所以对于插入和查询,确保数据类型不变,才能正确的查到数据
0x08 开窗函数
Windowing and Analytics Functions
开窗函数是为了解决聚合运算每组只返回一个值的问题,开窗函数可以为聚合运算每组返回多个值,oracle中也叫分析函数
window_func() over(partition by [col1,col2...] [order by [col1,col2...]] windowing_clause)
window_func() 窗口函数
包括所有聚合函数,还有一些常用的分析函数,例如rank()、row_number()、lead()等partition by 开窗的列
可以指定多个列,和group by的分组类似order by 排序字段
用来指定窗口中数据顺序的列windowing_clause 开窗范围
限定开窗的范围,以当前行为参照,在前几行或后几行的范围上计算聚合值
只有窗口函数是聚合函数时才能使用,例如count、sum、max等,但是rank、row_number不能使用,并且一定要有order by
(ROWS | RANGE) BETWEEN (UNBOUNDED | [num]) PRECEDING AND ([num] PRECEDING | CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING)
(ROWS | RANGE) BETWEEN CURRENT ROW AND (CURRENT ROW | (UNBOUNDED | [num]) FOLLOWING)
(ROWS | RANGE) BETWEEN [num] FOLLOWING AND (UNBOUNDED | [num]) FOLLOWING
- 常见用法
数据
name | score |
---|---|
a | 2 |
a | 3 |
a | 4 |
a | 7 |
a | 9 |
1、如果窗口函数是聚合函数,并且windowing_clause省略时,并且有order by时,窗口函数计算范围是第一行到当前行
-- sql1
select name,score,sum(score) over(partition by name order by score) as x from t
-- sql2
select name,score,sum(score) over(partition by name) as x from t
结果1: 窗口内从第一行截止到当前行的和
name | score | x |
---|---|---|
a | 2 | 2 |
a | 3 | 5 |
a | 4 | 9 |
a | 7 | 16 |
a | 9 | 25 |
结果2: 窗口内所有数据和
name | score | x |
---|---|---|
a | 2 | 25 |
a | 3 | 25 |
a | 4 | 25 |
a | 7 | 25 |
a | 9 | 25 |
2、rank、row_number都是排名函数,区别是row_number没有相同的排名,rank有并列排名
3、lead和lag用法
lag(expr,Bigint offset, default) 取当前行之前的offset行数据,没有就是default值
lead(expr,Bigint offset, default) 取当前行之后的offset行数据,没有就是default值
4、windowing_clause用法
-- sql1
select name,score,sum(score) over(partition by name order by score rows BETWEEN 1 PRECEDING and 2 PRECEDING) as x from t
-- sql2
select name,score,sum(score) over(partition by name order by score rows BETWEEN 1 PRECEDING and 2 PRECEDING) as x from t
结果1:从当前行向上第一行到第二行求和
name | score | x |
---|---|---|
a | 2 | \N |
a | 3 | 2 |
a | 4 | 5 |
a | 7 | 7 |
a | 9 | 11 |
结果2:从当前行向下第一行到第二行求和
name | score | x |
---|---|---|
a | 2 | 7 |
a | 3 | 11 |
a | 4 | 16 |
a | 7 | 9 |
a | 9 | \N |
利用开窗函数的特性可以巧妙的解决很多问题,例如:查询连续登陆N天的用户
0x09 调优
Cost-based optimization in Hive
所有优化问题都可以从上到下优化,优化效果也是从上到下依次递减,对于hive来说,掌握hive的执行原理,根据具体的业务,选择更好的存储表和查询方式往往可能得到更好的优化结果。所以遇到优化问题,可以从以下几个方面考虑
1、数据组织方式是否可以优化,例如只是查询某一块的数据,是否可以将表建为分区表或分桶表
2、数据是否存在倾斜,hive底层的mapreduce是分布式并行处理框架,所以不怕数据多,而怕数据倾斜
3、如果数据没有倾斜,还有可能是资源不足造成的,可以优化sql,尝试减少job数
4、调整hive参数,增加资源
0x10 数据倾斜
hive中数据倾斜一般是因为输入到reduce端的数据不均衡导致的,部分reduce处理的数据过多,一直未完成,具体表现就是进度一直卡在99%,也叫做长尾问题
在sql中通常join、group by、count distinct操作的时候可能会发生长尾问题
1、join
空值较多或者某个key数据量太大,可以从几个方面考虑,如果是大小表join,可以使用mapjoin先将小表加载到内存,然后在join,如果两张表都比较大,就要尽量将大表去重,减少join的数据量,如果大表数据还是很大,就要从业务上考虑能否优化处理方式
-- mapjoin
select /*+mapjoin(smallT)*/ from bigT left outer join smallT on bigT.key = smallT.key
2、group by
某个key值数据量太大,可以将key单独提出来处理,或者使用随机数将这个key拆分处理,最后在进行加总
-- 原sql
SELECT Key,COUNT(*) AS Cnt FROM TableName GROUP BY Key;
-- 假设长尾的Key已经找到是KEY001
SELECT a.Key
, SUM(a.Cnt) AS Cnt
FROM (
SELECT Key
, COUNT(*) AS Cnt
FROM TableName
GROUP BY Key,
CASE
WHEN Key = 'KEY001' THEN Hash(Random()) % 50
ELSE 0
END
) a
GROUP BY a.Key;
3、count distinct key最终会将所有不同的key汇聚到最后一个reduce统计,如果key值太多,可能需要处理很长时间,我们可以先使用group by key,然后为每个组附上一个随机数字段,然后针对随机数进行group by,计算sum,最后在外层对每组随机数的sum进行加总
select sum(t1.cnt) as res from (
select tag,count(1) as cnt from (
select key,rand()*100 as tag from t group by key
) t1 group by tag
) t2
-- 也可以先group by最后在count的方式
SELECT SUM(PV) AS Pv
, COUNT(*) AS UV
FROM (
SELECT COUNT(*) AS Pv
, uid
FROM UserLog
GROUP BY uid
) a;
需要注意的是,随机数的解决方案都要借助子查询的方式,如果长尾不是特别严重,这种处理方式不一定更好
0x11 hive参数
关于优化的一些常用参数,并不完全
-- 设置reduce数
set mapred.reduce.tasks = 256
-- 开启join倾斜优化
set hive.optimize.skewjoin = true
-- 基于输入文件大小,将join优化为mapjoin
set hive.auto.convert.join = true
-- 小表阈值(byte),如果小表大小小于这个值,hive会使用mapjoin
set hive.mapjoin.smalltable.filesize=25000000
-- hive是否开启对group by的倾斜优化
set hive.groupby.skewindata=true
0x12 join优化之Predicate Pushdown
谓词下推,指将数据的筛选条件尽量下推到底层,避免过多的数据传输,hive参数为set hive.optimize.ppd=true,默认是开启的,先来看两个sql
select t1.*,t2.* from
(select * from a where pt = '20191120) t1
left outer join
(select * from b where pt = '20191120) t2
on t1.key = t2.key
select a.*,b.*
from a left outer join b
on a.key = b.key and b.pt = '20191120'
where a.pt = '20191120
outer join的执行顺序
1、将join的表做笛卡尔积
2、筛选on条件
3、补全主表,left补左表,right补右表
4、筛选where条件
对于第二个sql,a表的分区裁剪就会下推到join前执行
因为执行顺序的关系,对于b表的分区裁剪不能放在where中,因为对于a存在,b补null的数据最终会过滤掉,导致结果和第一个sql不一致
所以对于outer join,分区的筛选条件放在on和where上是有很大差别的,特别是full outer join,使用的时候要非常注意,为了避免不必要的错误,一般对join的表使用子查询,将分区筛选直接放在子查询中,然后join,这是最简便的方式。
详情可参考:阿里云文档join on条件