51cto赵强HADOOP学习(三)

MapReduce基本原理

image.png

基本概念

MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题。

MapReduce由两个阶段组成:Map和Reduce,用于只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。

这两个函数的形参是key、value对,表示函数的输入信息。

#jps
#start-all.sh
#jps
#hdfs dfs -lsr /
#hdfs dfs -cat /input/data.txt
#cd /root/training/hadoop-2.4.1/share/hadoop/mapreduce
#hadoop jar hadoop-mapreduce-examples-2.4.1.jar wordcount /input/data.txt /output
#hdfs dfs -lsr /
#hdfs dfs -cat /output/part-r-00000
hdfs dfs -cat /input/data.txt
image.png

第一个MapReduce程序

image.png
package demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    public static void main(String[] args) throws Exception{
        //申明一个job
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        
        //指明程序的入口
        job.setJarByClass(WordCount.class);
        
        //指明输入的数据
        //FileInputFormat.addInputPath(job,new Path("/input/data.txt"));
                                                                          //第二种
                FileInputFormat.addInputPath(job,new Path(args[0]));
        //组装Mapper和Reducer
        //设置Mapper
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        //设置Reducer
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        //指明数据输出的路径
        //FileOutputFormat.setOutputPath(job, new Path("/output1"));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //提交任务运行
        //job.waitForCompletion(true);
                job.waitForCompletion(false);

    }

}
                                  //  k1  v1        k2     v2
//class WordCountMapper extends Mapper<int, String, String, int>{
class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable>{

    @Override
    protected void map(LongWritable key1, Text value1,Context context)
            throws IOException, InterruptedException {
        //分词
        //key1        value1
        //  1          I love Beijing
        String var = value1.toString();
        String[] words = var.split(" ");
        
        //统计每个单词的频率,得到k2和v2
        for(String word:words) {
            //                      k2                    v2
            context.write(new Text(word), new LongWritable(1));
            
        }
    }
    
    
    
}
                                    //k3        v3      k4      v4
class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context) 
            throws IOException, InterruptedException {
        
        //key     values
        // I        (1,1)
        //得到每个单词总的频率
        long sum = 0;
        
        for(LongWritable value:values) {
            sum += value.get();
        }
        
        //将k4和v4输出
        context.write(key, new LongWritable(sum));
    }
    
    
}

右击程序,选择Export,Java,JAR file

image.png

image.png

image.png

image.png

image.png

image.png

上传到training目录下

#cd ~/training
#hadoop jar wc.jar
#hdfs dfs -lsr /output1
# hdfs dfs -cat /output1/part-r-00000
#hadoop jar wc.jar /input/data.txt /output2
#hdfs dfs -lsr /output2

MapReduce的序列化

序列化(Serialization)是指把结构化对象转化为字节流。

反序列化(Deserialization)是序列化的逆过程。即把字节流转回结构化对象

Java序列化(java.io.Serializable)

Hadoop序列化的特点

序列化格式特点:

-紧凑:高效使用存储空间。
-快速:读写数据的额外开销小
-可扩展:可透明地读取老格式的数据
-互操作:支持多语言的交互

Hadoop的序列化格式:Writable

Hadoop序列化的作用

序列化在分布式环境的两大作用:进程间通信,永久存储。
Hadoop节点间通信。
#more emp.csv
# hdfs dfs -put emp.csv /input/emp.csv
image.png

demo.se-Emp.java

package demo.se;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

//代表员工
public class Emp implements Writable{
    //7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
    private int empno;
    private String ename;
    private String job;
    private int mgr;
    private String hiredate;
    private int sal;
    private int comm;
    private int deptno;
    
    public Emp(){
        
        
    }
    
        @Override
    public String toString() {
        return "The salary of" + this.ename + "is" + this.sal;
    }

    @Override
    public void readFields(DataInput input) throws IOException {
        // 反序列化
        this.empno = input.readInt();
        this.ename = input.readUTF();
        this.job = input.readUTF();
        this.mgr = input.readInt();
        this.hiredate = input.readUTF();
        this.sal = input.readInt();
        this.comm = input.readInt();
        this.deptno = input.readInt();
        
        
    }

