Hadoop实验——MapReduce编程(2)

实验目的

  1. 通过实验掌握基本的MapReduce编程方法。
  2. 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。
  3. 通过操作MapReduce的实验,模仿实验内容,深入理解MapReduce的过程,和shuffle的具体意义。

实验平台

  • 操作系统:Ubuntu-16.04
  • Hadoop版本:2.6.0
  • JDK版本:1.8
  • IDE:Eclipse

实验内容和要求

一,编程实现文件二次排序:

  1. 现有一个输入文件,包含两列数据,要求先按照第一列整数大小排序,如果第一列相同,按照第二列整数大小排序。下面是输入文件和输出文件的一个样例供参考。
  • 输入文件secondarysort.txt的样例如下:
20 21 
50 51 
50 52 
50 53 
50 54 
60 51 
60 53 
60 52 
60 56 
60 57 
70 58 
60 61 
70 54 
70 55 
70 56 
70 57 
70 58 
1 2 
3 4 
5 6 
7 82 
203 21 
50 512 
50 522 
50 53 
530 54 
40 511 
20 53 
20 522 
60 56 
60 57 
740 58 
63 61 
730 54 
71 55 
71 56 
73 57 
74 58 
12 211 
31 42 
50 62 
7 8
  • 输出文件的样例如下:


实验过程:

  1. 创建文件secondarysort.txt


    将上面样例内容复制进去
  2. 在HDFS建立secondarysort_input文件夹(执行这步之前要开启hadoop相关进程)


  3. 上传输入文件到HDFS中的secondarysort_input文件夹


  4. 接着打开eclipse
    Eclipse的使用
    1. 点开上次实验的项目,找到 src 文件夹,右键选择 New -> Class


    2. 输入 Package 和 Name,然后Finish


    3. 写好Java代码(给的代码里要修改HDFS和本地路径),右键选择 Run As -> Run on Hadoop,结果在HDFS系统中查看


实验代码:

