hive简要机制
hive 利用HDFS存储,利用MR查询,执行程序运行在yarn上,是基于hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并提供类SQL查询功能
本质:将HQL转化为MR程序
hive优缺点
优点:采用类SQL语法,简单、容易上手
避免了去写MR,减少了开发人员的学习成本
执行延迟比较高,常用于数据分析,对实时性要求不高的情况
优势在于处理大数据
有内置函数,也支持用户自定义函数
缺点:HQL表达能力有限(迭代式运算无法表达 不擅长数据挖掘)
效率比较低(自动生成MR,不够智能化 调优比较困难)
hive执行过程:
1、通过解析器,将SQL字符串转化成抽象语法树AST,对AST进行语法分析,判断语义是否有错误,
2、通过编译器,将AST编译生成逻辑执行计划
3、通过优化器,对逻辑执行计划进行优化
4、通过执行器,把逻辑计划转换成可以运行的物理计划,就是MR或者Spark
导入数据:
使用导入数据时,会使用到into和overwrite into两个关键字,into是在当前表追加数据,而overwrite into是删除当前表的数据然后在导入数据。
1、本地文件导入hive:load data local inpath '路径' into table 表名;
【方式1 :文件的路径必须使用绝对路径】
2、从HDFS导入hive 上边的语句不加local
3、通过查询语句向表中插入数据(Insert):insert into table 表名select , , from 表名 执行MR
4、查询语句创建表并加载数据:create table 表名 as select .... from 表名
5、创建表时通过Location 指定加载数据路径:上传数据到创建表时指定的路径即可
6、sqoop的方式:Import数据到指定的hive表中
数据导出:
1、Insert导出:insert [local] directory '路径' [row format delimited fields terminated by ""【格式化】]select * from 表名
2、hadoop 命令导出到本地 -get
3、hive shell 命令导出:hive -f/-e 执行语句或者脚本>file
4、sqoop导出:export
创建表的相关字段说明:
external:指定是外部表,默认是内部表
comment:为表和列添加注释
partitioned by创建分区表
clustered by:创建分桶表
stored as:指定存储文件的类型
row format delimited fields terminated by""
location:指定表在HDFS上的存储位置
hive 文件的格式 stored as:
textfile:默认格式,导入数据时会直接把数据文件拷贝到hdfs上。存储方式为行存储;磁盘开销大,数据解析开销大;hive不会对数据进行切分,从而无法对数据进行并行操作。
sequencefile:除了上述的格式外,不能直接从本地文件导入数据,数据要先导入到textfile格式的表中,然后再从表中用insert导入表中;或者用复制表结构及数据的方式
二进制文件,以<k,v>的形式序列化到文件中;行存储;可分割 压缩;
rcfile:数据按行分块每块按列存储;压缩快 ;
orcfile:按行分块每块按列存储; 压缩快; 效率比fcfile高
parquet:列存储
内部表(管理表)VS 外部表:
内部表:创建时,会将数据移动到数据仓库指向的路径,删除时,元数据和数据一起被删掉
外部表:创建时,仅记录数据所在的路径,不对数据的位置做任何改变,删除时,只会删除元数据,不会删除数据,
分区表:就是分目录,把大的数据集分割成小的数据集;查询时,通过where子句的表达式选择指定的分区,
数据上传到分区目录上,让分区表与数据产生关联的三种方式:
1、上传数据后修复:修复:msck repair table 表名
2、上传后添加分区:添加分区 alter table 表名addpartition(month="201709")
3、创建文件夹后load data到分区
二级分区表,就是有两个分区的条件
排序:
全局排序:order by :一个reduce asc(升序 默认) desc (降序)
内部排序:sort by:对每个reduce内部怕排序
分区排序:distribute by: 对partition 排序 ,要结合sort by 使用,【distribute by要写在sort by之前】
cluster by:distribute by 和sort by字段相同时,可以使用cluster by,包括两个的功能。但是排序只能升序排序,不能指定
行存储VS列存储:
行存储的特点:查询满足条件的一整行数据的时候,列存储需要去每个聚集的字段找到对应的每个列的值,行存储只需要找到中一个值,其余的值都在相邻的地方所以此时行存储的速度较快;
列存储的特点:查询只需要少数字段的时候,能减少读取的数据量;每个字段的数据类型一定是相同的,列存储可以针对性的设计更好的设计压缩算法
hive的数据倾斜:
由于数据分布不均匀,造成数据大量集中到一点
表现:任务进度长时间维持在100%附近,查看任务监控页面,发现只有少量reduce任务未完成,以为处理的数据量和其他的reduce差异过大,单一reduce处理的记录数与平均记录数相差太大,最长时间远大于平均时长
原因:key分布不均匀(通过hash算法分配reduce数)
业务数据本身的特性
建表考虑不周全
某些HQL语句本身存在数据倾斜
场景:空值产生的数据倾斜;
常有信息丢失的问题,在进行表关联的时候,只能通过空值进行关联,就会碰到数据倾斜
解决:1、为空的不参与关联 表连接的时候指定not null
2、给空值赋一个新值
方法2比方法1效率好,不但io少了,而且作业数也少了。在1中,表读取两次,jods是2。在2中,job是1,。这个优化适合无效的id产生的数据倾斜。把倾斜的数据分到不同的reduce上,解决数据倾斜。?????
不同数据类型关联产生数据倾斜:
一个表的字段是int,另一个表的字段既有int也有String,表连接的时候默认的hash操作会按照int进行分配,这样导致所有的String类型的id都分配到一个reduce中;
小表join大表 可能产生数据倾斜
小表不大不小,map join不支持这么小的表,若是使用普通的join,又会碰到数据倾斜 ???
使用mapjoin,
总结:使map输出数据更均匀的分布到reduce中,由于Hash算法的局限性,按照key Hash会或多或少的造成数据倾斜,
hive中join分为Common join和Map join:
Common join:如果不指定MapJoin或者不符合mapjoin的条件,那么hive的解析器会默认执行Common join,在reduce阶段完成join。整个过程包含Map,Shuffle,reduce
map阶段:读取源表的数据,map输出时,以join on条件中的列作为key,map输出的value为join之后所关心的列(select或者where中需要用到的),同时value中还会包含表的Tag信息,用于表明此value对应那个表。
shuffle阶段:根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中。
reduce阶段:根据key的值完成join操作,期间通过Tag来标识不同表中的数据
mapjoin:通常用于一张很小的表和一张很大的表进行进的场景,具体小表有多小,有参数hive.mapjoin.smalltable.filesize来决定,默认值是25M,满足条件的话,Hive在执行时候会自动转化为MapJoin,或使用hint提示 /*+
mapjoin(table) */执行MapJoin【map阶段完成join】
如图中的流程,首先是Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中,该HashTable的数据结构可以抽象为:
|key| value|
| 1 | 26 |
| 2 | 34 |
接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。
-
由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件
补充数据倾斜的解决方案:
1、参数配置;
设置map端聚合:set hive.map.aggr=true;
数据倾斜时进行负载均衡:set hive.groupby.skewindata=true;
2、 [endif]小表join大表时,采用mapjoin
大表join大表时0值或者空值过多时,将0值或者空值赋一个新值,把空值的 key 变成一个字符串加上随机数
3、 group by 分组的维度过少,每个维度的值过多,导致处理某值的reduce耗时很久
4、 count distinct 特殊值过多,处理耗时 可以先去重在进行统计
5、map和reduce的优化
小文件过多的时候合并小文件;set hive.merge.mapfiles=true
单个文件过大可以设置map的个数