一小时搞定Mapreduce程序

之前一直用hive处理数据,觉得MR程序打包上传的比较麻烦,后来偶遇hive搞不定的文件网上找了个MR的例子稍微改一下感觉也比较方便,主要是处理速度快。

MR程序主要是有3各类:main函数类,map重载类,reduce重载类。

第一步:maven里面添加几个jar包:

代码如下:


第二步:main类:主要是调度MR程序的启动运行

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static void main(String[]args)throws Exception

{

Configuration conf =new Configuration();//从hadoop配置文件读取参数

 String [] otherArgs =new GenericOptionsParser(conf,args).getRemainingArgs();//从命令行读取参数

 if(otherArgs.length!=2)

{

System.err.println("Usage:wordcount");

System.exit(2);

}

Job job =new Job(conf,"wordcount");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)?0:1);

}

}

第三步:Map类:主要是按行读取文件内容,根据自己需要处理(默认按回车换行分割,如果要改动需要重载某个函数)

代码如下:

import java.io.IOException; //导入异常处理类

import org.apache.hadoop.io.IntWritable; //导入整数类

import org.apache.hadoop.io.Text; //导入文本类

import org.apache.hadoop.mapreduce.Mapper; //导入Mapper类

public class TokenizerMapper extends Mapper{

    Text word = new Text();                //定义输出键

    //统计每行数据每个id,aaa字符出现的次数

    public void map(Object key,Text value,Context context)throws IOException,InterruptedException

    {

        int len = 0;

        String id="";

        len = appearNumber(value.toString(),"aaa");    //统计aaa出现次数

        index = value.toString().indexOf("\"id\":\"");

        if(index>0)

        {

            id = value.toString().substring(index+n,index+m); //取每行id值

            IntWritable one = new IntWritable(len);

            Text id_t = new Text(id);

            context.write(id_t,one);

        }

    }

    public static int appearNumber(String srcText, String findText) {

        int count = 0;

        int index = 0;

        while ((index = srcText.indexOf(findText, index)) != -1) {

            index = index + findText.length();

            count++;

        }

        return count;

    }

}

第四步:重载reduce类

  代码如下:

import java.io.IOException;import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class IntSumReducer extends Reducer{

 IntWritable result = new IntWritable();

 public void reduce(Text key,Iterable values,Context context)

            throws IOException,InterruptedException

    {

        int sum = 0;

        for(IntWritable val:values) //不断地将values中的IntWritable整数提取出来给val

        {

            sum=sum + val.get();

        }

        result.set(sum);

        context.write(key,result);  //每个id的aaa出现的次数求和输出

    }

}

第五步:打成jar包。上传hadoop,运行

hadoop  jar  mr_test1.jar  WordCount  hdfs:///myfile/test.log    hdfs:///myfile/output10

PS:hdfs:///myfile/2018042319.log  为输入日志文件

        hdfs:///myfile/output10 指定的输出目录

结果文件在:hdfs:///myfile/output10/part-r-00000

为了验证reduce阶段的作用,我曾把main函数中job.setReducerClass(IntSumReducer.class); 这句代码注掉,跑出的结果为每个id每行aaa出现的次数。将结果文件某个id  grep出来,例如结果有17行,然后对17行的value求和,和加上job.setReducerClass(IntSumReducer.class)这句,这个id的结果完全一致。

MR程序的运行速度对比

MR程序的运行速度感觉比较快,像这个日志文件约10G,一开始的时候出于简单的想法写了一个shell脚本去处理,放在内存192G,CPU64核物理机运行,发现每秒大概只能处理不到1000条数据,算下来跑完需要约10小时,想shell的结果和mr的结果进行对比,所以就没有kill。如果只是慢也就算了,shell脚本执行完几个小时,运维忽然通知这台服务器重启了,也没有太详细的dump信息,说大概是内存不足。。。幸亏只是一台节点机,无其它定时任务,所以对集群无影响。估计是shell管理内存有硬伤,系统本身也有一些内存没来得及释放。

上面的MR程序运行在同样配置的物理机集群,约40台,10G文件运行也就几分钟,对比shell的结果基本相同,感觉速度快了不止一个数量级。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,367评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,959评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,750评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,226评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,252评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,975评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,592评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,497评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,027评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,147评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,274评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,953评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,623评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,143评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,260评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,607评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,271评论 2 358

推荐阅读更多精彩内容

  • 前言 Netflix电影推荐的百万美金比赛,把“推荐”变成了时下最热门的数据挖掘算法之一。也正是由于Netflix...
    Alukar阅读 1,520评论 0 11
  • MapReduce编程重点把握 MapReduce核心概念 思考几个问题 词频统计wordcount的具体执行过程...
    胖胖的大罗阅读 720评论 0 1
  • 一场大雨吓跑了所有行人,给我一个与湖独处的机会。我撑起伞到湖畔散步。 脚下的灰犹豫不定,湖面的影躲躲藏藏。建筑物隐...
    陈果_周绿阅读 213评论 0 3
  • 在那些青春年少的岁月中,多少都会有些青涩美好的记忆,有的人相识,有的人分离,有的人成为朋友而有的人却咫尺天涯。 故...
    随峰星起阅读 193评论 0 0
  • 蒙顶甘露茶树花,欣逢电商发新芽。 昔日皇家贡品茶,今入寻常网民家。 ——素生 2017年8月16日 蒙顶山下
    杨共同学阅读 241评论 0 2