05_hadoop_wordcounts_yarn的配置_mr的简单使用

1.导入jar包

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.0</version>
        </dependency>
    </dependencies>

2. 继承Mapper

org.apache.hadoop.mapreduce.Mapper 下的

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
  1. KEYIN
    是map task 读取到的数据的key的类型,是一行的启始偏移量Long
  2. VALUEIN
    是map task读取到的数据的value的类型,是一行的内容Stirng
  3. KEYOUT
    是用户的自定义map方法要返回的数据kv数据的key的类型
  4. VALUEOUT
    用户的自定map方法要返回的结果kv数据的value的数据类型

在mapreduce中,map参数的数据传输给reduce,需要进行序列化和反序列化,而JDK中的原生序列化机制产生的数据比较冗余,就会导致MAPREDUCE运行过程中传输效率低
所以 hadoop专门设计了自己的序列化机制
hadoop为常用数据类型封装了自己的实现了hadoop序列化

java基本类型 Long String Integer Float
hadoop基本类型 LongWritable Text IntWritable FloatWritable

core:

public class WorkcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

2 另外的一个class 继承 Reducer

Reducer<Text, IntWritable, Text, IntWritable>
core:

public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int sun = 0;
        Iterator<IntWritable> iterator = values.iterator();
        while (iterator.hasNext()){
            IntWritable va = iterator.next();
            sun+=va.get();
        }

        context.write(key, new IntWritable(sun));
    }
}

3 配置yarn

在每台机器上配置

node manager在物理上应该跟data node部署在一起
resource manager在物理上应该独立部署在一台专门的机器上
此处为demo 所有 namenode 与resourcemanager 放在同一台服务器

  1. 修改每台机器配置文件:
    vi yarn-site.xml
<property>
<name>yarn.resourcemanager.hostname</name>
<value>vm01</value>
</property>

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property> 

<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property> 

其中vm01为 resource manager
mapreduce_shuffle为固定写法
1024 分配的内存 最小2048MB 因为yarn.app.mapreduce.am.resource.mb 需要有1536MB 如果设置小雨1536 运行yarn程序时会报错。当然不是运行就占用这么大的内存,而是最大可以用这么多。
2 为cpu个数 最小1个

  1. 启动yarn集群:start-yarn.sh (注:该命令应该在resource manager所在的机器上执行,否则resource manager会在执行这个命令的机器上启动)
  2. 用jps检查yarn的进程,用web浏览器查看yarn的web控制台
    http://vm01:8088

4 运行

4.1 写windows提到交到Linux中yarn的程序

package com.looc.main;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName JobSubmitter.java
 * @Description TODO
 * @createTime 2019年02月01日 16:55:00
 */
public class JobSubmitter {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
        //在代码中设置 获取HDFS以及yarn的用户身份
        System.setProperty("HADOOP_USER_NAME", "root");

        //设置job运行时的参数
        Configuration conf = new Configuration();

        //1. 设置job运行时需要访问的默认文件系统
        conf.set("fs.defaultFS", "hdfs://vm01:9000");
        //2. 设置job提交到哪里去
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "vm01");
        //3. 如果是在windows上运行就设置为跨平台,如果是在Linux系统上就不需要。因为windows上路径问题,和jar包执行命令不同
        conf.set("mapreduce.app-submission.cross-platform","true");

        //设置job
        Job job = Job.getInstance(conf);
        //1. 封装参数:jar包所在位置,写死适用于在window的ide上执行,也需要事先打好jar包,自动获取使用与在Linux系统上运行
        job.setJar("H:\\BaiduNetdiskDownload\\bigDateTempJar\\wr.jar");
        //job.setJarByClass(JobSubmitter.class);

        //2. 封装参数:此次job所需调用的Mapper实现类,Reducer实现类
        job.setMapperClass(WorkcountMapper.class);
        job.setReducerClass(WordcountReduce.class);

        //3. 封装参数:此次job的Mapper实现类和,Reducer实现类产生的结果数据的key,value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //此处是demo如果存在就删除
        Path outPutPath = new Path("/wordCount/outPut");
        FileSystem fileSystem = FileSystem.get(new URI("hdfs://vm01:9000"),conf,"root");
        if (fileSystem.exists(outPutPath)){
            fileSystem.delete(outPutPath, true);
        }


        //4. 封装参数:此次job需要处理的输入数据所在的路径,以及输出路径
        FileInputFormat.setInputPaths(job, new Path("/wordCount/resource"));
        FileOutputFormat.setOutputPath(job, outPutPath);

        //5. 封装参数:想要启动reduce task的数量
        job.setNumReduceTasks(2);

        //6. 提交job给yarn  此处为等待yarn执行完,并将执行过程输出到控制台,可以用job.submit();来提交
        boolean res = job.waitForCompletion(true);

        System.exit(res? 0:1);

    }
}

