MapReduce2-3.1.1 分布式计算 实验示例(一)单词计数

大家好,我是Iggi。

今天我给大家分享的是MapReduce2-3.1.1版本的Word Count Ver1.0实验示例。

首先用一段文字简介MapReduce:

MapReduce最早是由Google公司研究提出的一种面向大规模数据处理的并行计算模型和方法。Google公司设计MapReduce的初衷主要是为了解决其搜索引擎中大规模网页数据的并行化处理。Google公司发明了MapReduce之后首先用其重新改写了其搜索引擎中的Web文档索引处理系统。但由于MapReduce可以普遍应用于很多大规模数据的计算问题,因此自发明MapReduce以后,Google公司内部进一步将其广泛应用于很多大规模数据处理问题。到目前为止,Google公司内有上万个各种不同的算法问题和程序都使用MapReduce进行处理。

2004年,开源项目Lucene(搜索索引程序库)和Nutch(搜索引擎)的创始人Doug Cutting发现MapReduce正是其所需要的解决大规模Web数据处理的重要技术,因而模仿Google MapReduce,基于Java设计开发了一个称为Hadoop的开源MapReduce并行计算框架和系统。自此,Hadoop成为Apache开源组织下最重要的项目,自其推出后很快得到了全球学术界和工业界的普遍关注,并得到推广和普及应用。

MapReduce的推出给大数据并行处理带来了巨大的革命性影响,使其已经成为事实上的大数据处理的工业标准。尽管MapReduce还有很多局限性,但人们普遍公认,MapReduce是到目前为止最为成功、最广为接受和最易于使用的大数据并行处理技术。MapReduce的发展普及和带来的巨大影响远远超出了发明者和开源社区当初的意料,以至于马里兰大学教授、2010年出版的《Data-Intensive Text Processing with MapReduce》一书的作者Jimmy Lin在书中提出:MapReduce改变了我们组织大规模计算的方式,它代表了第一个有别于冯·诺依曼结构的计算模型,是在集群规模而非单个机器上组织大规模计算的新的抽象模型上的第一个重大突破,是到目前为止所见到的最为成功的基于大规模计算资源的计算模型。

MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:

1)MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。

2)MapReduce是一个并行计算与运行软件框架(Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。

3)MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理。

image.png

好,下面进入正题。介绍Java操作MapReduce2组件完成Word Count Ver2.0的操作。

首先,使用IDE建立Maven工程,建立工程时没有特殊说明,按照向导提示点击完成即可。重要的是在pom.xml文件中添加依赖包,内容如下图:

image.png

待系统下载好依赖的jar包后便可以编写程序了。

展示实验代码:

package linose.mapreduce;

import java.io.IOException;
import java.io.OutputStreamWriter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
//import org.apache.log4j.BasicConfigurator;


/**
 * Hello MapReduce!
 * Word Count V1.0
 * 本示例演示如何使用MapReduce组件统计单词出现的个数
 * 关于示例中出现的API方法可以参考如下连接:http://hadoop.apache.org/docs/r3.1.1/api/index.html
 */
