hadoop(5)MapReduce初识

1 MapReduce是什么

MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.这个可以跟排序中的归并排序类比下思想。归并采用的是分治法,也就是将一个问题分解成多个小问题,求出每个小问题的结果,然后再合并小结果成最终的结果。MapReduce亦然。
MapReduce由两个阶段组成:Map和Reduce,Map阶段进行分组计算。Reduce阶段进行合并结果。用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。
这两个函数的形参是key、value对,表示函数的输入信息。

2 MapReduce工作原理

MapReduce的整个流程,需要用户进行实现的也只有两部分内容。就是Map阶段的处理和Reduce阶段的处理。总的流程,如图:


MapReduce总流程

2.1 Map

  • 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。这里生成的键key是根据每一行开始字符的下标位置,解析示例如下:
123 abc// 解析之后为0,'123 abc'
321 cbd// 解析之后为8,'321 cbd'
tomy hello// 解析之后为16,'tomy hello'
  • Map具体的逻辑部分,对输入的key,value(就是上一步解析出来的key,value)进行处理。解析完成之后输出新的key,value。这一部分是用户自己实现

2.2 Shuffle

这一部分是系统自动完成,只是需要进行说明下,不然在进行编程的时候发现Map阶段的输出和Reduce阶段的输入不对称,但是也可以进行变成干预,如后面介绍的Partitioner编程、自定义排序编程、Combiner编程

  • 对Map输出的key,value进行分区,对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。然后将分组后的数据进行归约

2.3 Reduce

  • 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。
  • Reduce过程,对输入的key、value处理,转换成新的key、value输出。这一部分是用户自己实现
  • 把reduce的输出保存到文件中。

3 编码实践

  • 实现功能:对文件中的单词进行统计,如:
// 统计内容
how are you
// 统计结果
how 1
are 1
you 1
  • 实现的主要包含三个java类,WordCount、WordCountMapper、WordCountReduce
  • WordCount
package com.jiyx.test.mapred.job;

import com.jiyx.test.mapred.mapper.WordCountMapper;
import com.jiyx.test.mapred.reducer.WordCountReduce;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 主类
 * @author jiyx
 * @create 2018-10-13-16:01
 */
public class WordCount {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 构建job对象
        Job job = Job.getInstance();

        // 需要设置main方法所在的类
        job.setJarByClass(WordCount.class);

        // 设置mapper
        job.setMapperClass(WordCountMapper.class);
        // Map的输出类型设置可以省略,不过只能在reducer的输入(key,value)和输出(key,value)类型相等时才行
        // 也就是map的输出类型和reducer的输出类型一致的时候才能省略
        // 这里不相同,不能省略
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(ByteWritable.class);
        // 需要统计的文件
        FileInputFormat.setInputPaths(job, new Path("/wc.txt"));

        // 设置reducer
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        // 统计结果输出
        FileOutputFormat.setOutputPath(job, new Path("/wcResult"));

        // 提交任务,并打印过程信息
        job.waitForCompletion(true);
    }
}
  • WordCountMapper
package com.jiyx.test.mapred.mapper;

import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * Map
 * @author jiyx
 * @create 2018-10-13-15:42
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, ByteWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 对输入的value进行拆分后直接输出,如{18:"hello java hello"},输出为"hello",1和"java",1和"hello",1
        for (String newKey : value.toString().split(" ")) {
            context.write(new Text(newKey), new ByteWritable((byte) 1));
        }
    }
}
  • WordCountReduce
package com.jiyx.test.mapred.reducer;

import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * Reduce
 * @author jiyx
 * @create 2018-10-13-15:53
 */
public class WordCountReduce extends Reducer<Text, ByteWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<ByteWritable> values, Context context) throws IOException, InterruptedException {
        // 因为输入的是类似于{"hello": 1, 1, 1, 1}这种模式,所以直接++
        long count = 0;
        Iterator<ByteWritable> iterator = values.iterator();
        for (; iterator.hasNext(); iterator.next(), count++) {
        }
        context.write(key, new LongWritable(count));
    }

}
  • 将文件打包成以WordCount为主mainClass的jar包,如果是使用idea且打出的jar包不能执行的话,可以转到idea生成可执型jar包进行查看。
    这里使用的是hadoop三种运行模式中的伪分布模式,感觉会比较麻烦一点,后期采用本地模式进行
  • 然后将jar包上传到hadoop服务器,使用hadoop jar xxx.jar 执行jar包就可以了,至此第一个入门程序完成。

4 MapReduce的执行流程

  • 客户端提交一个mr的jar包给JobClient(提交方式:hadoop jar ...)
  • JobClient通过RPC和ResourceManager进行通信,返回一个存放jar包的地址(HDFS)和jobId
  • client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId,默认是写十份,mr结束删除文件)
  • 开始提交任务(任务的描述信息,不是具体jar包,而是包括jobid、jar存放位置、配置等等的信息)
  • ResourceManager进行初始化任务
  • 读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
  • NodeManager通过心跳机制领取任务(任务的描述信息)
  • 下载所需的jar,配置文件等
  • NodeManager启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
  • 将结果写入到HDFS当中

4 为什么hadoop适合小文件

  • 在执行MapReduce之前,原始数据被切割成若干个split,每个split作为一个map任务的输入,在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录。
  • FileInputFormat只划分比HDFS Block大的文件,所以FileInputFormat划分的结果就是这个文件或者是这个文件中的一部分。
  • 如果一个文件的大小比Block小,将不会被划分,这也就是Hadoop处理大文件的效率要比处理很多小文件的效率高的原因。
  • 当Hadoop处理很多小文件(文件大小小于HDFS Block大小)的时候,由于FileInputFormat不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致效率低下。
    例如:一个G的文件,会被划分成8个128MB的split,并分配8个map任务处理,而10000个100KB的文件会被10000个map任务处理。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 目的这篇教程从用户的角度出发,全面地介绍了Hadoop Map/Reduce框架的各个方面。先决条件请先确认Had...
    SeanC52111阅读 1,842评论 0 1
  • MapReduce:超大机群上的简单数据处理 摘要 MapReduce是一个编程模型,和处理,产生大数据集的相关实...
    lucode阅读 1,560评论 0 5
  • 2017年1月21日至2017年2月14日,这段时间回老家过春节去了。那么简单回顾下春节期间在老家的收获。 1.常...
    天涯侠客阅读 215评论 0 0
  • 早上的阳光有多久没打在你的身上了,那新鲜的空气是否早已忘却了味道。 ――题记...
    九二钰枫阅读 218评论 0 1
  • 在我学生生涯中,父亲只教我初一。父亲特别严厉,目光炯炯有神,里面藏着慑人的威严,神态安祥。从灵魂深处传来的信息,同...
    玉妮阅读 749评论 6 3

友情链接更多精彩内容