背景
最近在工作中有一个数据统计的任务,需要把一个万级别和一个亿级别的表join,通过查看hive日志,发现在reduce阶段出现了很严重的数据倾斜情况。故在此学习一下hive join原理和优化方法。
Hive join原理
通常的hive join指的是common join。
举个例子,目前我们手上:用户信息表user,交易记录表transaction两个表。user表如下:
uid | age | gender |
---|---|---|
1 | 20 | M |
2 | 30 | M |
3 | 25 | F |
4 | 23 | M |
transaction表如下:
tid | uid | item | amount |
---|---|---|---|
1 | 1 | 可乐 | 1 |
2 | 3 | 咖啡 | 1 |
3 | 3 | 面包 | 2 |
4 | 2 | 可乐 | 2 |
5 | 4 | 可乐 | 1 |
我们希望查询一张表,显示每一个交易的交易信息和用户信息。SQL代码如下:
SELECT
t.tid, u.uid, u.age, u.gender, t.item, t.amount
FROM
user u
JOIN
transaction t
ON
u.uid = t.uid;
其MapReduce的原理如下图:
大致步骤包含map -> shuffle sort -> reduce三步:
- 首先将原始的表映射成key-value的格式,其中join on里面的字段作为key。
- 然后按照key进行排序,这是一个shuffle的过程,这样相同的key就能够立马在同一个节点内了。
- 最后按照key进行reduce。不同的join类型(inner, left, right等)决定了不同的reduce函数。
大小表顺序
hive编译器默认join的时候,左表是小表,右表是大表,hive会将小表写入缓存,从而加快查询效率。例如user表是小表,transaction表是大表,我们应该写:
SELECT ...
FROM user u JOIN transaction t
ON ...
而不是:
SELECT ...
FROM transaction t JOIN user u
ON ...
使用mapJoin解决join数据倾斜
除了common join,hive还有一种map join算法,主要用于面对小表join大表的问题。原理如下图:
步骤描述如下:
- 在join任务开始前先创建一个本地mapReduce任务,将小表从hdfs读取到内存成为哈希表,然后把哈希表序列化成为哈希表文件。
- 在join任务启动时,会把这个哈希表文件读入hadoop分布式缓存(distributed cache)中,然后将这个哈希表发送到每个mapper上。在这个阶段,map任务扫描大表的每一条记录,并和小表哈希表进行关联,然后输出结果。
这样,就只有map而没有reduce了,加快了join效率,也解决了reduce侧的数据倾斜问题。
配置mapjoin有两种方法:
- 在select语句中增加一个标记,告诉hive编译器哪个是小表,以触发mapjoin:
SELECT /*+ MAPJOIN(u) */
t.tid, u.uid, u.age, u.gender, t.item, t.amount
FROM
user u
JOIN
transaction t
ON
u.uid = t.uid
- 第二种方法是设置属性值hive.auto.convert.JOIN为true,不过有些hive版本此参数默认是true,也就是说就算你不设置,hive也能识别出大表小表,自动开启mapjoin优化。此外,还可以设置小表的阈值,小于这个阈值的就认为是小表,默认是25M:
set hive.auto.convert.JOIN=true;
set hive.mapjoin.smalltable.filesize=25000000;
SELECT
t.tid, u.uid, u.age, u.gender, t.item, t.amount
FROM
user u
JOIN
transaction t
ON
u.uid = t.uid;
Reference
- 《Hive编程指南》,Edward Capriolo等著
- Hive Map Join 原理 - 腾讯云
- HIVE:JOIN原理、优化
- Hive SQL的编译过程