4.2 windows环境下模拟运行

public class JobSubWin {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(JobSubWin.class);

        job.setMapperClass();
        job.setReducerClass();

        job.setMapOutputValueClass();
        job.setMapOutputKeyClass();

        job.setOutputValueClass();
        job.setOutputKeyClass();

        FileInputFormat.setInputPaths(job, new Path("H:\BaiduNetdiskDownload\bigDateTempJar\\input"));
        FileOutputFormat.setOutputPath(job, new Path("H:\BaiduNetdiskDownload\bigDateTempJar\\output"));

        job.setNumReduceTasks(3);

        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

4.3 将jar包导入到Linux服务器上运行

  1. 将jar包放置任何一台装了hadoop的Linux机器上
  2. 使用命令
hadoop jar xxx.jar xxx.xxx.xxx.JobSubLinux

core:

public class JobSubLinux {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(JobSubWin.class);

        job.setMapperClass();
        job.setReducerClass();

        job.setMapOutputValueClass();
        job.setMapOutputKeyClass();

        job.setOutputValueClass();
        job.setOutputKeyClass();

        FileInputFormat.setInputPaths(job, new Path("/wordCount/input"));
        FileOutputFormat.setOutputPath(job, new Path("/wordCount/output"));

        job.setNumReduceTasks(3);

        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

5 实列demo

统计某个电话号码的全部上传流量,全部下载流量,以及全部上传和下载流量
如果需要使用自定义对象,那么该对象需要实现Writable接口,重写write和readFields方法 必须有无参构造因为需要反序列化
需要注意: write和readFields的顺序以及方法要一样
如下:

  1. bean类
package com.looc.流量数据分析demo;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName FlowBean.java
 * @Description TODO
 * @createTime 
 */
public class FlowBean implements Writable {
    private Integer upFlow;
    private Integer dFlow;
    private Integer amountFlow;
    private String phone;

    public FlowBean() {}
    
    public FlowBean(Integer upFlow, Integer dFlow, String phone) {
        this.upFlow = upFlow;
        this.dFlow = dFlow;
        this.amountFlow = upFlow + dFlow;
        this.phone = phone;
    }

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    public Integer getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Integer upFlow) {
        this.upFlow = upFlow;
    }

    public Integer getdFlow() {
        return dFlow;
    }

    public void setdFlow(Integer dFlow) {
        this.dFlow = dFlow;
    }

    public Integer getAmountFlow() {
        return amountFlow;
    }

    public void setAmountFlow(Integer amountFlow) {
        this.amountFlow = amountFlow;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(upFlow);
        dataOutput.writeInt(dFlow);
        dataOutput.writeInt(amountFlow);
        dataOutput.writeUTF(phone);
    }

    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readInt();
        dFlow = dataInput.readInt();
        amountFlow = dataInput.readInt();
        phone = dataInput.readUTF();
    }
}

  1. mapper类