package cn.edu.zucc.mapreduce2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LoggerJob {
    public static class LoggerMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        private IntWritable counter = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String result = handleLog(line);
            if (result != null && result.length() > 0) {
                context.write(new Text(result), counter);
            }
        }

        private String handleLog(String line) {
            StringBuffer sBuffer = new StringBuffer();
            try {
                if (line.length() > 0) {
                    if (line.indexOf("GET") > 0) {
                        String tmp = line.substring(line.indexOf("GET"),
                                line.indexOf("HTTP/1.0"));
                        sBuffer.append(tmp.trim());
                    } else if (line.indexOf("POST") > 0) {
                        String tmp = line.substring(line.indexOf("POST"),
                                line.indexOf("HTTP/1.0"));
                        sBuffer.append(tmp.trim());
                    }
                } else {
                    return null;
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
            return sBuffer.toString();
        }

    }

    public static class LoggerReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                              Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));

        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        String[] otherArgs = new String[]{"loggerjob_input",
                "loggerjob_output"};
        if (otherArgs.length != 2) {
            System.err.println("Usage: loggerjob <in> <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "loggerjob");
        job.setJarByClass(LoggerJob.class);
        job.setMapperClass(LoggerMapper.class);
        job.setReducerClass(LoggerReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
模仿上题完成以下内容:对于输入文件,要求依次按照顺序对字段进行排序,如果第一个字段排好后再根据第二个字段的升序排序最后在根据第三个字段进行排序,对下面是输入文件和输出文件的一个样例供参考。
  • 输入文件的样例如下:
a1,b2,c5
a4,b1,c3
a1,b2,c4
a2,b2,c4
a2,b1,c4
a4,b1,c2
  • 输出文件的结果为:
a1  b2,c4
a1  b2,c5
a2  b1,c4
a2  b2,c4
a4  b1,c2
a4  b1,c3

二,编写程序实现对非结构化日志文件处理:

  1. 现有一个输入文件,要求区分不同的HTTP请求对文件进行输出。下面是输入文件和输出文件的一个样例供参考。
  • 输入文件的样例如下:
127.0.0.1 - - [03/Jul/2014:23:53:32 +0800] "POST /service/addViewTimes_544.htm HTTP/1.0" 200 2 0.004
127.0.0.1 - - [03/Jul/2014:23:54:53 +0800] "GET /html/20140620/900.html HTTP/1.0" 200 151770 0.054
127.0.0.1 - - [03/Jul/2014:23:57:42 +0800] "GET /html/20140620/872.html HTTP/1.0" 200 52373 0.034
127.0.0.1 - - [03/Jul/2014:23:58:17 +0800] "POST /service/addViewTimes_900.htm HTTP/1.0" 200 2 0.003
127.0.0.1 - - [03/Jul/2014:23:58:51 +0800] "GET / HTTP/1.0" 200 70044 0.057
127.0.0.1 - - [03/Jul/2014:23:58:51 +0800] "GET / HTTP/1.0" 200 70044 0.057
  • 输出文件的结果为:
GET /   2
GET /html/20140620/872.html 1
GET /html/20140620/900.html 1
POST /service/addViewTimes_544.htm  1
POST /service/addViewTimes_900.htm  1

实验过程:

  1. 创建文件


    将上面样例内容复制进去
  2. 在HDFS建立loggerjob_input文件夹


  3. 上传样例到HDFS中的loggerjob_input文件夹


  4. 到eclipse上执行代码

实验代码:

package cn.edu.zucc.mapreduce2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LoggerJob {
    public static class LoggerMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        private IntWritable counter = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String result = handleLog(line);
            if (result != null && result.length() > 0) {
                context.write(new Text(result), counter);
            }
        }

        private String handleLog(String line) {
            StringBuffer sBuffer = new StringBuffer();
            try {
                if (line.length() > 0) {
                    if (line.indexOf("GET") > 0) {
                        String tmp = line.substring(line.indexOf("GET"),
                                line.indexOf("HTTP/1.0"));
                        sBuffer.append(tmp.trim());
                    } else if (line.indexOf("POST") > 0) {
                        String tmp = line.substring(line.indexOf("POST"),
                                line.indexOf("HTTP/1.0"));
                        sBuffer.append(tmp.trim());
                    }
                } else {
                    return null;
                }

            } catch (Exception e) {
                e.printStackTrace();
                System.out.println(line);
            }
            return sBuffer.toString();
        }

    }

    public static class LoggerReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));

        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        String[] otherArgs = new String[] { "loggerjob_input",
                "loggerjob_output" };
        if (otherArgs.length != 2) {
            System.err.println("Usage: loggerjob <in> <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "loggerjob");
        job.setJarByClass(LoggerJob.class);
        job.setMapperClass(LoggerMapper.class);
        job.setReducerClass(LoggerReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
模仿上题完成以下内容:对于气象数据,要求获取数据中的最高温度和最低温度。下面是输入文件和输出文件的一个样例供参考。
  • 输入文件的样例如下:
0067011990999991950051507004888888889999999N9+00001+9999999999999999999999
0067011990999991950051512004888888889999999N9+00221+9999999999999999999999
0067011990999991950051518004888888889999999N9-00111+9999999999999999999999
0067011990999991949032412004888888889999999N9+01111+9999999999999999999999

数据说明:
第15-19个字符是year
第45-50位是温度表示,+表示零上 -表示零下,且温度的值不能是9999,9999表示异常数据。
第50位值只能是0、1、4、5、9几个数字

  • 输出文件结果为:
1949    111
1950    3

三,编写程序实现对输入文件进行倒排索引:

  1. 现有三个文本文档,需要根据单词查找文档,并且还要考虑权重问题。下面是输入文件和输出文件的一个样例供参考。
  • 输入文件的d1.txt的样例如下:
Mapreduce is simple is easy
  • 输入文件的d2.txt的样例如下:
Mapreduce is powerful is userful
  • 输入文件的d3.txt的样例如下:
Hello Mapreduce Bye Mapreduce
  • 输出文件的结果为:
Bye hdfs://localhost:9000/user/tiny/invertedindex_input/d3.txt:1;
Hello   hdfs://localhost:9000/user/tiny/invertedindex_input/d3.txt:1;
Mapreduce   hdfs://localhost:9000/user/tiny/invertedindex_input/d3.txt:2;hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:1;hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:1;
easy    hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:1;
is  hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:2;hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:2;
powerful    hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:1;
simple  hdfs://localhost:9000/user/tiny/invertedindex_input/d1.txt:1;
userful hdfs://localhost:9000/user/tiny/invertedindex_input/d2.txt:1;
  1. 创建文件


    将上面样例内容复制进去
  2. 在HDFS建立invertedindex_input文件夹


  3. 上传样例到HDFS中的invertedindex_input文件夹


  4. 到eclipse上执行代码

实验代码:

package cn.edu.zucc.mapreduce2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndex {
    public static class InvertedIndexMap extends Mapper<Object, Text, Text, Text> {
        private static final Text one = new Text("1");
        private Text keyInfo = new Text();
        private FileSplit split;

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            split = (FileSplit) context.getInputSplit();
            String line = value.toString();
            String[] strings = line.split(" ");
            for (String s : strings) {
                keyInfo.set(s + ":" + this.split.getPath().toString());
                context.write(keyInfo, one);
            }
        }
    }

    public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> {
        Text info = new Text();

        @Override
        public void reduce(Text key, Iterable<Text> values, Context contex) throws IOException, InterruptedException {
            Integer sum = 0;
            for (Text value : values) {
                sum += Integer.parseInt(value.toString());
            }
            int splitIndex = key.toString().indexOf(":");
            info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
            key.set(key.toString().substring(0, splitIndex));
            contex.write(key, info);
        }
    }

    public static class InvertedIndexReduce extends Reducer<Text, Text, Text, Text> {
        private Text result = new Text();

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String fileList = "";
            for (Text value : values) {
                fileList += value.toString() + ";";
            }
            result.set(fileList);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");
        String[] otherArgs = new String[]{"invertedindex_input",
                "invertedindex_output"};
        if (otherArgs.length != 2) {
            System.err.println("Usage: InvertedIndex <in> <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "InvertedIndex");
        job.setJarByClass(InvertedIndex.class);
        job.setMapperClass(InvertedIndexMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(InvertedIndexReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }
}
模仿上题完成以下内容:对于输入文件,请编写MapReduce程序,统计文件中的通话记录情况。下面是输入文件和输出文件的一个样例供参考。
  • 输入文件 A 的样例如下:
13588888888 112
13678987879 13509098987
18987655436 110
2543789 112
15699807656 110
011-678987 112
  • 输出文件 B 的结果为:
110 15699807656|18987655436|
112 011-678987|2543789|13588888888|
13509098987 13678987879|
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容