    @Override
    public void write(DataOutput output) throws IOException {
        // 序列化
        output.writeInt(empno);
        output.writeUTF(ename);
        output.writeUTF(job);
        output.writeInt(mgr);
        output.writeUTF(hiredate);
        output.writeInt(sal);
        output.writeInt(comm);
        output.writeInt(deptno);
    }

    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public int getMgr() {
        return mgr;
    }

    public void setMgr(int mgr) {
        this.mgr = mgr;
    }

    public String getHiredate() {
        return hiredate;
    }

    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }

    public int getSal() {
        return sal;
    }

    public void setSal(int sal) {
        this.sal = sal;
    }

    public int getComm() {
        return comm;
    }

    public void setComm(int comm) {
        this.comm = comm;
    }

    public int getDeptno() {
        return deptno;
    }

    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }
}

demo.se-EmpMain.java

package demo.se;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class EmpMain {

    public static void main(String[] args) throws Exception{
        //申明一个job
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        
        //指明程序的入口
        job.setJarByClass(EmpMain.class);
                
        //指明输入的数据
        FileInputFormat.setInputPaths(job,new Path(args[0]));
                
        //组装Mapper和Reducer
        //设置Mapper
        job.setMapperClass(EmpMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Emp.class);
        
        //指明数据输出的路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //提交任务运行
        job.waitForCompletion(true);
    }

}
//7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
class EmpMapper extends Mapper<LongWritable,Text,LongWritable,Emp>{

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        //7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
        String str = value.toString();
        String[] words = str.split(",");
        
        //创建一个Emp的对象
        Emp emp = new Emp();
        
        //设置员工的属性
        emp.setEmpno(Integer.parseInt(words[0]));
        emp.setEname(words[1]);
        emp.setJob(words[2]);
        
        //设置员工的经理
        try {
            emp.setMgr(Integer.parseInt(words[3]));
        }catch(Exception ex) {
            emp.setMgr(0);
        }
        
        emp.setHiredate(words[4]);
        emp.setSal(Integer.parseInt(words[5]));
        
        //设置员工的奖金
        try {
            emp.setComm(Integer.parseInt(words[6]));
        }catch(Exception ex) {
            emp.setComm(0);
        }       
        emp.setDeptno(Integer.parseInt(words[7]));
        
        //输出            key:员工号                         value:员工hdfs 
        context.write(new LongWritable(emp.getEmpno()), emp);
    }   
}
打包。
# hadoop jar se.jar /input/emp.csv /outputemp
#hdfs dfs -lsr /outputemp
#hdfs dfs -cat /outputemp/part-r-00000

MapReduce的排序

在Map和Reduce阶段进行排序时,比较的是key2

value2是不参与排序比较的。

如果要想让value2也进行排序,需要把key2和value2组装成新的类,作为key2,才能参与比较。

demo.sort.Emp.java
package demo.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

//代表员工
public class Emp implements WritableComparable<Emp>{
    //7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
    private int empno;
    private String ename;
    private String job;
    private int mgr;
    private String hiredate;
    private int sal;
    private int comm;
    private int deptno;
    
    public Emp(){
        
        
    }
    
    @Override
    public int compareTo(Emp e) {
        //按照薪水进行排序
        if(this.sal >= e.sal) {
            return 1;
        }else {
            return -1;
        }
    }
    @Override
    public String toString() {
        return "The salary of" + this.ename + " is" + this.sal;
    }



    @Override
    public void readFields(DataInput input) throws IOException {
        // 反序列化
        this.empno = input.readInt();
        this.ename = input.readUTF();
        this.job = input.readUTF();
        this.mgr = input.readInt();
        this.hiredate = input.readUTF();
        this.sal = input.readInt();
        this.comm = input.readInt();
        this.deptno = input.readInt();
        
        
    }

    @Override
    public void write(DataOutput output) throws IOException {
        // 序列化
        output.writeInt(empno);
        output.writeUTF(ename);
        output.writeUTF(job);
        output.writeInt(mgr);
        output.writeUTF(hiredate);
        output.writeInt(sal);
        output.writeInt(comm);
        output.writeInt(deptno);
    }

    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public int getMgr() {
        return mgr;
    }

    public void setMgr(int mgr) {
        this.mgr = mgr;
    }

