Hive JOIN的MapReduce原理和优化

背景

最近在工作中有一个数据统计的任务,需要把一个万级别和一个亿级别的表join,通过查看hive日志,发现在reduce阶段出现了很严重的数据倾斜情况。故在此学习一下hive join原理和优化方法。

Hive join原理

通常的hive join指的是common join。
举个例子,目前我们手上:用户信息表user,交易记录表transaction两个表。user表如下:

uid age gender
1 20 M
2 30 M
3 25 F
4 23 M

transaction表如下:

tid uid item amount
1 1 可乐 1
2 3 咖啡 1
3 3 面包 2
4 2 可乐 2
5 4 可乐 1

我们希望查询一张表,显示每一个交易的交易信息和用户信息。SQL代码如下:

SELECT
    t.tid, u.uid, u.age, u.gender, t.item, t.amount
FROM
    user u
JOIN
    transaction t
ON
    u.uid = t.uid;

其MapReduce的原理如下图:


JOIN的mapReduce原理

大致步骤包含map -> shuffle sort -> reduce三步:

  • 首先将原始的表映射成key-value的格式,其中join on里面的字段作为key。
  • 然后按照key进行排序,这是一个shuffle的过程,这样相同的key就能够立马在同一个节点内了。
  • 最后按照key进行reduce。不同的join类型(inner, left, right等)决定了不同的reduce函数。

大小表顺序

hive编译器默认join的时候,左表是小表,右表是大表,hive会将小表写入缓存,从而加快查询效率。例如user表是小表,transaction表是大表,我们应该写:

SELECT ...
FROM user u JOIN transaction t
ON ...

而不是:

SELECT ...
FROM transaction t JOIN user u
ON ...

使用mapJoin解决join数据倾斜

除了common join,hive还有一种map join算法,主要用于面对小表join大表的问题。原理如下图:


mapjoin.png

步骤描述如下:

  • 在join任务开始前先创建一个本地mapReduce任务,将小表从hdfs读取到内存成为哈希表,然后把哈希表序列化成为哈希表文件。
  • 在join任务启动时,会把这个哈希表文件读入hadoop分布式缓存(distributed cache)中,然后将这个哈希表发送到每个mapper上。在这个阶段,map任务扫描大表的每一条记录,并和小表哈希表进行关联,然后输出结果。

这样,就只有map而没有reduce了,加快了join效率,也解决了reduce侧的数据倾斜问题。
配置mapjoin有两种方法:

  1. 在select语句中增加一个标记,告诉hive编译器哪个是小表,以触发mapjoin:
SELECT /*+ MAPJOIN(u) */
    t.tid, u.uid, u.age, u.gender, t.item, t.amount
FROM
    user u
JOIN
    transaction t
ON
    u.uid = t.uid
  1. 第二种方法是设置属性值hive.auto.convert.JOIN为true,不过有些hive版本此参数默认是true,也就是说就算你不设置,hive也能识别出大表小表,自动开启mapjoin优化。此外,还可以设置小表的阈值,小于这个阈值的就认为是小表,默认是25M:
set hive.auto.convert.JOIN=true;
set hive.mapjoin.smalltable.filesize=25000000;

SELECT
    t.tid, u.uid, u.age, u.gender, t.item, t.amount
FROM
    user u
JOIN
    transaction t
ON
    u.uid = t.uid;

Reference

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容