MapReduce Map端 join 的一个例子

什么是 Join

Join,翻译过来是 加入、连接、结合的意思。
而在数据处理中,join 是对表的操作。表是数据存储的一种形式,就像 excel 中的表一样。
我们为了想得到想要的结果,需要分析多张表,而 把两张 或更多的表 进行结合,这样的操作 就叫 Join。

那 在 MapReduce 中的 Join 就是指上面的操作,只不过可能不是处理的表,而是文件,或者是从表存储的介质 比如 MySql、Hbase 中读取的数据。

举个 MapReduce 中 使用 Join 的例子:

比如我们有两个文件,分别存储 订单信息:products.txt,和 商品信息:orders.txt ,详细数据如下:

products.txt:

//商品ID,商品名称,商品类型(数字表示,我们假设有一个数字和具体类型的映射)
p0001,xiaomi,001
p0002,chuizi,001

orders.txt:

//订单号,时间,商品id,购买数量 
1001,20170710,p0001,1 
1002,20170710,p0001,3 
1003,20170710,p0001,3 
1004,20170710,p0002,1

我们想象有多个商品,并有海量的订单信息,并且存储在多个 HDFS 块中。

如果我们想统计 每个商品的 购买数量,即这样的形式:

xiaomi,7
chuizi,1

该怎么处理?

我们分析上面我们想要的结果,商品名称和销量,这两个属性分别存放到不同的文件中,那我们就要考虑 在一个地方(mapper)读取这两个文件的数据,并把数据在一个地方(reducer)进行结合。这就是 MapReduce 中的 Join 了。

我们用代码实现上面的过程 (只写出最主要的代码):
Mapper:

Text outKey = new Text();
Text outValue = new Text();
StringBuilder sb = new StringBuilder();

protected void map {
    String[] split = value.toString().split(",");
    
    //两个文件 在一个 mapper 中处理
    if(name.equals("products.txt")) {
    
        //取商品ID 作为 输出key 和 商品名称 作为 输出value,即 第0、1 的数据
        outKey.set(split[0]);
        outValue.set("product#" + split[1]);
        context.write(outKey, outValue);
        
    } else {
        //取商品ID 作为 输出key 和 购买数量 作为 输出value,即 第2、3 的数据
        outKey.set(split[2]);
        outValue.set("order#" + split[3]);
        context.write(outKey, outValue);
    }
}

Reducer:

//用来存放:商品ID、商品名称
List<String> productsList = new ArrayList<>();

//用来存放:商品ID、购买数量
List<String> ordersList = new ArrayList<>();

Text outValue = new Text();

protected void reduce {

    for (Text text : values) {
        String value = text.toString();
        
        if(value.startsWith("product#")) {
            productsList.add(value.split("#")[1]); //取出 商品名称
            
        } else if(value.startsWith("order#")){
            ordersList.add(text.toString().split("#")[1]); //取出商品的销量
        }
    }
    int totalOrders = 0;
    for (int i=0; i < productsList.size(); i++) {
        for (int j=0; j < ordersList.size(); j++) {
            totalOrders += ordersList.get(j);
        }
        outValue.set(productsList.get(i) + "\t" + totalOrders );
        //最后的输出是:商品ID、商品名称、购买数量
        context.write(key, outValue);
    }
}

上面的代码即是 Join 的过程。

下面我们说下 Map Join。

什么是 Map Join

Map Join 是指 Join 发生在 MapReduce 的 Map 阶段,而我们通常在 MapReduce 中把两张或多个表结合,是在 Reduce 端处理。

为什么要使用 Map Join

我们假设 某些商品卖的特别好,比如小米手机,产生了大量的订单,数据量特别大;而某些商品销量惨淡,比如锤子手机。那么某一个 Reduce Task 处理了大量数据,某个 Reduce Task 可能只处理几条数据,即产生了数据倾斜的问题。

而 我们上面举的例子,商品相对于订单 的数据量来说,是非常小的,可能一个商城 有几百种商品,而订单量可能达到上千万。

而 Map Join,适用于: 合并两个表数据,但有数据倾斜的问题,且一张表的数据量很小,另一张表数据量很大的情况。

如何实现 Map Join

我们可以通过 DistributedCache 来实现。
DistributedCache 是一个提供给Map/Reduce框架的工具,用来缓存指定的文件到 每一个 slave 节点上。

我们通过 DistributedCache 把小文件发给每一个 Mapper,在 每一个 Mapper 中实现 上面 Reducer 的功能,这样发送到 Reducer 中的数据已经是聚合过的数据,数据量大大减少,也就解决了数据倾斜的问题。

代码流程:

  1. 在 Main 方法中调用 DistributedCache 的Api 把小文件保存起来。
  2. 在 Mapper 的 setup() 中通过 DistributedCache 的 Api 来获取小文件,并保存到 HashMap 中。
    Mapper 中有一个 setup() 方法,在 map() 之前执行,通常做一些初始化工作。
  3. 在 map() 中 从HasnMap 中读取数据,并读取大文件,把两个文件进行合并。
  4. 在 Reducer 中进一步处理。
看一下具体代码:

Main:

public static void main(String[] args) {
    ...
    DistributedCache.addCacheFile(new Path("...").toUri() , conf);
}

Mapper 中的 setup():

private static Map<String,String> productMap =  new HashMap<>();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
    //Path[] files = context.getLocalCacheFiles();
    Path[] files = DistributedCache.getLocalCacheFiles(conf);
    
    String strPath = files[0].toUri().toString();
    
    BufferedReader br = new BufferedReader(new FileReader(strPath));
    
    while((readLine = br.readLine()) != null) {
        String[] split = readLine.split(",");
        String productId = split[0];
        String productName = split[1];

        productMap.put(productId, productName);
    }
}

Mapper 中的 map():

@Override
protected void map {
    //读取的 orders.txt 中的数据
    String[] split = value.toString().split(",");

    String productId = split[2];
    String saleSum = split[3];

    String productName = productMap.get(productId);

    outKey.set(productId);
    outValue.set(productName + "\t" + saleSum);

    context.write(outKey, outValue);
}

这样就完成了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容