    public String getHiredate() {
        return hiredate;
    }

    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }

    public int getSal() {
        return sal;
    }

    public void setSal(int sal) {
        this.sal = sal;
    }

    public int getComm() {
        return comm;
    }

    public void setComm(int comm) {
        this.comm = comm;
    }

    public int getDeptno() {
        return deptno;
    }

    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }
    
    
}

demo.sort.EmpSortMain.java
package demo.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class EmpSortMain {

    public static void main(String[] args) throws Exception{
        //申明一个job
                Configuration conf = new Configuration();
                Job job = new Job(conf);
                
                //指明程序的入口
                job.setJarByClass(EmpSortMain.class);
                        
                //指明输入的数据
                FileInputFormat.setInputPaths(job,new Path(args[0]));
                        
                //组装Mapper和Reducer
                //设置Mapper
                job.setMapperClass(EmpMapper.class);
                job.setMapOutputKeyClass(Emp.class);
                job.setMapOutputValueClass(NullWritable.class);
                
                //指明数据输出的路径
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
                
                //提交任务运行
                job.waitForCompletion(true);

    }

}
//7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
class EmpMapper extends Mapper<LongWritable,Text,Emp,NullWritable>{

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        //7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
        String str = value.toString();
        String[] words = str.split(",");
        
        //创建一个Emp的对象
        Emp emp = new Emp();
        
        //设置员工的属性
        emp.setEmpno(Integer.parseInt(words[0]));
        emp.setEname(words[1]);
        emp.setJob(words[2]);
        
        //设置员工的经理
        try {
            emp.setMgr(Integer.parseInt(words[3]));
        }catch(Exception ex) {
            emp.setMgr(0);
        }
        
        emp.setHiredate(words[4]);
        emp.setSal(Integer.parseInt(words[5]));
        
        //设置员工的奖金
        try {
            emp.setComm(Integer.parseInt(words[6]));
        }catch(Exception ex) {
            emp.setComm(0);
        }       
        emp.setDeptno(Integer.parseInt(words[7]));
        
        //输出            key:Emp                         value:NullWritable
        context.write(emp,NullWritable.get());
    }   
}
#hadoop jar sort.jar /input/emp.csv /outputsortemp
#hdfs dfs -lsr /outputsortemp
#hdfs dfs -cat /outputsortemp/part-r-00000

MapReduce的分区

image.png

Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类

MapReduce有一个默认的分区规则:只会产生一个分区

什么是Combiner?

每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量

combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

WordCount.java

package demo;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

    public static void main(String[] args) throws Exception{
        //申明一个job
        Configuration conf = new Configuration();
        Job job = new Job(conf);
        
        //指明程序的入口
        job.setJarByClass(WordCount.class);
        
        //指明输入的数据
        FileInputFormat.addInputPath(job,new Path(args[0]));
        
        //组装Mapper和Reducer
        //设置Mapper
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        //设置Combiner
        job.setCombinerClass(WordCountReducer.class);
        //设置Reducer
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        //指明数据输出的路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //提交任务运行
        job.waitForCompletion(true);
    }

}
                                  //  k1  v1        k2     v2
//class WordCountMapper extends Mapper<int, String, String, int>{
class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable>{

    @Override
    protected void map(LongWritable key1, Text value1,Context context)
            throws IOException, InterruptedException {
        //分词
        //key1        value1
        //  1          I love Beijing
        String var = value1.toString();
        String[] words = var.split(" ");
        
        //统计每个单词的频率,得到k2和v2
        for(String word:words) {
            //                      k2                    v2
            context.write(new Text(word), new LongWritable(1));
            
        }
    }
    
    
    
}
                                    //k3        v3      k4      v4
class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,Context context) 
            throws IOException, InterruptedException {
        
        //key     values
        // I        (1,1)
        //得到每个单词总的频率
        long sum = 0;
        
        for(LongWritable value:values) {
            sum += value.get();
        }
        
        //将k4和v4输出
        context.write(key, new LongWritable(sum));
    }
    
    
}
#hadoop jar wcd.jar /input/data.txt /dd
#hdfs dfs -ls /dd
# hdfs dfs -cat /dd/part-r-00000

注意

-Combiner的输出是Reduce的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner值应该用于那种Reduce的输入key/value与输出key/value类型安全一致,且不影响最终结果的场景。不如累加,最大值等。

什么是Shuffle?

Shuffle的过程

image.png

image.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容