package com.looc.流量数据分析demo;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName FlowCountMapper.java
 * @Description TODO
 * @createTime 
 */
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();

        String[] fields = line.split("\t");

        String phone = fields[1];
        Integer upFlow = Integer.parseInt(fields[fields.length-3]);
        Integer dFlow = Integer.parseInt(fields[fields.length-2]);

        context.write(new Text(phone), new FlowBean(upFlow, dFlow, phone));
    }
}

  1. reduce类
package com.looc.流量数据分析demo;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName FlowCountReducer.java
 * @Description TODO
 * @createTime 
 */
public class FlowCountReducer extends Reducer<Text ,FlowBean,Text, FlowBean> {
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

        int upSum = 0;
        int dSun = 0;

        for (FlowBean value : values) {
            upSum+=value.getUpFlow();
            dSun+=value.getdFlow();
        }

        context.write(key,new FlowBean(upSum, dSun, key.toString()));
    }
}

  1. 提交类
package com.looc.流量数据分析demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author chenPeng
 * @version 1.0.0
 * @ClassName FlowJobSub.java
 * @Description TODO
 * @createTime 
 */
public class FlowJobSub {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(FlowJobSub.class);

        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path("E:\\soft\\java\\ideaProject\\hadoop\\file\\数据流量分析demo\\input"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\soft\\java\\ideaProject\\hadoop\\file\\数据流量分析demo\\output"));

        job.waitForCompletion(true);

    }
}

  1. 数据源
1363157985066   137****0503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157995052   138****4101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4            4   0   264 0   200
1363157991076   139****5656 20-10-7A-28-CC-0A:CMCC  120.196.100.99          2   4   132 1512    200
1363154400022   139****1106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4            4   0   240 0   200
1363157993044   182****5961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99  iface.qiyi.com  视频网站    15  12  1527    2106    200
1363157995074   841****3    5C-0E-8B-8C-E8-20:7DaysInn  120.197.40.4    122.72.52.12        20  16  4116    1432    200
1363157993055   135****9658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157995033   159****3257 5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   信息安全    20  20  3156    2936    200
1363157983019   137****9419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82          4   0   240 0   200
1363157984041   136****7991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4    s19.cnzz.com    站点统计    24  9   6960    690 200
1363157973098   150****5858 5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com   搜索引擎    28  27  3659    3538    200
1363157986029   159****2119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99  www.umeng.com   站点统计    3   3   1938    180 200
1363157992093   135****9658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          15  9   918 4938    200
1363157986041   134****3104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4            3   3   180 180 200
1363157984040   136****6565 5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash2-http.qq.com 综合门户    15  12  1938    2910    200
1363157995093   139****4466 00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn      12  12  3008    3720    200
1363157982040   135****8823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99  y0.ifengimg.com 综合门户    57  102 7335    110349  200
1363157986072   183****3382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99  input.shouji.sogou.com  搜索引擎    21  18  9531    2412    200
1363157990043   139****7413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜索引擎    69  63  11058   48243   200
1363157988072   137****8710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200
1363157985066   137****8888 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157993055   135****6666 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
  1. 输出结果
134****3104 180 180 360
135****8823 7335 110349 117684
135****6666 1116 954 2070
135****9658 2034 5892 7926
136****6565 1938 2910 4848
136****7991 6960 690 7650
137****9419 240 0 240
137****0503 2481 24681 27162
137****8888 2481 24681 27162
137****8710 120 120 240
138****4101 264 0 264
139****4466 3008 3720 6728
139****7413 11058 48243 59301
139****1106 240 0 240
139****5656 132 1512 1644
150****5858 3659 3538 7197
159****3257 3156 2936 6092
159****2119 1938 180 2118
182****5961 1527 2106 3633
183****3382 9531 2412 11943
841****3    4116 1432 5548
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,427评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,551评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,747评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,939评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,955评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,737评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,448评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,352评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,834评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,992评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,133评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,815评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,477评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,022评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,147评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,398评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,077评论 2 355

推荐阅读更多精彩内容