一、说明
MapReduce提供的表连接操作包括:Map端join、Reduce端join、semi join(半连接)。
Map端join是指数据到达map处理函数之前进行合并的,效率要远远高于Reduce端join;因为Reduce端join是把所有的数据都经过Shuffle,非常消耗资源。
common join/shuffle join/reduce join (都是指同一个)。
1、Map join(也叫作 boardcast join)
连接的数据中,有一份数据比较小,小数据全部加载到内存,按关键字建立索引。
大数据文件作为map的输入文件,对map()函数每一对输入,都能够方便地和已加载到内存的小数据进行连接。
把连接结果按key输出,经过shuffle阶段,reduce端得到的就是已经按key分组的、并且连接好了的数据。
这种方法,要使用hadoop中的DistributedCache把小数据分布到各个计算节点,每个map节点都要把小数据库加载到内存,按关键字建立索引。
这种方法有明显的局限性:有一份数据比较小,在map端,能够把它加载到内存,并进行join操作。
2、Reduce join
在map阶段, 把关键字作为key输出,并在value中标记出数据是来自data1还是data2。
因为在shuffle阶段已经自然按key分组,reduce阶段,判断每一个value是来自data1还是data2,在内部分成2组,做集合的乘积。
这种方法有2个问题:
1、map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。
2、reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。
3、semi join
semi join就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据,不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。
semi join也是有一定的适用范围的,其抽取出来进行join的key是要放到内存中的,所以不能够太大,否则容易在Map端造成OOM。
4、使用BloomFilter过滤空连接的数据
对其中一份数据在内存中建立BloomFilter,另外一份数据在连接之前,用BloomFilter判断它的key是否存在,如果不存在,那这个记录是空连接,可以忽略。
5、使用内存服务器,扩大节点的内存空间
针对map join,可以把一份数据存放到专门的内存服务器,在map()方法中,对每一个<key,value>的输入对,根据key到内存服务器中取出数据,进行连接。
6、使用mapreduce专为join设计的包
jar: mapreduce-client-core.jar
package: org.apache.hadoop.mapreduce.lib.join