public class AppVer1 
{ 
    public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException
    {   
        /**
         * 设定MapReduce示例拥有HDFS的操作权限
         */
        System.setProperty("HADOOP_USER_NAME", "hdfs"); 
        
        /**
         * 为了清楚的看到输出结果,暂将集群调试信息缺省。
         * 如果想查阅集群调试信息,取消注释即可。
         */
        //BasicConfigurator.configure();
        
        /**
         * MapReude实验准备阶段:
         * 定义HDFS文件路径
         */
        String defaultFS = "hdfs://master2.linose.cloud.beijing.com:8020";
        String inputPath = defaultFS + "/index.dirs/input.txt";
        String outputPath = defaultFS + "/index.dirs/output";
        
        /**
         * 生产配置,并获取HDFS对象
         */
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", defaultFS);
        FileSystem system = FileSystem.get(conf);
        
        /**
         * 定义输入路径,输出路径
         */
        Path inputHdfsPath = new Path(inputPath);
        Path outputHdfsPath = new Path(outputPath);
        
        /**
         * 如果实验数据文件不存在则创建数据文件
         */
        if (!system.exists(inputHdfsPath)) {
            
            FSDataOutputStream inputStream = system.create(inputHdfsPath);
            OutputStreamWriter outputStream = new OutputStreamWriter(inputStream);
            
            outputStream.write("芒果 菠萝 西瓜 橘子 草莓 \n");
            outputStream.write("草莓 橘子 苹果 荔枝 蓝莓 \n");
            outputStream.write("天天 菇娘 释迦 软枣子 癞瓜 蛇皮果 \n");
            outputStream.write("香蕉 菠萝 鸭梨 柚子 苹果 \n");
            outputStream.write("草莓 橘子 桂圆 荔枝 香蕉 \n");
            outputStream.write("苹果 菠萝 草莓 弥猴桃 芒果 \n");
            outputStream.write("苹果 香蕉 提子 橘子 菠萝 \n");
            outputStream.write("西瓜 苹果 香蕉 橙子 提子 \n");
            outputStream.write("香蕉 鸭梨 西瓜 葡萄 芒果 \n");
            outputStream.write("苹果 樱桃 香蕉 葡萄 橘子 \n");
            outputStream.write("西瓜 葡萄 桃 车厘子 香蕉 榴莲 瓜 火龙果 荔枝 \n");
            
            outputStream.close();
            inputStream.close();
        }
    
        /**
         * 如果实验结果目录存在,遍历文件内容全部删除
         */
        if (system.exists(outputHdfsPath)) {
            RemoteIterator<LocatedFileStatus> fsIterator = system.listFiles(outputHdfsPath, true);
            LocatedFileStatus fileStatus;
            while (fsIterator.hasNext()) {
                fileStatus = fsIterator.next();
                system.delete(fileStatus.getPath(), false);
            }
            system.delete(outputHdfsPath, false);
        }

        /**
         * 创建MapReduce任务并设定Job名称
         */
        JobConf jobConf = new JobConf(conf, WordCountVer1.class);
        jobConf.setJobName("Word Count Ver1.0:");
    
        /**
         * 指定输入输出的默认格式类
         */
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.setOutputFormat(TextOutputFormat.class);

        /**
         * 设置输入文件与输出文件
         */
        FileInputFormat.setInputPaths(jobConf, inputHdfsPath);
        FileOutputFormat.setOutputPath(jobConf, outputHdfsPath);
        
        /**
         * 指定Reduce类输出类型Key类型与Value类型
         */
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(IntWritable.class);

        /**
         * 指定自定义Map类,Reduce类,开启Combiner函数。
         */
        jobConf.setMapperClass(WordCountVer1.Map.class);
        jobConf.setCombinerClass(WordCountVer1.Reduce.class);
        jobConf.setReducerClass(WordCountVer1.Reduce.class);
        
        /**
         * 提交作业
         */
        RunningJob run = JobClient.runJob(jobConf);

        /**
         * 然后轮询进度,直到作业完成。
         */
        float progress = 0.0f;
        do {
            progress = run.setupProgress();
            System.out.println("Word Count Ver1.0: 的当前进度:" + progress * 100);
            Thread.sleep(1000);
        } while (progress != 1.0f && !run.isComplete());
        
        /**
         * 如果成功,查看输出文件内容
         */
        if (run.isSuccessful()) {
            RemoteIterator<LocatedFileStatus> fsIterator = system.listFiles(outputHdfsPath, true);
            LocatedFileStatus fileStatus;
            while (fsIterator.hasNext()) {
                fileStatus = fsIterator.next();
                FSDataInputStream outputStream = system.open(fileStatus.getPath());
                IOUtils.copyBytes(outputStream, System.out, conf, false);
                outputStream.close();
                System.out.println("--------------------------------------------");
            }
        }
    }
}

展示MapReduce2-3.1.1组件编写Word Count Ver1.0测试类:

package linose.mapreduce;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

/**
 * WordCount示例
 * @author Iggi
 *
 */
public class WordCountVer1 {

    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter report) throws IOException {
            int counter = 0;
            while(values.hasNext()) {
                counter += values.next().get();
            }
            output.collect(key, new IntWritable(counter));
        }
    }
    
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter report) throws IOException {
            
            String[] words = value.toString().split(" ");
            for(String word: words){
                output.collect(new Text(word), new IntWritable(1));
            }
        }
    }
}

下图为测试结果:


image.png

至此,MapReduce2-3.1.1 Word Count Ver1.0 实验示例演示完毕。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352