Hadoop 之 MapReduce 作业初体验

简单的 MapReduce 作业,需要一个 map 函数,一个 reduce 函数和一些用来运行作业的代码

// Mapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
  extends Mapper<LongWritable, Text, Text, IntWritable> {

  private static final int MISSING = 9999;
  
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    
    String line = value.toString();
    String year = line.substring(15, 19);
    int airTemperature;
    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
      airTemperature = Integer.parseInt(line.substring(88, 92));
    } else {
      airTemperature = Integer.parseInt(line.substring(87, 92));
    }
    String quality = line.substring(92, 93);
    if (airTemperature != MISSING && quality.matches("[01459]")) {
      context.write(new Text(year), new IntWritable(airTemperature));
    }
  }
}

// Reducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
  extends Reducer<Text, IntWritable, Text, IntWritable> {
  
  @Override
  public void reduce(Text key, Iterable<IntWritable> values,
      Context context)
      throws IOException, InterruptedException {
    
    int maxValue = Integer.MIN_VALUE;
    for (IntWritable value : values) {
      maxValue = Math.max(maxValue, value.get());
    }
    context.write(key, new IntWritable(maxValue));
  }
}

横向扩展(Scaling out)

需要把数据存储在分布式文件系统中,通过使用 Hadoop 资源管理系统 YARN,Hadoop 可以将 MapReduce 计算转移到存储有部分数据的各机器上

相关概念

MapReduce 作业

MapReduce 作业 == 输入数据 + MapReduce程序 + 配置信息

任务分类

Hadoop 将作业分成若干个任务(task)来执行,其中包括两类任务:map 任务和 reduce 任务,这些任务运行在集群几点上,并通过 YARN 进行调度。如果一个任务失败,它将在另一个不同的节点上自动重新调度运行

分片(input split)

Hadoop 将 MapReduce 的输入数据划分成等长的小数据块,成为输入分片(input split)或简称“分片”
Hadoop 为每个分片构建一个 map 任务,并由该任务来运行用户自定义的 map 函数从而处理分片中的每条记录

分片切分的粒度

相对来说,分片被切分的越细,作业的负载平衡质量会更高。但是如果分片切分的太细,那么管理分片的总时间和构建 map 任务的总时间将决定作业的整个执行时间

对于大多数作业来说,一个合理的分片大小趋向于 HDFS 的一个块的大小(128MB)

数据本地化优化(data locality optimization)

Hadoop 在存储输入数据的节点上运行 map 任务,可以获得最佳性能,而无需使用宝贵的集群带宽资源

跨机架的 map 任务

有时对于一个 map 任务的输入分片来说,存储该分片的 HDFS 数据块副本的所有节点可能正在运行其他的 map 任务,此时作业调度需要从某一个数据块所在的机架中的一个节点上寻找一个空闲的 map 槽(slot)来运行该 map 任务,这将导致机架与机架之间的网络传输

为何最佳分片的大小应该与块大小相同?

如果分片跨越两个数据块,那么对于任何一个 HDFS 节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到 map 任务运行的节点。这与使用本地数据运行整个 map 任务相比,显然效率更低

reduce 任务并不具备数据本地化的优势,单个 reduce 任务的输入通常来自于所有的 mapper 的输出;多个 reduce 任务,每个 map 任务针对输出进行分区

reduce 的输出通常存储在 HDFS 中以实现可靠存储。第一个副本存储在本地节点上,其他的副本处于可靠性考虑存储在其他机架的节点上

reduce 任务的数量并非由输入数据的大小决定,反而是独立指定的

combiner 函数

combiner 函数能够帮助减少 mapper 和 reducer 之间的数据传输量

// 通过如下方式调用来启用 combiner 函数
job.setComiberClass(XXXReducer.class)

Hadoop Streaming

Hadoop Streaming 使用 Unix 标准流作为 Hadoop 和应用程序之间的接口,所以可以使用任何编程语言通过标准输入/输出来写 MapReduce 程序

Streaming 天生适合用于文本处理。map 的输入数据通过标准输入流传递给 map 函数,并且是一行一行地传输,最后将结果行写到标准输出。map 输出的键-值对以一个制表符分隔的行,reduce 函数的输入格式与之相同并通过标准输入流进行传输。reduce 函数从标准输入流中读取输入行,该输入已由 Hadoop 框架根据键排过序,最后将结果写入标准输出

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容

  • 先思考问题 我们处在一个大数据的时代已经是不争的事实,这主要表现在数据源多且大,如互联网数据,人们也认识到数据里往...
    墙角儿的花阅读 7,355评论 0 9
  • 参考:hadoop 学习笔记:mapreduce框架详解 [toc] 总结 Mapreduce是一个计算框架,既然...
    小小少年Boy阅读 870评论 0 12
  • 参考:hadoop 学习笔记:mapreduce框架详解 [toc] 总结 Mapreduce是一个计算框架,既然...
    小小少年Boy阅读 1,166评论 0 4
  • 思考问题 MapReduce总结 MapReduce MapReduce的定义MapReduce是一种编程模型, ...
    Sakura_P阅读 937评论 0 1
  • 今天晚上班级举行了一次篝火晚会有几个人借机喝酒然后装醉或许待在喝醉的皮囊里能让自己舒服些酒壮怂人胆再次恭喜这些影帝...
    3流浪阅读 220评论 0 0